Newer
Older
Angus Lothian
committed
import random
Angus Lothian
committed
import string
import sys
Angus Lothian
committed
Angus Lothian
committed
from b_asic import SFG, FastSimulation, Input, Output, Signal
from b_asic.core_operations import (
Absolute,
Addition,
Butterfly,
ComplexConjugate,
Constant,
ConstantMultiplication,
Division,
Max,
Min,
Multiplication,
SquareRoot,
Subtraction,
SymmetricTwoportAdaptor,
)
from b_asic.save_load_structure import python_to_sfg, sfg_to_python
Angus Lothian
committed
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
class TestInit:
def test_direct_input_to_output_sfg_construction(self):
in1 = Input("IN1")
out1 = Output(None, "OUT1")
out1.input(0).connect(in1, "S1")
sfg = SFG(inputs=[in1], outputs=[out1]) # in1 ---s1---> out1
assert len(list(sfg.components)) == 3
assert len(list(sfg.operations)) == 2
assert sfg.input_count == 1
assert sfg.output_count == 1
def test_same_signal_input_and_output_sfg_construction(self):
add1 = Addition(None, None, "ADD1")
add2 = Addition(None, None, "ADD2")
s1 = add2.input(0).connect(add1, "S1")
# in1 ---s1---> out1
sfg = SFG(input_signals=[s1], output_signals=[s1])
assert len(list(sfg.components)) == 3
assert len(list(sfg.operations)) == 2
assert sfg.input_count == 1
assert sfg.output_count == 1
def test_outputs_construction(self, operation_tree):
sfg = SFG(outputs=[Output(operation_tree)])
assert len(list(sfg.components)) == 7
assert len(list(sfg.operations)) == 4
assert sfg.input_count == 0
assert sfg.output_count == 1
def test_signals_construction(self, operation_tree):
sfg = SFG(output_signals=[Signal(source=operation_tree.output(0))])
assert len(list(sfg.components)) == 7
assert len(list(sfg.operations)) == 4
assert sfg.input_count == 0
assert sfg.output_count == 1
class TestPrintSfg:
def test_one_addition(self):
inp1 = Input("INP1")
inp2 = Input("INP2")
add1 = Addition(inp1, inp2, "ADD1")
out1 = Output(add1, "OUT1")
sfg = SFG(inputs=[inp1, inp2], outputs=[out1], name="SFG1")
== "id: no_id, \tname: SFG1, \tinputs: {0: '-'}, \toutputs: {0:"
" '-'}\n"
+ "Internal Operations:\n"
+ "----------------------------------------------------------------------------------------------------\n"
+ str(sfg.find_by_name("INP1")[0])
+ "\n"
+ str(sfg.find_by_name("INP2")[0])
+ "\n"
+ str(sfg.find_by_name("ADD1")[0])
+ "\n"
+ str(sfg.find_by_name("OUT1")[0])
+ "\n"
+ "----------------------------------------------------------------------------------------------------\n"
)
Angus Lothian
committed
def test_add_mul(self):
inp1 = Input("INP1")
inp2 = Input("INP2")
inp3 = Input("INP3")
add1 = Addition(inp1, inp2, "ADD1")
mul1 = Multiplication(add1, inp3, "MUL1")
out1 = Output(mul1, "OUT1")
sfg = SFG(inputs=[inp1, inp2, inp3], outputs=[out1], name="mac_sfg")
== "id: no_id, \tname: mac_sfg, \tinputs: {0: '-'}, \toutputs: {0:"
" '-'}\n"
+ "Internal Operations:\n"
+ "----------------------------------------------------------------------------------------------------\n"
+ str(sfg.find_by_name("INP1")[0])
+ "\n"
+ str(sfg.find_by_name("INP2")[0])
+ "\n"
+ str(sfg.find_by_name("ADD1")[0])
+ "\n"
+ str(sfg.find_by_name("INP3")[0])
+ "\n"
+ str(sfg.find_by_name("MUL1")[0])
+ "\n"
+ str(sfg.find_by_name("OUT1")[0])
+ "\n"
+ "----------------------------------------------------------------------------------------------------\n"
)
Angus Lothian
committed
def test_constant(self):
inp1 = Input("INP1")
const1 = Constant(3, "CONST")
add1 = Addition(const1, inp1, "ADD1")
out1 = Output(add1, "OUT1")
sfg = SFG(inputs=[inp1], outputs=[out1], name="sfg")
== "id: no_id, \tname: sfg, \tinputs: {0: '-'}, \toutputs: {0:"
" '-'}\n"
+ "Internal Operations:\n"
+ "----------------------------------------------------------------------------------------------------\n"
+ str(sfg.find_by_name("CONST")[0])
+ "\n"
+ str(sfg.find_by_name("INP1")[0])
+ "\n"
+ str(sfg.find_by_name("ADD1")[0])
+ "\n"
+ str(sfg.find_by_name("OUT1")[0])
+ "\n"
+ "----------------------------------------------------------------------------------------------------\n"
)
Angus Lothian
committed
def test_simple_filter(self, sfg_simple_filter):
== "id: no_id, \tname: simple_filter, \tinputs: {0: '-'},"
" \toutputs: {0: '-'}\n"
+ "Internal Operations:\n"
+ "----------------------------------------------------------------------------------------------------\n"
+ str(sfg_simple_filter.find_by_name("IN1")[0])
+ "\n"
+ str(sfg_simple_filter.find_by_name("ADD1")[0])
+ "\n"
+ str(sfg_simple_filter.find_by_name("T1")[0])
+ "\n"
+ str(sfg_simple_filter.find_by_name("CMUL1")[0])
+ "\n"
+ str(sfg_simple_filter.find_by_name("OUT1")[0])
+ "\n"
+ "----------------------------------------------------------------------------------------------------\n"
)
Angus Lothian
committed
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
class TestDeepCopy:
def test_deep_copy_no_duplicates(self):
inp1 = Input("INP1")
inp2 = Input("INP2")
inp3 = Input("INP3")
add1 = Addition(inp1, inp2, "ADD1")
mul1 = Multiplication(add1, inp3, "MUL1")
out1 = Output(mul1, "OUT1")
mac_sfg = SFG(inputs=[inp1, inp2], outputs=[out1], name="mac_sfg")
mac_sfg_new = mac_sfg()
assert mac_sfg.name == "mac_sfg"
assert mac_sfg_new.name == ""
for g_id, component in mac_sfg._components_by_id.items():
component_copy = mac_sfg_new.find_by_id(g_id)
assert component.name == component_copy.name
def test_deep_copy(self):
inp1 = Input("INP1")
inp2 = Input("INP2")
inp3 = Input("INP3")
add1 = Addition(None, None, "ADD1")
add2 = Addition(None, None, "ADD2")
mul1 = Multiplication(None, None, "MUL1")
out1 = Output(None, "OUT1")
add1.input(0).connect(inp1, "S1")
add1.input(1).connect(inp2, "S2")
add2.input(0).connect(add1, "S4")
add2.input(1).connect(inp3, "S3")
mul1.input(0).connect(add1, "S5")
mul1.input(1).connect(add2, "S6")
out1.input(0).connect(mul1, "S7")
mac_sfg = SFG(
inputs=[inp1, inp2],
outputs=[out1],
id_number_offset=100,
name="mac_sfg",
)
Angus Lothian
committed
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
mac_sfg_new = mac_sfg(name="mac_sfg2")
assert mac_sfg.name == "mac_sfg"
assert mac_sfg_new.name == "mac_sfg2"
assert mac_sfg.id_number_offset == 100
assert mac_sfg_new.id_number_offset == 100
for g_id, component in mac_sfg._components_by_id.items():
component_copy = mac_sfg_new.find_by_id(g_id)
assert component.name == component_copy.name
def test_deep_copy_with_new_sources(self):
inp1 = Input("INP1")
inp2 = Input("INP2")
inp3 = Input("INP3")
add1 = Addition(inp1, inp2, "ADD1")
mul1 = Multiplication(add1, inp3, "MUL1")
out1 = Output(mul1, "OUT1")
mac_sfg = SFG(inputs=[inp1, inp2], outputs=[out1], name="mac_sfg")
a = Addition(Constant(3), Constant(5))
b = Constant(2)
mac_sfg_new = mac_sfg(a, b)
assert mac_sfg_new.input(0).signals[0].source.operation is a
assert mac_sfg_new.input(1).signals[0].source.operation is b
class TestEvaluateOutput:
def test_evaluate_output(self, operation_tree):
sfg = SFG(outputs=[Output(operation_tree)])
assert sfg.evaluate_output(0, []) == 5
def test_evaluate_output_large(self, large_operation_tree):
sfg = SFG(outputs=[Output(large_operation_tree)])
assert sfg.evaluate_output(0, []) == 14
def test_evaluate_output_cycle(self, operation_graph_with_cycle):
sfg = SFG(outputs=[Output(operation_graph_with_cycle)])
with pytest.raises(
RuntimeError, match="Direct feedback loop detected"
):
Angus Lothian
committed
sfg.evaluate_output(0, [])
class TestComponents:
def test_advanced_components(self):
inp1 = Input("INP1")
inp2 = Input("INP2")
inp3 = Input("INP3")
add1 = Addition(None, None, "ADD1")
add2 = Addition(None, None, "ADD2")
mul1 = Multiplication(None, None, "MUL1")
out1 = Output(None, "OUT1")
add1.input(0).connect(inp1, "S1")
add1.input(1).connect(inp2, "S2")
add2.input(0).connect(add1, "S4")
add2.input(1).connect(inp3, "S3")
mul1.input(0).connect(add1, "S5")
mul1.input(1).connect(add2, "S6")
out1.input(0).connect(mul1, "S7")
mac_sfg = SFG(inputs=[inp1, inp2], outputs=[out1], name="mac_sfg")
assert {comp.name for comp in mac_sfg.components} == {
"INP1",
"INP2",
"INP3",
"ADD1",
"ADD2",
"MUL1",
"OUT1",
"S1",
"S2",
"S3",
"S4",
"S5",
"S6",
"S7",
}
Angus Lothian
committed
class TestReplaceComponents:
def test_replace_addition_by_id(self, operation_tree):
sfg = SFG(outputs=[Output(operation_tree)])
component_id = "add1"
sfg = sfg.replace_component(
Multiplication(name="Multi"), graph_id=component_id
)
Angus Lothian
committed
assert component_id not in sfg._components_by_id.keys()
assert "Multi" in sfg._components_by_name.keys()
def test_replace_addition_large_tree(self, large_operation_tree):
sfg = SFG(outputs=[Output(large_operation_tree)])
component_id = "add3"
sfg = sfg.replace_component(
Multiplication(name="Multi"), graph_id=component_id
)
Angus Lothian
committed
assert "Multi" in sfg._components_by_name.keys()
assert component_id not in sfg._components_by_id.keys()
def test_replace_no_input_component(self, operation_tree):
sfg = SFG(outputs=[Output(operation_tree)])
component_id = "c1"
const_ = sfg.find_by_id(component_id)
sfg = sfg.replace_component(Constant(1), graph_id=component_id)
assert const_ is not sfg.find_by_id(component_id)
def test_no_match_on_replace(self, large_operation_tree):
sfg = SFG(outputs=[Output(large_operation_tree)])
component_id = "addd1"
try:
sfg = sfg.replace_component(
Multiplication(name="Multi"), graph_id=component_id
)
Angus Lothian
committed
except AssertionError:
assert True
else:
assert False
def test_not_equal_input(self, large_operation_tree):
sfg = SFG(outputs=[Output(large_operation_tree)])
component_id = "c1"
try:
sfg = sfg.replace_component(
Multiplication(name="Multi"), graph_id=component_id
)
Angus Lothian
committed
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
except AssertionError:
assert True
else:
assert False
class TestConstructSFG:
def test_1k_additions(self):
prev_op = Addition(Constant(1), Constant(1))
for _ in range(999):
prev_op = Addition(prev_op, Constant(2))
sfg = SFG(outputs=[Output(prev_op)])
sim = FastSimulation(sfg)
sim.step()
assert sim.results["0"][0].real == 2000
def test_1k_subtractions(self):
prev_op = Subtraction(Constant(0), Constant(2))
for _ in range(999):
prev_op = Subtraction(prev_op, Constant(2))
sfg = SFG(outputs=[Output(prev_op)])
sim = FastSimulation(sfg)
sim.step()
assert sim.results["0"][0].real == -2000
def test_1k_butterfly(self):
prev_op_add = Addition(Constant(1), Constant(1))
prev_op_sub = Subtraction(Constant(-1), Constant(1))
for _ in range(499):
prev_op_add = Addition(prev_op_add, Constant(2))
for _ in range(499):
prev_op_sub = Subtraction(prev_op_sub, Constant(2))
butterfly = Butterfly(prev_op_add, prev_op_sub)
sfg = SFG(
outputs=[Output(butterfly.output(0)), Output(butterfly.output(1))]
)
Angus Lothian
committed
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
sim = FastSimulation(sfg)
sim.step()
assert sim.results["0"][0].real == 0
assert sim.results["1"][0].real == 2000
def test_1k_multiplications(self):
prev_op = Multiplication(Constant(3), Constant(0.5))
for _ in range(999):
prev_op = Multiplication(prev_op, Constant(1.01))
sfg = SFG(outputs=[Output(prev_op)])
sim = FastSimulation(sfg)
sim.step()
assert sim.results["0"][0].real == 31127.458868040336
def test_1k_divisions(self):
prev_op = Division(Constant(3), Constant(0.5))
for _ in range(999):
prev_op = Division(prev_op, Constant(1.01))
sfg = SFG(outputs=[Output(prev_op)])
sim = FastSimulation(sfg)
sim.step()
assert sim.results["0"][0].real == 0.00028913378500165966
def test_1k_mins(self):
prev_op = Min(Constant(3.14159), Constant(43.14123843))
for _ in range(999):
prev_op = Min(prev_op, Constant(43.14123843))
sfg = SFG(outputs=[Output(prev_op)])
sim = FastSimulation(sfg)
sim.step()
assert sim.results["0"][0].real == 3.14159
def test_1k_maxs(self):
prev_op = Max(Constant(3.14159), Constant(43.14123843))
for _ in range(999):
prev_op = Max(prev_op, Constant(3.14159))
sfg = SFG(outputs=[Output(prev_op)])
sim = FastSimulation(sfg)
sim.step()
assert sim.results["0"][0].real == 43.14123843
def test_1k_square_roots(self):
prev_op = SquareRoot(Constant(1000000))
for _ in range(4):
prev_op = SquareRoot(prev_op)
sfg = SFG(outputs=[Output(prev_op)])
sim = FastSimulation(sfg)
sim.step()
assert sim.results["0"][0].real == 1.539926526059492
def test_1k_complex_conjugates(self):
Angus Lothian
committed
for _ in range(999):
prev_op = ComplexConjugate(prev_op)
sfg = SFG(outputs=[Output(prev_op)])
sim = FastSimulation(sfg)
sim.step()
Angus Lothian
committed
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
def test_1k_absolutes(self):
prev_op = Absolute(Constant(-3.14159))
for _ in range(999):
prev_op = Absolute(prev_op)
sfg = SFG(outputs=[Output(prev_op)])
sim = FastSimulation(sfg)
sim.step()
assert sim.results["0"][0].real == 3.14159
def test_1k_constant_multiplications(self):
prev_op = ConstantMultiplication(1.02, Constant(3.14159))
for _ in range(999):
prev_op = ConstantMultiplication(1.02, prev_op)
sfg = SFG(outputs=[Output(prev_op)])
sim = FastSimulation(sfg)
sim.step()
assert sim.results["0"][0].real == 1251184247.0026844
class TestInsertComponent:
def test_insert_component_in_sfg(self, large_operation_tree_names):
sfg = SFG(outputs=[Output(large_operation_tree_names)])
sqrt = SquareRoot()
_sfg = sfg.insert_operation(
Angus Lothian
committed
assert _sfg.evaluate() != sfg.evaluate()
assert any([isinstance(comp, SquareRoot) for comp in _sfg.operations])
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
assert not any(
[isinstance(comp, SquareRoot) for comp in sfg.operations]
)
assert not isinstance(
sfg.find_by_name("constant4")[0]
.output(0)
.signals[0]
.destination.operation,
SquareRoot,
)
assert isinstance(
_sfg.find_by_name("constant4")[0]
.output(0)
.signals[0]
.destination.operation,
SquareRoot,
)
assert sfg.find_by_name("constant4")[0].output(0).signals[
0
].destination.operation is sfg.find_by_id("add3")
assert _sfg.find_by_name("constant4")[0].output(0).signals[
0
].destination.operation is not _sfg.find_by_id("add3")
assert _sfg.find_by_id("sqrt1").output(0).signals[
0
].destination.operation is _sfg.find_by_id("add3")
Angus Lothian
committed
def test_insert_invalid_component_in_sfg(self, large_operation_tree):
sfg = SFG(outputs=[Output(large_operation_tree)])
# Should raise an exception for not matching input count to output count.
add4 = Addition()
with pytest.raises(TypeError, match="Source operation output count"):
Angus Lothian
committed
sfg.insert_operation(add4, "c1")
def test_insert_at_output(self, large_operation_tree):
sfg = SFG(outputs=[Output(large_operation_tree)])
# Should raise an exception for trying to insert an operation after an output.
sqrt = SquareRoot()
with pytest.raises(TypeError, match="Source operation cannot be an"):
Angus Lothian
committed
def test_insert_multiple_output_ports(self, butterfly_operation_tree):
sfg = SFG(outputs=list(map(Output, butterfly_operation_tree.outputs)))
_sfg = sfg.insert_operation(Butterfly(name="n_bfly"), "bfly3")
assert sfg.evaluate() != _sfg.evaluate()
assert len(sfg.find_by_name("n_bfly")) == 0
assert len(_sfg.find_by_name("n_bfly")) == 1
# Correctly connected old output -> new input
assert (
_sfg.find_by_name("bfly3")[0]
.output(0)
.signals[0]
.destination.operation
is _sfg.find_by_name("n_bfly")[0]
)
assert (
_sfg.find_by_name("bfly3")[0]
.output(1)
.signals[0]
.destination.operation
is _sfg.find_by_name("n_bfly")[0]
)
Angus Lothian
committed
# Correctly connected new input -> old output
assert (
_sfg.find_by_name("n_bfly")[0].input(0).signals[0].source.operation
is _sfg.find_by_name("bfly3")[0]
)
assert (
_sfg.find_by_name("n_bfly")[0].input(1).signals[0].source.operation
is _sfg.find_by_name("bfly3")[0]
)
Angus Lothian
committed
# Correctly connected new output -> next input
assert (
_sfg.find_by_name("n_bfly")[0]
.output(0)
.signals[0]
.destination.operation
is _sfg.find_by_name("bfly2")[0]
)
assert (
_sfg.find_by_name("n_bfly")[0]
.output(1)
.signals[0]
.destination.operation
is _sfg.find_by_name("bfly2")[0]
)
Angus Lothian
committed
# Correctly connected next input -> new output
assert (
_sfg.find_by_name("bfly2")[0].input(0).signals[0].source.operation
is _sfg.find_by_name("n_bfly")[0]
)
assert (
_sfg.find_by_name("bfly2")[0].input(1).signals[0].source.operation
is _sfg.find_by_name("n_bfly")[0]
)
Angus Lothian
committed
class TestFindComponentsWithTypeName:
def test_mac_components(self):
inp1 = Input("INP1")
inp2 = Input("INP2")
inp3 = Input("INP3")
add1 = Addition(None, None, "ADD1")
add2 = Addition(None, None, "ADD2")
mul1 = Multiplication(None, None, "MUL1")
out1 = Output(None, "OUT1")
add1.input(0).connect(inp1, "S1")
add1.input(1).connect(inp2, "S2")
add2.input(0).connect(add1, "S4")
add2.input(1).connect(inp3, "S3")
mul1.input(0).connect(add1, "S5")
mul1.input(1).connect(add2, "S6")
out1.input(0).connect(mul1, "S7")
mac_sfg = SFG(inputs=[inp1, inp2], outputs=[out1], name="mac_sfg")
assert {
comp.name for comp in mac_sfg.find_by_type_name(inp1.type_name())
} == {
"INP1",
"INP2",
"INP3",
}
Angus Lothian
committed
assert {
comp.name for comp in mac_sfg.find_by_type_name(add1.type_name())
} == {
"ADD1",
"ADD2",
}
Angus Lothian
committed
assert {
comp.name for comp in mac_sfg.find_by_type_name(mul1.type_name())
} == {"MUL1"}
Angus Lothian
committed
assert {
comp.name for comp in mac_sfg.find_by_type_name(out1.type_name())
} == {"OUT1"}
Angus Lothian
committed
assert {
comp.name for comp in mac_sfg.find_by_type_name(Signal.type_name())
} == {"S1", "S2", "S3", "S4", "S5", "S6", "S7"}
Angus Lothian
committed
class TestGetPrecedenceList:
def test_inputs_delays(self, precedence_sfg_delays):
# No cached precedence list
assert precedence_sfg_delays._precedence_list is None
Angus Lothian
committed
precedence_list = precedence_sfg_delays.get_precedence_list()
assert len(precedence_list) == 7
# Cached precedence list
assert len(precedence_sfg_delays._precedence_list) == 7
assert set(
[
port.operation.key(port.index, port.operation.name)
for port in precedence_list[0]
]
) == {"IN1", "T1", "T2"}
Angus Lothian
committed
assert set(
[
port.operation.key(port.index, port.operation.name)
for port in precedence_list[1]
]
) == {"C0", "B1", "B2", "A1", "A2"}
Angus Lothian
committed
assert set(
[
port.operation.key(port.index, port.operation.name)
for port in precedence_list[2]
]
) == {"ADD2", "ADD3"}
Angus Lothian
committed
assert set(
[
port.operation.key(port.index, port.operation.name)
for port in precedence_list[3]
]
) == {"ADD1"}
Angus Lothian
committed
assert set(
[
port.operation.key(port.index, port.operation.name)
for port in precedence_list[4]
]
) == {"Q1"}
Angus Lothian
committed
assert set(
[
port.operation.key(port.index, port.operation.name)
for port in precedence_list[5]
]
) == {"A0"}
Angus Lothian
committed
assert set(
[
port.operation.key(port.index, port.operation.name)
for port in precedence_list[6]
]
) == {"ADD4"}
Angus Lothian
committed
# Trigger cache
precedence_list = precedence_sfg_delays.get_precedence_list()
assert len(precedence_list) == 7
def test_inputs_constants_delays_multiple_outputs(
self, precedence_sfg_delays_and_constants
):
precedence_list = (
precedence_sfg_delays_and_constants.get_precedence_list()
)
Angus Lothian
committed
assert len(precedence_list) == 7
assert set(
[
port.operation.key(port.index, port.operation.name)
for port in precedence_list[0]
]
) == {"IN1", "T1", "CONST1"}
Angus Lothian
committed
assert set(
[
port.operation.key(port.index, port.operation.name)
for port in precedence_list[1]
]
) == {"C0", "B1", "B2", "A1", "A2"}
Angus Lothian
committed
assert set(
[
port.operation.key(port.index, port.operation.name)
for port in precedence_list[2]
]
) == {"ADD2", "ADD3"}
Angus Lothian
committed
assert set(
[
port.operation.key(port.index, port.operation.name)
for port in precedence_list[3]
]
) == {"ADD1"}
Angus Lothian
committed
assert set(
[
port.operation.key(port.index, port.operation.name)
for port in precedence_list[4]
]
) == {"Q1"}
Angus Lothian
committed
assert set(
[
port.operation.key(port.index, port.operation.name)
for port in precedence_list[5]
]
) == {"A0"}
Angus Lothian
committed
assert set(
[
port.operation.key(port.index, port.operation.name)
for port in precedence_list[6]
]
) == {"BFLY1.0", "BFLY1.1"}
def test_precedence_multiple_outputs_same_precedence(
self, sfg_two_inputs_two_outputs
):
Angus Lothian
committed
sfg_two_inputs_two_outputs.name = "NESTED_SFG"
in1 = Input("IN1")
sfg_two_inputs_two_outputs.input(0).connect(in1, "S1")
in2 = Input("IN2")
cmul1 = ConstantMultiplication(10, None, "CMUL1")
cmul1.input(0).connect(in2, "S2")
sfg_two_inputs_two_outputs.input(1).connect(cmul1, "S3")
out1 = Output(sfg_two_inputs_two_outputs.output(0), "OUT1")
out2 = Output(sfg_two_inputs_two_outputs.output(1), "OUT2")
sfg = SFG(inputs=[in1, in2], outputs=[out1, out2])
precedence_list = sfg.get_precedence_list()
assert len(precedence_list) == 3
assert set(
[
port.operation.key(port.index, port.operation.name)
for port in precedence_list[0]
]
) == {"IN1", "IN2"}
Angus Lothian
committed
assert set(
[
port.operation.key(port.index, port.operation.name)
for port in precedence_list[1]
]
) == {"CMUL1"}
Angus Lothian
committed
assert set(
[
port.operation.key(port.index, port.operation.name)
for port in precedence_list[2]
]
) == {"NESTED_SFG.0", "NESTED_SFG.1"}
def test_precedence_sfg_multiple_outputs_different_precedences(
self, sfg_two_inputs_two_outputs_independent
):
Angus Lothian
committed
sfg_two_inputs_two_outputs_independent.name = "NESTED_SFG"
in1 = Input("IN1")
in2 = Input("IN2")
sfg_two_inputs_two_outputs_independent.input(0).connect(in1, "S1")
cmul1 = ConstantMultiplication(10, None, "CMUL1")
cmul1.input(0).connect(in2, "S2")
sfg_two_inputs_two_outputs_independent.input(1).connect(cmul1, "S3")
out1 = Output(sfg_two_inputs_two_outputs_independent.output(0), "OUT1")
out2 = Output(sfg_two_inputs_two_outputs_independent.output(1), "OUT2")
sfg = SFG(inputs=[in1, in2], outputs=[out1, out2])
precedence_list = sfg.get_precedence_list()
assert len(precedence_list) == 3
assert set(
[
port.operation.key(port.index, port.operation.name)
for port in precedence_list[0]
]
) == {"IN1", "IN2"}
Angus Lothian
committed
assert set(
[
port.operation.key(port.index, port.operation.name)
for port in precedence_list[1]
]
) == {"CMUL1"}
Angus Lothian
committed
assert set(
[
port.operation.key(port.index, port.operation.name)
for port in precedence_list[2]
]
) == {"NESTED_SFG.0", "NESTED_SFG.1"}
Angus Lothian
committed
class TestPrintPrecedence:
def test_delays(self, precedence_sfg_delays):
sfg = precedence_sfg_delays
captured_output = io.StringIO()
sys.stdout = captured_output
sfg.print_precedence_graph()
sys.stdout = sys.__stdout__
captured_output = captured_output.getvalue()
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
assert (
captured_output
== "-" * 120
+ "\n"
+ "1.1 \t"
+ str(sfg.find_by_name("IN1")[0])
+ "\n"
+ "1.2 \t"
+ str(sfg.find_by_name("T1")[0])
+ "\n"
+ "1.3 \t"
+ str(sfg.find_by_name("T2")[0])
+ "\n"
+ "-" * 120
+ "\n"
+ "2.1 \t"
+ str(sfg.find_by_name("C0")[0])
+ "\n"
+ "2.2 \t"
+ str(sfg.find_by_name("A1")[0])
+ "\n"
+ "2.3 \t"
+ str(sfg.find_by_name("B1")[0])
+ "\n"
+ "2.4 \t"
+ str(sfg.find_by_name("A2")[0])
+ "\n"
+ "2.5 \t"
+ str(sfg.find_by_name("B2")[0])
+ "\n"
+ "-" * 120
+ "\n"
+ "3.1 \t"
+ str(sfg.find_by_name("ADD3")[0])
+ "\n"
+ "3.2 \t"
+ str(sfg.find_by_name("ADD2")[0])
+ "\n"
+ "-" * 120
+ "\n"
+ "4.1 \t"
+ str(sfg.find_by_name("ADD1")[0])
+ "\n"
+ "-" * 120
+ "\n"
+ "5.1 \t"
+ str(sfg.find_by_name("Q1")[0])
+ "\n"
+ "-" * 120
+ "\n"
+ "6.1 \t"
+ str(sfg.find_by_name("A0")[0])
+ "\n"
+ "-" * 120
+ "\n"
+ "7.1 \t"
+ str(sfg.find_by_name("ADD4")[0])
+ "\n"
+ "-" * 120
+ "\n"
)
Angus Lothian
committed
class TestDepends:
def test_depends_sfg(self, sfg_two_inputs_two_outputs):
assert set(
sfg_two_inputs_two_outputs.inputs_required_for_output(0)
) == {0, 1}
assert set(
sfg_two_inputs_two_outputs.inputs_required_for_output(1)
) == {0, 1}
Angus Lothian
committed
def test_depends_sfg_independent(
self, sfg_two_inputs_two_outputs_independent
):
Angus Lothian
committed
assert set(
sfg_two_inputs_two_outputs_independent.inputs_required_for_output(
0
)
) == {0}
Angus Lothian
committed
assert set(
sfg_two_inputs_two_outputs_independent.inputs_required_for_output(
1
)
) == {1}
Angus Lothian
committed
class TestConnectExternalSignalsToComponentsSoloComp:
def test_connect_external_signals_to_components_mac(self):
Angus Lothian
committed
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
inp1 = Input("INP1")
inp2 = Input("INP2")
inp3 = Input("INP3")
add1 = Addition(None, None, "ADD1")
add2 = Addition(None, None, "ADD2")
mul1 = Multiplication(None, None, "MUL1")
out1 = Output(None, "OUT1")
add1.input(0).connect(inp1, "S1")
add1.input(1).connect(inp2, "S2")
add2.input(0).connect(add1, "S3")
add2.input(1).connect(inp3, "S4")
mul1.input(0).connect(add1, "S5")
mul1.input(1).connect(add2, "S6")
out1.input(0).connect(mul1, "S7")
mac_sfg = SFG(inputs=[inp1, inp2], outputs=[out1])
inp4 = Input("INP4")
inp5 = Input("INP5")
out2 = Output(None, "OUT2")
mac_sfg.input(0).connect(inp4, "S8")
mac_sfg.input(1).connect(inp5, "S9")
out2.input(0).connect(mac_sfg.outputs[0], "S10")
test_sfg = SFG(inputs=[inp4, inp5], outputs=[out2])
assert test_sfg.evaluate(1, 2) == 9
mac_sfg.connect_external_signals_to_components()
assert test_sfg.evaluate(1, 2) == 9
assert not test_sfg.connect_external_signals_to_components()
def test_connect_external_signals_to_components_operation_tree(
self, operation_tree
):
"""Replaces an SFG with only a operation_tree component with its inner components
"""
Angus Lothian
committed
sfg1 = SFG(outputs=[Output(operation_tree)])
out1 = Output(None, "OUT1")
out1.input(0).connect(sfg1.outputs[0], "S1")
test_sfg = SFG(outputs=[out1])
assert test_sfg.evaluate_output(0, []) == 5
sfg1.connect_external_signals_to_components()
assert test_sfg.evaluate_output(0, []) == 5
assert not test_sfg.connect_external_signals_to_components()
def test_connect_external_signals_to_components_large_operation_tree(
self, large_operation_tree
):
"""Replaces an SFG with only a large_operation_tree component with its inner components
"""
Angus Lothian
committed
sfg1 = SFG(outputs=[Output(large_operation_tree)])
out1 = Output(None, "OUT1")
out1.input(0).connect(sfg1.outputs[0], "S1")
test_sfg = SFG(outputs=[out1])
assert test_sfg.evaluate_output(0, []) == 14
sfg1.connect_external_signals_to_components()
assert test_sfg.evaluate_output(0, []) == 14
assert not test_sfg.connect_external_signals_to_components()