Skip to content
Snippets Groups Projects
test_sfg.py 50.6 KiB
Newer Older
  • Learn to ignore specific revisions
  • class TestConnectExternalSignalsToComponentsMultipleComp:
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        def test_connect_external_signals_to_components_operation_tree(
            self, operation_tree
        ):
            """Replaces a operation_tree in an SFG with other components"""
    
            sfg1 = SFG(outputs=[Output(operation_tree)])
    
            inp1 = Input("INP1")
            inp2 = Input("INP2")
            out1 = Output(None, "OUT1")
    
            add1 = Addition(None, None, "ADD1")
            add2 = Addition(None, None, "ADD2")
    
            add1.input(0).connect(inp1, "S1")
            add1.input(1).connect(inp2, "S2")
            add2.input(0).connect(add1, "S3")
            add2.input(1).connect(sfg1.outputs[0], "S4")
            out1.input(0).connect(add2, "S5")
    
            test_sfg = SFG(inputs=[inp1, inp2], outputs=[out1])
            assert test_sfg.evaluate(1, 2) == 8
            sfg1.connect_external_signals_to_components()
            assert test_sfg.evaluate(1, 2) == 8
            assert not test_sfg.connect_external_signals_to_components()
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        def test_connect_external_signals_to_components_large_operation_tree(
            self, large_operation_tree
        ):
            """Replaces a large_operation_tree in an SFG with other components"""
    
            sfg1 = SFG(outputs=[Output(large_operation_tree)])
    
            inp1 = Input("INP1")
            inp2 = Input("INP2")
            out1 = Output(None, "OUT1")
            add1 = Addition(None, None, "ADD1")
            add2 = Addition(None, None, "ADD2")
    
            add1.input(0).connect(inp1, "S1")
            add1.input(1).connect(inp2, "S2")
            add2.input(0).connect(add1, "S3")
            add2.input(1).connect(sfg1.outputs[0], "S4")
            out1.input(0).connect(add2, "S5")
    
            test_sfg = SFG(inputs=[inp1, inp2], outputs=[out1])
            assert test_sfg.evaluate(1, 2) == 17
            sfg1.connect_external_signals_to_components()
            assert test_sfg.evaluate(1, 2) == 17
            assert not test_sfg.connect_external_signals_to_components()
    
        def create_sfg(self, op_tree):
    
            """Create a simple SFG with either operation_tree or large_operation_tree
            """
    
            sfg1 = SFG(outputs=[Output(op_tree)])
    
            inp1 = Input("INP1")
            inp2 = Input("INP2")
            out1 = Output(None, "OUT1")
            add1 = Addition(None, None, "ADD1")
            add2 = Addition(None, None, "ADD2")
    
            add1.input(0).connect(inp1, "S1")
            add1.input(1).connect(inp2, "S2")
            add2.input(0).connect(add1, "S3")
            add2.input(1).connect(sfg1.outputs[0], "S4")
            out1.input(0).connect(add2, "S5")
    
            return SFG(inputs=[inp1, inp2], outputs=[out1])
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        def test_connect_external_signals_to_components_many_op(
            self, large_operation_tree
        ):
    
            """Replaces an sfg component in a larger SFG with several component operations
            """
    
            inp1 = Input("INP1")
            inp2 = Input("INP2")
            inp3 = Input("INP3")
            inp4 = Input("INP4")
            out1 = Output(None, "OUT1")
            add1 = Addition(None, None, "ADD1")
            sub1 = Subtraction(None, None, "SUB1")
    
            add1.input(0).connect(inp1, "S1")
            add1.input(1).connect(inp2, "S2")
    
            sfg1 = self.create_sfg(large_operation_tree)
    
            sfg1.input(0).connect(add1, "S3")
            sfg1.input(1).connect(inp3, "S4")
            sub1.input(0).connect(sfg1.outputs[0], "S5")
            sub1.input(1).connect(inp4, "S6")
            out1.input(0).connect(sub1, "S7")
    
            test_sfg = SFG(inputs=[inp1, inp2, inp3, inp4], outputs=[out1])
    
            assert test_sfg.evaluate(1, 2, 3, 4) == 16
            sfg1.connect_external_signals_to_components()
            assert test_sfg.evaluate(1, 2, 3, 4) == 16
            assert not test_sfg.connect_external_signals_to_components()
    
    
    class TestTopologicalOrderOperations:
        def test_feedback_sfg(self, sfg_simple_filter):
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            topological_order = (
                sfg_simple_filter.get_operations_topological_order()
            )
    
    
            assert [comp.name for comp in topological_order] == [
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                "IN1",
                "ADD1",
                "T1",
                "CMUL1",
                "OUT1",
            ]
    
        def test_multiple_independent_inputs(
            self, sfg_two_inputs_two_outputs_independent
        ):
            topological_order = (
                sfg_two_inputs_two_outputs_independent.get_operations_topological_order()
            )
    
    
            assert [comp.name for comp in topological_order] == [
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                "IN1",
                "OUT1",
                "IN2",
                "C1",
                "ADD1",
                "OUT2",
            ]
    
    
        def test_complex_graph(self, precedence_sfg_delays):
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            topological_order = (
                precedence_sfg_delays.get_operations_topological_order()
            )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            assert [comp.name for comp in topological_order] == [
                "IN1",
                "C0",
                "ADD1",
                "Q1",
                "A0",
                "T1",
                "B1",
                "A1",
                "T2",
                "B2",
                "ADD2",
                "A2",
                "ADD3",
                "ADD4",
                "OUT1",
            ]
    
    
    
    class TestRemove:
        def test_remove_single_input_outputs(self, sfg_simple_filter):
            new_sfg = sfg_simple_filter.remove_operation("cmul1")
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            assert set(
                op.name
                for op in sfg_simple_filter.find_by_name("T1")[
                    0
                ].subsequent_operations
            ) == {"CMUL1", "OUT1"}
            assert set(
                op.name
                for op in new_sfg.find_by_name("T1")[0].subsequent_operations
            ) == {"ADD1", "OUT1"}
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            assert set(
                op.name
                for op in sfg_simple_filter.find_by_name("ADD1")[
                    0
                ].preceding_operations
            ) == {"CMUL1", "IN1"}
            assert set(
                op.name
                for op in new_sfg.find_by_name("ADD1")[0].preceding_operations
            ) == {"T1", "IN1"}
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                [
                    sig.name
                    for sig in sfg_simple_filter.find_by_name("T1")[0]
                    .output(0)
                    .signals
                ]
            )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                [
                    sig.name
                    for sig in new_sfg.find_by_name("T1")[0].output(0).signals
                ]
            )
    
    
        def test_remove_multiple_inputs_outputs(self, butterfly_operation_tree):
            out1 = Output(butterfly_operation_tree.output(0), "OUT1")
            out2 = Output(butterfly_operation_tree.output(1), "OUT2")
    
            sfg = SFG(outputs=[out1, out2])
    
            new_sfg = sfg.remove_operation(sfg.find_by_name("bfly2")[0].graph_id)
    
            assert sfg.find_by_name("bfly3")[0].output(0).signal_count == 1
            assert new_sfg.find_by_name("bfly3")[0].output(0).signal_count == 1
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            sfg_dest_0 = (
                sfg.find_by_name("bfly3")[0].output(0).signals[0].destination
            )
            new_sfg_dest_0 = (
                new_sfg.find_by_name("bfly3")[0].output(0).signals[0].destination
            )
    
    
            assert sfg_dest_0.index == 0
            assert new_sfg_dest_0.index == 0
            assert sfg_dest_0.operation.name == "bfly2"
            assert new_sfg_dest_0.operation.name == "bfly1"
    
            assert sfg.find_by_name("bfly3")[0].output(1).signal_count == 1
            assert new_sfg.find_by_name("bfly3")[0].output(1).signal_count == 1
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            sfg_dest_1 = (
                sfg.find_by_name("bfly3")[0].output(1).signals[0].destination
            )
            new_sfg_dest_1 = (
                new_sfg.find_by_name("bfly3")[0].output(1).signals[0].destination
            )
    
    
            assert sfg_dest_1.index == 1
            assert new_sfg_dest_1.index == 1
            assert sfg_dest_1.operation.name == "bfly2"
            assert new_sfg_dest_1.operation.name == "bfly1"
    
            assert sfg.find_by_name("bfly1")[0].input(0).signal_count == 1
            assert new_sfg.find_by_name("bfly1")[0].input(0).signal_count == 1
    
            sfg_source_0 = sfg.find_by_name("bfly1")[0].input(0).signals[0].source
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            new_sfg_source_0 = (
                new_sfg.find_by_name("bfly1")[0].input(0).signals[0].source
            )
    
    
            assert sfg_source_0.index == 0
            assert new_sfg_source_0.index == 0
            assert sfg_source_0.operation.name == "bfly2"
            assert new_sfg_source_0.operation.name == "bfly3"
    
            sfg_source_1 = sfg.find_by_name("bfly1")[0].input(1).signals[0].source
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            new_sfg_source_1 = (
                new_sfg.find_by_name("bfly1")[0].input(1).signals[0].source
            )
    
    
            assert sfg_source_1.index == 1
            assert new_sfg_source_1.index == 1
            assert sfg_source_1.operation.name == "bfly2"
            assert new_sfg_source_1.operation.name == "bfly3"
    
            assert "bfly2" not in set(op.name for op in new_sfg.operations)
    
        def remove_different_number_inputs_outputs(self, sfg_simple_filter):
            with pytest.raises(ValueError):
                sfg_simple_filter.remove_operation("add1")
    
    
    class TestSaveLoadSFG:
        def get_path(self, existing=False):
            path_ = "".join(random.choices(string.ascii_uppercase, k=4)) + ".py"
            while path.exists(path_) if not existing else not path.exists(path_):
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                path_ = (
                    "".join(random.choices(string.ascii_uppercase, k=4)) + ".py"
                )
    
    
            return path_
    
        def test_save_simple_sfg(self, sfg_simple_filter):
            result = sfg_to_python(sfg_simple_filter)
            path_ = self.get_path()
    
            assert not path.exists(path_)
            with open(path_, "w") as file_obj:
                file_obj.write(result)
    
            assert path.exists(path_)
    
            with open(path_, "r") as file_obj:
                assert file_obj.read() == result
    
            remove(path_)
    
        def test_save_complex_sfg(self, precedence_sfg_delays_and_constants):
            result = sfg_to_python(precedence_sfg_delays_and_constants)
            path_ = self.get_path()
    
            assert not path.exists(path_)
            with open(path_, "w") as file_obj:
                file_obj.write(result)
    
            assert path.exists(path_)
    
            with open(path_, "r") as file_obj:
                assert file_obj.read() == result
    
            remove(path_)
    
        def test_load_simple_sfg(self, sfg_simple_filter):
            result = sfg_to_python(sfg_simple_filter)
            path_ = self.get_path()
    
            assert not path.exists(path_)
            with open(path_, "w") as file_obj:
                file_obj.write(result)
    
            assert path.exists(path_)
    
            simple_filter_, _ = python_to_sfg(path_)
    
            assert str(sfg_simple_filter) == str(simple_filter_)
            assert sfg_simple_filter.evaluate([2]) == simple_filter_.evaluate([2])
    
            remove(path_)
    
        def test_load_complex_sfg(self, precedence_sfg_delays_and_constants):
            result = sfg_to_python(precedence_sfg_delays_and_constants)
            path_ = self.get_path()
    
            assert not path.exists(path_)
            with open(path_, "w") as file_obj:
                file_obj.write(result)
    
            assert path.exists(path_)
    
            precedence_sfg_registers_and_constants_, _ = python_to_sfg(path_)
    
            assert str(precedence_sfg_delays_and_constants) == str(
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                precedence_sfg_registers_and_constants_
            )
    
    
            remove(path_)
    
        def test_load_invalid_path(self):
            path_ = self.get_path(existing=False)
    
            with pytest.raises(FileNotFoundError):
    
                python_to_sfg(path_)
    
    
    class TestGetComponentsOfType:
        def test_get_no_operations_of_type(self, sfg_two_inputs_two_outputs):
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            assert [
                op.name
                for op in sfg_two_inputs_two_outputs.find_by_type_name(
                    Multiplication.type_name()
                )
            ] == []
    
    
        def test_get_multple_operations_of_type(self, sfg_two_inputs_two_outputs):
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            assert [
                op.name
                for op in sfg_two_inputs_two_outputs.find_by_type_name(
                    Addition.type_name()
                )
            ] == ["ADD1", "ADD2"]
    
            assert [
                op.name
                for op in sfg_two_inputs_two_outputs.find_by_type_name(
                    Input.type_name()
                )
            ] == ["IN1", "IN2"]
    
            assert [
                op.name
                for op in sfg_two_inputs_two_outputs.find_by_type_name(
                    Output.type_name()
                )
            ] == ["OUT1", "OUT2"]
    
    
    
    class TestPrecedenceGraph:
        def test_precedence_graph(self, sfg_simple_filter):
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            res = (
    
                "digraph {\n\trankdir=LR\n\tsubgraph cluster_0"
                " {\n\t\tlabel=N1\n\t\t\"in1.0\" [label=in1]\n\t\t\"t1.0\""
                " [label=t1]\n\t}\n\tsubgraph cluster_1"
                " {\n\t\tlabel=N2\n\t\t\"cmul1.0\" [label=cmul1]\n\t}\n\tsubgraph"
                " cluster_2 {\n\t\tlabel=N3\n\t\t\"add1.0\""
                " [label=add1]\n\t}\n\t\"in1.0\" -> add1\n\tadd1 [label=add1"
                " shape=square]\n\tin1 -> \"in1.0\"\n\tin1 [label=in1"
                " shape=square]\n\t\"t1.0\" -> cmul1\n\tcmul1 [label=cmul1"
                " shape=square]\n\t\"t1.0\" -> out1\n\tout1 [label=out1"
                " shape=square]\n\tt1Out -> \"t1.0\"\n\tt1Out [label=t1"
                " shape=square]\n\t\"cmul1.0\" -> add1\n\tadd1 [label=add1"
                " shape=square]\n\tcmul1 -> \"cmul1.0\"\n\tcmul1 [label=cmul1"
                " shape=square]\n\t\"add1.0\" -> t1In\n\tt1In [label=t1"
                " shape=square]\n\tadd1 -> \"add1.0\"\n\tadd1 [label=add1"
    
                " shape=square]\n}"
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            )
    
            assert sfg_simple_filter.precedence_graph().source in (res, res + "\n")
    
    
    
    class TestSFGGraph:
        def test_sfg(self, sfg_simple_filter):
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            res = (
                "digraph {\n\trankdir=LR\n\tin1\n\tin1 -> "
                "add1\n\tout1\n\tt1 -> out1\n\tadd1\n\tcmul1 -> "
                "add1\n\tcmul1\n\tadd1 -> t1\n\tt1 [shape=square]\n\tt1 "
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            )
    
            assert sfg_simple_filter.sfg().source in (res, res + "\n")
    
    
        def test_sfg_show_id(self, sfg_simple_filter):
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            res = (
                "digraph {\n\trankdir=LR\n\tin1\n\tin1 -> add1 "
                "[label=s1]\n\tout1\n\tt1 -> out1 [label=s2]\n\tadd1"
                "\n\tcmul1 -> add1 [label=s3]\n\tcmul1\n\tadd1 -> t1 "
    
                "[label=s4]\n\tt1 [shape=square]\n\tt1 -> cmul1 [label=s5]\n}"
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            )
    
            assert sfg_simple_filter.sfg(show_id=True).source in (res, res + "\n")
    
    
        def test_show_sfg_invalid_format(self, sfg_simple_filter):
    
            with pytest.raises(ValueError):
    
                sfg_simple_filter.show_sfg(format="ppddff")
    
        def test_show_sfg_invalid_engine(self, sfg_simple_filter):
    
            with pytest.raises(ValueError):
    
                sfg_simple_filter.show_sfg(engine="ppddff")
    
    
    
    class TestSFGErrors:
        def test_dangling_output(self):
            in1 = Input()
            in2 = Input()
            adaptor = SymmetricTwoportAdaptor(0.5, in1, in2)
            out1 = Output(adaptor.output(0))
            # No error, maybe should be?
            _ = SFG([in1, in2], [out1])
    
        def test_unconnected_input_port(self):
            in1 = Input()
            adaptor = SymmetricTwoportAdaptor(0.5, in1)
            out1 = Output(adaptor.output(0))
            with pytest.raises(ValueError, match="Unconnected input port in SFG"):
                SFG([in1], [out1])
    
        def test_unconnected_output(self):
            in1 = Input()
            in2 = Input()
            adaptor = SymmetricTwoportAdaptor(0.5, in1, in2)
            out1 = Output(adaptor.output(0))
            out2 = Output()
            # No error, should be
            SFG([in1, in2], [out1, out2])
    
        def test_unconnected_input(self):
            in1 = Input()
            in2 = Input()
            adaptor = SymmetricTwoportAdaptor(0.5, in1)
            out1 = Output(adaptor.output(0))
            out2 = Output(adaptor.output(1))
            # Correct error?
            with pytest.raises(ValueError, match="Unconnected input port in SFG"):
                SFG([in1, in2], [out1, out2])
    
        def test_duplicate_input(self):
            in1 = Input()
            in2 = Input()
            adaptor = SymmetricTwoportAdaptor(0.5, in1, in2)
            out1 = Output(adaptor.output(0))
            out2 = Output(adaptor.output(1))
            with pytest.raises(ValueError, match="Duplicate input operation"):
                SFG([in1, in1], [out1, out2])
    
        def test_duplicate_output(self):
            in1 = Input()
            in2 = Input()
            adaptor = SymmetricTwoportAdaptor(0.5, in1, in2)
            out1 = Output(adaptor.output(0))
            out2 = Output(adaptor.output(1))
            with pytest.raises(ValueError, match="Duplicate output operation"):
                SFG([in1, in2], [out1, out1])
    
        def test_unconnected_input_signal(self):
            in1 = Input()
            in2 = Input()
            adaptor = SymmetricTwoportAdaptor(0.5, in1, in2)
            out1 = Output(adaptor.output(0))
            out2 = Output(adaptor.output(1))
            signal = Signal()
            with pytest.raises(
                ValueError, match="Input signal #0 is missing destination in SFG"
            ):
                SFG([in1, in2], [out1, out2], [signal])
    
        def test_unconnected_output_signal(self):
            in1 = Input()
            in2 = Input()
            adaptor = SymmetricTwoportAdaptor(0.5, in1, in2)
            out1 = Output(adaptor.output(0))
            out2 = Output(adaptor.output(1))
            signal = Signal()
            with pytest.raises(
                ValueError, match="Output signal #0 is missing source in SFG"
            ):
                SFG([in1, in2], [out1, out2], output_signals=[signal])
    
        def test_duplicate_input_signal(self):
            in1 = Input()
            signal = Signal()
            adaptor = SymmetricTwoportAdaptor(0.5, in1, signal)
            out1 = Output(adaptor.output(0))
            out2 = Output(adaptor.output(1))
            with pytest.raises(ValueError, match="Duplicate input signal"):
                SFG([in1], [out1, out2], [signal, signal])
    
        def test_duplicate_output_signal(self):
            in1 = Input()
            in2 = Input()
            adaptor = SymmetricTwoportAdaptor(0.5, in1, in2)
            out1 = Output(adaptor.output(0))
            signal = Signal(adaptor.output(1))
            # Should raise?
            SFG([in1, in2], [out1], output_signals=[signal, signal])
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
    
        def test_dangling_input_signal(self):
            in1 = Input()
            signal = Signal()
            adaptor = SymmetricTwoportAdaptor(0.5, in1, signal)
            out1 = Output(adaptor.output(0))
            out2 = Output(adaptor.output(1))
            with pytest.raises(
                ValueError, match="Dangling signal without source in SFG"
            ):
                SFG([in1], [out1, out2])
    
        def test_remove_signal_with_different_number_of_inputs_and_outputs(self):
            in1 = Input()
            in2 = Input()
            add1 = Addition(in1, in2, name="addition")
            out1 = Output(add1)
            sfg = SFG([in1, in2], [out1])
            # Try to remove non-existent operation
            sfg1 = sfg.remove_operation("foo")
            assert sfg1 is None
            with pytest.raises(
                ValueError,
                match=(
                    "Different number of input and output ports of operation with"
                ),
            ):
                sfg.remove_operation('add1')
    
        def test_inputs_required_for_output(self):
            in1 = Input()
            in2 = Input()
            add1 = Addition(in1, in2, name="addition")
            out1 = Output(add1)
            sfg = SFG([in1, in2], [out1])
            with pytest.raises(
                IndexError,
                match=re.escape("Output index out of range (expected 0-0, got 1)"),
            ):
                sfg.inputs_required_for_output(1)