Skip to content
Snippets Groups Projects
test_sfg.py 55.9 KiB
Newer Older
  • Learn to ignore specific revisions
  • Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                op.name
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                for op in sfg_simple_filter.find_by_name("ADD1")[0].preceding_operations
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                op.name for op in new_sfg.find_by_name("ADD1")[0].preceding_operations
    
            assert "S1" in {
                sig.name
                for sig in sfg_simple_filter.find_by_name("T1")[0].output(0).signals
            }
            assert "S2" in {
                sig.name for sig in new_sfg.find_by_name("T1")[0].output(0).signals
            }
    
    
        def test_remove_multiple_inputs_outputs(self, butterfly_operation_tree):
            out1 = Output(butterfly_operation_tree.output(0), "OUT1")
            out2 = Output(butterfly_operation_tree.output(1), "OUT2")
    
            sfg = SFG(outputs=[out1, out2])
    
            new_sfg = sfg.remove_operation(sfg.find_by_name("bfly2")[0].graph_id)
    
            assert sfg.find_by_name("bfly3")[0].output(0).signal_count == 1
            assert new_sfg.find_by_name("bfly3")[0].output(0).signal_count == 1
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            sfg_dest_0 = sfg.find_by_name("bfly3")[0].output(0).signals[0].destination
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            new_sfg_dest_0 = (
                new_sfg.find_by_name("bfly3")[0].output(0).signals[0].destination
            )
    
    
            assert sfg_dest_0.index == 0
            assert new_sfg_dest_0.index == 0
            assert sfg_dest_0.operation.name == "bfly2"
            assert new_sfg_dest_0.operation.name == "bfly1"
    
            assert sfg.find_by_name("bfly3")[0].output(1).signal_count == 1
            assert new_sfg.find_by_name("bfly3")[0].output(1).signal_count == 1
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            sfg_dest_1 = sfg.find_by_name("bfly3")[0].output(1).signals[0].destination
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            new_sfg_dest_1 = (
                new_sfg.find_by_name("bfly3")[0].output(1).signals[0].destination
            )
    
    
            assert sfg_dest_1.index == 1
            assert new_sfg_dest_1.index == 1
            assert sfg_dest_1.operation.name == "bfly2"
            assert new_sfg_dest_1.operation.name == "bfly1"
    
            assert sfg.find_by_name("bfly1")[0].input(0).signal_count == 1
            assert new_sfg.find_by_name("bfly1")[0].input(0).signal_count == 1
    
            sfg_source_0 = sfg.find_by_name("bfly1")[0].input(0).signals[0].source
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            new_sfg_source_0 = new_sfg.find_by_name("bfly1")[0].input(0).signals[0].source
    
    
            assert sfg_source_0.index == 0
            assert new_sfg_source_0.index == 0
            assert sfg_source_0.operation.name == "bfly2"
            assert new_sfg_source_0.operation.name == "bfly3"
    
            sfg_source_1 = sfg.find_by_name("bfly1")[0].input(1).signals[0].source
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            new_sfg_source_1 = new_sfg.find_by_name("bfly1")[0].input(1).signals[0].source
    
    
            assert sfg_source_1.index == 1
            assert new_sfg_source_1.index == 1
            assert sfg_source_1.operation.name == "bfly2"
            assert new_sfg_source_1.operation.name == "bfly3"
    
    
            assert "bfly2" not in {op.name for op in new_sfg.operations}
    
    
        def remove_different_number_inputs_outputs(self, sfg_simple_filter):
            with pytest.raises(ValueError):
                sfg_simple_filter.remove_operation("add1")
    
    
    class TestSaveLoadSFG:
        def get_path(self, existing=False):
            path_ = "".join(random.choices(string.ascii_uppercase, k=4)) + ".py"
            while path.exists(path_) if not existing else not path.exists(path_):
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                path_ = "".join(random.choices(string.ascii_uppercase, k=4)) + ".py"
    
    
            return path_
    
        def test_save_simple_sfg(self, sfg_simple_filter):
            result = sfg_to_python(sfg_simple_filter)
            path_ = self.get_path()
    
            assert not path.exists(path_)
            with open(path_, "w") as file_obj:
                file_obj.write(result)
    
            assert path.exists(path_)
    
    
            with open(path_) as file_obj:
    
                assert file_obj.read() == result
    
            remove(path_)
    
        def test_save_complex_sfg(self, precedence_sfg_delays_and_constants):
            result = sfg_to_python(precedence_sfg_delays_and_constants)
            path_ = self.get_path()
    
            assert not path.exists(path_)
            with open(path_, "w") as file_obj:
                file_obj.write(result)
    
            assert path.exists(path_)
    
    
            with open(path_) as file_obj:
    
                assert file_obj.read() == result
    
            remove(path_)
    
        def test_load_simple_sfg(self, sfg_simple_filter):
            result = sfg_to_python(sfg_simple_filter)
            path_ = self.get_path()
    
            assert not path.exists(path_)
            with open(path_, "w") as file_obj:
                file_obj.write(result)
    
            assert path.exists(path_)
    
            simple_filter_, _ = python_to_sfg(path_)
    
            assert str(sfg_simple_filter) == str(simple_filter_)
            assert sfg_simple_filter.evaluate([2]) == simple_filter_.evaluate([2])
    
            remove(path_)
    
        def test_load_complex_sfg(self, precedence_sfg_delays_and_constants):
            result = sfg_to_python(precedence_sfg_delays_and_constants)
            path_ = self.get_path()
    
            assert not path.exists(path_)
            with open(path_, "w") as file_obj:
                file_obj.write(result)
    
            assert path.exists(path_)
    
            precedence_sfg_registers_and_constants_, _ = python_to_sfg(path_)
    
            assert str(precedence_sfg_delays_and_constants) == str(
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                precedence_sfg_registers_and_constants_
            )
    
    
            remove(path_)
    
        def test_load_invalid_path(self):
            path_ = self.get_path(existing=False)
    
            with pytest.raises(FileNotFoundError):
    
                python_to_sfg(path_)
    
    
    class TestGetComponentsOfType:
        def test_get_no_operations_of_type(self, sfg_two_inputs_two_outputs):
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            assert [
                op.name
                for op in sfg_two_inputs_two_outputs.find_by_type_name(
                    Multiplication.type_name()
                )
            ] == []
    
    
        def test_get_multple_operations_of_type(self, sfg_two_inputs_two_outputs):
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            assert [
                op.name
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                for op in sfg_two_inputs_two_outputs.find_by_type_name(Addition.type_name())
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            ] == ["ADD1", "ADD2"]
    
            assert [
                op.name
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                for op in sfg_two_inputs_two_outputs.find_by_type_name(Input.type_name())
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            ] == ["IN1", "IN2"]
    
            assert [
                op.name
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                for op in sfg_two_inputs_two_outputs.find_by_type_name(Output.type_name())
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            ] == ["OUT1", "OUT2"]
    
    
    
    class TestPrecedenceGraph:
        def test_precedence_graph(self, sfg_simple_filter):
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            res = (
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                'digraph {\n\trankdir=LR\n\tsubgraph cluster_0 {\n\t\tlabel=N0\n\t\t"in1.0"'
                ' [label=in1 height=0.1 shape=rectangle width=0.1]\n\t\t"t1.0" [label=t1'
                ' height=0.1 shape=rectangle width=0.1]\n\t}\n\tsubgraph cluster_1'
                ' {\n\t\tlabel=N1\n\t\t"cmul1.0" [label=cmul1 height=0.1 shape=rectangle'
                ' width=0.1]\n\t}\n\tsubgraph cluster_2 {\n\t\tlabel=N2\n\t\t"add1.0"'
                ' [label=add1 height=0.1 shape=rectangle width=0.1]\n\t}\n\t"in1.0" ->'
                ' add1\n\tadd1 [label=add1 shape=ellipse]\n\tin1 -> "in1.0"\n\tin1'
                ' [label=in1 shape=cds]\n\t"t1.0" -> cmul1\n\tcmul1 [label=cmul1'
                ' shape=ellipse]\n\t"t1.0" -> out1\n\tout1 [label=out1 shape=cds]\n\tt1Out'
                ' -> "t1.0"\n\tt1Out [label=t1 shape=square]\n\t"cmul1.0" -> add1\n\tadd1'
                ' [label=add1 shape=ellipse]\n\tcmul1 -> "cmul1.0"\n\tcmul1 [label=cmul1'
    
                ' shape=ellipse]\n\t"add1.0" -> t1In\n\tt1In [label=t1'
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                ' shape=square]\n\tadd1 -> "add1.0"\n\tadd1 [label=add1 shape=ellipse]\n}'
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            assert sfg_simple_filter.precedence_graph.source in (res, res + "\n")
    
    
    
    class TestSFGGraph:
        def test_sfg(self, sfg_simple_filter):
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            res = (
    
                'digraph {\n\trankdir=LR splines=spline\n\tin1 [shape=cds]\n\tin1 -> add1'
                ' [headlabel=0]\n\tout1 [shape=cds]\n\tt1 -> out1\n\tadd1'
                ' [shape=ellipse]\n\tcmul1 -> add1 [headlabel=1]\n\tcmul1'
                ' [shape=ellipse]\n\tadd1 -> t1\n\tt1 [shape=square]\n\tt1 -> cmul1\n}'
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            )
    
            assert sfg_simple_filter.sfg_digraph(branch_node=False).source in (
                res,
                res + "\n",
            )
    
    
        def test_sfg_show_id(self, sfg_simple_filter):
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            res = (
    
                'digraph {\n\trankdir=LR splines=spline\n\tin1 [shape=cds]\n\tin1 -> add1'
                ' [label=s1 headlabel=0]\n\tout1 [shape=cds]\n\tt1 -> out1'
                ' [label=s2]\n\tadd1 [shape=ellipse]\n\tcmul1 -> add1 [label=s3'
                ' headlabel=1]\n\tcmul1 [shape=ellipse]\n\tadd1 -> t1 [label=s4]\n\tt1'
    
                ' [shape=square]\n\tt1 -> cmul1 [label=s5]\n}'
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            )
    
            assert sfg_simple_filter.sfg_digraph(
                show_id=True, branch_node=False
            ).source in (
    
    Frans Skarman's avatar
    Frans Skarman committed
                res,
                res + "\n",
            )
    
        def test_sfg_branch(self, sfg_simple_filter):
            res = (
                'digraph {\n\trankdir=LR splines=spline\n\tin1 [shape=cds]\n\tin1 -> add1'
                ' [headlabel=0]\n\tout1 [shape=cds]\n\t"t1.0" -> out1\n\t"t1.0"'
                ' [shape=point]\n\tt1 -> "t1.0" [arrowhead=none]\n\tadd1'
                ' [shape=ellipse]\n\tcmul1 -> add1 [headlabel=1]\n\tcmul1'
                ' [shape=ellipse]\n\tadd1 -> t1\n\tt1 [shape=square]\n\t"t1.0" ->'
                ' cmul1\n}'
            )
    
    
            assert sfg_simple_filter.sfg_digraph().source in (
    
                res,
                res + "\n",
            )
    
        def test_sfg_no_port_numbering(self, sfg_simple_filter):
            res = (
                'digraph {\n\trankdir=LR splines=spline\n\tin1 [shape=cds]\n\tin1 ->'
                ' add1\n\tout1 [shape=cds]\n\tt1 -> out1\n\tadd1 [shape=ellipse]\n\tcmul1'
                ' -> add1\n\tcmul1 [shape=ellipse]\n\tadd1 -> t1\n\tt1 [shape=square]\n\tt1'
                ' -> cmul1\n}'
            )
    
    
            assert sfg_simple_filter.sfg_digraph(
                port_numbering=False, branch_node=False
            ).source in (
    
        def test_show_sfg_invalid_format(self, sfg_simple_filter):
    
            with pytest.raises(ValueError):
    
                sfg_simple_filter.show(fmt="ppddff")
    
    
        def test_show_sfg_invalid_engine(self, sfg_simple_filter):
    
            with pytest.raises(ValueError):
    
    Frans Skarman's avatar
    Frans Skarman committed
                sfg_simple_filter.show(engine="ppddff")
    
    
    
    class TestSFGErrors:
        def test_dangling_output(self):
            in1 = Input()
            in2 = Input()
            adaptor = SymmetricTwoportAdaptor(0.5, in1, in2)
            out1 = Output(adaptor.output(0))
            # No error, maybe should be?
            _ = SFG([in1, in2], [out1])
    
        def test_unconnected_input_port(self):
            in1 = Input()
            adaptor = SymmetricTwoportAdaptor(0.5, in1)
            out1 = Output(adaptor.output(0))
            with pytest.raises(ValueError, match="Unconnected input port in SFG"):
                SFG([in1], [out1])
    
        def test_unconnected_output(self):
            in1 = Input()
            in2 = Input()
            adaptor = SymmetricTwoportAdaptor(0.5, in1, in2)
            out1 = Output(adaptor.output(0))
            out2 = Output()
            # No error, should be
            SFG([in1, in2], [out1, out2])
    
        def test_unconnected_input(self):
            in1 = Input()
            in2 = Input()
            adaptor = SymmetricTwoportAdaptor(0.5, in1)
            out1 = Output(adaptor.output(0))
            out2 = Output(adaptor.output(1))
            # Correct error?
            with pytest.raises(ValueError, match="Unconnected input port in SFG"):
                SFG([in1, in2], [out1, out2])
    
        def test_duplicate_input(self):
            in1 = Input()
            in2 = Input()
            adaptor = SymmetricTwoportAdaptor(0.5, in1, in2)
            out1 = Output(adaptor.output(0))
            out2 = Output(adaptor.output(1))
            with pytest.raises(ValueError, match="Duplicate input operation"):
                SFG([in1, in1], [out1, out2])
    
        def test_duplicate_output(self):
            in1 = Input()
            in2 = Input()
            adaptor = SymmetricTwoportAdaptor(0.5, in1, in2)
            out1 = Output(adaptor.output(0))
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            Output(adaptor.output(1))
    
            with pytest.raises(ValueError, match="Duplicate output operation"):
                SFG([in1, in2], [out1, out1])
    
        def test_unconnected_input_signal(self):
            in1 = Input()
            in2 = Input()
            adaptor = SymmetricTwoportAdaptor(0.5, in1, in2)
            out1 = Output(adaptor.output(0))
            out2 = Output(adaptor.output(1))
            signal = Signal()
            with pytest.raises(
                ValueError, match="Input signal #0 is missing destination in SFG"
            ):
                SFG([in1, in2], [out1, out2], [signal])
    
        def test_unconnected_output_signal(self):
            in1 = Input()
            in2 = Input()
            adaptor = SymmetricTwoportAdaptor(0.5, in1, in2)
            out1 = Output(adaptor.output(0))
            out2 = Output(adaptor.output(1))
            signal = Signal()
            with pytest.raises(
                ValueError, match="Output signal #0 is missing source in SFG"
            ):
                SFG([in1, in2], [out1, out2], output_signals=[signal])
    
        def test_duplicate_input_signal(self):
            in1 = Input()
            signal = Signal()
            adaptor = SymmetricTwoportAdaptor(0.5, in1, signal)
            out1 = Output(adaptor.output(0))
            out2 = Output(adaptor.output(1))
            with pytest.raises(ValueError, match="Duplicate input signal"):
                SFG([in1], [out1, out2], [signal, signal])
    
        def test_duplicate_output_signal(self):
            in1 = Input()
            in2 = Input()
            adaptor = SymmetricTwoportAdaptor(0.5, in1, in2)
            out1 = Output(adaptor.output(0))
            signal = Signal(adaptor.output(1))
            # Should raise?
            SFG([in1, in2], [out1], output_signals=[signal, signal])
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
    
        def test_dangling_input_signal(self):
            in1 = Input()
            signal = Signal()
            adaptor = SymmetricTwoportAdaptor(0.5, in1, signal)
            out1 = Output(adaptor.output(0))
            out2 = Output(adaptor.output(1))
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            with pytest.raises(ValueError, match="Dangling signal without source in SFG"):
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                SFG([in1], [out1, out2])
    
        def test_remove_signal_with_different_number_of_inputs_and_outputs(self):
            in1 = Input()
            in2 = Input()
            add1 = Addition(in1, in2, name="addition")
            out1 = Output(add1)
            sfg = SFG([in1, in2], [out1])
            # Try to remove non-existent operation
            sfg1 = sfg.remove_operation("foo")
            assert sfg1 is None
            with pytest.raises(
                ValueError,
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                match="Different number of input and output ports of operation with",
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            ):
                sfg.remove_operation('add1')
    
        def test_inputs_required_for_output(self):
            in1 = Input()
            in2 = Input()
            add1 = Addition(in1, in2, name="addition")
            out1 = Output(add1)
            sfg = SFG([in1, in2], [out1])
            with pytest.raises(
                IndexError,
                match=re.escape("Output index out of range (expected 0-0, got 1)"),
            ):
                sfg.inputs_required_for_output(1)
    
    class TestInputDuplicationBug:
        def test_input_is_not_duplicated_in_operation_list(self):
            # Inputs:
            in1 = Input(name="in1")
            out1 = Output(name="out1")
    
            # Operations:
            t1 = Delay(initial_value=0, name="")
            t1.inputs[0].connect(in1)
            add1 = t1 + in1
    
            out1.inputs[0].connect(add1)
    
            twotapfir = SFG(inputs=[in1], outputs=[out1], name='twotapfir')
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            assert len([op for op in twotapfir.operations if isinstance(op, Input)]) == 1
    
    class TestCriticalPath:
        def test_single_accumulator(self, sfg_simple_accumulator: SFG):
            sfg_simple_accumulator.set_latency_of_type(Addition.type_name(), 5)
    
            assert sfg_simple_accumulator.critical_path_time() == 5
    
    
            sfg_simple_accumulator.set_latency_of_type(Addition.type_name(), 6)
    
            assert sfg_simple_accumulator.critical_path_time() == 6
    
    class TestUnfold:
    
    Frans Skarman's avatar
    Frans Skarman committed
        def count_kinds(self, sfg: SFG) -> Dict[Type, int]:
            return Counter([type(op) for op in sfg.operations])
    
        # Checks that the number of each kind of operation in sfg2 is multiple*count
        # of the same operation in sfg1.
        # Filters out delay delays
        def assert_counts_is_correct(self, sfg1: SFG, sfg2: SFG, multiple: int):
            count1 = self.count_kinds(sfg1)
            count2 = self.count_kinds(sfg2)
    
            # Delays should not be duplicated. Check that and then clear them
            # Using get to avoid issues if there are no delays in the sfg
            assert count1.get(Delay) == count2.get(Delay)
            count1[Delay] = 0
            count2[Delay] = 0
    
            # Ensure that we aren't missing any keys, or have any extras
            assert count1.keys() == count2.keys()
    
            for k in count1.keys():
                assert count1[k] * multiple == count2[k]
    
    
        # This is horrifying, but I can't figure out a way to run the test on multiple
        # fixtures, so this is an ugly hack until someone that knows pytest comes along
    
    Frans Skarman's avatar
    Frans Skarman committed
        def test_two_inputs_two_outputs(self, sfg_two_inputs_two_outputs: SFG):
            self.do_tests(sfg_two_inputs_two_outputs)
    
        def test_twotapfir(self, sfg_two_tap_fir: SFG):
            self.do_tests(sfg_two_tap_fir)
    
        def test_delay(self, sfg_delay: SFG):
            self.do_tests(sfg_delay)
    
        def test_sfg_two_inputs_two_outputs_independent(
            self, sfg_two_inputs_two_outputs_independent: SFG
        ):
            self.do_tests(sfg_two_inputs_two_outputs_independent)
    
        def do_tests(self, sfg: SFG):
            for factor in range(2, 4):
                # Ensure that the correct number of operations get created
                unfolded = sfg.unfold(factor)
    
                self.assert_counts_is_correct(sfg, unfolded, factor)
    
    Frans Skarman's avatar
    Frans Skarman committed
                double_unfolded = sfg.unfold(factor).unfold(factor)
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                self.assert_counts_is_correct(sfg, double_unfolded, factor * factor)
    
    Frans Skarman's avatar
    Frans Skarman committed
    
                NUM_TESTS = 5
                # Evaluate with some random values
                # To avoid problems with missing inputs at the end of the sequence,
                # we generate i*(some large enough) number
                input_list = [
                    [random.random() for _ in range(0, NUM_TESTS * factor)]
                    for _ in sfg.inputs
                ]
    
                sim = Simulation(sfg, input_list)
                sim.run()
                ref = sim.results
    
                # We have i copies of the inputs, each sourcing their input from the orig
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                unfolded_input_lists = [[] for _ in range(len(sfg.inputs) * factor)]
    
    Frans Skarman's avatar
    Frans Skarman committed
                for t in range(0, NUM_TESTS):
                    for n in range(0, factor):
                        for k in range(0, len(sfg.inputs)):
                            unfolded_input_lists[k + n * len(sfg.inputs)].append(
                                input_list[k][t * factor + n]
                            )
    
                sim = Simulation(unfolded, unfolded_input_lists)
                sim.run()
                unfolded_results = sim.results
    
                for n, _ in enumerate(sfg.outputs):
                    # Outputs for an original output
                    ref_values = list(ref[ResultKey(f"{n}")])
    
                    # Output n will be split into `factor` output ports, compute the
                    # indicies where we find the outputs
                    out_indices = [n + k * len(sfg.outputs) for k in range(factor)]
                    u_values = [
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        [unfolded_results[ResultKey(f"{idx}")][k] for idx in out_indices]
    
    Frans Skarman's avatar
    Frans Skarman committed
                        for k in range(int(NUM_TESTS))
                    ]
    
                    flat_u_values = list(itertools.chain.from_iterable(u_values))
    
                    assert flat_u_values == ref_values
    
        def test_value_error(self, sfg_two_inputs_two_outputs: SFG):
            sfg = sfg_two_inputs_two_outputs
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            with pytest.raises(ValueError, match="Unfolding 0 times removes the SFG"):
    
    Frans Skarman's avatar
    Frans Skarman committed
                sfg.unfold(0)
    
    
    
    class TestIsLinear:
        def test_single_accumulator(self, sfg_simple_accumulator: SFG):
            assert sfg_simple_accumulator.is_linear
    
        def test_sfg_nested(self, sfg_nested: SFG):
            assert not sfg_nested.is_linear
    
    
    class TestIsConstant:
        def test_single_accumulator(self, sfg_simple_accumulator: SFG):
            assert not sfg_simple_accumulator.is_constant
    
        def test_sfg_nested(self, sfg_nested: SFG):
            assert not sfg_nested.is_constant
    
    class TestSwapIOOfOperation:
        def do_test(self, sfg: SFG, graph_id: GraphID):
            NUM_TESTS = 5
            # Evaluate with some random values
            # To avoid problems with missing inputs at the end of the sequence,
            # we generate i*(some large enough) number
            input_list = [
                [random.random() for _ in range(0, NUM_TESTS)] for _ in sfg.inputs
            ]
            sim_ref = Simulation(sfg, input_list)
            sim_ref.run()
    
            sfg.swap_io_of_operation(graph_id)
            sim_swap = Simulation(sfg, input_list)
            sim_swap.run()
            for n, _ in enumerate(sfg.outputs):
                ref_values = list(sim_ref.results[ResultKey(f"{n}")])
                swap_values = list(sim_swap.results[ResultKey(f"{n}")])
                assert ref_values == swap_values
    
        def test_single_accumulator(self, sfg_simple_accumulator: SFG):
            self.do_test(sfg_simple_accumulator, 'add1')
    
    
    
    class TestInsertComponentAfter:
        def test_insert_component_after_in_sfg(self, large_operation_tree_names):
            sfg = SFG(outputs=[Output(large_operation_tree_names)])
            sqrt = SquareRoot()
    
            _sfg = sfg.insert_operation_after(
                sfg.find_by_name("constant4")[0].graph_id, sqrt
            )
            assert _sfg.evaluate() != sfg.evaluate()
    
            assert any([isinstance(comp, SquareRoot) for comp in _sfg.operations])
            assert not any([isinstance(comp, SquareRoot) for comp in sfg.operations])
    
            assert not isinstance(
                sfg.find_by_name("constant4")[0].output(0).signals[0].destination.operation,
                SquareRoot,
            )
            assert isinstance(
                _sfg.find_by_name("constant4")[0]
                .output(0)
                .signals[0]
                .destination.operation,
                SquareRoot,
            )
    
            assert sfg.find_by_name("constant4")[0].output(0).signals[
                0
            ].destination.operation is sfg.find_by_id("add3")
            assert _sfg.find_by_name("constant4")[0].output(0).signals[
                0
            ].destination.operation is not _sfg.find_by_id("add3")
            assert _sfg.find_by_id("sqrt1").output(0).signals[
                0
            ].destination.operation is _sfg.find_by_id("add3")
    
        def test_insert_component_after_mimo_operation_error(
            self, large_operation_tree_names
        ):
            sfg = SFG(outputs=[Output(large_operation_tree_names)])
            with pytest.raises(
                TypeError, match="Only operations with one input and one output"
            ):
                sfg.insert_operation_after('constant4', SymmetricTwoportAdaptor(0.5))
    
        def test_insert_component_after_unknown_component_error(
            self, large_operation_tree_names
        ):
            sfg = SFG(outputs=[Output(large_operation_tree_names)])
            with pytest.raises(ValueError, match="Unknown component:"):
                sfg.insert_operation_after('foo', SquareRoot())
    
    
    
    class TestGetUsedTypeNames:
        def test_single_accumulator(self, sfg_simple_accumulator: SFG):
            assert sfg_simple_accumulator.get_used_type_names() == ['add', 'in', 'out', 't']
    
        def test_sfg_nested(self, sfg_nested: SFG):
            assert sfg_nested.get_used_type_names() == ['in', 'out', 'sfg']
    
        def test_large_operation_tree(self, large_operation_tree):
            sfg = SFG(outputs=[Output(large_operation_tree)])
            assert sfg.get_used_type_names() == ['add', 'c', 'out']