Skip to content
Snippets Groups Projects
test_sfg.py 63.5 KiB
Newer Older
  • Learn to ignore specific revisions
  • Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                "OUT1",
            ]
    
    
    
    class TestRemove:
        def test_remove_single_input_outputs(self, sfg_simple_filter):
    
            new_sfg = sfg_simple_filter.remove_operation("cmul0")
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                op.name
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                for op in sfg_simple_filter.find_by_name("T")[0].subsequent_operations
            } == {"CMUL", "OUT"}
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                op.name for op in new_sfg.find_by_name("T")[0].subsequent_operations
            } == {"ADD", "OUT"}
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                op.name
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                for op in sfg_simple_filter.find_by_name("ADD")[0].preceding_operations
            } == {"CMUL", "IN"}
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                op.name for op in new_sfg.find_by_name("ADD")[0].preceding_operations
            } == {"T", "IN"}
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                sig.name for sig in sfg_simple_filter.find_by_name("T")[0].output(0).signals
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                sig.name for sig in new_sfg.find_by_name("T")[0].output(0).signals
    
    
        def test_remove_multiple_inputs_outputs(self, butterfly_operation_tree):
            out1 = Output(butterfly_operation_tree.output(0), "OUT1")
            out2 = Output(butterfly_operation_tree.output(1), "OUT2")
    
            sfg = SFG(outputs=[out1, out2])
    
            new_sfg = sfg.remove_operation(sfg.find_by_name("bfly2")[0].graph_id)
    
            assert sfg.find_by_name("bfly3")[0].output(0).signal_count == 1
            assert new_sfg.find_by_name("bfly3")[0].output(0).signal_count == 1
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            sfg_dest_0 = sfg.find_by_name("bfly3")[0].output(0).signals[0].destination
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            new_sfg_dest_0 = (
                new_sfg.find_by_name("bfly3")[0].output(0).signals[0].destination
            )
    
    
            assert sfg_dest_0.index == 0
            assert new_sfg_dest_0.index == 0
            assert sfg_dest_0.operation.name == "bfly2"
            assert new_sfg_dest_0.operation.name == "bfly1"
    
            assert sfg.find_by_name("bfly3")[0].output(1).signal_count == 1
            assert new_sfg.find_by_name("bfly3")[0].output(1).signal_count == 1
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            sfg_dest_1 = sfg.find_by_name("bfly3")[0].output(1).signals[0].destination
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            new_sfg_dest_1 = (
                new_sfg.find_by_name("bfly3")[0].output(1).signals[0].destination
            )
    
    
            assert sfg_dest_1.index == 1
            assert new_sfg_dest_1.index == 1
            assert sfg_dest_1.operation.name == "bfly2"
            assert new_sfg_dest_1.operation.name == "bfly1"
    
            assert sfg.find_by_name("bfly1")[0].input(0).signal_count == 1
            assert new_sfg.find_by_name("bfly1")[0].input(0).signal_count == 1
    
            sfg_source_0 = sfg.find_by_name("bfly1")[0].input(0).signals[0].source
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            new_sfg_source_0 = new_sfg.find_by_name("bfly1")[0].input(0).signals[0].source
    
    
            assert sfg_source_0.index == 0
            assert new_sfg_source_0.index == 0
            assert sfg_source_0.operation.name == "bfly2"
            assert new_sfg_source_0.operation.name == "bfly3"
    
            sfg_source_1 = sfg.find_by_name("bfly1")[0].input(1).signals[0].source
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            new_sfg_source_1 = new_sfg.find_by_name("bfly1")[0].input(1).signals[0].source
    
    
            assert sfg_source_1.index == 1
            assert new_sfg_source_1.index == 1
            assert sfg_source_1.operation.name == "bfly2"
            assert new_sfg_source_1.operation.name == "bfly3"
    
    
            assert "bfly2" not in {op.name for op in new_sfg.operations}
    
    
        def remove_different_number_inputs_outputs(self, sfg_simple_filter):
            with pytest.raises(ValueError):
                sfg_simple_filter.remove_operation("add1")
    
    
    class TestSaveLoadSFG:
        def get_path(self, existing=False):
            path_ = "".join(random.choices(string.ascii_uppercase, k=4)) + ".py"
            while path.exists(path_) if not existing else not path.exists(path_):
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                path_ = "".join(random.choices(string.ascii_uppercase, k=4)) + ".py"
    
    
            return path_
    
        def test_save_simple_sfg(self, sfg_simple_filter):
            result = sfg_to_python(sfg_simple_filter)
            path_ = self.get_path()
    
            assert not path.exists(path_)
            with open(path_, "w") as file_obj:
                file_obj.write(result)
    
            assert path.exists(path_)
    
    
            with open(path_) as file_obj:
    
                assert file_obj.read() == result
    
            remove(path_)
    
        def test_save_complex_sfg(self, precedence_sfg_delays_and_constants):
            result = sfg_to_python(precedence_sfg_delays_and_constants)
            path_ = self.get_path()
    
            assert not path.exists(path_)
            with open(path_, "w") as file_obj:
                file_obj.write(result)
    
            assert path.exists(path_)
    
    
            with open(path_) as file_obj:
    
                assert file_obj.read() == result
    
            remove(path_)
    
        def test_load_simple_sfg(self, sfg_simple_filter):
            result = sfg_to_python(sfg_simple_filter)
            path_ = self.get_path()
    
            assert not path.exists(path_)
            with open(path_, "w") as file_obj:
                file_obj.write(result)
    
            assert path.exists(path_)
    
            simple_filter_, _ = python_to_sfg(path_)
    
            assert str(sfg_simple_filter) == str(simple_filter_)
            assert sfg_simple_filter.evaluate([2]) == simple_filter_.evaluate([2])
    
            remove(path_)
    
        def test_load_complex_sfg(self, precedence_sfg_delays_and_constants):
            result = sfg_to_python(precedence_sfg_delays_and_constants)
            path_ = self.get_path()
    
            assert not path.exists(path_)
            with open(path_, "w") as file_obj:
                file_obj.write(result)
    
            assert path.exists(path_)
    
            precedence_sfg_registers_and_constants_, _ = python_to_sfg(path_)
    
            assert str(precedence_sfg_delays_and_constants) == str(
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                precedence_sfg_registers_and_constants_
            )
    
    
            remove(path_)
    
        def test_load_invalid_path(self):
            path_ = self.get_path(existing=False)
    
            with pytest.raises(FileNotFoundError):
    
                python_to_sfg(path_)
    
    
    class TestGetComponentsOfType:
        def test_get_no_operations_of_type(self, sfg_two_inputs_two_outputs):
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            assert [
                op.name
                for op in sfg_two_inputs_two_outputs.find_by_type_name(
                    Multiplication.type_name()
                )
            ] == []
    
        def test_get_multiple_operations_of_type(self, sfg_two_inputs_two_outputs):
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            assert [
                op.name
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                for op in sfg_two_inputs_two_outputs.find_by_type_name(Addition.type_name())
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            ] == ["ADD1", "ADD2"]
    
            assert [
                op.name
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                for op in sfg_two_inputs_two_outputs.find_by_type_name(Input.type_name())
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            ] == ["IN1", "IN2"]
    
            assert [
                op.name
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                for op in sfg_two_inputs_two_outputs.find_by_type_name(Output.type_name())
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            ] == ["OUT1", "OUT2"]
    
    
    
    class TestPrecedenceGraph:
        def test_precedence_graph(self, sfg_simple_filter):
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            res = (
    
                'digraph {\n\trankdir=LR\n\tsubgraph cluster_0 {\n\t\tlabel=N0\n\t\t"in0.0"'
                ' [label=in0 height=0.1 shape=rectangle width=0.1]\n\t\t"t0.0" [label=t0'
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                ' height=0.1 shape=rectangle width=0.1]\n\t}\n\tsubgraph cluster_1'
    
                ' {\n\t\tlabel=N1\n\t\t"cmul0.0" [label=cmul0 height=0.1 shape=rectangle'
                ' width=0.1]\n\t}\n\tsubgraph cluster_2 {\n\t\tlabel=N2\n\t\t"add0.0"'
                ' [label=add0 height=0.1 shape=rectangle width=0.1]\n\t}\n\t"in0.0" ->'
                ' add0\n\tadd0 [label=add0 shape=ellipse]\n\tin0 -> "in0.0"\n\tin0'
                ' [label=in0 shape=cds]\n\t"t0.0" -> cmul0\n\tcmul0 [label=cmul0'
                ' shape=ellipse]\n\t"t0.0" -> out0\n\tout0 [label=out0 shape=cds]\n\tt0Out'
                ' -> "t0.0"\n\tt0Out [label=t0 shape=square]\n\t"cmul0.0" -> add0\n\tadd0'
                ' [label=add0 shape=ellipse]\n\tcmul0 -> "cmul0.0"\n\tcmul0 [label=cmul0'
                ' shape=ellipse]\n\t"add0.0" -> t0In\n\tt0In [label=t0'
                ' shape=square]\n\tadd0 -> "add0.0"\n\tadd0 [label=add0 shape=ellipse]\n}'
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            assert sfg_simple_filter.precedence_graph.source in (res, res + "\n")
    
    
    
    class TestSFGGraph:
        def test_sfg(self, sfg_simple_filter):
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            res = """digraph {
    	rankdir=LR splines=spline
    	in0 [label="IN
    (in0)" shape=cds]
    	in0 -> add0 [headlabel=0]
    	out0 [label="OUT
    (out0)" shape=cds]
    	"t0.0" -> out0
    	"t0.0" [shape=point]
    	t0 -> "t0.0" [arrowhead=none]
    	add0 [label="ADD
    (add0)" shape=ellipse]
    	cmul0 -> add0 [headlabel=1]
    	cmul0 [label="CMUL
    (cmul0)" shape=ellipse]
    	add0 -> t0
    	t0 [label="T
    (t0)" shape=square]
    	"t0.0" -> cmul0
    }"""
            assert sfg_simple_filter.sfg_digraph().source in (
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        def test_sfg_show_signal_id(self, sfg_simple_filter):
            res = """digraph {
    	rankdir=LR splines=spline
    	in0 [label="IN
    (in0)" shape=cds]
    	in0 -> add0 [label=s0 headlabel=0]
    	out0 [label="OUT
    (out0)" shape=cds]
    	"t0.0" -> out0 [label=s1]
    	"t0.0" [shape=point]
    	t0 -> "t0.0" [arrowhead=none]
    	add0 [label="ADD
    (add0)" shape=ellipse]
    	cmul0 -> add0 [label=s2 headlabel=1]
    	cmul0 [label="CMUL
    (cmul0)" shape=ellipse]
    	add0 -> t0 [label=s3]
    	t0 [label="T
    (t0)" shape=square]
    	"t0.0" -> cmul0 [label=s4]
    }"""
    
            assert sfg_simple_filter.sfg_digraph(show_signal_id=True).source in (
    
    Frans Skarman's avatar
    Frans Skarman committed
                res,
                res + "\n",
            )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        def test_sfg_no_branch(self, sfg_simple_filter):
            res = """digraph {
    	rankdir=LR splines=spline
    	in0 [label="IN
    (in0)" shape=cds]
    	in0 -> add0 [headlabel=0]
    	out0 [label="OUT
    (out0)" shape=cds]
    	t0 -> out0
    	add0 [label="ADD
    (add0)" shape=ellipse]
    	cmul0 -> add0 [headlabel=1]
    	cmul0 [label="CMUL
    (cmul0)" shape=ellipse]
    	add0 -> t0
    	t0 [label="T
    (t0)" shape=square]
    	t0 -> cmul0
    }"""
            assert sfg_simple_filter.sfg_digraph(branch_node=False).source in (
    
                res,
                res + "\n",
            )
    
        def test_sfg_no_port_numbering(self, sfg_simple_filter):
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            res = """digraph {
    	rankdir=LR splines=spline
    	in0 [label="IN
    (in0)" shape=cds]
    	in0 -> add0
    	out0 [label="OUT
    (out0)" shape=cds]
    	"t0.0" -> out0
    	"t0.0" [shape=point]
    	t0 -> "t0.0" [arrowhead=none]
    	add0 [label="ADD
    (add0)" shape=ellipse]
    	cmul0 -> add0
    	cmul0 [label="CMUL
    (cmul0)" shape=ellipse]
    	add0 -> t0
    	t0 [label="T
    (t0)" shape=square]
    	"t0.0" -> cmul0
    }"""
    
            assert sfg_simple_filter.sfg_digraph(port_numbering=False).source in (
    
        def test_show_sfg_invalid_format(self, sfg_simple_filter):
    
            with pytest.raises(ValueError):
    
                sfg_simple_filter.show(fmt="ppddff")
    
    
        def test_show_sfg_invalid_engine(self, sfg_simple_filter):
    
            with pytest.raises(ValueError):
    
    Frans Skarman's avatar
    Frans Skarman committed
                sfg_simple_filter.show(engine="ppddff")
    
    
    
    class TestSFGErrors:
        def test_dangling_output(self):
            in1 = Input()
            in2 = Input()
            adaptor = SymmetricTwoportAdaptor(0.5, in1, in2)
            out1 = Output(adaptor.output(0))
            # No error, maybe should be?
            _ = SFG([in1, in2], [out1])
    
        def test_unconnected_input_port(self):
            in1 = Input()
            adaptor = SymmetricTwoportAdaptor(0.5, in1)
            out1 = Output(adaptor.output(0))
            with pytest.raises(ValueError, match="Unconnected input port in SFG"):
                SFG([in1], [out1])
    
        def test_unconnected_output(self):
            in1 = Input()
            in2 = Input()
            adaptor = SymmetricTwoportAdaptor(0.5, in1, in2)
            out1 = Output(adaptor.output(0))
            out2 = Output()
    
            with pytest.raises(
    
                ValueError,
                match="At least one output operation is not connected!, Tips: Check for output ports that are connected to the same signal",
    
            ):
                SFG([in1, in2], [out1, out2])
    
    
        def test_unconnected_input(self):
            in1 = Input()
            in2 = Input()
            adaptor = SymmetricTwoportAdaptor(0.5, in1)
            out1 = Output(adaptor.output(0))
            out2 = Output(adaptor.output(1))
            # Correct error?
            with pytest.raises(ValueError, match="Unconnected input port in SFG"):
                SFG([in1, in2], [out1, out2])
    
        def test_duplicate_input(self):
            in1 = Input()
            in2 = Input()
            adaptor = SymmetricTwoportAdaptor(0.5, in1, in2)
            out1 = Output(adaptor.output(0))
            out2 = Output(adaptor.output(1))
            with pytest.raises(ValueError, match="Duplicate input operation"):
                SFG([in1, in1], [out1, out2])
    
        def test_duplicate_output(self):
            in1 = Input()
            in2 = Input()
            adaptor = SymmetricTwoportAdaptor(0.5, in1, in2)
            out1 = Output(adaptor.output(0))
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            Output(adaptor.output(1))
    
            with pytest.raises(ValueError, match="Duplicate output operation"):
                SFG([in1, in2], [out1, out1])
    
        def test_unconnected_input_signal(self):
            in1 = Input()
            in2 = Input()
            adaptor = SymmetricTwoportAdaptor(0.5, in1, in2)
            out1 = Output(adaptor.output(0))
            out2 = Output(adaptor.output(1))
            signal = Signal()
            with pytest.raises(
                ValueError, match="Input signal #0 is missing destination in SFG"
            ):
                SFG([in1, in2], [out1, out2], [signal])
    
        def test_unconnected_output_signal(self):
            in1 = Input()
            in2 = Input()
            adaptor = SymmetricTwoportAdaptor(0.5, in1, in2)
            out1 = Output(adaptor.output(0))
            out2 = Output(adaptor.output(1))
            signal = Signal()
            with pytest.raises(
                ValueError, match="Output signal #0 is missing source in SFG"
            ):
                SFG([in1, in2], [out1, out2], output_signals=[signal])
    
        def test_duplicate_input_signal(self):
            in1 = Input()
            signal = Signal()
            adaptor = SymmetricTwoportAdaptor(0.5, in1, signal)
            out1 = Output(adaptor.output(0))
            out2 = Output(adaptor.output(1))
            with pytest.raises(ValueError, match="Duplicate input signal"):
                SFG([in1], [out1, out2], [signal, signal])
    
        def test_duplicate_output_signal(self):
            in1 = Input()
            in2 = Input()
            adaptor = SymmetricTwoportAdaptor(0.5, in1, in2)
            out1 = Output(adaptor.output(0))
            signal = Signal(adaptor.output(1))
    
            with pytest.raises(
    
                ValueError,
                match="At least one output operation is not connected!, Tips: Check for output ports that are connected to the same signal",
    
            ):
                SFG([in1, in2], [out1], output_signals=[signal, signal])
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
    
        def test_dangling_input_signal(self):
            in1 = Input()
            signal = Signal()
            adaptor = SymmetricTwoportAdaptor(0.5, in1, signal)
            out1 = Output(adaptor.output(0))
            out2 = Output(adaptor.output(1))
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            with pytest.raises(ValueError, match="Dangling signal without source in SFG"):
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                SFG([in1], [out1, out2])
    
        def test_remove_signal_with_different_number_of_inputs_and_outputs(self):
            in1 = Input()
            in2 = Input()
            add1 = Addition(in1, in2, name="addition")
            out1 = Output(add1)
            sfg = SFG([in1, in2], [out1])
            # Try to remove non-existent operation
            sfg1 = sfg.remove_operation("foo")
            assert sfg1 is None
            with pytest.raises(
                ValueError,
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                match="Different number of input and output ports of operation with",
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            ):
    
                sfg.remove_operation('add0')
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
    
        def test_inputs_required_for_output(self):
            in1 = Input()
            in2 = Input()
            add1 = Addition(in1, in2, name="addition")
            out1 = Output(add1)
            sfg = SFG([in1, in2], [out1])
            with pytest.raises(
                IndexError,
                match=re.escape("Output index out of range (expected 0-0, got 1)"),
            ):
                sfg.inputs_required_for_output(1)
    
    class TestInputDuplicationBug:
        def test_input_is_not_duplicated_in_operation_list(self):
            # Inputs:
            in1 = Input(name="in1")
            out1 = Output(name="out1")
    
            # Operations:
            t1 = Delay(initial_value=0, name="")
            t1.inputs[0].connect(in1)
            add1 = t1 + in1
    
            out1.inputs[0].connect(add1)
    
            twotapfir = SFG(inputs=[in1], outputs=[out1], name='twotapfir')
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            assert len([op for op in twotapfir.operations if isinstance(op, Input)]) == 1
    
    class TestCriticalPath:
        def test_single_accumulator(self, sfg_simple_accumulator: SFG):
            sfg_simple_accumulator.set_latency_of_type(Addition.type_name(), 5)
    
            assert sfg_simple_accumulator.critical_path_time() == 5
    
    
            sfg_simple_accumulator.set_latency_of_type(Addition.type_name(), 6)
    
            assert sfg_simple_accumulator.critical_path_time() == 6
    
    class TestUnfold:
    
    Frans Skarman's avatar
    Frans Skarman committed
        def count_kinds(self, sfg: SFG) -> Dict[Type, int]:
            return Counter([type(op) for op in sfg.operations])
    
        # Checks that the number of each kind of operation in sfg2 is multiple*count
        # of the same operation in sfg1.
        # Filters out delay delays
        def assert_counts_is_correct(self, sfg1: SFG, sfg2: SFG, multiple: int):
            count1 = self.count_kinds(sfg1)
            count2 = self.count_kinds(sfg2)
    
            # Delays should not be duplicated. Check that and then clear them
            # Using get to avoid issues if there are no delays in the sfg
            assert count1.get(Delay) == count2.get(Delay)
            count1[Delay] = 0
            count2[Delay] = 0
    
            # Ensure that we aren't missing any keys, or have any extras
            assert count1.keys() == count2.keys()
    
            for k in count1.keys():
                assert count1[k] * multiple == count2[k]
    
    
        # This is horrifying, but I can't figure out a way to run the test on multiple
        # fixtures, so this is an ugly hack until someone that knows pytest comes along
    
    Frans Skarman's avatar
    Frans Skarman committed
        def test_two_inputs_two_outputs(self, sfg_two_inputs_two_outputs: SFG):
            self.do_tests(sfg_two_inputs_two_outputs)
    
        def test_twotapfir(self, sfg_two_tap_fir: SFG):
            self.do_tests(sfg_two_tap_fir)
    
        def test_delay(self, sfg_delay: SFG):
            self.do_tests(sfg_delay)
    
        def test_sfg_two_inputs_two_outputs_independent(
            self, sfg_two_inputs_two_outputs_independent: SFG
        ):
            self.do_tests(sfg_two_inputs_two_outputs_independent)
    
    
        def test_threetapiir(self, sfg_direct_form_iir_lp_filter: SFG):
    
            self.do_tests(sfg_direct_form_iir_lp_filter)
    
    
    Frans Skarman's avatar
    Frans Skarman committed
        def do_tests(self, sfg: SFG):
            for factor in range(2, 4):
                # Ensure that the correct number of operations get created
                unfolded = sfg.unfold(factor)
    
                self.assert_counts_is_correct(sfg, unfolded, factor)
    
    Frans Skarman's avatar
    Frans Skarman committed
                double_unfolded = sfg.unfold(factor).unfold(factor)
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                self.assert_counts_is_correct(sfg, double_unfolded, factor * factor)
    
    Frans Skarman's avatar
    Frans Skarman committed
    
                NUM_TESTS = 5
                # Evaluate with some random values
                # To avoid problems with missing inputs at the end of the sequence,
                # we generate i*(some large enough) number
                input_list = [
                    [random.random() for _ in range(0, NUM_TESTS * factor)]
                    for _ in sfg.inputs
                ]
    
                sim = Simulation(sfg, input_list)
                sim.run()
                ref = sim.results
    
                # We have i copies of the inputs, each sourcing their input from the orig
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                unfolded_input_lists = [[] for _ in range(len(sfg.inputs) * factor)]
    
    Frans Skarman's avatar
    Frans Skarman committed
                for t in range(0, NUM_TESTS):
                    for n in range(0, factor):
                        for k in range(0, len(sfg.inputs)):
                            unfolded_input_lists[k + n * len(sfg.inputs)].append(
                                input_list[k][t * factor + n]
                            )
    
                sim = Simulation(unfolded, unfolded_input_lists)
                sim.run()
                unfolded_results = sim.results
    
                for n, _ in enumerate(sfg.outputs):
                    # Outputs for an original output
                    ref_values = list(ref[ResultKey(f"{n}")])
    
                    # Output n will be split into `factor` output ports, compute the
    
                    # indices where we find the outputs
    
    Frans Skarman's avatar
    Frans Skarman committed
                    out_indices = [n + k * len(sfg.outputs) for k in range(factor)]
                    u_values = [
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        [unfolded_results[ResultKey(f"{idx}")][k] for idx in out_indices]
    
    Frans Skarman's avatar
    Frans Skarman committed
                        for k in range(int(NUM_TESTS))
                    ]
    
                    flat_u_values = list(itertools.chain.from_iterable(u_values))
    
                    assert flat_u_values == ref_values
    
        def test_value_error(self, sfg_two_inputs_two_outputs: SFG):
            sfg = sfg_two_inputs_two_outputs
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            with pytest.raises(ValueError, match="Unfolding 0 times removes the SFG"):
    
    Frans Skarman's avatar
    Frans Skarman committed
                sfg.unfold(0)
    
    
    
    class TestIsLinear:
        def test_single_accumulator(self, sfg_simple_accumulator: SFG):
            assert sfg_simple_accumulator.is_linear
    
        def test_sfg_nested(self, sfg_nested: SFG):
            assert not sfg_nested.is_linear
    
    
    class TestIsConstant:
        def test_single_accumulator(self, sfg_simple_accumulator: SFG):
            assert not sfg_simple_accumulator.is_constant
    
        def test_sfg_nested(self, sfg_nested: SFG):
            assert not sfg_nested.is_constant
    
    class TestSwapIOOfOperation:
        def do_test(self, sfg: SFG, graph_id: GraphID):
            NUM_TESTS = 5
            # Evaluate with some random values
            # To avoid problems with missing inputs at the end of the sequence,
            # we generate i*(some large enough) number
            input_list = [
                [random.random() for _ in range(0, NUM_TESTS)] for _ in sfg.inputs
            ]
            sim_ref = Simulation(sfg, input_list)
            sim_ref.run()
    
            sfg.swap_io_of_operation(graph_id)
            sim_swap = Simulation(sfg, input_list)
            sim_swap.run()
            for n, _ in enumerate(sfg.outputs):
                ref_values = list(sim_ref.results[ResultKey(f"{n}")])
                swap_values = list(sim_swap.results[ResultKey(f"{n}")])
                assert ref_values == swap_values
    
        def test_single_accumulator(self, sfg_simple_accumulator: SFG):
            self.do_test(sfg_simple_accumulator, 'add1')
    
    
    
    class TestInsertComponentAfter:
        def test_insert_component_after_in_sfg(self, large_operation_tree_names):
            sfg = SFG(outputs=[Output(large_operation_tree_names)])
            sqrt = SquareRoot()
    
            _sfg = sfg.insert_operation_after(
                sfg.find_by_name("constant4")[0].graph_id, sqrt
            )
            assert _sfg.evaluate() != sfg.evaluate()
    
            assert any([isinstance(comp, SquareRoot) for comp in _sfg.operations])
            assert not any([isinstance(comp, SquareRoot) for comp in sfg.operations])
    
            assert not isinstance(
                sfg.find_by_name("constant4")[0].output(0).signals[0].destination.operation,
                SquareRoot,
            )
            assert isinstance(
                _sfg.find_by_name("constant4")[0]
                .output(0)
                .signals[0]
                .destination.operation,
                SquareRoot,
            )
    
            assert sfg.find_by_name("constant4")[0].output(0).signals[
                0
    
            ].destination.operation is sfg.find_by_id("add2")
    
            assert _sfg.find_by_name("constant4")[0].output(0).signals[
                0
    
            ].destination.operation is not _sfg.find_by_id("add2")
            assert _sfg.find_by_id("sqrt0").output(0).signals[
    
            ].destination.operation is _sfg.find_by_id("add2")
    
    
        def test_insert_component_after_mimo_operation_error(
            self, large_operation_tree_names
        ):
            sfg = SFG(outputs=[Output(large_operation_tree_names)])
            with pytest.raises(
                TypeError, match="Only operations with one input and one output"
            ):
                sfg.insert_operation_after('constant4', SymmetricTwoportAdaptor(0.5))
    
        def test_insert_component_after_unknown_component_error(
            self, large_operation_tree_names
        ):
            sfg = SFG(outputs=[Output(large_operation_tree_names)])
            with pytest.raises(ValueError, match="Unknown component:"):
                sfg.insert_operation_after('foo', SquareRoot())
    
    class TestInsertComponentBefore:
        def test_insert_component_before_in_sfg(self, butterfly_operation_tree):
            sfg = SFG(outputs=list(map(Output, butterfly_operation_tree.outputs)))
            sqrt = SquareRoot()
    
            _sfg = sfg.insert_operation_before(
                sfg.find_by_name("bfly1")[0].graph_id, sqrt, port=0
            )
            assert _sfg.evaluate() != sfg.evaluate()
    
            assert any([isinstance(comp, SquareRoot) for comp in _sfg.operations])
            assert not any([isinstance(comp, SquareRoot) for comp in sfg.operations])
    
            assert not isinstance(
                sfg.find_by_name("bfly1")[0].input(0).signals[0].source.operation,
                SquareRoot,
            )
            assert isinstance(
    
                _sfg.find_by_name("bfly1")[0].input(0).signals[0].source.operation,
    
            assert (
                sfg.find_by_name("bfly1")[0].input(0).signals[0].source.operation
                is sfg.find_by_name("bfly2")[0]
            )
            assert (
                _sfg.find_by_name("bfly1")[0].input(0).signals[0].destination.operation
                is not _sfg.find_by_name("bfly2")[0]
            )
            assert (
                _sfg.find_by_id("sqrt0").input(0).signals[0].source.operation
                is _sfg.find_by_name("bfly2")[0]
            )
    
    
        def test_insert_component_before_mimo_operation_error(
            self, large_operation_tree_names
        ):
            sfg = SFG(outputs=[Output(large_operation_tree_names)])
            with pytest.raises(
                TypeError, match="Only operations with one input and one output"
            ):
                sfg.insert_operation_before('add0', SymmetricTwoportAdaptor(0.5), port=0)
    
        def test_insert_component_before_unknown_component_error(
            self, large_operation_tree_names
        ):
            sfg = SFG(outputs=[Output(large_operation_tree_names)])
            with pytest.raises(ValueError, match="Unknown component:"):
                sfg.insert_operation_before('foo', SquareRoot())
    
    
    
    class TestGetUsedTypeNames:
        def test_single_accumulator(self, sfg_simple_accumulator: SFG):
            assert sfg_simple_accumulator.get_used_type_names() == ['add', 'in', 'out', 't']
    
        def test_sfg_nested(self, sfg_nested: SFG):
            assert sfg_nested.get_used_type_names() == ['in', 'out', 'sfg']
    
        def test_large_operation_tree(self, large_operation_tree):
            sfg = SFG(outputs=[Output(large_operation_tree)])
            assert sfg.get_used_type_names() == ['add', 'c', 'out']
    
    class Test_Keep_GraphIDs:
        def test_single_accumulator(self):
    
            i = Input()
            d = Delay()
            o = Output(d)
            c = ConstantMultiplication(0.5, d)
            a = Addition(i, c)
            d.input(0).connect(a)
    
            sfg = SFG([i], [o])
            sfg = sfg.insert_operation_before('t0', ConstantMultiplication(8))
            sfg = sfg.insert_operation_after('t0', ConstantMultiplication(8))
            sfg = sfg.insert_operation(ConstantMultiplication(8), 't0')
            assert sfg.get_used_graph_ids() == {
                'add0',
                'cmul0',
                'cmul1',
                'cmul2',
                'cmul3',
                'in0',
                'out0',
                't0',
            }
    
        def test_large_operation_tree(self, large_operation_tree):
            sfg = SFG(outputs=[Output(large_operation_tree)])
            assert sfg.get_used_type_names() == ['add', 'c', 'out']
    
    
    
    class TestInsertDelays:
        def test_insert_delays_before_operation(self):
            in1 = Input()
            bfly = Butterfly()
            d1 = bfly.input(0).delay(2)
            d2 = bfly.input(1).delay(1)
            d1 <<= in1
            d2 <<= in1
            out1 = Output(bfly.output(0))
            out2 = Output(bfly.output(1))
            sfg = SFG([in1], [out1, out2])
    
            d_type_name = d1.operation.type_name()
    
            assert len(sfg.find_by_type_name(d_type_name)) == 3
    
            sfg.find_by_id('out1').input(0).delay(3)
            sfg = sfg()
    
            assert len(sfg.find_by_type_name(d_type_name)) == 6
            source1 = sfg.find_by_id('out1').input(0).signals[0].source.operation
            source2 = source1.input(0).signals[0].source.operation
            source3 = source2.input(0).signals[0].source.operation
            source4 = source3.input(0).signals[0].source.operation
            assert source1.type_name() == d_type_name
            assert source2.type_name() == d_type_name
            assert source3.type_name() == d_type_name
            assert source4.type_name() == bfly.type_name()
    
    Samuel Fagerlund's avatar
    Samuel Fagerlund committed
    
    
    class TestIterationPeriodBound:
        def test_accumulator(self, sfg_simple_accumulator):
            sfg_simple_accumulator.set_latency_of_type('add', 2)
            assert sfg_simple_accumulator.iteration_period_bound() == 2
    
        def test_no_latency(self, sfg_simple_accumulator):
            with pytest.raises(
                ValueError,
                match="All native offsets have to set to a non-negative value to",
            ):
                sfg_simple_accumulator.iteration_period_bound()
    
        def test_secondorder_iir(self, precedence_sfg_delays):
            precedence_sfg_delays.set_latency_of_type('add', 2)
            precedence_sfg_delays.set_latency_of_type('cmul', 3)
            assert precedence_sfg_delays.iteration_period_bound() == 10
    
        def test_no_delays(self, sfg_two_inputs_two_outputs):
            assert sfg_two_inputs_two_outputs.iteration_period_bound() == -1
    
    
    class TestStateSpace:
        def test_accumulator(self, sfg_simple_accumulator):
            ss = sfg_simple_accumulator.state_space_representation()
            assert ss[0] == ['v0', 'y0']
            assert (ss[1] == np.array([[1.0, 1.0], [0.0, 1.0]])).all()
            assert ss[2] == ['v0', 'x0']
    
        def test_secondorder_iir(self, precedence_sfg_delays):
            ss = precedence_sfg_delays.state_space_representation()
            assert ss[0] == ['v0', 'v1', 'y0']
    
    
    Samuel Fagerlund's avatar
    Samuel Fagerlund committed
            mat = np.array([[3.0, 2.0, 5.0], [1.0, 0.0, 0.0], [4.0, 6.0, 35.0]])
    
    Samuel Fagerlund's avatar
    Samuel Fagerlund committed
            assert (ss[1] == mat).all()
            assert ss[2] == ['v0', 'v1', 'x0']
    
    
    Samuel Fagerlund's avatar
    Samuel Fagerlund committed
        # @pytest.mark.xfail()
        def test_sfg_two_inputs_two_outputs(self, sfg_two_inputs_two_outputs):
    
    Samuel Fagerlund's avatar
    Samuel Fagerlund committed
            ss = sfg_two_inputs_two_outputs.state_space_representation()
    
            assert ss[0] == ['y0', 'y1']
            assert (ss[1] == np.array([[1.0, 1.0], [1.0, 2.0]])).all()
            assert ss[2] == ['x0', 'x1']
    
    Samuel Fagerlund's avatar
    Samuel Fagerlund committed
    
        def test_sfg_two_inputs_two_outputs_independent(
            self, sfg_two_inputs_two_outputs_independent
        ):
            # assert sfg_two_inputs_two_outputs_independent.state_space_representation() == 1
            ss = sfg_two_inputs_two_outputs_independent.state_space_representation()
    
            assert ss[0] == ['y0', 'y1']
            assert (ss[1] == np.array([[1.0, 0.0], [0.0, 4.0]])).all()
            assert ss[2] == ['x0', 'x1']
    
        def test_sfg_two_inputs_two_outputs_independent_with_cmul(
            self, sfg_two_inputs_two_outputs_independent_with_cmul
        ):
            ss = (
                sfg_two_inputs_two_outputs_independent_with_cmul.state_space_representation()
            )
    
            assert ss[0] == ['y0', 'y1']
            assert (ss[1] == np.array([[20.0, 0.0], [0.0, 8.0]])).all()
            assert ss[2] == ['x0', 'x1']