Skip to content
Snippets Groups Projects
test_sfg.py 63.5 KiB
Newer Older
  • Learn to ignore specific revisions
  • Oscar Gustafsson's avatar
    Oscar Gustafsson committed
    import io
    
    Frans Skarman's avatar
    Frans Skarman committed
    import itertools
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
    import re
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
    from os import path, remove
    
    Frans Skarman's avatar
    Frans Skarman committed
    from typing import Counter, Dict, Type
    
    Samuel Fagerlund's avatar
    Samuel Fagerlund committed
    import numpy as np
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
    import pytest
    
    from b_asic import Input, Output, Signal
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
    from b_asic.core_operations import (
        Addition,
        Butterfly,
        Constant,
        ConstantMultiplication,
        Multiplication,
        SquareRoot,
        Subtraction,
    
        SymmetricTwoportAdaptor,
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
    )
    
    Frans Skarman's avatar
    Frans Skarman committed
    from b_asic.operation import ResultKey
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
    from b_asic.save_load_structure import python_to_sfg, sfg_to_python
    
    from b_asic.sfg_generators import wdf_allpass
    
    from b_asic.signal_flow_graph import SFG, GraphID
    
    Frans Skarman's avatar
    Frans Skarman committed
    from b_asic.simulation import Simulation
    
    from b_asic.special_operations import Delay
    
    
    
    class TestInit:
        def test_direct_input_to_output_sfg_construction(self):
            in1 = Input("IN1")
            out1 = Output(None, "OUT1")
            out1.input(0).connect(in1, "S1")
    
            sfg = SFG(inputs=[in1], outputs=[out1])  # in1 ---s1---> out1
    
            assert len(list(sfg.components)) == 3
            assert len(list(sfg.operations)) == 2
            assert sfg.input_count == 1
            assert sfg.output_count == 1
    
        def test_same_signal_input_and_output_sfg_construction(self):
            add1 = Addition(None, None, "ADD1")
            add2 = Addition(None, None, "ADD2")
    
            s1 = add2.input(0).connect(add1, "S1")
    
            # in1 ---s1---> out1
            sfg = SFG(input_signals=[s1], output_signals=[s1])
    
            assert len(list(sfg.components)) == 3
            assert len(list(sfg.operations)) == 2
            assert sfg.input_count == 1
            assert sfg.output_count == 1
    
        def test_outputs_construction(self, operation_tree):
            sfg = SFG(outputs=[Output(operation_tree)])
    
            assert len(list(sfg.components)) == 7
            assert len(list(sfg.operations)) == 4
            assert sfg.input_count == 0
            assert sfg.output_count == 1
    
        def test_signals_construction(self, operation_tree):
            sfg = SFG(output_signals=[Signal(source=operation_tree.output(0))])
    
            assert len(list(sfg.components)) == 7
            assert len(list(sfg.operations)) == 4
            assert sfg.input_count == 0
            assert sfg.output_count == 1
    
    
    class TestPrintSfg:
        def test_one_addition(self):
            inp1 = Input("INP1")
            inp2 = Input("INP2")
            add1 = Addition(inp1, inp2, "ADD1")
            out1 = Output(add1, "OUT1")
            sfg = SFG(inputs=[inp1, inp2], outputs=[out1], name="SFG1")
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            assert (
                sfg.__str__()
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                == "id: no_id, \tname: SFG1, \tinputs: {0: '-'}, \toutputs: {0: '-'}\n"
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                + "Internal Operations:\n"
    
                + "--------------------------------------------------------------------"
                + "--------------------------------\n"
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                + str(sfg.find_by_name("INP1")[0])
                + "\n"
                + str(sfg.find_by_name("INP2")[0])
                + "\n"
                + str(sfg.find_by_name("ADD1")[0])
                + "\n"
                + str(sfg.find_by_name("OUT1")[0])
                + "\n"
    
                + "--------------------------------------------------------------------"
                + "--------------------------------\n"
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            )
    
    
        def test_add_mul(self):
            inp1 = Input("INP1")
            inp2 = Input("INP2")
            inp3 = Input("INP3")
            add1 = Addition(inp1, inp2, "ADD1")
            mul1 = Multiplication(add1, inp3, "MUL1")
            out1 = Output(mul1, "OUT1")
            sfg = SFG(inputs=[inp1, inp2, inp3], outputs=[out1], name="mac_sfg")
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            assert (
                sfg.__str__()
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                == "id: no_id, \tname: mac_sfg, \tinputs: {0: '-'}, \toutputs: {0: '-'}\n"
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                + "Internal Operations:\n"
    
                + "--------------------------------------------------------------------"
                + "--------------------------------\n"
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                + str(sfg.find_by_name("INP1")[0])
                + "\n"
                + str(sfg.find_by_name("INP2")[0])
                + "\n"
                + str(sfg.find_by_name("ADD1")[0])
                + "\n"
                + str(sfg.find_by_name("INP3")[0])
                + "\n"
                + str(sfg.find_by_name("MUL1")[0])
                + "\n"
                + str(sfg.find_by_name("OUT1")[0])
                + "\n"
    
                + "--------------------------------------------------------------------"
                + "--------------------------------\n"
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            )
    
    
        def test_constant(self):
            inp1 = Input("INP1")
            const1 = Constant(3, "CONST")
            add1 = Addition(const1, inp1, "ADD1")
            out1 = Output(add1, "OUT1")
    
            sfg = SFG(inputs=[inp1], outputs=[out1], name="sfg")
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            assert (
                sfg.__str__()
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                == "id: no_id, \tname: sfg, \tinputs: {0: '-'}, \toutputs: {0: '-'}\n"
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                + "Internal Operations:\n"
    
                + "--------------------------------------------------------------------"
                + "--------------------------------\n"
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                + str(sfg.find_by_name("CONST")[0])
                + "\n"
                + str(sfg.find_by_name("INP1")[0])
                + "\n"
                + str(sfg.find_by_name("ADD1")[0])
                + "\n"
                + str(sfg.find_by_name("OUT1")[0])
                + "\n"
    
                + "--------------------------------------------------------------------"
                + "--------------------------------\n"
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            )
    
    
        def test_simple_filter(self, sfg_simple_filter):
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            assert (
                sfg_simple_filter.__str__()
    
                == "id: no_id, \tname: simple_filter, \tinputs: {0: '-'},"
                " \toutputs: {0: '-'}\n"
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                + "Internal Operations:\n"
    
                + "--------------------------------------------------------------------"
                + "--------------------------------\n"
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                + str(sfg_simple_filter.find_by_name("IN")[0])
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                + "\n"
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                + str(sfg_simple_filter.find_by_name("ADD")[0])
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                + "\n"
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                + str(sfg_simple_filter.find_by_name("T")[0])
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                + "\n"
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                + str(sfg_simple_filter.find_by_name("CMUL")[0])
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                + "\n"
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                + str(sfg_simple_filter.find_by_name("OUT")[0])
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                + "\n"
    
                + "--------------------------------------------------------------------"
                + "--------------------------------\n"
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            )
    
    
    
    class TestDeepCopy:
        def test_deep_copy_no_duplicates(self):
            inp1 = Input("INP1")
            inp2 = Input("INP2")
            inp3 = Input("INP3")
            add1 = Addition(inp1, inp2, "ADD1")
            mul1 = Multiplication(add1, inp3, "MUL1")
            out1 = Output(mul1, "OUT1")
    
            mac_sfg = SFG(inputs=[inp1, inp2], outputs=[out1], name="mac_sfg")
            mac_sfg_new = mac_sfg()
    
            assert mac_sfg.name == "mac_sfg"
            assert mac_sfg_new.name == ""
    
            for g_id, component in mac_sfg._components_by_id.items():
                component_copy = mac_sfg_new.find_by_id(g_id)
                assert component.name == component_copy.name
    
        def test_deep_copy(self):
            inp1 = Input("INP1")
            inp2 = Input("INP2")
            inp3 = Input("INP3")
            add1 = Addition(None, None, "ADD1")
            add2 = Addition(None, None, "ADD2")
            mul1 = Multiplication(None, None, "MUL1")
            out1 = Output(None, "OUT1")
    
            add1.input(0).connect(inp1, "S1")
            add1.input(1).connect(inp2, "S2")
            add2.input(0).connect(add1, "S4")
            add2.input(1).connect(inp3, "S3")
            mul1.input(0).connect(add1, "S5")
            mul1.input(1).connect(add2, "S6")
            out1.input(0).connect(mul1, "S7")
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            mac_sfg = SFG(
                inputs=[inp1, inp2],
                outputs=[out1],
                id_number_offset=100,
                name="mac_sfg",
            )
    
            mac_sfg_new = mac_sfg(name="mac_sfg2")
    
            assert mac_sfg.name == "mac_sfg"
            assert mac_sfg_new.name == "mac_sfg2"
            assert mac_sfg.id_number_offset == 100
            assert mac_sfg_new.id_number_offset == 100
    
            for g_id, component in mac_sfg._components_by_id.items():
                component_copy = mac_sfg_new.find_by_id(g_id)
                assert component.name == component_copy.name
    
        def test_deep_copy_with_new_sources(self):
            inp1 = Input("INP1")
            inp2 = Input("INP2")
            inp3 = Input("INP3")
            add1 = Addition(inp1, inp2, "ADD1")
            mul1 = Multiplication(add1, inp3, "MUL1")
            out1 = Output(mul1, "OUT1")
    
            mac_sfg = SFG(inputs=[inp1, inp2], outputs=[out1], name="mac_sfg")
    
            a = Addition(Constant(3), Constant(5))
            b = Constant(2)
            mac_sfg_new = mac_sfg(a, b)
            assert mac_sfg_new.input(0).signals[0].source.operation is a
            assert mac_sfg_new.input(1).signals[0].source.operation is b
    
    
    class TestEvaluateOutput:
        def test_evaluate_output(self, operation_tree):
            sfg = SFG(outputs=[Output(operation_tree)])
            assert sfg.evaluate_output(0, []) == 5
    
        def test_evaluate_output_large(self, large_operation_tree):
            sfg = SFG(outputs=[Output(large_operation_tree)])
            assert sfg.evaluate_output(0, []) == 14
    
        def test_evaluate_output_cycle(self, operation_graph_with_cycle):
            sfg = SFG(outputs=[Output(operation_graph_with_cycle)])
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            with pytest.raises(RuntimeError, match="Direct feedback loop detected"):
    
                sfg.evaluate_output(0, [])
    
    
    class TestComponents:
        def test_advanced_components(self):
            inp1 = Input("INP1")
            inp2 = Input("INP2")
            inp3 = Input("INP3")
            add1 = Addition(None, None, "ADD1")
            add2 = Addition(None, None, "ADD2")
            mul1 = Multiplication(None, None, "MUL1")
            out1 = Output(None, "OUT1")
    
            add1.input(0).connect(inp1, "S1")
            add1.input(1).connect(inp2, "S2")
            add2.input(0).connect(add1, "S4")
            add2.input(1).connect(inp3, "S3")
            mul1.input(0).connect(add1, "S5")
            mul1.input(1).connect(add2, "S6")
            out1.input(0).connect(mul1, "S7")
    
            mac_sfg = SFG(inputs=[inp1, inp2], outputs=[out1], name="mac_sfg")
    
    
            assert {comp.name for comp in mac_sfg.components} == {
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                "INP1",
                "INP2",
                "INP3",
                "ADD1",
                "ADD2",
                "MUL1",
                "OUT1",
                "S1",
                "S2",
                "S3",
                "S4",
                "S5",
                "S6",
                "S7",
            }
    
    class TestReplaceOperation:
    
        def test_replace_addition_by_id(self, operation_tree):
            sfg = SFG(outputs=[Output(operation_tree)])
    
            component_id = "add0"
    
            sfg = sfg.replace_operation(Multiplication(name="Multi"), graph_id=component_id)
    
            assert component_id not in sfg._components_by_id.keys()
            assert "Multi" in sfg._components_by_name.keys()
    
        def test_replace_addition_large_tree(self, large_operation_tree):
            sfg = SFG(outputs=[Output(large_operation_tree)])
    
            component_id = "add2"
    
            sfg = sfg.replace_operation(Multiplication(name="Multi"), graph_id=component_id)
    
            assert "Multi" in sfg._components_by_name.keys()
            assert component_id not in sfg._components_by_id.keys()
    
        def test_replace_no_input_component(self, operation_tree):
            sfg = SFG(outputs=[Output(operation_tree)])
    
            component_id = "c0"
    
            sfg = sfg.replace_operation(Constant(1), graph_id=component_id)
    
            assert const_ is not sfg.find_by_id(component_id)
    
        def test_no_match_on_replace(self, large_operation_tree):
            sfg = SFG(outputs=[Output(large_operation_tree)])
    
            component_id = "addd0"
    
            with pytest.raises(
                ValueError, match="No operation matching the criteria found"
            ):
    
                sfg = sfg.replace_operation(
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                    Multiplication(name="Multi"), graph_id=component_id
                )
    
    
        def test_not_equal_input(self, large_operation_tree):
            sfg = SFG(outputs=[Output(large_operation_tree)])
    
            component_id = "c0"
    
            with pytest.raises(
                TypeError,
                match="The input count may not differ between the operations",
            ):
    
                sfg = sfg.replace_operation(
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                    Multiplication(name="Multi"), graph_id=component_id
                )
    
    
    
    class TestInsertComponent:
        def test_insert_component_in_sfg(self, large_operation_tree_names):
            sfg = SFG(outputs=[Output(large_operation_tree_names)])
            sqrt = SquareRoot()
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            _sfg = sfg.insert_operation(sqrt, sfg.find_by_name("constant4")[0].graph_id)
    
            assert _sfg.evaluate() != sfg.evaluate()
    
            assert any([isinstance(comp, SquareRoot) for comp in _sfg.operations])
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            assert not any([isinstance(comp, SquareRoot) for comp in sfg.operations])
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
    
            assert not isinstance(
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                sfg.find_by_name("constant4")[0].output(0).signals[0].destination.operation,
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                SquareRoot,
            )
            assert isinstance(
                _sfg.find_by_name("constant4")[0]
                .output(0)
                .signals[0]
                .destination.operation,
                SquareRoot,
            )
    
            assert sfg.find_by_name("constant4")[0].output(0).signals[
                0
    
            ].destination.operation is sfg.find_by_id("add2")
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            assert _sfg.find_by_name("constant4")[0].output(0).signals[
                0
    
            ].destination.operation is not _sfg.find_by_id("add2")
            assert _sfg.find_by_id("sqrt0").output(0).signals[
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                0
    
            ].destination.operation is _sfg.find_by_id("add2")
    
    
        def test_insert_invalid_component_in_sfg(self, large_operation_tree):
            sfg = SFG(outputs=[Output(large_operation_tree)])
    
            # Should raise an exception for not matching input count to output count.
            add4 = Addition()
    
            with pytest.raises(TypeError, match="Source operation output count"):
    
                sfg.insert_operation(add4, "c0")
    
    
        def test_insert_at_output(self, large_operation_tree):
            sfg = SFG(outputs=[Output(large_operation_tree)])
    
            # Should raise an exception for trying to insert an operation after an output.
            sqrt = SquareRoot()
    
            with pytest.raises(TypeError, match="Source operation cannot be an"):
    
                _ = sfg.insert_operation(sqrt, "out0")
    
    
        def test_insert_multiple_output_ports(self, butterfly_operation_tree):
            sfg = SFG(outputs=list(map(Output, butterfly_operation_tree.outputs)))
    
            _sfg = sfg.insert_operation(Butterfly(name="n_bfly"), "bfly2")
    
    
            assert sfg.evaluate() != _sfg.evaluate()
    
            assert len(sfg.find_by_name("n_bfly")) == 0
            assert len(_sfg.find_by_name("n_bfly")) == 1
    
            # Correctly connected old output -> new input
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            assert (
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                _sfg.find_by_name("bfly3")[0].output(0).signals[0].destination.operation
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                is _sfg.find_by_name("n_bfly")[0]
            )
            assert (
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                _sfg.find_by_name("bfly3")[0].output(1).signals[0].destination.operation
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                is _sfg.find_by_name("n_bfly")[0]
            )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            assert (
                _sfg.find_by_name("n_bfly")[0].input(0).signals[0].source.operation
                is _sfg.find_by_name("bfly3")[0]
            )
            assert (
                _sfg.find_by_name("n_bfly")[0].input(1).signals[0].source.operation
                is _sfg.find_by_name("bfly3")[0]
            )
    
    
            # Correctly connected new output -> next input
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            assert (
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                _sfg.find_by_name("n_bfly")[0].output(0).signals[0].destination.operation
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                is _sfg.find_by_name("bfly2")[0]
            )
            assert (
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                _sfg.find_by_name("n_bfly")[0].output(1).signals[0].destination.operation
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                is _sfg.find_by_name("bfly2")[0]
            )
    
    
            # Correctly connected next input -> new output
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            assert (
                _sfg.find_by_name("bfly2")[0].input(0).signals[0].source.operation
                is _sfg.find_by_name("n_bfly")[0]
            )
            assert (
                _sfg.find_by_name("bfly2")[0].input(1).signals[0].source.operation
                is _sfg.find_by_name("n_bfly")[0]
            )
    
    
    
    class TestFindComponentsWithTypeName:
        def test_mac_components(self):
            inp1 = Input("INP1")
            inp2 = Input("INP2")
            inp3 = Input("INP3")
            add1 = Addition(None, None, "ADD1")
            add2 = Addition(None, None, "ADD2")
            mul1 = Multiplication(None, None, "MUL1")
            out1 = Output(None, "OUT1")
    
            add1.input(0).connect(inp1, "S1")
            add1.input(1).connect(inp2, "S2")
            add2.input(0).connect(add1, "S4")
            add2.input(1).connect(inp3, "S3")
            mul1.input(0).connect(add1, "S5")
            mul1.input(1).connect(add2, "S6")
            out1.input(0).connect(mul1, "S7")
    
            mac_sfg = SFG(inputs=[inp1, inp2], outputs=[out1], name="mac_sfg")
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            assert {comp.name for comp in mac_sfg.find_by_type_name(inp1.type_name())} == {
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                "INP1",
                "INP2",
                "INP3",
            }
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            assert {comp.name for comp in mac_sfg.find_by_type_name(add1.type_name())} == {
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                "ADD1",
                "ADD2",
            }
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            assert {comp.name for comp in mac_sfg.find_by_type_name(mul1.type_name())} == {
                "MUL1"
            }
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            assert {comp.name for comp in mac_sfg.find_by_type_name(out1.type_name())} == {
                "OUT1"
            }
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            assert {
                comp.name for comp in mac_sfg.find_by_type_name(Signal.type_name())
            } == {"S1", "S2", "S3", "S4", "S5", "S6", "S7"}
    
    
    
    class TestGetPrecedenceList:
        def test_inputs_delays(self, precedence_sfg_delays):
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            # No cached precedence list
            assert precedence_sfg_delays._precedence_list is None
    
    
            precedence_list = precedence_sfg_delays.get_precedence_list()
    
            assert len(precedence_list) == 7
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            # Cached precedence list
            assert len(precedence_sfg_delays._precedence_list) == 7
    
    
            assert {
                port.operation.key(port.index, port.operation.name)
                for port in precedence_list[0]
            } == {"IN1", "T1", "T2"}
    
            assert {
                port.operation.key(port.index, port.operation.name)
                for port in precedence_list[1]
            } == {"C0", "B1", "B2", "A1", "A2"}
    
            assert {
                port.operation.key(port.index, port.operation.name)
                for port in precedence_list[2]
            } == {"ADD2", "ADD3"}
    
            assert {
                port.operation.key(port.index, port.operation.name)
                for port in precedence_list[3]
            } == {"ADD1"}
    
            assert {
                port.operation.key(port.index, port.operation.name)
                for port in precedence_list[4]
            } == {"Q1"}
    
            assert {
                port.operation.key(port.index, port.operation.name)
                for port in precedence_list[5]
            } == {"A0"}
    
            assert {
                port.operation.key(port.index, port.operation.name)
                for port in precedence_list[6]
            } == {"ADD4"}
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            # Trigger cache
            precedence_list = precedence_sfg_delays.get_precedence_list()
    
            assert len(precedence_list) == 7
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        def test_inputs_constants_delays_multiple_outputs(
            self, precedence_sfg_delays_and_constants
        ):
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            precedence_list = precedence_sfg_delays_and_constants.get_precedence_list()
    
            assert {
                port.operation.key(port.index, port.operation.name)
                for port in precedence_list[0]
            } == {"IN1", "T1", "CONST1"}
    
            assert {
                port.operation.key(port.index, port.operation.name)
                for port in precedence_list[1]
            } == {"C0", "B1", "B2", "A1", "A2"}
    
            assert {
                port.operation.key(port.index, port.operation.name)
                for port in precedence_list[2]
            } == {"ADD2", "ADD3"}
    
            assert {
                port.operation.key(port.index, port.operation.name)
                for port in precedence_list[3]
            } == {"ADD1"}
    
            assert {
                port.operation.key(port.index, port.operation.name)
                for port in precedence_list[4]
            } == {"Q1"}
    
            assert {
                port.operation.key(port.index, port.operation.name)
                for port in precedence_list[5]
            } == {"A0"}
    
            assert {
                port.operation.key(port.index, port.operation.name)
                for port in precedence_list[6]
            } == {"BFLY1.0", "BFLY1.1"}
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
    
        def test_precedence_multiple_outputs_same_precedence(
            self, sfg_two_inputs_two_outputs
        ):
    
            sfg_two_inputs_two_outputs.name = "NESTED_SFG"
    
            in1 = Input("IN1")
            sfg_two_inputs_two_outputs.input(0).connect(in1, "S1")
            in2 = Input("IN2")
            cmul1 = ConstantMultiplication(10, None, "CMUL1")
            cmul1.input(0).connect(in2, "S2")
            sfg_two_inputs_two_outputs.input(1).connect(cmul1, "S3")
    
            out1 = Output(sfg_two_inputs_two_outputs.output(0), "OUT1")
            out2 = Output(sfg_two_inputs_two_outputs.output(1), "OUT2")
    
            sfg = SFG(inputs=[in1, in2], outputs=[out1, out2])
    
            precedence_list = sfg.get_precedence_list()
    
            assert len(precedence_list) == 3
    
    
            assert {
                port.operation.key(port.index, port.operation.name)
                for port in precedence_list[0]
            } == {"IN1", "IN2"}
    
            assert {
                port.operation.key(port.index, port.operation.name)
                for port in precedence_list[1]
            } == {"CMUL1"}
    
            assert {
                port.operation.key(port.index, port.operation.name)
                for port in precedence_list[2]
            } == {"NESTED_SFG.0", "NESTED_SFG.1"}
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
    
        def test_precedence_sfg_multiple_outputs_different_precedences(
            self, sfg_two_inputs_two_outputs_independent
        ):
    
            sfg_two_inputs_two_outputs_independent.name = "NESTED_SFG"
    
            in1 = Input("IN1")
            in2 = Input("IN2")
            sfg_two_inputs_two_outputs_independent.input(0).connect(in1, "S1")
            cmul1 = ConstantMultiplication(10, None, "CMUL1")
            cmul1.input(0).connect(in2, "S2")
            sfg_two_inputs_two_outputs_independent.input(1).connect(cmul1, "S3")
            out1 = Output(sfg_two_inputs_two_outputs_independent.output(0), "OUT1")
            out2 = Output(sfg_two_inputs_two_outputs_independent.output(1), "OUT2")
    
            sfg = SFG(inputs=[in1, in2], outputs=[out1, out2])
    
            precedence_list = sfg.get_precedence_list()
    
            assert len(precedence_list) == 3
    
    
            assert {
                port.operation.key(port.index, port.operation.name)
                for port in precedence_list[0]
            } == {"IN1", "IN2"}
    
            assert {
                port.operation.key(port.index, port.operation.name)
                for port in precedence_list[1]
            } == {"CMUL1"}
    
            assert {
                port.operation.key(port.index, port.operation.name)
                for port in precedence_list[2]
            } == {"NESTED_SFG.0", "NESTED_SFG.1"}
    
    
    
    class TestPrintPrecedence:
        def test_delays(self, precedence_sfg_delays):
            sfg = precedence_sfg_delays
    
            captured_output = io.StringIO()
            sys.stdout = captured_output
    
            sfg.print_precedence_graph()
    
            sys.stdout = sys.__stdout__
    
            captured_output = captured_output.getvalue()
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            assert (
                captured_output
                == "-" * 120
                + "\n"
                + "1.1 \t"
                + str(sfg.find_by_name("IN1")[0])
                + "\n"
                + "1.2 \t"
                + str(sfg.find_by_name("T1")[0])
                + "\n"
                + "1.3 \t"
                + str(sfg.find_by_name("T2")[0])
                + "\n"
                + "-" * 120
                + "\n"
                + "2.1 \t"
                + str(sfg.find_by_name("C0")[0])
                + "\n"
                + "2.2 \t"
                + str(sfg.find_by_name("A1")[0])
                + "\n"
                + "2.3 \t"
                + str(sfg.find_by_name("B1")[0])
                + "\n"
                + "2.4 \t"
                + str(sfg.find_by_name("A2")[0])
                + "\n"
                + "2.5 \t"
                + str(sfg.find_by_name("B2")[0])
                + "\n"
                + "-" * 120
                + "\n"
                + "3.1 \t"
                + str(sfg.find_by_name("ADD3")[0])
                + "\n"
                + "3.2 \t"
                + str(sfg.find_by_name("ADD2")[0])
                + "\n"
                + "-" * 120
                + "\n"
                + "4.1 \t"
                + str(sfg.find_by_name("ADD1")[0])
                + "\n"
                + "-" * 120
                + "\n"
                + "5.1 \t"
                + str(sfg.find_by_name("Q1")[0])
                + "\n"
                + "-" * 120
                + "\n"
                + "6.1 \t"
                + str(sfg.find_by_name("A0")[0])
                + "\n"
                + "-" * 120
                + "\n"
                + "7.1 \t"
                + str(sfg.find_by_name("ADD4")[0])
                + "\n"
                + "-" * 120
                + "\n"
            )
    
    
    
    class TestDepends:
        def test_depends_sfg(self, sfg_two_inputs_two_outputs):
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            assert set(sfg_two_inputs_two_outputs.inputs_required_for_output(0)) == {0, 1}
            assert set(sfg_two_inputs_two_outputs.inputs_required_for_output(1)) == {0, 1}
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        def test_depends_sfg_independent(self, sfg_two_inputs_two_outputs_independent):
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                sfg_two_inputs_two_outputs_independent.inputs_required_for_output(0)
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            ) == {0}
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                sfg_two_inputs_two_outputs_independent.inputs_required_for_output(1)
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            ) == {1}
    
    
    
    class TestConnectExternalSignalsToComponentsSoloComp:
        def test_connect_external_signals_to_components_mac(self):
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            """Replace a MAC with inner components in an SFG"""
    
            inp1 = Input("INP1")
            inp2 = Input("INP2")
            inp3 = Input("INP3")
            add1 = Addition(None, None, "ADD1")
            add2 = Addition(None, None, "ADD2")
            mul1 = Multiplication(None, None, "MUL1")
            out1 = Output(None, "OUT1")
    
            add1.input(0).connect(inp1, "S1")
            add1.input(1).connect(inp2, "S2")
            add2.input(0).connect(add1, "S3")
            add2.input(1).connect(inp3, "S4")
            mul1.input(0).connect(add1, "S5")
            mul1.input(1).connect(add2, "S6")
            out1.input(0).connect(mul1, "S7")
    
            mac_sfg = SFG(inputs=[inp1, inp2], outputs=[out1])
    
            inp4 = Input("INP4")
            inp5 = Input("INP5")
            out2 = Output(None, "OUT2")
    
            mac_sfg.input(0).connect(inp4, "S8")
            mac_sfg.input(1).connect(inp5, "S9")
            out2.input(0).connect(mac_sfg.outputs[0], "S10")
    
            test_sfg = SFG(inputs=[inp4, inp5], outputs=[out2])
            assert test_sfg.evaluate(1, 2) == 9
            mac_sfg.connect_external_signals_to_components()
            assert test_sfg.evaluate(1, 2) == 9
            assert not test_sfg.connect_external_signals_to_components()
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        def test_connect_external_signals_to_components_operation_tree(
            self, operation_tree
        ):
    
            """
            Replaces an SFG with only a operation_tree component with its inner components
    
            sfg1 = SFG(outputs=[Output(operation_tree)])
            out1 = Output(None, "OUT1")
            out1.input(0).connect(sfg1.outputs[0], "S1")
            test_sfg = SFG(outputs=[out1])
            assert test_sfg.evaluate_output(0, []) == 5
            sfg1.connect_external_signals_to_components()
            assert test_sfg.evaluate_output(0, []) == 5
            assert not test_sfg.connect_external_signals_to_components()
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        def test_connect_external_signals_to_components_large_operation_tree(
            self, large_operation_tree
        ):
    
            """
            Replaces an SFG with only a large_operation_tree component with its inner
            components
    
            sfg1 = SFG(outputs=[Output(large_operation_tree)])
            out1 = Output(None, "OUT1")
            out1.input(0).connect(sfg1.outputs[0], "S1")
            test_sfg = SFG(outputs=[out1])
            assert test_sfg.evaluate_output(0, []) == 14
            sfg1.connect_external_signals_to_components()
            assert test_sfg.evaluate_output(0, []) == 14
            assert not test_sfg.connect_external_signals_to_components()
    
    
        def test_connect_external_signals_to_components_multiple_operations_after_input(
    
        ):
            """
            Replaces an SFG with a symmetric two-port adaptor to test when the input
            port goes to multiple operations
            """
            sfg1 = wdf_allpass(0.5)
            sfg2 = sfg1.replace_operation(sfg1.find_by_id('sym2p0').to_sfg(), 'sym2p0')
            sfg2.find_by_id('sfg0').connect_external_signals_to_components()
            test_sfg = SFG(sfg2.input_operations, sfg2.output_operations)
            assert sfg1.evaluate(1) == -0.5
            assert test_sfg.evaluate(1) == -0.5
            assert not test_sfg.connect_external_signals_to_components()
    
    class TestConnectExternalSignalsToComponentsMultipleComp:
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        def test_connect_external_signals_to_components_operation_tree(
            self, operation_tree
        ):
            """Replaces a operation_tree in an SFG with other components"""
    
            sfg1 = SFG(outputs=[Output(operation_tree)])
    
            inp1 = Input("INP1")
            inp2 = Input("INP2")
            out1 = Output(None, "OUT1")
    
            add1 = Addition(None, None, "ADD1")
            add2 = Addition(None, None, "ADD2")
    
            add1.input(0).connect(inp1, "S1")
            add1.input(1).connect(inp2, "S2")
            add2.input(0).connect(add1, "S3")
            add2.input(1).connect(sfg1.outputs[0], "S4")
            out1.input(0).connect(add2, "S5")
    
            test_sfg = SFG(inputs=[inp1, inp2], outputs=[out1])
            assert test_sfg.evaluate(1, 2) == 8
            sfg1.connect_external_signals_to_components()
            assert test_sfg.evaluate(1, 2) == 8
            assert not test_sfg.connect_external_signals_to_components()
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        def test_connect_external_signals_to_components_large_operation_tree(
            self, large_operation_tree
        ):
            """Replaces a large_operation_tree in an SFG with other components"""
    
            sfg1 = SFG(outputs=[Output(large_operation_tree)])
    
            inp1 = Input("INP1")
            inp2 = Input("INP2")
            out1 = Output(None, "OUT1")
            add1 = Addition(None, None, "ADD1")
            add2 = Addition(None, None, "ADD2")
    
            add1.input(0).connect(inp1, "S1")
            add1.input(1).connect(inp2, "S2")
            add2.input(0).connect(add1, "S3")
            add2.input(1).connect(sfg1.outputs[0], "S4")
            out1.input(0).connect(add2, "S5")
    
            test_sfg = SFG(inputs=[inp1, inp2], outputs=[out1])
            assert test_sfg.evaluate(1, 2) == 17
            sfg1.connect_external_signals_to_components()
            assert test_sfg.evaluate(1, 2) == 17
            assert not test_sfg.connect_external_signals_to_components()
    
        def create_sfg(self, op_tree):
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            """Create a simple SFG with either operation_tree or large_operation_tree"""
    
            sfg1 = SFG(outputs=[Output(op_tree)])
    
            inp1 = Input("INP1")
            inp2 = Input("INP2")
            out1 = Output(None, "OUT1")
            add1 = Addition(None, None, "ADD1")
            add2 = Addition(None, None, "ADD2")
    
            add1.input(0).connect(inp1, "S1")
            add1.input(1).connect(inp2, "S2")
            add2.input(0).connect(add1, "S3")
            add2.input(1).connect(sfg1.outputs[0], "S4")
            out1.input(0).connect(add2, "S5")
    
            return SFG(inputs=[inp1, inp2], outputs=[out1])
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        def test_connect_external_signals_to_components_many_op(self, large_operation_tree):
    
            """Replace an sfg component in a larger SFG with several component operations"""
    
            inp1 = Input("INP1")
            inp2 = Input("INP2")
            inp3 = Input("INP3")
            inp4 = Input("INP4")
            out1 = Output(None, "OUT1")
            add1 = Addition(None, None, "ADD1")
            sub1 = Subtraction(None, None, "SUB1")
    
            add1.input(0).connect(inp1, "S1")
            add1.input(1).connect(inp2, "S2")
    
            sfg1 = self.create_sfg(large_operation_tree)
    
            sfg1.input(0).connect(add1, "S3")
            sfg1.input(1).connect(inp3, "S4")
            sub1.input(0).connect(sfg1.outputs[0], "S5")
            sub1.input(1).connect(inp4, "S6")
            out1.input(0).connect(sub1, "S7")
    
            test_sfg = SFG(inputs=[inp1, inp2, inp3, inp4], outputs=[out1])
    
            assert test_sfg.evaluate(1, 2, 3, 4) == 16
            sfg1.connect_external_signals_to_components()
            assert test_sfg.evaluate(1, 2, 3, 4) == 16
            assert not test_sfg.connect_external_signals_to_components()
    
    
        def test_add_two_sfgs(self):
            c1 = ConstantMultiplication(0.5)
            c1_sfg = c1.to_sfg()
    
            c2 = ConstantMultiplication(0.5)
            c2_sfg = c2.to_sfg()
    
            in1 = Input()
            in2 = Input()
    
            output = Output(c1_sfg + c2_sfg)
    
    
            sfg = SFG([in1, in2], [output])
            assert not sfg.find_by_type_name(ConstantMultiplication.type_name())
    
            c1_sfg.connect_external_signals_to_components()
            sfg = SFG([in1, in2], [output])
            assert len(sfg.find_by_type_name(ConstantMultiplication.type_name())) == 1
    
            c2_sfg.connect_external_signals_to_components()
            sfg = SFG([in1, in2], [output])
            assert len(sfg.find_by_type_name(ConstantMultiplication.type_name())) == 2
    
    
    
    class TestTopologicalOrderOperations:
        def test_feedback_sfg(self, sfg_simple_filter):
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            topological_order = sfg_simple_filter.get_operations_topological_order()
    
    
            assert [comp.name for comp in topological_order] == [
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                "IN",
                "ADD",
                "T",
                "CMUL",
                "OUT",
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        def test_multiple_independent_inputs(self, sfg_two_inputs_two_outputs_independent):
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            topological_order = (
                sfg_two_inputs_two_outputs_independent.get_operations_topological_order()
            )
    
    
            assert [comp.name for comp in topological_order] == [
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                "IN1",
                "OUT1",
                "IN2",
                "C1",
                "ADD1",
                "OUT2",
            ]
    
    
        def test_complex_graph(self, precedence_sfg_delays):
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            topological_order = precedence_sfg_delays.get_operations_topological_order()
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            assert [comp.name for comp in topological_order] == [
                "IN1",
                "C0",
                "ADD1",
                "Q1",
                "A0",
                "T1",
                "B1",
                "A1",
                "T2",
                "B2",
                "ADD2",
                "A2",
                "ADD3",
                "ADD4",