Skip to content
Snippets Groups Projects
test_sfg.py 56 KiB
Newer Older
  • Learn to ignore specific revisions
  • 
    
    class TestConnectExternalSignalsToComponentsMultipleComp:
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        def test_connect_external_signals_to_components_operation_tree(
            self, operation_tree
        ):
            """Replaces a operation_tree in an SFG with other components"""
    
            sfg1 = SFG(outputs=[Output(operation_tree)])
    
            inp1 = Input("INP1")
            inp2 = Input("INP2")
            out1 = Output(None, "OUT1")
    
            add1 = Addition(None, None, "ADD1")
            add2 = Addition(None, None, "ADD2")
    
            add1.input(0).connect(inp1, "S1")
            add1.input(1).connect(inp2, "S2")
            add2.input(0).connect(add1, "S3")
            add2.input(1).connect(sfg1.outputs[0], "S4")
            out1.input(0).connect(add2, "S5")
    
            test_sfg = SFG(inputs=[inp1, inp2], outputs=[out1])
            assert test_sfg.evaluate(1, 2) == 8
            sfg1.connect_external_signals_to_components()
            assert test_sfg.evaluate(1, 2) == 8
            assert not test_sfg.connect_external_signals_to_components()
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        def test_connect_external_signals_to_components_large_operation_tree(
            self, large_operation_tree
        ):
            """Replaces a large_operation_tree in an SFG with other components"""
    
            sfg1 = SFG(outputs=[Output(large_operation_tree)])
    
            inp1 = Input("INP1")
            inp2 = Input("INP2")
            out1 = Output(None, "OUT1")
            add1 = Addition(None, None, "ADD1")
            add2 = Addition(None, None, "ADD2")
    
            add1.input(0).connect(inp1, "S1")
            add1.input(1).connect(inp2, "S2")
            add2.input(0).connect(add1, "S3")
            add2.input(1).connect(sfg1.outputs[0], "S4")
            out1.input(0).connect(add2, "S5")
    
            test_sfg = SFG(inputs=[inp1, inp2], outputs=[out1])
            assert test_sfg.evaluate(1, 2) == 17
            sfg1.connect_external_signals_to_components()
            assert test_sfg.evaluate(1, 2) == 17
            assert not test_sfg.connect_external_signals_to_components()
    
        def create_sfg(self, op_tree):
    
            """Create a simple SFG with either operation_tree or large_operation_tree
            """
    
            sfg1 = SFG(outputs=[Output(op_tree)])
    
            inp1 = Input("INP1")
            inp2 = Input("INP2")
            out1 = Output(None, "OUT1")
            add1 = Addition(None, None, "ADD1")
            add2 = Addition(None, None, "ADD2")
    
            add1.input(0).connect(inp1, "S1")
            add1.input(1).connect(inp2, "S2")
            add2.input(0).connect(add1, "S3")
            add2.input(1).connect(sfg1.outputs[0], "S4")
            out1.input(0).connect(add2, "S5")
    
            return SFG(inputs=[inp1, inp2], outputs=[out1])
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        def test_connect_external_signals_to_components_many_op(
            self, large_operation_tree
        ):
    
            """Replaces an sfg component in a larger SFG with several component operations
            """
    
            inp1 = Input("INP1")
            inp2 = Input("INP2")
            inp3 = Input("INP3")
            inp4 = Input("INP4")
            out1 = Output(None, "OUT1")
            add1 = Addition(None, None, "ADD1")
            sub1 = Subtraction(None, None, "SUB1")
    
            add1.input(0).connect(inp1, "S1")
            add1.input(1).connect(inp2, "S2")
    
            sfg1 = self.create_sfg(large_operation_tree)
    
            sfg1.input(0).connect(add1, "S3")
            sfg1.input(1).connect(inp3, "S4")
            sub1.input(0).connect(sfg1.outputs[0], "S5")
            sub1.input(1).connect(inp4, "S6")
            out1.input(0).connect(sub1, "S7")
    
            test_sfg = SFG(inputs=[inp1, inp2, inp3, inp4], outputs=[out1])
    
            assert test_sfg.evaluate(1, 2, 3, 4) == 16
            sfg1.connect_external_signals_to_components()
            assert test_sfg.evaluate(1, 2, 3, 4) == 16
            assert not test_sfg.connect_external_signals_to_components()
    
    
    class TestTopologicalOrderOperations:
        def test_feedback_sfg(self, sfg_simple_filter):
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            topological_order = (
                sfg_simple_filter.get_operations_topological_order()
            )
    
    
            assert [comp.name for comp in topological_order] == [
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                "IN1",
                "ADD1",
                "T1",
                "CMUL1",
                "OUT1",
            ]
    
        def test_multiple_independent_inputs(
            self, sfg_two_inputs_two_outputs_independent
        ):
            topological_order = (
                sfg_two_inputs_two_outputs_independent.get_operations_topological_order()
            )
    
    
            assert [comp.name for comp in topological_order] == [
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                "IN1",
                "OUT1",
                "IN2",
                "C1",
                "ADD1",
                "OUT2",
            ]
    
    
        def test_complex_graph(self, precedence_sfg_delays):
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            topological_order = (
                precedence_sfg_delays.get_operations_topological_order()
            )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            assert [comp.name for comp in topological_order] == [
                "IN1",
                "C0",
                "ADD1",
                "Q1",
                "A0",
                "T1",
                "B1",
                "A1",
                "T2",
                "B2",
                "ADD2",
                "A2",
                "ADD3",
                "ADD4",
                "OUT1",
            ]
    
    
    
    class TestRemove:
        def test_remove_single_input_outputs(self, sfg_simple_filter):
            new_sfg = sfg_simple_filter.remove_operation("cmul1")
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            assert set(
                op.name
                for op in sfg_simple_filter.find_by_name("T1")[
                    0
                ].subsequent_operations
            ) == {"CMUL1", "OUT1"}
            assert set(
                op.name
                for op in new_sfg.find_by_name("T1")[0].subsequent_operations
            ) == {"ADD1", "OUT1"}
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            assert set(
                op.name
                for op in sfg_simple_filter.find_by_name("ADD1")[
                    0
                ].preceding_operations
            ) == {"CMUL1", "IN1"}
            assert set(
                op.name
                for op in new_sfg.find_by_name("ADD1")[0].preceding_operations
            ) == {"T1", "IN1"}
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                [
                    sig.name
                    for sig in sfg_simple_filter.find_by_name("T1")[0]
                    .output(0)
                    .signals
                ]
            )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                [
                    sig.name
                    for sig in new_sfg.find_by_name("T1")[0].output(0).signals
                ]
            )
    
    
        def test_remove_multiple_inputs_outputs(self, butterfly_operation_tree):
            out1 = Output(butterfly_operation_tree.output(0), "OUT1")
            out2 = Output(butterfly_operation_tree.output(1), "OUT2")
    
            sfg = SFG(outputs=[out1, out2])
    
            new_sfg = sfg.remove_operation(sfg.find_by_name("bfly2")[0].graph_id)
    
            assert sfg.find_by_name("bfly3")[0].output(0).signal_count == 1
            assert new_sfg.find_by_name("bfly3")[0].output(0).signal_count == 1
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            sfg_dest_0 = (
                sfg.find_by_name("bfly3")[0].output(0).signals[0].destination
            )
            new_sfg_dest_0 = (
                new_sfg.find_by_name("bfly3")[0].output(0).signals[0].destination
            )
    
    
            assert sfg_dest_0.index == 0
            assert new_sfg_dest_0.index == 0
            assert sfg_dest_0.operation.name == "bfly2"
            assert new_sfg_dest_0.operation.name == "bfly1"
    
            assert sfg.find_by_name("bfly3")[0].output(1).signal_count == 1
            assert new_sfg.find_by_name("bfly3")[0].output(1).signal_count == 1
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            sfg_dest_1 = (
                sfg.find_by_name("bfly3")[0].output(1).signals[0].destination
            )
            new_sfg_dest_1 = (
                new_sfg.find_by_name("bfly3")[0].output(1).signals[0].destination
            )
    
    
            assert sfg_dest_1.index == 1
            assert new_sfg_dest_1.index == 1
            assert sfg_dest_1.operation.name == "bfly2"
            assert new_sfg_dest_1.operation.name == "bfly1"
    
            assert sfg.find_by_name("bfly1")[0].input(0).signal_count == 1
            assert new_sfg.find_by_name("bfly1")[0].input(0).signal_count == 1
    
            sfg_source_0 = sfg.find_by_name("bfly1")[0].input(0).signals[0].source
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            new_sfg_source_0 = (
                new_sfg.find_by_name("bfly1")[0].input(0).signals[0].source
            )
    
    
            assert sfg_source_0.index == 0
            assert new_sfg_source_0.index == 0
            assert sfg_source_0.operation.name == "bfly2"
            assert new_sfg_source_0.operation.name == "bfly3"
    
            sfg_source_1 = sfg.find_by_name("bfly1")[0].input(1).signals[0].source
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            new_sfg_source_1 = (
                new_sfg.find_by_name("bfly1")[0].input(1).signals[0].source
            )
    
    
            assert sfg_source_1.index == 1
            assert new_sfg_source_1.index == 1
            assert sfg_source_1.operation.name == "bfly2"
            assert new_sfg_source_1.operation.name == "bfly3"
    
            assert "bfly2" not in set(op.name for op in new_sfg.operations)
    
        def remove_different_number_inputs_outputs(self, sfg_simple_filter):
            with pytest.raises(ValueError):
                sfg_simple_filter.remove_operation("add1")
    
    
    class TestSaveLoadSFG:
        def get_path(self, existing=False):
            path_ = "".join(random.choices(string.ascii_uppercase, k=4)) + ".py"
            while path.exists(path_) if not existing else not path.exists(path_):
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                path_ = (
                    "".join(random.choices(string.ascii_uppercase, k=4)) + ".py"
                )
    
    
            return path_
    
        def test_save_simple_sfg(self, sfg_simple_filter):
            result = sfg_to_python(sfg_simple_filter)
            path_ = self.get_path()
    
            assert not path.exists(path_)
            with open(path_, "w") as file_obj:
                file_obj.write(result)
    
            assert path.exists(path_)
    
            with open(path_, "r") as file_obj:
                assert file_obj.read() == result
    
            remove(path_)
    
        def test_save_complex_sfg(self, precedence_sfg_delays_and_constants):
            result = sfg_to_python(precedence_sfg_delays_and_constants)
            path_ = self.get_path()
    
            assert not path.exists(path_)
            with open(path_, "w") as file_obj:
                file_obj.write(result)
    
            assert path.exists(path_)
    
            with open(path_, "r") as file_obj:
                assert file_obj.read() == result
    
            remove(path_)
    
        def test_load_simple_sfg(self, sfg_simple_filter):
            result = sfg_to_python(sfg_simple_filter)
            path_ = self.get_path()
    
            assert not path.exists(path_)
            with open(path_, "w") as file_obj:
                file_obj.write(result)
    
            assert path.exists(path_)
    
            simple_filter_, _ = python_to_sfg(path_)
    
            assert str(sfg_simple_filter) == str(simple_filter_)
            assert sfg_simple_filter.evaluate([2]) == simple_filter_.evaluate([2])
    
            remove(path_)
    
        def test_load_complex_sfg(self, precedence_sfg_delays_and_constants):
            result = sfg_to_python(precedence_sfg_delays_and_constants)
            path_ = self.get_path()
    
            assert not path.exists(path_)
            with open(path_, "w") as file_obj:
                file_obj.write(result)
    
            assert path.exists(path_)
    
            precedence_sfg_registers_and_constants_, _ = python_to_sfg(path_)
    
            assert str(precedence_sfg_delays_and_constants) == str(
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                precedence_sfg_registers_and_constants_
            )
    
    
            remove(path_)
    
        def test_load_invalid_path(self):
            path_ = self.get_path(existing=False)
    
            with pytest.raises(FileNotFoundError):
    
                python_to_sfg(path_)
    
    
    class TestGetComponentsOfType:
        def test_get_no_operations_of_type(self, sfg_two_inputs_two_outputs):
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            assert [
                op.name
                for op in sfg_two_inputs_two_outputs.find_by_type_name(
                    Multiplication.type_name()
                )
            ] == []
    
    
        def test_get_multple_operations_of_type(self, sfg_two_inputs_two_outputs):
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            assert [
                op.name
                for op in sfg_two_inputs_two_outputs.find_by_type_name(
                    Addition.type_name()
                )
            ] == ["ADD1", "ADD2"]
    
            assert [
                op.name
                for op in sfg_two_inputs_two_outputs.find_by_type_name(
                    Input.type_name()
                )
            ] == ["IN1", "IN2"]
    
            assert [
                op.name
                for op in sfg_two_inputs_two_outputs.find_by_type_name(
                    Output.type_name()
                )
            ] == ["OUT1", "OUT2"]
    
    
    
    class TestPrecedenceGraph:
        def test_precedence_graph(self, sfg_simple_filter):
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            res = (
    
                'digraph {\n\trankdir=LR\n\tsubgraph cluster_0'
                ' {\n\t\tlabel=N0\n\t\t"in1.0" [label=in1 height=0.1'
                ' shape=rectangle width=0.1]\n\t\t"t1.0" [label=t1 height=0.1'
                ' shape=rectangle width=0.1]\n\t}\n\tsubgraph cluster_1'
                ' {\n\t\tlabel=N1\n\t\t"cmul1.0" [label=cmul1 height=0.1'
                ' shape=rectangle width=0.1]\n\t}\n\tsubgraph cluster_2'
                ' {\n\t\tlabel=N2\n\t\t"add1.0" [label=add1 height=0.1'
                ' shape=rectangle width=0.1]\n\t}\n\t"in1.0" -> add1\n\tadd1'
                ' [label=add1 shape=ellipse]\n\tin1 -> "in1.0"\n\tin1 [label=in1'
                ' shape=cds]\n\t"t1.0" -> cmul1\n\tcmul1 [label=cmul1'
                ' shape=ellipse]\n\t"t1.0" -> out1\n\tout1 [label=out1'
                ' shape=cds]\n\tt1Out -> "t1.0"\n\tt1Out [label=t1'
                ' shape=square]\n\t"cmul1.0" -> add1\n\tadd1 [label=add1'
                ' shape=ellipse]\n\tcmul1 -> "cmul1.0"\n\tcmul1 [label=cmul1'
                ' shape=ellipse]\n\t"add1.0" -> t1In\n\tt1In [label=t1'
                ' shape=square]\n\tadd1 -> "add1.0"\n\tadd1 [label=add1'
                ' shape=ellipse]\n}'
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            )
    
            assert sfg_simple_filter.precedence_graph().source in (res, res + "\n")
    
    
    
    class TestSFGGraph:
        def test_sfg(self, sfg_simple_filter):
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            res = (
    
                'digraph {\n\trankdir=LR\n\tin1 [shape=cds]\n\tin1 -> add1\n\tout1'
                ' [shape=cds]\n\tt1 -> out1\n\tadd1 [shape=ellipse]\n\tcmul1 ->'
                ' add1\n\tcmul1 [shape=ellipse]\n\tadd1 -> t1\n\tt1'
                ' [shape=square]\n\tt1 -> cmul1\n}'
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            )
    
    Frans Skarman's avatar
    Frans Skarman committed
            assert sfg_simple_filter.sfg_digraph().source in (res, res + "\n")
    
    
        def test_sfg_show_id(self, sfg_simple_filter):
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            res = (
    
                'digraph {\n\trankdir=LR\n\tin1 [shape=cds]\n\tin1 -> add1'
                ' [label=s1]\n\tout1 [shape=cds]\n\tt1 -> out1 [label=s2]\n\tadd1'
                ' [shape=ellipse]\n\tcmul1 -> add1 [label=s3]\n\tcmul1'
                ' [shape=ellipse]\n\tadd1 -> t1 [label=s4]\n\tt1'
                ' [shape=square]\n\tt1 -> cmul1 [label=s5]\n}'
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            )
    
    Frans Skarman's avatar
    Frans Skarman committed
            assert sfg_simple_filter.sfg_digraph(show_id=True).source in (
                res,
                res + "\n",
            )
    
    
        def test_show_sfg_invalid_format(self, sfg_simple_filter):
    
            with pytest.raises(ValueError):
    
                sfg_simple_filter.show(fmt="ppddff")
    
    
        def test_show_sfg_invalid_engine(self, sfg_simple_filter):
    
            with pytest.raises(ValueError):
    
    Frans Skarman's avatar
    Frans Skarman committed
                sfg_simple_filter.show(engine="ppddff")
    
    
    
    class TestSFGErrors:
        def test_dangling_output(self):
            in1 = Input()
            in2 = Input()
            adaptor = SymmetricTwoportAdaptor(0.5, in1, in2)
            out1 = Output(adaptor.output(0))
            # No error, maybe should be?
            _ = SFG([in1, in2], [out1])
    
        def test_unconnected_input_port(self):
            in1 = Input()
            adaptor = SymmetricTwoportAdaptor(0.5, in1)
            out1 = Output(adaptor.output(0))
            with pytest.raises(ValueError, match="Unconnected input port in SFG"):
                SFG([in1], [out1])
    
        def test_unconnected_output(self):
            in1 = Input()
            in2 = Input()
            adaptor = SymmetricTwoportAdaptor(0.5, in1, in2)
            out1 = Output(adaptor.output(0))
            out2 = Output()
            # No error, should be
            SFG([in1, in2], [out1, out2])
    
        def test_unconnected_input(self):
            in1 = Input()
            in2 = Input()
            adaptor = SymmetricTwoportAdaptor(0.5, in1)
            out1 = Output(adaptor.output(0))
            out2 = Output(adaptor.output(1))
            # Correct error?
            with pytest.raises(ValueError, match="Unconnected input port in SFG"):
                SFG([in1, in2], [out1, out2])
    
        def test_duplicate_input(self):
            in1 = Input()
            in2 = Input()
            adaptor = SymmetricTwoportAdaptor(0.5, in1, in2)
            out1 = Output(adaptor.output(0))
            out2 = Output(adaptor.output(1))
            with pytest.raises(ValueError, match="Duplicate input operation"):
                SFG([in1, in1], [out1, out2])
    
        def test_duplicate_output(self):
            in1 = Input()
            in2 = Input()
            adaptor = SymmetricTwoportAdaptor(0.5, in1, in2)
            out1 = Output(adaptor.output(0))
            out2 = Output(adaptor.output(1))
            with pytest.raises(ValueError, match="Duplicate output operation"):
                SFG([in1, in2], [out1, out1])
    
        def test_unconnected_input_signal(self):
            in1 = Input()
            in2 = Input()
            adaptor = SymmetricTwoportAdaptor(0.5, in1, in2)
            out1 = Output(adaptor.output(0))
            out2 = Output(adaptor.output(1))
            signal = Signal()
            with pytest.raises(
                ValueError, match="Input signal #0 is missing destination in SFG"
            ):
                SFG([in1, in2], [out1, out2], [signal])
    
        def test_unconnected_output_signal(self):
            in1 = Input()
            in2 = Input()
            adaptor = SymmetricTwoportAdaptor(0.5, in1, in2)
            out1 = Output(adaptor.output(0))
            out2 = Output(adaptor.output(1))
            signal = Signal()
            with pytest.raises(
                ValueError, match="Output signal #0 is missing source in SFG"
            ):
                SFG([in1, in2], [out1, out2], output_signals=[signal])
    
        def test_duplicate_input_signal(self):
            in1 = Input()
            signal = Signal()
            adaptor = SymmetricTwoportAdaptor(0.5, in1, signal)
            out1 = Output(adaptor.output(0))
            out2 = Output(adaptor.output(1))
            with pytest.raises(ValueError, match="Duplicate input signal"):
                SFG([in1], [out1, out2], [signal, signal])
    
        def test_duplicate_output_signal(self):
            in1 = Input()
            in2 = Input()
            adaptor = SymmetricTwoportAdaptor(0.5, in1, in2)
            out1 = Output(adaptor.output(0))
            signal = Signal(adaptor.output(1))
            # Should raise?
            SFG([in1, in2], [out1], output_signals=[signal, signal])
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
    
        def test_dangling_input_signal(self):
            in1 = Input()
            signal = Signal()
            adaptor = SymmetricTwoportAdaptor(0.5, in1, signal)
            out1 = Output(adaptor.output(0))
            out2 = Output(adaptor.output(1))
            with pytest.raises(
                ValueError, match="Dangling signal without source in SFG"
            ):
                SFG([in1], [out1, out2])
    
        def test_remove_signal_with_different_number_of_inputs_and_outputs(self):
            in1 = Input()
            in2 = Input()
            add1 = Addition(in1, in2, name="addition")
            out1 = Output(add1)
            sfg = SFG([in1, in2], [out1])
            # Try to remove non-existent operation
            sfg1 = sfg.remove_operation("foo")
            assert sfg1 is None
            with pytest.raises(
                ValueError,
                match=(
                    "Different number of input and output ports of operation with"
                ),
            ):
                sfg.remove_operation('add1')
    
        def test_inputs_required_for_output(self):
            in1 = Input()
            in2 = Input()
            add1 = Addition(in1, in2, name="addition")
            out1 = Output(add1)
            sfg = SFG([in1, in2], [out1])
            with pytest.raises(
                IndexError,
                match=re.escape("Output index out of range (expected 0-0, got 1)"),
            ):
                sfg.inputs_required_for_output(1)
    
    class TestInputDuplicationBug:
        def test_input_is_not_duplicated_in_operation_list(self):
            # Inputs:
            in1 = Input(name="in1")
            out1 = Output(name="out1")
    
            # Operations:
            t1 = Delay(initial_value=0, name="")
            t1.inputs[0].connect(in1)
            add1 = t1 + in1
    
            out1.inputs[0].connect(add1)
    
            twotapfir = SFG(inputs=[in1], outputs=[out1], name='twotapfir')
    
            assert (
                len([op for op in twotapfir.operations if isinstance(op, Input)])
                == 1
            )
    
    
    
    class TestCriticalPath:
        def test_single_accumulator(self, sfg_simple_accumulator: SFG):
            sfg_simple_accumulator.set_latency_of_type(Addition.type_name(), 5)
            assert sfg_simple_accumulator.critical_path() == 5
    
            sfg_simple_accumulator.set_latency_of_type(Addition.type_name(), 6)
            assert sfg_simple_accumulator.critical_path() == 6
    
    class TestUnfold:
    
    Frans Skarman's avatar
    Frans Skarman committed
        def count_kinds(self, sfg: SFG) -> Dict[Type, int]:
            return Counter([type(op) for op in sfg.operations])
    
        # Checks that the number of each kind of operation in sfg2 is multiple*count
        # of the same operation in sfg1.
        # Filters out delay delays
        def assert_counts_is_correct(self, sfg1: SFG, sfg2: SFG, multiple: int):
            count1 = self.count_kinds(sfg1)
            count2 = self.count_kinds(sfg2)
    
            # Delays should not be duplicated. Check that and then clear them
            # Using get to avoid issues if there are no delays in the sfg
            assert count1.get(Delay) == count2.get(Delay)
            count1[Delay] = 0
            count2[Delay] = 0
    
            # Ensure that we aren't missing any keys, or have any extras
            assert count1.keys() == count2.keys()
    
            for k in count1.keys():
                assert count1[k] * multiple == count2[k]
    
        # This is horrifying, but I can't figure out a way to run the test on multiple fixtures,
        # so this is an ugly hack until someone that knows pytest comes along
        def test_two_inputs_two_outputs(self, sfg_two_inputs_two_outputs: SFG):
            self.do_tests(sfg_two_inputs_two_outputs)
    
        def test_twotapfir(self, sfg_two_tap_fir: SFG):
            self.do_tests(sfg_two_tap_fir)
    
        def test_delay(self, sfg_delay: SFG):
            self.do_tests(sfg_delay)
    
        def test_sfg_two_inputs_two_outputs_independent(
            self, sfg_two_inputs_two_outputs_independent: SFG
        ):
            self.do_tests(sfg_two_inputs_two_outputs_independent)
    
        def do_tests(self, sfg: SFG):
            for factor in range(2, 4):
                # Ensure that the correct number of operations get created
                unfolded = sfg.unfold(factor)
    
                self.assert_counts_is_correct(sfg, unfolded, factor)
    
    Frans Skarman's avatar
    Frans Skarman committed
                double_unfolded = sfg.unfold(factor).unfold(factor)
    
                self.assert_counts_is_correct(
                    sfg, double_unfolded, factor * factor
                )
    
                NUM_TESTS = 5
                # Evaluate with some random values
                # To avoid problems with missing inputs at the end of the sequence,
                # we generate i*(some large enough) number
                input_list = [
                    [random.random() for _ in range(0, NUM_TESTS * factor)]
                    for _ in sfg.inputs
                ]
    
                sim = Simulation(sfg, input_list)
                sim.run()
                ref = sim.results
    
                # We have i copies of the inputs, each sourcing their input from the orig
                unfolded_input_lists = [
                    [] for _ in range(len(sfg.inputs) * factor)
                ]
                for t in range(0, NUM_TESTS):
                    for n in range(0, factor):
                        for k in range(0, len(sfg.inputs)):
                            unfolded_input_lists[k + n * len(sfg.inputs)].append(
                                input_list[k][t * factor + n]
                            )
    
                sim = Simulation(unfolded, unfolded_input_lists)
                sim.run()
                unfolded_results = sim.results
    
                for n, _ in enumerate(sfg.outputs):
                    # Outputs for an original output
                    ref_values = list(ref[ResultKey(f"{n}")])
    
                    # Output n will be split into `factor` output ports, compute the
                    # indicies where we find the outputs
                    out_indices = [n + k * len(sfg.outputs) for k in range(factor)]
                    u_values = [
                        [
                            unfolded_results[ResultKey(f"{idx}")][k]
                            for idx in out_indices
                        ]
                        for k in range(int(NUM_TESTS))
                    ]
    
                    flat_u_values = list(itertools.chain.from_iterable(u_values))
    
                    assert flat_u_values == ref_values
    
        def test_value_error(self, sfg_two_inputs_two_outputs: SFG):
            sfg = sfg_two_inputs_two_outputs
            with pytest.raises(
    
                ValueError, match="Unfolding 0 times removes the SFG"
    
    Frans Skarman's avatar
    Frans Skarman committed
            ):
                sfg.unfold(0)