Newer
Older
Angus Lothian
committed
class TestConnectExternalSignalsToComponentsMultipleComp:
def test_connect_external_signals_to_components_operation_tree(
self, operation_tree
):
"""Replaces a operation_tree in an SFG with other components"""
Angus Lothian
committed
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
sfg1 = SFG(outputs=[Output(operation_tree)])
inp1 = Input("INP1")
inp2 = Input("INP2")
out1 = Output(None, "OUT1")
add1 = Addition(None, None, "ADD1")
add2 = Addition(None, None, "ADD2")
add1.input(0).connect(inp1, "S1")
add1.input(1).connect(inp2, "S2")
add2.input(0).connect(add1, "S3")
add2.input(1).connect(sfg1.outputs[0], "S4")
out1.input(0).connect(add2, "S5")
test_sfg = SFG(inputs=[inp1, inp2], outputs=[out1])
assert test_sfg.evaluate(1, 2) == 8
sfg1.connect_external_signals_to_components()
assert test_sfg.evaluate(1, 2) == 8
assert not test_sfg.connect_external_signals_to_components()
def test_connect_external_signals_to_components_large_operation_tree(
self, large_operation_tree
):
"""Replaces a large_operation_tree in an SFG with other components"""
Angus Lothian
committed
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
sfg1 = SFG(outputs=[Output(large_operation_tree)])
inp1 = Input("INP1")
inp2 = Input("INP2")
out1 = Output(None, "OUT1")
add1 = Addition(None, None, "ADD1")
add2 = Addition(None, None, "ADD2")
add1.input(0).connect(inp1, "S1")
add1.input(1).connect(inp2, "S2")
add2.input(0).connect(add1, "S3")
add2.input(1).connect(sfg1.outputs[0], "S4")
out1.input(0).connect(add2, "S5")
test_sfg = SFG(inputs=[inp1, inp2], outputs=[out1])
assert test_sfg.evaluate(1, 2) == 17
sfg1.connect_external_signals_to_components()
assert test_sfg.evaluate(1, 2) == 17
assert not test_sfg.connect_external_signals_to_components()
def create_sfg(self, op_tree):
"""Create a simple SFG with either operation_tree or large_operation_tree
"""
Angus Lothian
committed
sfg1 = SFG(outputs=[Output(op_tree)])
inp1 = Input("INP1")
inp2 = Input("INP2")
out1 = Output(None, "OUT1")
add1 = Addition(None, None, "ADD1")
add2 = Addition(None, None, "ADD2")
add1.input(0).connect(inp1, "S1")
add1.input(1).connect(inp2, "S2")
add2.input(0).connect(add1, "S3")
add2.input(1).connect(sfg1.outputs[0], "S4")
out1.input(0).connect(add2, "S5")
return SFG(inputs=[inp1, inp2], outputs=[out1])
def test_connect_external_signals_to_components_many_op(
self, large_operation_tree
):
"""Replaces an sfg component in a larger SFG with several component operations
"""
Angus Lothian
committed
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
inp1 = Input("INP1")
inp2 = Input("INP2")
inp3 = Input("INP3")
inp4 = Input("INP4")
out1 = Output(None, "OUT1")
add1 = Addition(None, None, "ADD1")
sub1 = Subtraction(None, None, "SUB1")
add1.input(0).connect(inp1, "S1")
add1.input(1).connect(inp2, "S2")
sfg1 = self.create_sfg(large_operation_tree)
sfg1.input(0).connect(add1, "S3")
sfg1.input(1).connect(inp3, "S4")
sub1.input(0).connect(sfg1.outputs[0], "S5")
sub1.input(1).connect(inp4, "S6")
out1.input(0).connect(sub1, "S7")
test_sfg = SFG(inputs=[inp1, inp2, inp3, inp4], outputs=[out1])
assert test_sfg.evaluate(1, 2, 3, 4) == 16
sfg1.connect_external_signals_to_components()
assert test_sfg.evaluate(1, 2, 3, 4) == 16
assert not test_sfg.connect_external_signals_to_components()
class TestTopologicalOrderOperations:
def test_feedback_sfg(self, sfg_simple_filter):
topological_order = (
sfg_simple_filter.get_operations_topological_order()
)
Angus Lothian
committed
assert [comp.name for comp in topological_order] == [
"IN1",
"ADD1",
"T1",
"CMUL1",
"OUT1",
]
def test_multiple_independent_inputs(
self, sfg_two_inputs_two_outputs_independent
):
topological_order = (
sfg_two_inputs_two_outputs_independent.get_operations_topological_order()
)
Angus Lothian
committed
assert [comp.name for comp in topological_order] == [
"IN1",
"OUT1",
"IN2",
"C1",
"ADD1",
"OUT2",
]
Angus Lothian
committed
def test_complex_graph(self, precedence_sfg_delays):
topological_order = (
precedence_sfg_delays.get_operations_topological_order()
)
Angus Lothian
committed
assert [comp.name for comp in topological_order] == [
"IN1",
"C0",
"ADD1",
"Q1",
"A0",
"T1",
"B1",
"A1",
"T2",
"B2",
"ADD2",
"A2",
"ADD3",
"ADD4",
"OUT1",
]
Angus Lothian
committed
class TestRemove:
def test_remove_single_input_outputs(self, sfg_simple_filter):
new_sfg = sfg_simple_filter.remove_operation("cmul1")
assert set(
op.name
for op in sfg_simple_filter.find_by_name("T1")[
0
].subsequent_operations
) == {"CMUL1", "OUT1"}
assert set(
op.name
for op in new_sfg.find_by_name("T1")[0].subsequent_operations
) == {"ADD1", "OUT1"}
Angus Lothian
committed
assert set(
op.name
for op in sfg_simple_filter.find_by_name("ADD1")[
0
].preceding_operations
) == {"CMUL1", "IN1"}
assert set(
op.name
for op in new_sfg.find_by_name("ADD1")[0].preceding_operations
) == {"T1", "IN1"}
Angus Lothian
committed
assert "S1" in set(
[
sig.name
for sig in sfg_simple_filter.find_by_name("T1")[0]
.output(0)
.signals
]
)
Angus Lothian
committed
assert "S2" in set(
[
sig.name
for sig in new_sfg.find_by_name("T1")[0].output(0).signals
]
)
Angus Lothian
committed
def test_remove_multiple_inputs_outputs(self, butterfly_operation_tree):
out1 = Output(butterfly_operation_tree.output(0), "OUT1")
out2 = Output(butterfly_operation_tree.output(1), "OUT2")
sfg = SFG(outputs=[out1, out2])
new_sfg = sfg.remove_operation(sfg.find_by_name("bfly2")[0].graph_id)
assert sfg.find_by_name("bfly3")[0].output(0).signal_count == 1
assert new_sfg.find_by_name("bfly3")[0].output(0).signal_count == 1
sfg_dest_0 = (
sfg.find_by_name("bfly3")[0].output(0).signals[0].destination
)
new_sfg_dest_0 = (
new_sfg.find_by_name("bfly3")[0].output(0).signals[0].destination
)
Angus Lothian
committed
assert sfg_dest_0.index == 0
assert new_sfg_dest_0.index == 0
assert sfg_dest_0.operation.name == "bfly2"
assert new_sfg_dest_0.operation.name == "bfly1"
assert sfg.find_by_name("bfly3")[0].output(1).signal_count == 1
assert new_sfg.find_by_name("bfly3")[0].output(1).signal_count == 1
sfg_dest_1 = (
sfg.find_by_name("bfly3")[0].output(1).signals[0].destination
)
new_sfg_dest_1 = (
new_sfg.find_by_name("bfly3")[0].output(1).signals[0].destination
)
Angus Lothian
committed
assert sfg_dest_1.index == 1
assert new_sfg_dest_1.index == 1
assert sfg_dest_1.operation.name == "bfly2"
assert new_sfg_dest_1.operation.name == "bfly1"
assert sfg.find_by_name("bfly1")[0].input(0).signal_count == 1
assert new_sfg.find_by_name("bfly1")[0].input(0).signal_count == 1
sfg_source_0 = sfg.find_by_name("bfly1")[0].input(0).signals[0].source
new_sfg_source_0 = (
new_sfg.find_by_name("bfly1")[0].input(0).signals[0].source
)
Angus Lothian
committed
assert sfg_source_0.index == 0
assert new_sfg_source_0.index == 0
assert sfg_source_0.operation.name == "bfly2"
assert new_sfg_source_0.operation.name == "bfly3"
sfg_source_1 = sfg.find_by_name("bfly1")[0].input(1).signals[0].source
new_sfg_source_1 = (
new_sfg.find_by_name("bfly1")[0].input(1).signals[0].source
)
Angus Lothian
committed
assert sfg_source_1.index == 1
assert new_sfg_source_1.index == 1
assert sfg_source_1.operation.name == "bfly2"
assert new_sfg_source_1.operation.name == "bfly3"
assert "bfly2" not in set(op.name for op in new_sfg.operations)
def remove_different_number_inputs_outputs(self, sfg_simple_filter):
with pytest.raises(ValueError):
sfg_simple_filter.remove_operation("add1")
class TestSaveLoadSFG:
def get_path(self, existing=False):
path_ = "".join(random.choices(string.ascii_uppercase, k=4)) + ".py"
while path.exists(path_) if not existing else not path.exists(path_):
path_ = (
"".join(random.choices(string.ascii_uppercase, k=4)) + ".py"
)
Angus Lothian
committed
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
return path_
def test_save_simple_sfg(self, sfg_simple_filter):
result = sfg_to_python(sfg_simple_filter)
path_ = self.get_path()
assert not path.exists(path_)
with open(path_, "w") as file_obj:
file_obj.write(result)
assert path.exists(path_)
with open(path_, "r") as file_obj:
assert file_obj.read() == result
remove(path_)
def test_save_complex_sfg(self, precedence_sfg_delays_and_constants):
result = sfg_to_python(precedence_sfg_delays_and_constants)
path_ = self.get_path()
assert not path.exists(path_)
with open(path_, "w") as file_obj:
file_obj.write(result)
assert path.exists(path_)
with open(path_, "r") as file_obj:
assert file_obj.read() == result
remove(path_)
def test_load_simple_sfg(self, sfg_simple_filter):
result = sfg_to_python(sfg_simple_filter)
path_ = self.get_path()
assert not path.exists(path_)
with open(path_, "w") as file_obj:
file_obj.write(result)
assert path.exists(path_)
simple_filter_, _ = python_to_sfg(path_)
assert str(sfg_simple_filter) == str(simple_filter_)
assert sfg_simple_filter.evaluate([2]) == simple_filter_.evaluate([2])
remove(path_)
def test_load_complex_sfg(self, precedence_sfg_delays_and_constants):
result = sfg_to_python(precedence_sfg_delays_and_constants)
path_ = self.get_path()
assert not path.exists(path_)
with open(path_, "w") as file_obj:
file_obj.write(result)
assert path.exists(path_)
precedence_sfg_registers_and_constants_, _ = python_to_sfg(path_)
assert str(precedence_sfg_delays_and_constants) == str(
Angus Lothian
committed
remove(path_)
def test_load_invalid_path(self):
path_ = self.get_path(existing=False)
with pytest.raises(FileNotFoundError):
Angus Lothian
committed
python_to_sfg(path_)
class TestGetComponentsOfType:
def test_get_no_operations_of_type(self, sfg_two_inputs_two_outputs):
assert [
op.name
for op in sfg_two_inputs_two_outputs.find_by_type_name(
Multiplication.type_name()
)
] == []
Angus Lothian
committed
def test_get_multple_operations_of_type(self, sfg_two_inputs_two_outputs):
assert [
op.name
for op in sfg_two_inputs_two_outputs.find_by_type_name(
Addition.type_name()
)
] == ["ADD1", "ADD2"]
assert [
op.name
for op in sfg_two_inputs_two_outputs.find_by_type_name(
Input.type_name()
)
] == ["IN1", "IN2"]
assert [
op.name
for op in sfg_two_inputs_two_outputs.find_by_type_name(
Output.type_name()
)
] == ["OUT1", "OUT2"]
class TestPrecedenceGraph:
def test_precedence_graph(self, sfg_simple_filter):
"digraph {\n\trankdir=LR\n\tsubgraph cluster_0"
" {\n\t\tlabel=N1\n\t\t\"in1.0\" [label=in1]\n\t\t\"t1.0\""
" [label=t1]\n\t}\n\tsubgraph cluster_1"
" {\n\t\tlabel=N2\n\t\t\"cmul1.0\" [label=cmul1]\n\t}\n\tsubgraph"
" cluster_2 {\n\t\tlabel=N3\n\t\t\"add1.0\""
" [label=add1]\n\t}\n\t\"in1.0\" -> add1\n\tadd1 [label=add1"
" shape=square]\n\tin1 -> \"in1.0\"\n\tin1 [label=in1"
" shape=square]\n\t\"t1.0\" -> cmul1\n\tcmul1 [label=cmul1"
" shape=square]\n\t\"t1.0\" -> out1\n\tout1 [label=out1"
" shape=square]\n\tt1Out -> \"t1.0\"\n\tt1Out [label=t1"
" shape=square]\n\t\"cmul1.0\" -> add1\n\tadd1 [label=add1"
" shape=square]\n\tcmul1 -> \"cmul1.0\"\n\tcmul1 [label=cmul1"
" shape=square]\n\t\"add1.0\" -> t1In\n\tt1In [label=t1"
" shape=square]\n\tadd1 -> \"add1.0\"\n\tadd1 [label=add1"
assert sfg_simple_filter.precedence_graph().source in (res, res + "\n")
class TestSFGGraph:
def test_sfg(self, sfg_simple_filter):
res = (
"digraph {\n\trankdir=LR\n\tin1\n\tin1 -> "
"add1\n\tout1\n\tt1 -> out1\n\tadd1\n\tcmul1 -> "
"add1\n\tcmul1\n\tadd1 -> t1\n\tt1 [shape=square]\n\tt1 "
assert sfg_simple_filter.sfg().source in (res, res + "\n")
def test_sfg_show_id(self, sfg_simple_filter):
res = (
"digraph {\n\trankdir=LR\n\tin1\n\tin1 -> add1 "
"[label=s1]\n\tout1\n\tt1 -> out1 [label=s2]\n\tadd1"
"\n\tcmul1 -> add1 [label=s3]\n\tcmul1\n\tadd1 -> t1 "
"[label=s4]\n\tt1 [shape=square]\n\tt1 -> cmul1 [label=s5]\n}"
assert sfg_simple_filter.sfg(show_id=True).source in (res, res + "\n")
def test_show_sfg_invalid_format(self, sfg_simple_filter):
sfg_simple_filter.show_sfg(format="ppddff")
def test_show_sfg_invalid_engine(self, sfg_simple_filter):
sfg_simple_filter.show_sfg(engine="ppddff")
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
class TestSFGErrors:
def test_dangling_output(self):
in1 = Input()
in2 = Input()
adaptor = SymmetricTwoportAdaptor(0.5, in1, in2)
out1 = Output(adaptor.output(0))
# No error, maybe should be?
_ = SFG([in1, in2], [out1])
def test_unconnected_input_port(self):
in1 = Input()
adaptor = SymmetricTwoportAdaptor(0.5, in1)
out1 = Output(adaptor.output(0))
with pytest.raises(ValueError, match="Unconnected input port in SFG"):
SFG([in1], [out1])
def test_unconnected_output(self):
in1 = Input()
in2 = Input()
adaptor = SymmetricTwoportAdaptor(0.5, in1, in2)
out1 = Output(adaptor.output(0))
out2 = Output()
# No error, should be
SFG([in1, in2], [out1, out2])
def test_unconnected_input(self):
in1 = Input()
in2 = Input()
adaptor = SymmetricTwoportAdaptor(0.5, in1)
out1 = Output(adaptor.output(0))
out2 = Output(adaptor.output(1))
# Correct error?
with pytest.raises(ValueError, match="Unconnected input port in SFG"):
SFG([in1, in2], [out1, out2])
def test_duplicate_input(self):
in1 = Input()
in2 = Input()
adaptor = SymmetricTwoportAdaptor(0.5, in1, in2)
out1 = Output(adaptor.output(0))
out2 = Output(adaptor.output(1))
with pytest.raises(ValueError, match="Duplicate input operation"):
SFG([in1, in1], [out1, out2])
def test_duplicate_output(self):
in1 = Input()
in2 = Input()
adaptor = SymmetricTwoportAdaptor(0.5, in1, in2)
out1 = Output(adaptor.output(0))
out2 = Output(adaptor.output(1))
with pytest.raises(ValueError, match="Duplicate output operation"):
SFG([in1, in2], [out1, out1])
def test_unconnected_input_signal(self):
in1 = Input()
in2 = Input()
adaptor = SymmetricTwoportAdaptor(0.5, in1, in2)
out1 = Output(adaptor.output(0))
out2 = Output(adaptor.output(1))
signal = Signal()
with pytest.raises(
ValueError, match="Input signal #0 is missing destination in SFG"
):
SFG([in1, in2], [out1, out2], [signal])
def test_unconnected_output_signal(self):
in1 = Input()
in2 = Input()
adaptor = SymmetricTwoportAdaptor(0.5, in1, in2)
out1 = Output(adaptor.output(0))
out2 = Output(adaptor.output(1))
signal = Signal()
with pytest.raises(
ValueError, match="Output signal #0 is missing source in SFG"
):
SFG([in1, in2], [out1, out2], output_signals=[signal])
def test_duplicate_input_signal(self):
in1 = Input()
signal = Signal()
adaptor = SymmetricTwoportAdaptor(0.5, in1, signal)
out1 = Output(adaptor.output(0))
out2 = Output(adaptor.output(1))
with pytest.raises(ValueError, match="Duplicate input signal"):
SFG([in1], [out1, out2], [signal, signal])
def test_duplicate_output_signal(self):
in1 = Input()
in2 = Input()
adaptor = SymmetricTwoportAdaptor(0.5, in1, in2)
out1 = Output(adaptor.output(0))
signal = Signal(adaptor.output(1))
# Should raise?
SFG([in1, in2], [out1], output_signals=[signal, signal])
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
def test_dangling_input_signal(self):
in1 = Input()
signal = Signal()
adaptor = SymmetricTwoportAdaptor(0.5, in1, signal)
out1 = Output(adaptor.output(0))
out2 = Output(adaptor.output(1))
with pytest.raises(
ValueError, match="Dangling signal without source in SFG"
):
SFG([in1], [out1, out2])
def test_remove_signal_with_different_number_of_inputs_and_outputs(self):
in1 = Input()
in2 = Input()
add1 = Addition(in1, in2, name="addition")
out1 = Output(add1)
sfg = SFG([in1, in2], [out1])
# Try to remove non-existent operation
sfg1 = sfg.remove_operation("foo")
assert sfg1 is None
with pytest.raises(
ValueError,
match=(
"Different number of input and output ports of operation with"
),
):
sfg.remove_operation('add1')
def test_inputs_required_for_output(self):
in1 = Input()
in2 = Input()
add1 = Addition(in1, in2, name="addition")
out1 = Output(add1)
sfg = SFG([in1, in2], [out1])
with pytest.raises(
IndexError,
match=re.escape("Output index out of range (expected 0-0, got 1)"),
):
sfg.inputs_required_for_output(1)