Newer
Older
Angus Lothian
committed
import random
Angus Lothian
committed
import string
import sys
Angus Lothian
committed
Angus Lothian
committed
from b_asic import Input, Output, Signal
from b_asic.core_operations import (
Addition,
Butterfly,
Constant,
ConstantMultiplication,
Multiplication,
SquareRoot,
Subtraction,
SymmetricTwoportAdaptor,
from b_asic.save_load_structure import python_to_sfg, sfg_to_python
from b_asic.sfg_generators import wdf_allpass
from b_asic.signal_flow_graph import SFG, GraphID
from b_asic.special_operations import Delay
Angus Lothian
committed
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
class TestInit:
def test_direct_input_to_output_sfg_construction(self):
in1 = Input("IN1")
out1 = Output(None, "OUT1")
out1.input(0).connect(in1, "S1")
sfg = SFG(inputs=[in1], outputs=[out1]) # in1 ---s1---> out1
assert len(list(sfg.components)) == 3
assert len(list(sfg.operations)) == 2
assert sfg.input_count == 1
assert sfg.output_count == 1
def test_same_signal_input_and_output_sfg_construction(self):
add1 = Addition(None, None, "ADD1")
add2 = Addition(None, None, "ADD2")
s1 = add2.input(0).connect(add1, "S1")
# in1 ---s1---> out1
sfg = SFG(input_signals=[s1], output_signals=[s1])
assert len(list(sfg.components)) == 3
assert len(list(sfg.operations)) == 2
assert sfg.input_count == 1
assert sfg.output_count == 1
def test_outputs_construction(self, operation_tree):
sfg = SFG(outputs=[Output(operation_tree)])
assert len(list(sfg.components)) == 7
assert len(list(sfg.operations)) == 4
assert sfg.input_count == 0
assert sfg.output_count == 1
def test_signals_construction(self, operation_tree):
sfg = SFG(output_signals=[Signal(source=operation_tree.output(0))])
assert len(list(sfg.components)) == 7
assert len(list(sfg.operations)) == 4
assert sfg.input_count == 0
assert sfg.output_count == 1
class TestPrintSfg:
def test_one_addition(self):
inp1 = Input("INP1")
inp2 = Input("INP2")
add1 = Addition(inp1, inp2, "ADD1")
out1 = Output(add1, "OUT1")
sfg = SFG(inputs=[inp1, inp2], outputs=[out1], name="SFG1")
== "id: no_id, \tname: SFG1, \tinputs: {0: '-'}, \toutputs: {0: '-'}\n"
+ "--------------------------------------------------------------------"
+ "--------------------------------\n"
+ str(sfg.find_by_name("INP1")[0])
+ "\n"
+ str(sfg.find_by_name("INP2")[0])
+ "\n"
+ str(sfg.find_by_name("ADD1")[0])
+ "\n"
+ str(sfg.find_by_name("OUT1")[0])
+ "\n"
+ "--------------------------------------------------------------------"
+ "--------------------------------\n"
Angus Lothian
committed
def test_add_mul(self):
inp1 = Input("INP1")
inp2 = Input("INP2")
inp3 = Input("INP3")
add1 = Addition(inp1, inp2, "ADD1")
mul1 = Multiplication(add1, inp3, "MUL1")
out1 = Output(mul1, "OUT1")
sfg = SFG(inputs=[inp1, inp2, inp3], outputs=[out1], name="mac_sfg")
== "id: no_id, \tname: mac_sfg, \tinputs: {0: '-'}, \toutputs: {0: '-'}\n"
+ "--------------------------------------------------------------------"
+ "--------------------------------\n"
+ str(sfg.find_by_name("INP1")[0])
+ "\n"
+ str(sfg.find_by_name("INP2")[0])
+ "\n"
+ str(sfg.find_by_name("ADD1")[0])
+ "\n"
+ str(sfg.find_by_name("INP3")[0])
+ "\n"
+ str(sfg.find_by_name("MUL1")[0])
+ "\n"
+ str(sfg.find_by_name("OUT1")[0])
+ "\n"
+ "--------------------------------------------------------------------"
+ "--------------------------------\n"
Angus Lothian
committed
def test_constant(self):
inp1 = Input("INP1")
const1 = Constant(3, "CONST")
add1 = Addition(const1, inp1, "ADD1")
out1 = Output(add1, "OUT1")
sfg = SFG(inputs=[inp1], outputs=[out1], name="sfg")
== "id: no_id, \tname: sfg, \tinputs: {0: '-'}, \toutputs: {0: '-'}\n"
+ "--------------------------------------------------------------------"
+ "--------------------------------\n"
+ str(sfg.find_by_name("CONST")[0])
+ "\n"
+ str(sfg.find_by_name("INP1")[0])
+ "\n"
+ str(sfg.find_by_name("ADD1")[0])
+ "\n"
+ str(sfg.find_by_name("OUT1")[0])
+ "\n"
+ "--------------------------------------------------------------------"
+ "--------------------------------\n"
Angus Lothian
committed
def test_simple_filter(self, sfg_simple_filter):
== "id: no_id, \tname: simple_filter, \tinputs: {0: '-'},"
" \toutputs: {0: '-'}\n"
+ "--------------------------------------------------------------------"
+ "--------------------------------\n"
+ "--------------------------------------------------------------------"
+ "--------------------------------\n"
Angus Lothian
committed
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
class TestDeepCopy:
def test_deep_copy_no_duplicates(self):
inp1 = Input("INP1")
inp2 = Input("INP2")
inp3 = Input("INP3")
add1 = Addition(inp1, inp2, "ADD1")
mul1 = Multiplication(add1, inp3, "MUL1")
out1 = Output(mul1, "OUT1")
mac_sfg = SFG(inputs=[inp1, inp2], outputs=[out1], name="mac_sfg")
mac_sfg_new = mac_sfg()
assert mac_sfg.name == "mac_sfg"
assert mac_sfg_new.name == ""
for g_id, component in mac_sfg._components_by_id.items():
component_copy = mac_sfg_new.find_by_id(g_id)
assert component.name == component_copy.name
def test_deep_copy(self):
inp1 = Input("INP1")
inp2 = Input("INP2")
inp3 = Input("INP3")
add1 = Addition(None, None, "ADD1")
add2 = Addition(None, None, "ADD2")
mul1 = Multiplication(None, None, "MUL1")
out1 = Output(None, "OUT1")
add1.input(0).connect(inp1, "S1")
add1.input(1).connect(inp2, "S2")
add2.input(0).connect(add1, "S4")
add2.input(1).connect(inp3, "S3")
mul1.input(0).connect(add1, "S5")
mul1.input(1).connect(add2, "S6")
out1.input(0).connect(mul1, "S7")
mac_sfg = SFG(
inputs=[inp1, inp2],
outputs=[out1],
id_number_offset=100,
name="mac_sfg",
)
Angus Lothian
committed
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
mac_sfg_new = mac_sfg(name="mac_sfg2")
assert mac_sfg.name == "mac_sfg"
assert mac_sfg_new.name == "mac_sfg2"
assert mac_sfg.id_number_offset == 100
assert mac_sfg_new.id_number_offset == 100
for g_id, component in mac_sfg._components_by_id.items():
component_copy = mac_sfg_new.find_by_id(g_id)
assert component.name == component_copy.name
def test_deep_copy_with_new_sources(self):
inp1 = Input("INP1")
inp2 = Input("INP2")
inp3 = Input("INP3")
add1 = Addition(inp1, inp2, "ADD1")
mul1 = Multiplication(add1, inp3, "MUL1")
out1 = Output(mul1, "OUT1")
mac_sfg = SFG(inputs=[inp1, inp2], outputs=[out1], name="mac_sfg")
a = Addition(Constant(3), Constant(5))
b = Constant(2)
mac_sfg_new = mac_sfg(a, b)
assert mac_sfg_new.input(0).signals[0].source.operation is a
assert mac_sfg_new.input(1).signals[0].source.operation is b
class TestEvaluateOutput:
def test_evaluate_output(self, operation_tree):
sfg = SFG(outputs=[Output(operation_tree)])
assert sfg.evaluate_output(0, []) == 5
def test_evaluate_output_large(self, large_operation_tree):
sfg = SFG(outputs=[Output(large_operation_tree)])
assert sfg.evaluate_output(0, []) == 14
def test_evaluate_output_cycle(self, operation_graph_with_cycle):
sfg = SFG(outputs=[Output(operation_graph_with_cycle)])
with pytest.raises(RuntimeError, match="Direct feedback loop detected"):
Angus Lothian
committed
sfg.evaluate_output(0, [])
class TestComponents:
def test_advanced_components(self):
inp1 = Input("INP1")
inp2 = Input("INP2")
inp3 = Input("INP3")
add1 = Addition(None, None, "ADD1")
add2 = Addition(None, None, "ADD2")
mul1 = Multiplication(None, None, "MUL1")
out1 = Output(None, "OUT1")
add1.input(0).connect(inp1, "S1")
add1.input(1).connect(inp2, "S2")
add2.input(0).connect(add1, "S4")
add2.input(1).connect(inp3, "S3")
mul1.input(0).connect(add1, "S5")
mul1.input(1).connect(add2, "S6")
out1.input(0).connect(mul1, "S7")
mac_sfg = SFG(inputs=[inp1, inp2], outputs=[out1], name="mac_sfg")
assert {comp.name for comp in mac_sfg.components} == {
"INP1",
"INP2",
"INP3",
"ADD1",
"ADD2",
"MUL1",
"OUT1",
"S1",
"S2",
"S3",
"S4",
"S5",
"S6",
"S7",
}
Angus Lothian
committed
class TestReplaceOperation:
Angus Lothian
committed
def test_replace_addition_by_id(self, operation_tree):
sfg = SFG(outputs=[Output(operation_tree)])
Angus Lothian
committed
sfg = sfg.replace_operation(Multiplication(name="Multi"), graph_id=component_id)
Angus Lothian
committed
assert component_id not in sfg._components_by_id.keys()
assert "Multi" in sfg._components_by_name.keys()
def test_replace_addition_large_tree(self, large_operation_tree):
sfg = SFG(outputs=[Output(large_operation_tree)])
Angus Lothian
committed
sfg = sfg.replace_operation(Multiplication(name="Multi"), graph_id=component_id)
Angus Lothian
committed
assert "Multi" in sfg._components_by_name.keys()
assert component_id not in sfg._components_by_id.keys()
def test_replace_no_input_component(self, operation_tree):
sfg = SFG(outputs=[Output(operation_tree)])
Angus Lothian
committed
const_ = sfg.find_by_id(component_id)
sfg = sfg.replace_operation(Constant(1), graph_id=component_id)
Angus Lothian
committed
assert const_ is not sfg.find_by_id(component_id)
def test_no_match_on_replace(self, large_operation_tree):
sfg = SFG(outputs=[Output(large_operation_tree)])
Angus Lothian
committed
with pytest.raises(
ValueError, match="No operation matching the criteria found"
):
sfg = sfg.replace_operation(
Multiplication(name="Multi"), graph_id=component_id
)
Angus Lothian
committed
def test_not_equal_input(self, large_operation_tree):
sfg = SFG(outputs=[Output(large_operation_tree)])
Angus Lothian
committed
with pytest.raises(
TypeError,
match="The input count may not differ between the operations",
):
sfg = sfg.replace_operation(
Multiplication(name="Multi"), graph_id=component_id
)
Angus Lothian
committed
class TestInsertComponent:
def test_insert_component_in_sfg(self, large_operation_tree_names):
sfg = SFG(outputs=[Output(large_operation_tree_names)])
sqrt = SquareRoot()
_sfg = sfg.insert_operation(sqrt, sfg.find_by_name("constant4")[0].graph_id)
Angus Lothian
committed
assert _sfg.evaluate() != sfg.evaluate()
assert any([isinstance(comp, SquareRoot) for comp in _sfg.operations])
assert not any([isinstance(comp, SquareRoot) for comp in sfg.operations])
sfg.find_by_name("constant4")[0].output(0).signals[0].destination.operation,
SquareRoot,
)
assert isinstance(
_sfg.find_by_name("constant4")[0]
.output(0)
.signals[0]
.destination.operation,
SquareRoot,
)
assert sfg.find_by_name("constant4")[0].output(0).signals[
0
].destination.operation is sfg.find_by_id("add2")
assert _sfg.find_by_name("constant4")[0].output(0).signals[
0
].destination.operation is not _sfg.find_by_id("add2")
assert _sfg.find_by_id("sqrt0").output(0).signals[
].destination.operation is _sfg.find_by_id("add2")
Angus Lothian
committed
def test_insert_invalid_component_in_sfg(self, large_operation_tree):
sfg = SFG(outputs=[Output(large_operation_tree)])
# Should raise an exception for not matching input count to output count.
add4 = Addition()
with pytest.raises(TypeError, match="Source operation output count"):
sfg.insert_operation(add4, "c0")
Angus Lothian
committed
def test_insert_at_output(self, large_operation_tree):
sfg = SFG(outputs=[Output(large_operation_tree)])
# Should raise an exception for trying to insert an operation after an output.
sqrt = SquareRoot()
with pytest.raises(TypeError, match="Source operation cannot be an"):
_ = sfg.insert_operation(sqrt, "out0")
Angus Lothian
committed
def test_insert_multiple_output_ports(self, butterfly_operation_tree):
sfg = SFG(outputs=list(map(Output, butterfly_operation_tree.outputs)))
_sfg = sfg.insert_operation(Butterfly(name="n_bfly"), "bfly2")
Angus Lothian
committed
assert sfg.evaluate() != _sfg.evaluate()
assert len(sfg.find_by_name("n_bfly")) == 0
assert len(_sfg.find_by_name("n_bfly")) == 1
# Correctly connected old output -> new input
_sfg.find_by_name("bfly3")[0].output(0).signals[0].destination.operation
_sfg.find_by_name("bfly3")[0].output(1).signals[0].destination.operation
Angus Lothian
committed
# Correctly connected new input -> old output
assert (
_sfg.find_by_name("n_bfly")[0].input(0).signals[0].source.operation
is _sfg.find_by_name("bfly3")[0]
)
assert (
_sfg.find_by_name("n_bfly")[0].input(1).signals[0].source.operation
is _sfg.find_by_name("bfly3")[0]
)
Angus Lothian
committed
# Correctly connected new output -> next input
_sfg.find_by_name("n_bfly")[0].output(0).signals[0].destination.operation
_sfg.find_by_name("n_bfly")[0].output(1).signals[0].destination.operation
Angus Lothian
committed
# Correctly connected next input -> new output
assert (
_sfg.find_by_name("bfly2")[0].input(0).signals[0].source.operation
is _sfg.find_by_name("n_bfly")[0]
)
assert (
_sfg.find_by_name("bfly2")[0].input(1).signals[0].source.operation
is _sfg.find_by_name("n_bfly")[0]
)
Angus Lothian
committed
class TestFindComponentsWithTypeName:
def test_mac_components(self):
inp1 = Input("INP1")
inp2 = Input("INP2")
inp3 = Input("INP3")
add1 = Addition(None, None, "ADD1")
add2 = Addition(None, None, "ADD2")
mul1 = Multiplication(None, None, "MUL1")
out1 = Output(None, "OUT1")
add1.input(0).connect(inp1, "S1")
add1.input(1).connect(inp2, "S2")
add2.input(0).connect(add1, "S4")
add2.input(1).connect(inp3, "S3")
mul1.input(0).connect(add1, "S5")
mul1.input(1).connect(add2, "S6")
out1.input(0).connect(mul1, "S7")
mac_sfg = SFG(inputs=[inp1, inp2], outputs=[out1], name="mac_sfg")
assert {comp.name for comp in mac_sfg.find_by_type_name(inp1.type_name())} == {
Angus Lothian
committed
assert {comp.name for comp in mac_sfg.find_by_type_name(add1.type_name())} == {
Angus Lothian
committed
assert {comp.name for comp in mac_sfg.find_by_type_name(mul1.type_name())} == {
"MUL1"
}
Angus Lothian
committed
assert {comp.name for comp in mac_sfg.find_by_type_name(out1.type_name())} == {
"OUT1"
}
Angus Lothian
committed
assert {
comp.name for comp in mac_sfg.find_by_type_name(Signal.type_name())
} == {"S1", "S2", "S3", "S4", "S5", "S6", "S7"}
Angus Lothian
committed
class TestGetPrecedenceList:
def test_inputs_delays(self, precedence_sfg_delays):
# No cached precedence list
assert precedence_sfg_delays._precedence_list is None
Angus Lothian
committed
precedence_list = precedence_sfg_delays.get_precedence_list()
assert len(precedence_list) == 7
# Cached precedence list
assert len(precedence_sfg_delays._precedence_list) == 7
assert {
port.operation.key(port.index, port.operation.name)
for port in precedence_list[0]
} == {"IN1", "T1", "T2"}
Angus Lothian
committed
assert {
port.operation.key(port.index, port.operation.name)
for port in precedence_list[1]
} == {"C0", "B1", "B2", "A1", "A2"}
Angus Lothian
committed
assert {
port.operation.key(port.index, port.operation.name)
for port in precedence_list[2]
} == {"ADD2", "ADD3"}
Angus Lothian
committed
assert {
port.operation.key(port.index, port.operation.name)
for port in precedence_list[3]
} == {"ADD1"}
Angus Lothian
committed
assert {
port.operation.key(port.index, port.operation.name)
for port in precedence_list[4]
} == {"Q1"}
Angus Lothian
committed
assert {
port.operation.key(port.index, port.operation.name)
for port in precedence_list[5]
} == {"A0"}
Angus Lothian
committed
assert {
port.operation.key(port.index, port.operation.name)
for port in precedence_list[6]
} == {"ADD4"}
Angus Lothian
committed
# Trigger cache
precedence_list = precedence_sfg_delays.get_precedence_list()
assert len(precedence_list) == 7
def test_inputs_constants_delays_multiple_outputs(
self, precedence_sfg_delays_and_constants
):
precedence_list = precedence_sfg_delays_and_constants.get_precedence_list()
Angus Lothian
committed
assert len(precedence_list) == 7
assert {
port.operation.key(port.index, port.operation.name)
for port in precedence_list[0]
} == {"IN1", "T1", "CONST1"}
Angus Lothian
committed
assert {
port.operation.key(port.index, port.operation.name)
for port in precedence_list[1]
} == {"C0", "B1", "B2", "A1", "A2"}
Angus Lothian
committed
assert {
port.operation.key(port.index, port.operation.name)
for port in precedence_list[2]
} == {"ADD2", "ADD3"}
Angus Lothian
committed
assert {
port.operation.key(port.index, port.operation.name)
for port in precedence_list[3]
} == {"ADD1"}
Angus Lothian
committed
assert {
port.operation.key(port.index, port.operation.name)
for port in precedence_list[4]
} == {"Q1"}
Angus Lothian
committed
assert {
port.operation.key(port.index, port.operation.name)
for port in precedence_list[5]
} == {"A0"}
Angus Lothian
committed
assert {
port.operation.key(port.index, port.operation.name)
for port in precedence_list[6]
} == {"BFLY1.0", "BFLY1.1"}
def test_precedence_multiple_outputs_same_precedence(
self, sfg_two_inputs_two_outputs
):
Angus Lothian
committed
sfg_two_inputs_two_outputs.name = "NESTED_SFG"
in1 = Input("IN1")
sfg_two_inputs_two_outputs.input(0).connect(in1, "S1")
in2 = Input("IN2")
cmul1 = ConstantMultiplication(10, None, "CMUL1")
cmul1.input(0).connect(in2, "S2")
sfg_two_inputs_two_outputs.input(1).connect(cmul1, "S3")
out1 = Output(sfg_two_inputs_two_outputs.output(0), "OUT1")
out2 = Output(sfg_two_inputs_two_outputs.output(1), "OUT2")
sfg = SFG(inputs=[in1, in2], outputs=[out1, out2])
precedence_list = sfg.get_precedence_list()
assert len(precedence_list) == 3
assert {
port.operation.key(port.index, port.operation.name)
for port in precedence_list[0]
} == {"IN1", "IN2"}
Angus Lothian
committed
assert {
port.operation.key(port.index, port.operation.name)
for port in precedence_list[1]
} == {"CMUL1"}
Angus Lothian
committed
assert {
port.operation.key(port.index, port.operation.name)
for port in precedence_list[2]
} == {"NESTED_SFG.0", "NESTED_SFG.1"}
def test_precedence_sfg_multiple_outputs_different_precedences(
self, sfg_two_inputs_two_outputs_independent
):
Angus Lothian
committed
sfg_two_inputs_two_outputs_independent.name = "NESTED_SFG"
in1 = Input("IN1")
in2 = Input("IN2")
sfg_two_inputs_two_outputs_independent.input(0).connect(in1, "S1")
cmul1 = ConstantMultiplication(10, None, "CMUL1")
cmul1.input(0).connect(in2, "S2")
sfg_two_inputs_two_outputs_independent.input(1).connect(cmul1, "S3")
out1 = Output(sfg_two_inputs_two_outputs_independent.output(0), "OUT1")
out2 = Output(sfg_two_inputs_two_outputs_independent.output(1), "OUT2")
sfg = SFG(inputs=[in1, in2], outputs=[out1, out2])
precedence_list = sfg.get_precedence_list()
assert len(precedence_list) == 3
assert {
port.operation.key(port.index, port.operation.name)
for port in precedence_list[0]
} == {"IN1", "IN2"}
Angus Lothian
committed
assert {
port.operation.key(port.index, port.operation.name)
for port in precedence_list[1]
} == {"CMUL1"}
Angus Lothian
committed
assert {
port.operation.key(port.index, port.operation.name)
for port in precedence_list[2]
} == {"NESTED_SFG.0", "NESTED_SFG.1"}
Angus Lothian
committed
class TestPrintPrecedence:
def test_delays(self, precedence_sfg_delays):
sfg = precedence_sfg_delays
captured_output = io.StringIO()
sys.stdout = captured_output
sfg.print_precedence_graph()
sys.stdout = sys.__stdout__
captured_output = captured_output.getvalue()
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
assert (
captured_output
== "-" * 120
+ "\n"
+ "1.1 \t"
+ str(sfg.find_by_name("IN1")[0])
+ "\n"
+ "1.2 \t"
+ str(sfg.find_by_name("T1")[0])
+ "\n"
+ "1.3 \t"
+ str(sfg.find_by_name("T2")[0])
+ "\n"
+ "-" * 120
+ "\n"
+ "2.1 \t"
+ str(sfg.find_by_name("C0")[0])
+ "\n"
+ "2.2 \t"
+ str(sfg.find_by_name("A1")[0])
+ "\n"
+ "2.3 \t"
+ str(sfg.find_by_name("B1")[0])
+ "\n"
+ "2.4 \t"
+ str(sfg.find_by_name("A2")[0])
+ "\n"
+ "2.5 \t"
+ str(sfg.find_by_name("B2")[0])
+ "\n"
+ "-" * 120
+ "\n"
+ "3.1 \t"
+ str(sfg.find_by_name("ADD3")[0])
+ "\n"
+ "3.2 \t"
+ str(sfg.find_by_name("ADD2")[0])
+ "\n"
+ "-" * 120
+ "\n"
+ "4.1 \t"
+ str(sfg.find_by_name("ADD1")[0])
+ "\n"
+ "-" * 120
+ "\n"
+ "5.1 \t"
+ str(sfg.find_by_name("Q1")[0])
+ "\n"
+ "-" * 120
+ "\n"
+ "6.1 \t"
+ str(sfg.find_by_name("A0")[0])
+ "\n"
+ "-" * 120
+ "\n"
+ "7.1 \t"
+ str(sfg.find_by_name("ADD4")[0])
+ "\n"
+ "-" * 120
+ "\n"
)
Angus Lothian
committed
class TestDepends:
def test_depends_sfg(self, sfg_two_inputs_two_outputs):
assert set(sfg_two_inputs_two_outputs.inputs_required_for_output(0)) == {0, 1}
assert set(sfg_two_inputs_two_outputs.inputs_required_for_output(1)) == {0, 1}
Angus Lothian
committed
def test_depends_sfg_independent(self, sfg_two_inputs_two_outputs_independent):
Angus Lothian
committed
assert set(
sfg_two_inputs_two_outputs_independent.inputs_required_for_output(0)
Angus Lothian
committed
assert set(
sfg_two_inputs_two_outputs_independent.inputs_required_for_output(1)
Angus Lothian
committed
class TestConnectExternalSignalsToComponentsSoloComp:
def test_connect_external_signals_to_components_mac(self):
Angus Lothian
committed
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
inp1 = Input("INP1")
inp2 = Input("INP2")
inp3 = Input("INP3")
add1 = Addition(None, None, "ADD1")
add2 = Addition(None, None, "ADD2")
mul1 = Multiplication(None, None, "MUL1")
out1 = Output(None, "OUT1")
add1.input(0).connect(inp1, "S1")
add1.input(1).connect(inp2, "S2")
add2.input(0).connect(add1, "S3")
add2.input(1).connect(inp3, "S4")
mul1.input(0).connect(add1, "S5")
mul1.input(1).connect(add2, "S6")
out1.input(0).connect(mul1, "S7")
mac_sfg = SFG(inputs=[inp1, inp2], outputs=[out1])
inp4 = Input("INP4")
inp5 = Input("INP5")
out2 = Output(None, "OUT2")
mac_sfg.input(0).connect(inp4, "S8")
mac_sfg.input(1).connect(inp5, "S9")
out2.input(0).connect(mac_sfg.outputs[0], "S10")
test_sfg = SFG(inputs=[inp4, inp5], outputs=[out2])
assert test_sfg.evaluate(1, 2) == 9
mac_sfg.connect_external_signals_to_components()
assert test_sfg.evaluate(1, 2) == 9
assert not test_sfg.connect_external_signals_to_components()
def test_connect_external_signals_to_components_operation_tree(
self, operation_tree
):
"""
Replaces an SFG with only a operation_tree component with its inner components
Angus Lothian
committed
sfg1 = SFG(outputs=[Output(operation_tree)])
out1 = Output(None, "OUT1")
out1.input(0).connect(sfg1.outputs[0], "S1")
test_sfg = SFG(outputs=[out1])
assert test_sfg.evaluate_output(0, []) == 5
sfg1.connect_external_signals_to_components()
assert test_sfg.evaluate_output(0, []) == 5
assert not test_sfg.connect_external_signals_to_components()
def test_connect_external_signals_to_components_large_operation_tree(
self, large_operation_tree
):
"""
Replaces an SFG with only a large_operation_tree component with its inner
components
Angus Lothian
committed
sfg1 = SFG(outputs=[Output(large_operation_tree)])
out1 = Output(None, "OUT1")
out1.input(0).connect(sfg1.outputs[0], "S1")
test_sfg = SFG(outputs=[out1])
assert test_sfg.evaluate_output(0, []) == 14
sfg1.connect_external_signals_to_components()
assert test_sfg.evaluate_output(0, []) == 14
assert not test_sfg.connect_external_signals_to_components()
Hugo Winbladh
committed
def test_connect_external_signals_to_components_multiple_operations_after_input(
Hugo Winbladh
committed
):
"""
Replaces an SFG with a symmetric two-port adaptor to test when the input
port goes to multiple operations
"""
sfg1 = wdf_allpass(0.5)
sfg2 = sfg1.replace_operation(sfg1.find_by_id('sym2p0').to_sfg(), 'sym2p0')
sfg2.find_by_id('sfg0').connect_external_signals_to_components()
test_sfg = SFG(sfg2.input_operations, sfg2.output_operations)
assert sfg1.evaluate(1) == -0.5
assert test_sfg.evaluate(1) == -0.5
assert not test_sfg.connect_external_signals_to_components()
Angus Lothian
committed
Angus Lothian
committed
class TestConnectExternalSignalsToComponentsMultipleComp:
def test_connect_external_signals_to_components_operation_tree(
self, operation_tree
):
"""Replaces a operation_tree in an SFG with other components"""
Angus Lothian
committed
sfg1 = SFG(outputs=[Output(operation_tree)])
inp1 = Input("INP1")
inp2 = Input("INP2")
out1 = Output(None, "OUT1")
add1 = Addition(None, None, "ADD1")
add2 = Addition(None, None, "ADD2")
add1.input(0).connect(inp1, "S1")
add1.input(1).connect(inp2, "S2")
add2.input(0).connect(add1, "S3")
add2.input(1).connect(sfg1.outputs[0], "S4")
out1.input(0).connect(add2, "S5")
test_sfg = SFG(inputs=[inp1, inp2], outputs=[out1])
assert test_sfg.evaluate(1, 2) == 8
sfg1.connect_external_signals_to_components()
assert test_sfg.evaluate(1, 2) == 8
assert not test_sfg.connect_external_signals_to_components()
def test_connect_external_signals_to_components_large_operation_tree(
self, large_operation_tree
):
"""Replaces a large_operation_tree in an SFG with other components"""
Angus Lothian
committed
sfg1 = SFG(outputs=[Output(large_operation_tree)])
inp1 = Input("INP1")
inp2 = Input("INP2")
out1 = Output(None, "OUT1")
add1 = Addition(None, None, "ADD1")
add2 = Addition(None, None, "ADD2")
add1.input(0).connect(inp1, "S1")
add1.input(1).connect(inp2, "S2")
add2.input(0).connect(add1, "S3")
add2.input(1).connect(sfg1.outputs[0], "S4")
out1.input(0).connect(add2, "S5")
test_sfg = SFG(inputs=[inp1, inp2], outputs=[out1])
assert test_sfg.evaluate(1, 2) == 17
sfg1.connect_external_signals_to_components()
assert test_sfg.evaluate(1, 2) == 17
assert not test_sfg.connect_external_signals_to_components()
def create_sfg(self, op_tree):
"""Create a simple SFG with either operation_tree or large_operation_tree"""
Angus Lothian
committed
sfg1 = SFG(outputs=[Output(op_tree)])
inp1 = Input("INP1")
inp2 = Input("INP2")
out1 = Output(None, "OUT1")
add1 = Addition(None, None, "ADD1")
add2 = Addition(None, None, "ADD2")
add1.input(0).connect(inp1, "S1")
add1.input(1).connect(inp2, "S2")
add2.input(0).connect(add1, "S3")
add2.input(1).connect(sfg1.outputs[0], "S4")
out1.input(0).connect(add2, "S5")
return SFG(inputs=[inp1, inp2], outputs=[out1])
def test_connect_external_signals_to_components_many_op(self, large_operation_tree):
"""Replace an sfg component in a larger SFG with several component operations"""
Angus Lothian
committed
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
inp1 = Input("INP1")
inp2 = Input("INP2")
inp3 = Input("INP3")
inp4 = Input("INP4")
out1 = Output(None, "OUT1")
add1 = Addition(None, None, "ADD1")
sub1 = Subtraction(None, None, "SUB1")
add1.input(0).connect(inp1, "S1")
add1.input(1).connect(inp2, "S2")
sfg1 = self.create_sfg(large_operation_tree)
sfg1.input(0).connect(add1, "S3")
sfg1.input(1).connect(inp3, "S4")
sub1.input(0).connect(sfg1.outputs[0], "S5")
sub1.input(1).connect(inp4, "S6")
out1.input(0).connect(sub1, "S7")
test_sfg = SFG(inputs=[inp1, inp2, inp3, inp4], outputs=[out1])
assert test_sfg.evaluate(1, 2, 3, 4) == 16
sfg1.connect_external_signals_to_components()
assert test_sfg.evaluate(1, 2, 3, 4) == 16
assert not test_sfg.connect_external_signals_to_components()
def test_add_two_sfgs(self):
c1 = ConstantMultiplication(0.5)
c1_sfg = c1.to_sfg()
c2 = ConstantMultiplication(0.5)
c2_sfg = c2.to_sfg()
in1 = Input()
in2 = Input()
output = Output(c1_sfg + c2_sfg)
c1_sfg <<= in1
c2_sfg <<= in2
sfg = SFG([in1, in2], [output])
assert not sfg.find_by_type_name(ConstantMultiplication.type_name())
c1_sfg.connect_external_signals_to_components()
sfg = SFG([in1, in2], [output])
assert len(sfg.find_by_type_name(ConstantMultiplication.type_name())) == 1
c2_sfg.connect_external_signals_to_components()
sfg = SFG([in1, in2], [output])
assert len(sfg.find_by_type_name(ConstantMultiplication.type_name())) == 2
Angus Lothian
committed
class TestTopologicalOrderOperations:
def test_feedback_sfg(self, sfg_simple_filter):
topological_order = sfg_simple_filter.get_operations_topological_order()
Angus Lothian
committed
assert [comp.name for comp in topological_order] == [
def test_multiple_independent_inputs(self, sfg_two_inputs_two_outputs_independent):
topological_order = (
sfg_two_inputs_two_outputs_independent.get_operations_topological_order()
)
Angus Lothian
committed
assert [comp.name for comp in topological_order] == [
"IN1",
"OUT1",
"IN2",
"C1",
"ADD1",
"OUT2",
]
Angus Lothian
committed
def test_complex_graph(self, precedence_sfg_delays):
topological_order = precedence_sfg_delays.get_operations_topological_order()
Angus Lothian
committed