Newer
Older
for op in sfg_simple_filter.find_by_name("T1")[0].subsequent_operations
op.name for op in new_sfg.find_by_name("T1")[0].subsequent_operations
Angus Lothian
committed
for op in sfg_simple_filter.find_by_name("ADD1")[0].preceding_operations
op.name for op in new_sfg.find_by_name("ADD1")[0].preceding_operations
Angus Lothian
committed
assert "S1" in set(
for sig in sfg_simple_filter.find_by_name("T1")[0].output(0).signals
Angus Lothian
committed
assert "S2" in set(
[sig.name for sig in new_sfg.find_by_name("T1")[0].output(0).signals]
Angus Lothian
committed
def test_remove_multiple_inputs_outputs(self, butterfly_operation_tree):
out1 = Output(butterfly_operation_tree.output(0), "OUT1")
out2 = Output(butterfly_operation_tree.output(1), "OUT2")
sfg = SFG(outputs=[out1, out2])
new_sfg = sfg.remove_operation(sfg.find_by_name("bfly2")[0].graph_id)
assert sfg.find_by_name("bfly3")[0].output(0).signal_count == 1
assert new_sfg.find_by_name("bfly3")[0].output(0).signal_count == 1
sfg_dest_0 = sfg.find_by_name("bfly3")[0].output(0).signals[0].destination
new_sfg_dest_0 = (
new_sfg.find_by_name("bfly3")[0].output(0).signals[0].destination
)
Angus Lothian
committed
assert sfg_dest_0.index == 0
assert new_sfg_dest_0.index == 0
assert sfg_dest_0.operation.name == "bfly2"
assert new_sfg_dest_0.operation.name == "bfly1"
assert sfg.find_by_name("bfly3")[0].output(1).signal_count == 1
assert new_sfg.find_by_name("bfly3")[0].output(1).signal_count == 1
sfg_dest_1 = sfg.find_by_name("bfly3")[0].output(1).signals[0].destination
new_sfg_dest_1 = (
new_sfg.find_by_name("bfly3")[0].output(1).signals[0].destination
)
Angus Lothian
committed
assert sfg_dest_1.index == 1
assert new_sfg_dest_1.index == 1
assert sfg_dest_1.operation.name == "bfly2"
assert new_sfg_dest_1.operation.name == "bfly1"
assert sfg.find_by_name("bfly1")[0].input(0).signal_count == 1
assert new_sfg.find_by_name("bfly1")[0].input(0).signal_count == 1
sfg_source_0 = sfg.find_by_name("bfly1")[0].input(0).signals[0].source
new_sfg_source_0 = new_sfg.find_by_name("bfly1")[0].input(0).signals[0].source
Angus Lothian
committed
assert sfg_source_0.index == 0
assert new_sfg_source_0.index == 0
assert sfg_source_0.operation.name == "bfly2"
assert new_sfg_source_0.operation.name == "bfly3"
sfg_source_1 = sfg.find_by_name("bfly1")[0].input(1).signals[0].source
new_sfg_source_1 = new_sfg.find_by_name("bfly1")[0].input(1).signals[0].source
Angus Lothian
committed
assert sfg_source_1.index == 1
assert new_sfg_source_1.index == 1
assert sfg_source_1.operation.name == "bfly2"
assert new_sfg_source_1.operation.name == "bfly3"
assert "bfly2" not in set(op.name for op in new_sfg.operations)
def remove_different_number_inputs_outputs(self, sfg_simple_filter):
with pytest.raises(ValueError):
sfg_simple_filter.remove_operation("add1")
class TestSaveLoadSFG:
def get_path(self, existing=False):
path_ = "".join(random.choices(string.ascii_uppercase, k=4)) + ".py"
while path.exists(path_) if not existing else not path.exists(path_):
path_ = "".join(random.choices(string.ascii_uppercase, k=4)) + ".py"
Angus Lothian
committed
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
return path_
def test_save_simple_sfg(self, sfg_simple_filter):
result = sfg_to_python(sfg_simple_filter)
path_ = self.get_path()
assert not path.exists(path_)
with open(path_, "w") as file_obj:
file_obj.write(result)
assert path.exists(path_)
with open(path_, "r") as file_obj:
assert file_obj.read() == result
remove(path_)
def test_save_complex_sfg(self, precedence_sfg_delays_and_constants):
result = sfg_to_python(precedence_sfg_delays_and_constants)
path_ = self.get_path()
assert not path.exists(path_)
with open(path_, "w") as file_obj:
file_obj.write(result)
assert path.exists(path_)
with open(path_, "r") as file_obj:
assert file_obj.read() == result
remove(path_)
def test_load_simple_sfg(self, sfg_simple_filter):
result = sfg_to_python(sfg_simple_filter)
path_ = self.get_path()
assert not path.exists(path_)
with open(path_, "w") as file_obj:
file_obj.write(result)
assert path.exists(path_)
simple_filter_, _ = python_to_sfg(path_)
assert str(sfg_simple_filter) == str(simple_filter_)
assert sfg_simple_filter.evaluate([2]) == simple_filter_.evaluate([2])
remove(path_)
def test_load_complex_sfg(self, precedence_sfg_delays_and_constants):
result = sfg_to_python(precedence_sfg_delays_and_constants)
path_ = self.get_path()
assert not path.exists(path_)
with open(path_, "w") as file_obj:
file_obj.write(result)
assert path.exists(path_)
precedence_sfg_registers_and_constants_, _ = python_to_sfg(path_)
assert str(precedence_sfg_delays_and_constants) == str(
Angus Lothian
committed
remove(path_)
def test_load_invalid_path(self):
path_ = self.get_path(existing=False)
with pytest.raises(FileNotFoundError):
Angus Lothian
committed
python_to_sfg(path_)
class TestGetComponentsOfType:
def test_get_no_operations_of_type(self, sfg_two_inputs_two_outputs):
assert [
op.name
for op in sfg_two_inputs_two_outputs.find_by_type_name(
Multiplication.type_name()
)
] == []
Angus Lothian
committed
def test_get_multple_operations_of_type(self, sfg_two_inputs_two_outputs):
for op in sfg_two_inputs_two_outputs.find_by_type_name(Addition.type_name())
for op in sfg_two_inputs_two_outputs.find_by_type_name(Input.type_name())
for op in sfg_two_inputs_two_outputs.find_by_type_name(Output.type_name())
class TestPrecedenceGraph:
def test_precedence_graph(self, sfg_simple_filter):
'digraph {\n\trankdir=LR\n\tsubgraph cluster_0 {\n\t\tlabel=N0\n\t\t"in1.0"'
' [label=in1 height=0.1 shape=rectangle width=0.1]\n\t\t"t1.0" [label=t1'
' height=0.1 shape=rectangle width=0.1]\n\t}\n\tsubgraph cluster_1'
' {\n\t\tlabel=N1\n\t\t"cmul1.0" [label=cmul1 height=0.1 shape=rectangle'
' width=0.1]\n\t}\n\tsubgraph cluster_2 {\n\t\tlabel=N2\n\t\t"add1.0"'
' [label=add1 height=0.1 shape=rectangle width=0.1]\n\t}\n\t"in1.0" ->'
' add1\n\tadd1 [label=add1 shape=ellipse]\n\tin1 -> "in1.0"\n\tin1'
' [label=in1 shape=cds]\n\t"t1.0" -> cmul1\n\tcmul1 [label=cmul1'
' shape=ellipse]\n\t"t1.0" -> out1\n\tout1 [label=out1 shape=cds]\n\tt1Out'
' -> "t1.0"\n\tt1Out [label=t1 shape=square]\n\t"cmul1.0" -> add1\n\tadd1'
' [label=add1 shape=ellipse]\n\tcmul1 -> "cmul1.0"\n\tcmul1 [label=cmul1'
' shape=ellipse]\n\t"add1.0" -> t1In\n\tt1In [label=t1'
' shape=square]\n\tadd1 -> "add1.0"\n\tadd1 [label=add1 shape=ellipse]\n}'
assert sfg_simple_filter.precedence_graph().source in (res, res + "\n")
class TestSFGGraph:
def test_sfg(self, sfg_simple_filter):
'digraph {\n\trankdir=LR\n\tin1 [shape=cds]\n\tin1 -> add1\n\tout1'
' [shape=cds]\n\tt1 -> out1\n\tadd1 [shape=ellipse]\n\tcmul1 ->'
' add1\n\tcmul1 [shape=ellipse]\n\tadd1 -> t1\n\tt1'
' [shape=square]\n\tt1 -> cmul1\n}'
assert sfg_simple_filter.sfg_digraph().source in (res, res + "\n")
def test_sfg_show_id(self, sfg_simple_filter):
'digraph {\n\trankdir=LR\n\tin1 [shape=cds]\n\tin1 -> add1'
' [label=s1]\n\tout1 [shape=cds]\n\tt1 -> out1 [label=s2]\n\tadd1'
' [shape=ellipse]\n\tcmul1 -> add1 [label=s3]\n\tcmul1'
' [shape=ellipse]\n\tadd1 -> t1 [label=s4]\n\tt1'
' [shape=square]\n\tt1 -> cmul1 [label=s5]\n}'
assert sfg_simple_filter.sfg_digraph(show_id=True).source in (
res,
res + "\n",
)
def test_show_sfg_invalid_format(self, sfg_simple_filter):
sfg_simple_filter.show(fmt="ppddff")
def test_show_sfg_invalid_engine(self, sfg_simple_filter):
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
class TestSFGErrors:
def test_dangling_output(self):
in1 = Input()
in2 = Input()
adaptor = SymmetricTwoportAdaptor(0.5, in1, in2)
out1 = Output(adaptor.output(0))
# No error, maybe should be?
_ = SFG([in1, in2], [out1])
def test_unconnected_input_port(self):
in1 = Input()
adaptor = SymmetricTwoportAdaptor(0.5, in1)
out1 = Output(adaptor.output(0))
with pytest.raises(ValueError, match="Unconnected input port in SFG"):
SFG([in1], [out1])
def test_unconnected_output(self):
in1 = Input()
in2 = Input()
adaptor = SymmetricTwoportAdaptor(0.5, in1, in2)
out1 = Output(adaptor.output(0))
out2 = Output()
# No error, should be
SFG([in1, in2], [out1, out2])
def test_unconnected_input(self):
in1 = Input()
in2 = Input()
adaptor = SymmetricTwoportAdaptor(0.5, in1)
out1 = Output(adaptor.output(0))
out2 = Output(adaptor.output(1))
# Correct error?
with pytest.raises(ValueError, match="Unconnected input port in SFG"):
SFG([in1, in2], [out1, out2])
def test_duplicate_input(self):
in1 = Input()
in2 = Input()
adaptor = SymmetricTwoportAdaptor(0.5, in1, in2)
out1 = Output(adaptor.output(0))
out2 = Output(adaptor.output(1))
with pytest.raises(ValueError, match="Duplicate input operation"):
SFG([in1, in1], [out1, out2])
def test_duplicate_output(self):
in1 = Input()
in2 = Input()
adaptor = SymmetricTwoportAdaptor(0.5, in1, in2)
out1 = Output(adaptor.output(0))
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
with pytest.raises(ValueError, match="Duplicate output operation"):
SFG([in1, in2], [out1, out1])
def test_unconnected_input_signal(self):
in1 = Input()
in2 = Input()
adaptor = SymmetricTwoportAdaptor(0.5, in1, in2)
out1 = Output(adaptor.output(0))
out2 = Output(adaptor.output(1))
signal = Signal()
with pytest.raises(
ValueError, match="Input signal #0 is missing destination in SFG"
):
SFG([in1, in2], [out1, out2], [signal])
def test_unconnected_output_signal(self):
in1 = Input()
in2 = Input()
adaptor = SymmetricTwoportAdaptor(0.5, in1, in2)
out1 = Output(adaptor.output(0))
out2 = Output(adaptor.output(1))
signal = Signal()
with pytest.raises(
ValueError, match="Output signal #0 is missing source in SFG"
):
SFG([in1, in2], [out1, out2], output_signals=[signal])
def test_duplicate_input_signal(self):
in1 = Input()
signal = Signal()
adaptor = SymmetricTwoportAdaptor(0.5, in1, signal)
out1 = Output(adaptor.output(0))
out2 = Output(adaptor.output(1))
with pytest.raises(ValueError, match="Duplicate input signal"):
SFG([in1], [out1, out2], [signal, signal])
def test_duplicate_output_signal(self):
in1 = Input()
in2 = Input()
adaptor = SymmetricTwoportAdaptor(0.5, in1, in2)
out1 = Output(adaptor.output(0))
signal = Signal(adaptor.output(1))
# Should raise?
SFG([in1, in2], [out1], output_signals=[signal, signal])
def test_dangling_input_signal(self):
in1 = Input()
signal = Signal()
adaptor = SymmetricTwoportAdaptor(0.5, in1, signal)
out1 = Output(adaptor.output(0))
out2 = Output(adaptor.output(1))
with pytest.raises(ValueError, match="Dangling signal without source in SFG"):
SFG([in1], [out1, out2])
def test_remove_signal_with_different_number_of_inputs_and_outputs(self):
in1 = Input()
in2 = Input()
add1 = Addition(in1, in2, name="addition")
out1 = Output(add1)
sfg = SFG([in1, in2], [out1])
# Try to remove non-existent operation
sfg1 = sfg.remove_operation("foo")
assert sfg1 is None
with pytest.raises(
ValueError,
match="Different number of input and output ports of operation with",
):
sfg.remove_operation('add1')
def test_inputs_required_for_output(self):
in1 = Input()
in2 = Input()
add1 = Addition(in1, in2, name="addition")
out1 = Output(add1)
sfg = SFG([in1, in2], [out1])
with pytest.raises(
IndexError,
match=re.escape("Output index out of range (expected 0-0, got 1)"),
):
sfg.inputs_required_for_output(1)
class TestInputDuplicationBug:
def test_input_is_not_duplicated_in_operation_list(self):
# Inputs:
in1 = Input(name="in1")
out1 = Output(name="out1")
# Operations:
t1 = Delay(initial_value=0, name="")
t1.inputs[0].connect(in1)
add1 = t1 + in1
out1.inputs[0].connect(add1)
twotapfir = SFG(inputs=[in1], outputs=[out1], name='twotapfir')
assert len([op for op in twotapfir.operations if isinstance(op, Input)]) == 1
class TestCriticalPath:
def test_single_accumulator(self, sfg_simple_accumulator: SFG):
sfg_simple_accumulator.set_latency_of_type(Addition.type_name(), 5)
assert sfg_simple_accumulator.critical_path_time() == 5
sfg_simple_accumulator.set_latency_of_type(Addition.type_name(), 6)
assert sfg_simple_accumulator.critical_path_time() == 6
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
def count_kinds(self, sfg: SFG) -> Dict[Type, int]:
return Counter([type(op) for op in sfg.operations])
# Checks that the number of each kind of operation in sfg2 is multiple*count
# of the same operation in sfg1.
# Filters out delay delays
def assert_counts_is_correct(self, sfg1: SFG, sfg2: SFG, multiple: int):
count1 = self.count_kinds(sfg1)
count2 = self.count_kinds(sfg2)
# Delays should not be duplicated. Check that and then clear them
# Using get to avoid issues if there are no delays in the sfg
assert count1.get(Delay) == count2.get(Delay)
count1[Delay] = 0
count2[Delay] = 0
# Ensure that we aren't missing any keys, or have any extras
assert count1.keys() == count2.keys()
for k in count1.keys():
assert count1[k] * multiple == count2[k]
# This is horrifying, but I can't figure out a way to run the test on multiple fixtures,
# so this is an ugly hack until someone that knows pytest comes along
def test_two_inputs_two_outputs(self, sfg_two_inputs_two_outputs: SFG):
self.do_tests(sfg_two_inputs_two_outputs)
def test_twotapfir(self, sfg_two_tap_fir: SFG):
self.do_tests(sfg_two_tap_fir)
def test_delay(self, sfg_delay: SFG):
self.do_tests(sfg_delay)
def test_sfg_two_inputs_two_outputs_independent(
self, sfg_two_inputs_two_outputs_independent: SFG
):
self.do_tests(sfg_two_inputs_two_outputs_independent)
def do_tests(self, sfg: SFG):
for factor in range(2, 4):
# Ensure that the correct number of operations get created
unfolded = sfg.unfold(factor)
self.assert_counts_is_correct(sfg, unfolded, factor)
double_unfolded = sfg.unfold(factor).unfold(factor)
self.assert_counts_is_correct(sfg, double_unfolded, factor * factor)
NUM_TESTS = 5
# Evaluate with some random values
# To avoid problems with missing inputs at the end of the sequence,
# we generate i*(some large enough) number
input_list = [
[random.random() for _ in range(0, NUM_TESTS * factor)]
for _ in sfg.inputs
]
sim = Simulation(sfg, input_list)
sim.run()
ref = sim.results
# We have i copies of the inputs, each sourcing their input from the orig
unfolded_input_lists = [[] for _ in range(len(sfg.inputs) * factor)]
for t in range(0, NUM_TESTS):
for n in range(0, factor):
for k in range(0, len(sfg.inputs)):
unfolded_input_lists[k + n * len(sfg.inputs)].append(
input_list[k][t * factor + n]
)
sim = Simulation(unfolded, unfolded_input_lists)
sim.run()
unfolded_results = sim.results
for n, _ in enumerate(sfg.outputs):
# Outputs for an original output
ref_values = list(ref[ResultKey(f"{n}")])
# Output n will be split into `factor` output ports, compute the
# indicies where we find the outputs
out_indices = [n + k * len(sfg.outputs) for k in range(factor)]
u_values = [
[unfolded_results[ResultKey(f"{idx}")][k] for idx in out_indices]
for k in range(int(NUM_TESTS))
]
flat_u_values = list(itertools.chain.from_iterable(u_values))
assert flat_u_values == ref_values
def test_value_error(self, sfg_two_inputs_two_outputs: SFG):
sfg = sfg_two_inputs_two_outputs
with pytest.raises(ValueError, match="Unfolding 0 times removes the SFG"):
class TestIsLinear:
def test_single_accumulator(self, sfg_simple_accumulator: SFG):
assert sfg_simple_accumulator.is_linear
def test_sfg_nested(self, sfg_nested: SFG):
assert not sfg_nested.is_linear
class TestIsConstant:
def test_single_accumulator(self, sfg_simple_accumulator: SFG):
assert not sfg_simple_accumulator.is_constant
def test_sfg_nested(self, sfg_nested: SFG):
assert not sfg_nested.is_constant