Newer
Older
for op in sfg_simple_filter.find_by_name("ADD1")[0].preceding_operations
} == {"CMUL1", "IN1"}
assert {
op.name for op in new_sfg.find_by_name("ADD1")[0].preceding_operations
} == {"T1", "IN1"}
Angus Lothian
committed
assert "S1" in {
sig.name
for sig in sfg_simple_filter.find_by_name("T1")[0].output(0).signals
}
assert "S2" in {
sig.name for sig in new_sfg.find_by_name("T1")[0].output(0).signals
}
Angus Lothian
committed
def test_remove_multiple_inputs_outputs(self, butterfly_operation_tree):
out1 = Output(butterfly_operation_tree.output(0), "OUT1")
out2 = Output(butterfly_operation_tree.output(1), "OUT2")
sfg = SFG(outputs=[out1, out2])
new_sfg = sfg.remove_operation(sfg.find_by_name("bfly2")[0].graph_id)
assert sfg.find_by_name("bfly3")[0].output(0).signal_count == 1
assert new_sfg.find_by_name("bfly3")[0].output(0).signal_count == 1
sfg_dest_0 = sfg.find_by_name("bfly3")[0].output(0).signals[0].destination
new_sfg_dest_0 = (
new_sfg.find_by_name("bfly3")[0].output(0).signals[0].destination
)
Angus Lothian
committed
assert sfg_dest_0.index == 0
assert new_sfg_dest_0.index == 0
assert sfg_dest_0.operation.name == "bfly2"
assert new_sfg_dest_0.operation.name == "bfly1"
assert sfg.find_by_name("bfly3")[0].output(1).signal_count == 1
assert new_sfg.find_by_name("bfly3")[0].output(1).signal_count == 1
sfg_dest_1 = sfg.find_by_name("bfly3")[0].output(1).signals[0].destination
new_sfg_dest_1 = (
new_sfg.find_by_name("bfly3")[0].output(1).signals[0].destination
)
Angus Lothian
committed
assert sfg_dest_1.index == 1
assert new_sfg_dest_1.index == 1
assert sfg_dest_1.operation.name == "bfly2"
assert new_sfg_dest_1.operation.name == "bfly1"
assert sfg.find_by_name("bfly1")[0].input(0).signal_count == 1
assert new_sfg.find_by_name("bfly1")[0].input(0).signal_count == 1
sfg_source_0 = sfg.find_by_name("bfly1")[0].input(0).signals[0].source
new_sfg_source_0 = new_sfg.find_by_name("bfly1")[0].input(0).signals[0].source
Angus Lothian
committed
assert sfg_source_0.index == 0
assert new_sfg_source_0.index == 0
assert sfg_source_0.operation.name == "bfly2"
assert new_sfg_source_0.operation.name == "bfly3"
sfg_source_1 = sfg.find_by_name("bfly1")[0].input(1).signals[0].source
new_sfg_source_1 = new_sfg.find_by_name("bfly1")[0].input(1).signals[0].source
Angus Lothian
committed
assert sfg_source_1.index == 1
assert new_sfg_source_1.index == 1
assert sfg_source_1.operation.name == "bfly2"
assert new_sfg_source_1.operation.name == "bfly3"
assert "bfly2" not in {op.name for op in new_sfg.operations}
Angus Lothian
committed
def remove_different_number_inputs_outputs(self, sfg_simple_filter):
with pytest.raises(ValueError):
sfg_simple_filter.remove_operation("add1")
class TestSaveLoadSFG:
def get_path(self, existing=False):
path_ = "".join(random.choices(string.ascii_uppercase, k=4)) + ".py"
while path.exists(path_) if not existing else not path.exists(path_):
path_ = "".join(random.choices(string.ascii_uppercase, k=4)) + ".py"
Angus Lothian
committed
return path_
def test_save_simple_sfg(self, sfg_simple_filter):
result = sfg_to_python(sfg_simple_filter)
path_ = self.get_path()
assert not path.exists(path_)
with open(path_, "w") as file_obj:
file_obj.write(result)
assert path.exists(path_)
with open(path_) as file_obj:
Angus Lothian
committed
assert file_obj.read() == result
remove(path_)
def test_save_complex_sfg(self, precedence_sfg_delays_and_constants):
result = sfg_to_python(precedence_sfg_delays_and_constants)
path_ = self.get_path()
assert not path.exists(path_)
with open(path_, "w") as file_obj:
file_obj.write(result)
assert path.exists(path_)
with open(path_) as file_obj:
Angus Lothian
committed
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
assert file_obj.read() == result
remove(path_)
def test_load_simple_sfg(self, sfg_simple_filter):
result = sfg_to_python(sfg_simple_filter)
path_ = self.get_path()
assert not path.exists(path_)
with open(path_, "w") as file_obj:
file_obj.write(result)
assert path.exists(path_)
simple_filter_, _ = python_to_sfg(path_)
assert str(sfg_simple_filter) == str(simple_filter_)
assert sfg_simple_filter.evaluate([2]) == simple_filter_.evaluate([2])
remove(path_)
def test_load_complex_sfg(self, precedence_sfg_delays_and_constants):
result = sfg_to_python(precedence_sfg_delays_and_constants)
path_ = self.get_path()
assert not path.exists(path_)
with open(path_, "w") as file_obj:
file_obj.write(result)
assert path.exists(path_)
precedence_sfg_registers_and_constants_, _ = python_to_sfg(path_)
assert str(precedence_sfg_delays_and_constants) == str(
Angus Lothian
committed
remove(path_)
def test_load_invalid_path(self):
path_ = self.get_path(existing=False)
with pytest.raises(FileNotFoundError):
Angus Lothian
committed
python_to_sfg(path_)
class TestGetComponentsOfType:
def test_get_no_operations_of_type(self, sfg_two_inputs_two_outputs):
assert [
op.name
for op in sfg_two_inputs_two_outputs.find_by_type_name(
Multiplication.type_name()
)
] == []
Angus Lothian
committed
def test_get_multple_operations_of_type(self, sfg_two_inputs_two_outputs):
for op in sfg_two_inputs_two_outputs.find_by_type_name(Addition.type_name())
for op in sfg_two_inputs_two_outputs.find_by_type_name(Input.type_name())
for op in sfg_two_inputs_two_outputs.find_by_type_name(Output.type_name())
class TestPrecedenceGraph:
def test_precedence_graph(self, sfg_simple_filter):
'digraph {\n\trankdir=LR\n\tsubgraph cluster_0 {\n\t\tlabel=N0\n\t\t"in1.0"'
' [label=in1 height=0.1 shape=rectangle width=0.1]\n\t\t"t1.0" [label=t1'
' height=0.1 shape=rectangle width=0.1]\n\t}\n\tsubgraph cluster_1'
' {\n\t\tlabel=N1\n\t\t"cmul1.0" [label=cmul1 height=0.1 shape=rectangle'
' width=0.1]\n\t}\n\tsubgraph cluster_2 {\n\t\tlabel=N2\n\t\t"add1.0"'
' [label=add1 height=0.1 shape=rectangle width=0.1]\n\t}\n\t"in1.0" ->'
' add1\n\tadd1 [label=add1 shape=ellipse]\n\tin1 -> "in1.0"\n\tin1'
' [label=in1 shape=cds]\n\t"t1.0" -> cmul1\n\tcmul1 [label=cmul1'
' shape=ellipse]\n\t"t1.0" -> out1\n\tout1 [label=out1 shape=cds]\n\tt1Out'
' -> "t1.0"\n\tt1Out [label=t1 shape=square]\n\t"cmul1.0" -> add1\n\tadd1'
' [label=add1 shape=ellipse]\n\tcmul1 -> "cmul1.0"\n\tcmul1 [label=cmul1'
' shape=ellipse]\n\t"add1.0" -> t1In\n\tt1In [label=t1'
' shape=square]\n\tadd1 -> "add1.0"\n\tadd1 [label=add1 shape=ellipse]\n}'
assert sfg_simple_filter.precedence_graph.source in (res, res + "\n")
class TestSFGGraph:
def test_sfg(self, sfg_simple_filter):
'digraph {\n\trankdir=LR splines=spline\n\tin1 [shape=cds]\n\tin1 -> add1'
' [headlabel=0]\n\tout1 [shape=cds]\n\tt1 -> out1\n\tadd1'
' [shape=ellipse]\n\tcmul1 -> add1 [headlabel=1]\n\tcmul1'
' [shape=ellipse]\n\tadd1 -> t1\n\tt1 [shape=square]\n\tt1 -> cmul1\n}'
assert sfg_simple_filter.sfg_digraph(branch_node=False).source in (
res,
res + "\n",
)
def test_sfg_show_id(self, sfg_simple_filter):
'digraph {\n\trankdir=LR splines=spline\n\tin1 [shape=cds]\n\tin1 -> add1'
' [label=s1 headlabel=0]\n\tout1 [shape=cds]\n\tt1 -> out1'
' [label=s2]\n\tadd1 [shape=ellipse]\n\tcmul1 -> add1 [label=s3'
' headlabel=1]\n\tcmul1 [shape=ellipse]\n\tadd1 -> t1 [label=s4]\n\tt1'
' [shape=square]\n\tt1 -> cmul1 [label=s5]\n}'
assert sfg_simple_filter.sfg_digraph(
show_id=True, branch_node=False
).source in (
def test_sfg_branch(self, sfg_simple_filter):
res = (
'digraph {\n\trankdir=LR splines=spline\n\tin1 [shape=cds]\n\tin1 -> add1'
' [headlabel=0]\n\tout1 [shape=cds]\n\t"t1.0" -> out1\n\t"t1.0"'
' [shape=point]\n\tt1 -> "t1.0" [arrowhead=none]\n\tadd1'
' [shape=ellipse]\n\tcmul1 -> add1 [headlabel=1]\n\tcmul1'
' [shape=ellipse]\n\tadd1 -> t1\n\tt1 [shape=square]\n\t"t1.0" ->'
' cmul1\n}'
)
assert sfg_simple_filter.sfg_digraph().source in (
res,
res + "\n",
)
def test_sfg_no_port_numbering(self, sfg_simple_filter):
res = (
'digraph {\n\trankdir=LR splines=spline\n\tin1 [shape=cds]\n\tin1 ->'
' add1\n\tout1 [shape=cds]\n\tt1 -> out1\n\tadd1 [shape=ellipse]\n\tcmul1'
' -> add1\n\tcmul1 [shape=ellipse]\n\tadd1 -> t1\n\tt1 [shape=square]\n\tt1'
' -> cmul1\n}'
)
assert sfg_simple_filter.sfg_digraph(
port_numbering=False, branch_node=False
).source in (
res,
res + "\n",
)
def test_show_sfg_invalid_format(self, sfg_simple_filter):
sfg_simple_filter.show(fmt="ppddff")
def test_show_sfg_invalid_engine(self, sfg_simple_filter):
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
class TestSFGErrors:
def test_dangling_output(self):
in1 = Input()
in2 = Input()
adaptor = SymmetricTwoportAdaptor(0.5, in1, in2)
out1 = Output(adaptor.output(0))
# No error, maybe should be?
_ = SFG([in1, in2], [out1])
def test_unconnected_input_port(self):
in1 = Input()
adaptor = SymmetricTwoportAdaptor(0.5, in1)
out1 = Output(adaptor.output(0))
with pytest.raises(ValueError, match="Unconnected input port in SFG"):
SFG([in1], [out1])
def test_unconnected_output(self):
in1 = Input()
in2 = Input()
adaptor = SymmetricTwoportAdaptor(0.5, in1, in2)
out1 = Output(adaptor.output(0))
out2 = Output()
# No error, should be
SFG([in1, in2], [out1, out2])
def test_unconnected_input(self):
in1 = Input()
in2 = Input()
adaptor = SymmetricTwoportAdaptor(0.5, in1)
out1 = Output(adaptor.output(0))
out2 = Output(adaptor.output(1))
# Correct error?
with pytest.raises(ValueError, match="Unconnected input port in SFG"):
SFG([in1, in2], [out1, out2])
def test_duplicate_input(self):
in1 = Input()
in2 = Input()
adaptor = SymmetricTwoportAdaptor(0.5, in1, in2)
out1 = Output(adaptor.output(0))
out2 = Output(adaptor.output(1))
with pytest.raises(ValueError, match="Duplicate input operation"):
SFG([in1, in1], [out1, out2])
def test_duplicate_output(self):
in1 = Input()
in2 = Input()
adaptor = SymmetricTwoportAdaptor(0.5, in1, in2)
out1 = Output(adaptor.output(0))
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
with pytest.raises(ValueError, match="Duplicate output operation"):
SFG([in1, in2], [out1, out1])
def test_unconnected_input_signal(self):
in1 = Input()
in2 = Input()
adaptor = SymmetricTwoportAdaptor(0.5, in1, in2)
out1 = Output(adaptor.output(0))
out2 = Output(adaptor.output(1))
signal = Signal()
with pytest.raises(
ValueError, match="Input signal #0 is missing destination in SFG"
):
SFG([in1, in2], [out1, out2], [signal])
def test_unconnected_output_signal(self):
in1 = Input()
in2 = Input()
adaptor = SymmetricTwoportAdaptor(0.5, in1, in2)
out1 = Output(adaptor.output(0))
out2 = Output(adaptor.output(1))
signal = Signal()
with pytest.raises(
ValueError, match="Output signal #0 is missing source in SFG"
):
SFG([in1, in2], [out1, out2], output_signals=[signal])
def test_duplicate_input_signal(self):
in1 = Input()
signal = Signal()
adaptor = SymmetricTwoportAdaptor(0.5, in1, signal)
out1 = Output(adaptor.output(0))
out2 = Output(adaptor.output(1))
with pytest.raises(ValueError, match="Duplicate input signal"):
SFG([in1], [out1, out2], [signal, signal])
def test_duplicate_output_signal(self):
in1 = Input()
in2 = Input()
adaptor = SymmetricTwoportAdaptor(0.5, in1, in2)
out1 = Output(adaptor.output(0))
signal = Signal(adaptor.output(1))
# Should raise?
SFG([in1, in2], [out1], output_signals=[signal, signal])
def test_dangling_input_signal(self):
in1 = Input()
signal = Signal()
adaptor = SymmetricTwoportAdaptor(0.5, in1, signal)
out1 = Output(adaptor.output(0))
out2 = Output(adaptor.output(1))
with pytest.raises(ValueError, match="Dangling signal without source in SFG"):
SFG([in1], [out1, out2])
def test_remove_signal_with_different_number_of_inputs_and_outputs(self):
in1 = Input()
in2 = Input()
add1 = Addition(in1, in2, name="addition")
out1 = Output(add1)
sfg = SFG([in1, in2], [out1])
# Try to remove non-existent operation
sfg1 = sfg.remove_operation("foo")
assert sfg1 is None
with pytest.raises(
ValueError,
match="Different number of input and output ports of operation with",
):
sfg.remove_operation('add1')
def test_inputs_required_for_output(self):
in1 = Input()
in2 = Input()
add1 = Addition(in1, in2, name="addition")
out1 = Output(add1)
sfg = SFG([in1, in2], [out1])
with pytest.raises(
IndexError,
match=re.escape("Output index out of range (expected 0-0, got 1)"),
):
sfg.inputs_required_for_output(1)
class TestInputDuplicationBug:
def test_input_is_not_duplicated_in_operation_list(self):
# Inputs:
in1 = Input(name="in1")
out1 = Output(name="out1")
# Operations:
t1 = Delay(initial_value=0, name="")
t1.inputs[0].connect(in1)
add1 = t1 + in1
out1.inputs[0].connect(add1)
twotapfir = SFG(inputs=[in1], outputs=[out1], name='twotapfir')
assert len([op for op in twotapfir.operations if isinstance(op, Input)]) == 1
class TestCriticalPath:
def test_single_accumulator(self, sfg_simple_accumulator: SFG):
sfg_simple_accumulator.set_latency_of_type(Addition.type_name(), 5)
assert sfg_simple_accumulator.critical_path_time() == 5
sfg_simple_accumulator.set_latency_of_type(Addition.type_name(), 6)
assert sfg_simple_accumulator.critical_path_time() == 6
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
def count_kinds(self, sfg: SFG) -> Dict[Type, int]:
return Counter([type(op) for op in sfg.operations])
# Checks that the number of each kind of operation in sfg2 is multiple*count
# of the same operation in sfg1.
# Filters out delay delays
def assert_counts_is_correct(self, sfg1: SFG, sfg2: SFG, multiple: int):
count1 = self.count_kinds(sfg1)
count2 = self.count_kinds(sfg2)
# Delays should not be duplicated. Check that and then clear them
# Using get to avoid issues if there are no delays in the sfg
assert count1.get(Delay) == count2.get(Delay)
count1[Delay] = 0
count2[Delay] = 0
# Ensure that we aren't missing any keys, or have any extras
assert count1.keys() == count2.keys()
for k in count1.keys():
assert count1[k] * multiple == count2[k]
# This is horrifying, but I can't figure out a way to run the test on multiple
# fixtures, so this is an ugly hack until someone that knows pytest comes along
def test_two_inputs_two_outputs(self, sfg_two_inputs_two_outputs: SFG):
self.do_tests(sfg_two_inputs_two_outputs)
def test_twotapfir(self, sfg_two_tap_fir: SFG):
self.do_tests(sfg_two_tap_fir)
def test_delay(self, sfg_delay: SFG):
self.do_tests(sfg_delay)
def test_sfg_two_inputs_two_outputs_independent(
self, sfg_two_inputs_two_outputs_independent: SFG
):
self.do_tests(sfg_two_inputs_two_outputs_independent)
def do_tests(self, sfg: SFG):
for factor in range(2, 4):
# Ensure that the correct number of operations get created
unfolded = sfg.unfold(factor)
self.assert_counts_is_correct(sfg, unfolded, factor)
double_unfolded = sfg.unfold(factor).unfold(factor)
self.assert_counts_is_correct(sfg, double_unfolded, factor * factor)
NUM_TESTS = 5
# Evaluate with some random values
# To avoid problems with missing inputs at the end of the sequence,
# we generate i*(some large enough) number
input_list = [
[random.random() for _ in range(0, NUM_TESTS * factor)]
for _ in sfg.inputs
]
sim = Simulation(sfg, input_list)
sim.run()
ref = sim.results
# We have i copies of the inputs, each sourcing their input from the orig
unfolded_input_lists = [[] for _ in range(len(sfg.inputs) * factor)]
for t in range(0, NUM_TESTS):
for n in range(0, factor):
for k in range(0, len(sfg.inputs)):
unfolded_input_lists[k + n * len(sfg.inputs)].append(
input_list[k][t * factor + n]
)
sim = Simulation(unfolded, unfolded_input_lists)
sim.run()
unfolded_results = sim.results
for n, _ in enumerate(sfg.outputs):
# Outputs for an original output
ref_values = list(ref[ResultKey(f"{n}")])
# Output n will be split into `factor` output ports, compute the
# indicies where we find the outputs
out_indices = [n + k * len(sfg.outputs) for k in range(factor)]
u_values = [
[unfolded_results[ResultKey(f"{idx}")][k] for idx in out_indices]
for k in range(int(NUM_TESTS))
]
flat_u_values = list(itertools.chain.from_iterable(u_values))
assert flat_u_values == ref_values
def test_value_error(self, sfg_two_inputs_two_outputs: SFG):
sfg = sfg_two_inputs_two_outputs
with pytest.raises(ValueError, match="Unfolding 0 times removes the SFG"):
class TestIsLinear:
def test_single_accumulator(self, sfg_simple_accumulator: SFG):
assert sfg_simple_accumulator.is_linear
def test_sfg_nested(self, sfg_nested: SFG):
assert not sfg_nested.is_linear
class TestIsConstant:
def test_single_accumulator(self, sfg_simple_accumulator: SFG):
assert not sfg_simple_accumulator.is_constant
def test_sfg_nested(self, sfg_nested: SFG):
assert not sfg_nested.is_constant
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
class TestSwapIOOfOperation:
def do_test(self, sfg: SFG, graph_id: GraphID):
NUM_TESTS = 5
# Evaluate with some random values
# To avoid problems with missing inputs at the end of the sequence,
# we generate i*(some large enough) number
input_list = [
[random.random() for _ in range(0, NUM_TESTS)] for _ in sfg.inputs
]
sim_ref = Simulation(sfg, input_list)
sim_ref.run()
sfg.swap_io_of_operation(graph_id)
sim_swap = Simulation(sfg, input_list)
sim_swap.run()
for n, _ in enumerate(sfg.outputs):
ref_values = list(sim_ref.results[ResultKey(f"{n}")])
swap_values = list(sim_swap.results[ResultKey(f"{n}")])
assert ref_values == swap_values
def test_single_accumulator(self, sfg_simple_accumulator: SFG):
self.do_test(sfg_simple_accumulator, 'add1')
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
class TestInsertComponentAfter:
def test_insert_component_after_in_sfg(self, large_operation_tree_names):
sfg = SFG(outputs=[Output(large_operation_tree_names)])
sqrt = SquareRoot()
_sfg = sfg.insert_operation_after(
sfg.find_by_name("constant4")[0].graph_id, sqrt
)
assert _sfg.evaluate() != sfg.evaluate()
assert any([isinstance(comp, SquareRoot) for comp in _sfg.operations])
assert not any([isinstance(comp, SquareRoot) for comp in sfg.operations])
assert not isinstance(
sfg.find_by_name("constant4")[0].output(0).signals[0].destination.operation,
SquareRoot,
)
assert isinstance(
_sfg.find_by_name("constant4")[0]
.output(0)
.signals[0]
.destination.operation,
SquareRoot,
)
assert sfg.find_by_name("constant4")[0].output(0).signals[
0
].destination.operation is sfg.find_by_id("add3")
assert _sfg.find_by_name("constant4")[0].output(0).signals[
0
].destination.operation is not _sfg.find_by_id("add3")
assert _sfg.find_by_id("sqrt1").output(0).signals[
0
].destination.operation is _sfg.find_by_id("add3")
def test_insert_component_after_mimo_operation_error(
self, large_operation_tree_names
):
sfg = SFG(outputs=[Output(large_operation_tree_names)])
with pytest.raises(
TypeError, match="Only operations with one input and one output"
):
sfg.insert_operation_after('constant4', SymmetricTwoportAdaptor(0.5))
def test_insert_component_after_unknown_component_error(
self, large_operation_tree_names
):
sfg = SFG(outputs=[Output(large_operation_tree_names)])
with pytest.raises(ValueError, match="Unknown component:"):
sfg.insert_operation_after('foo', SquareRoot())
class TestGetUsedTypeNames:
def test_single_accumulator(self, sfg_simple_accumulator: SFG):
assert sfg_simple_accumulator.get_used_type_names() == ['add', 'in', 'out', 't']
def test_sfg_nested(self, sfg_nested: SFG):
assert sfg_nested.get_used_type_names() == ['in', 'out', 'sfg']
def test_large_operation_tree(self, large_operation_tree):
sfg = SFG(outputs=[Output(large_operation_tree)])
assert sfg.get_used_type_names() == ['add', 'c', 'out']