Newer
Older
Angus Lothian
committed
class TestRemove:
def test_remove_single_input_outputs(self, sfg_simple_filter):
new_sfg = sfg_simple_filter.remove_operation("cmul0")
Angus Lothian
committed
for op in sfg_simple_filter.find_by_name("T")[0].subsequent_operations
} == {"CMUL", "OUT"}
op.name for op in new_sfg.find_by_name("T")[0].subsequent_operations
} == {"ADD", "OUT"}
Angus Lothian
committed
for op in sfg_simple_filter.find_by_name("ADD")[0].preceding_operations
} == {"CMUL", "IN"}
op.name for op in new_sfg.find_by_name("ADD")[0].preceding_operations
} == {"T", "IN"}
Angus Lothian
committed
assert "S1" in {
sig.name for sig in sfg_simple_filter.find_by_name("T")[0].output(0).signals
}
assert "S2" in {
sig.name for sig in new_sfg.find_by_name("T")[0].output(0).signals
Angus Lothian
committed
def test_remove_multiple_inputs_outputs(self, butterfly_operation_tree):
out1 = Output(butterfly_operation_tree.output(0), "OUT1")
out2 = Output(butterfly_operation_tree.output(1), "OUT2")
sfg = SFG(outputs=[out1, out2])
new_sfg = sfg.remove_operation(sfg.find_by_name("bfly2")[0].graph_id)
assert sfg.find_by_name("bfly3")[0].output(0).signal_count == 1
assert new_sfg.find_by_name("bfly3")[0].output(0).signal_count == 1
sfg_dest_0 = sfg.find_by_name("bfly3")[0].output(0).signals[0].destination
new_sfg_dest_0 = (
new_sfg.find_by_name("bfly3")[0].output(0).signals[0].destination
)
Angus Lothian
committed
assert sfg_dest_0.index == 0
assert new_sfg_dest_0.index == 0
assert sfg_dest_0.operation.name == "bfly2"
assert new_sfg_dest_0.operation.name == "bfly1"
assert sfg.find_by_name("bfly3")[0].output(1).signal_count == 1
assert new_sfg.find_by_name("bfly3")[0].output(1).signal_count == 1
sfg_dest_1 = sfg.find_by_name("bfly3")[0].output(1).signals[0].destination
new_sfg_dest_1 = (
new_sfg.find_by_name("bfly3")[0].output(1).signals[0].destination
)
Angus Lothian
committed
assert sfg_dest_1.index == 1
assert new_sfg_dest_1.index == 1
assert sfg_dest_1.operation.name == "bfly2"
assert new_sfg_dest_1.operation.name == "bfly1"
assert sfg.find_by_name("bfly1")[0].input(0).signal_count == 1
assert new_sfg.find_by_name("bfly1")[0].input(0).signal_count == 1
sfg_source_0 = sfg.find_by_name("bfly1")[0].input(0).signals[0].source
new_sfg_source_0 = new_sfg.find_by_name("bfly1")[0].input(0).signals[0].source
Angus Lothian
committed
assert sfg_source_0.index == 0
assert new_sfg_source_0.index == 0
assert sfg_source_0.operation.name == "bfly2"
assert new_sfg_source_0.operation.name == "bfly3"
sfg_source_1 = sfg.find_by_name("bfly1")[0].input(1).signals[0].source
new_sfg_source_1 = new_sfg.find_by_name("bfly1")[0].input(1).signals[0].source
Angus Lothian
committed
assert sfg_source_1.index == 1
assert new_sfg_source_1.index == 1
assert sfg_source_1.operation.name == "bfly2"
assert new_sfg_source_1.operation.name == "bfly3"
assert "bfly2" not in {op.name for op in new_sfg.operations}
Angus Lothian
committed
def remove_different_number_inputs_outputs(self, sfg_simple_filter):
with pytest.raises(ValueError):
sfg_simple_filter.remove_operation("add1")
class TestSaveLoadSFG:
def get_path(self, existing=False):
path_ = "".join(random.choices(string.ascii_uppercase, k=4)) + ".py"
while path.exists(path_) if not existing else not path.exists(path_):
path_ = "".join(random.choices(string.ascii_uppercase, k=4)) + ".py"
Angus Lothian
committed
return path_
def test_save_simple_sfg(self, sfg_simple_filter):
result = sfg_to_python(sfg_simple_filter)
path_ = self.get_path()
assert not path.exists(path_)
with open(path_, "w") as file_obj:
file_obj.write(result)
assert path.exists(path_)
with open(path_) as file_obj:
Angus Lothian
committed
assert file_obj.read() == result
remove(path_)
def test_save_complex_sfg(self, precedence_sfg_delays_and_constants):
result = sfg_to_python(precedence_sfg_delays_and_constants)
path_ = self.get_path()
assert not path.exists(path_)
with open(path_, "w") as file_obj:
file_obj.write(result)
assert path.exists(path_)
with open(path_) as file_obj:
Angus Lothian
committed
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
assert file_obj.read() == result
remove(path_)
def test_load_simple_sfg(self, sfg_simple_filter):
result = sfg_to_python(sfg_simple_filter)
path_ = self.get_path()
assert not path.exists(path_)
with open(path_, "w") as file_obj:
file_obj.write(result)
assert path.exists(path_)
simple_filter_, _ = python_to_sfg(path_)
assert str(sfg_simple_filter) == str(simple_filter_)
assert sfg_simple_filter.evaluate([2]) == simple_filter_.evaluate([2])
remove(path_)
def test_load_complex_sfg(self, precedence_sfg_delays_and_constants):
result = sfg_to_python(precedence_sfg_delays_and_constants)
path_ = self.get_path()
assert not path.exists(path_)
with open(path_, "w") as file_obj:
file_obj.write(result)
assert path.exists(path_)
precedence_sfg_registers_and_constants_, _ = python_to_sfg(path_)
assert str(precedence_sfg_delays_and_constants) == str(
Angus Lothian
committed
remove(path_)
def test_load_invalid_path(self):
path_ = self.get_path(existing=False)
with pytest.raises(FileNotFoundError):
Angus Lothian
committed
python_to_sfg(path_)
class TestGetComponentsOfType:
def test_get_no_operations_of_type(self, sfg_two_inputs_two_outputs):
assert [
op.name
for op in sfg_two_inputs_two_outputs.find_by_type_name(
Multiplication.type_name()
)
] == []
Angus Lothian
committed
def test_get_multiple_operations_of_type(self, sfg_two_inputs_two_outputs):
for op in sfg_two_inputs_two_outputs.find_by_type_name(Addition.type_name())
for op in sfg_two_inputs_two_outputs.find_by_type_name(Input.type_name())
for op in sfg_two_inputs_two_outputs.find_by_type_name(Output.type_name())
class TestPrecedenceGraph:
def test_precedence_graph(self, sfg_simple_filter):
'digraph {\n\trankdir=LR\n\tsubgraph cluster_0 {\n\t\tlabel=N0\n\t\t"in0.0"'
' [label=in0 height=0.1 shape=rectangle width=0.1]\n\t\t"t0.0" [label=t0'
' height=0.1 shape=rectangle width=0.1]\n\t}\n\tsubgraph cluster_1'
' {\n\t\tlabel=N1\n\t\t"cmul0.0" [label=cmul0 height=0.1 shape=rectangle'
' width=0.1]\n\t}\n\tsubgraph cluster_2 {\n\t\tlabel=N2\n\t\t"add0.0"'
' [label=add0 height=0.1 shape=rectangle width=0.1]\n\t}\n\t"in0.0" ->'
' add0\n\tadd0 [label=add0 shape=ellipse]\n\tin0 -> "in0.0"\n\tin0'
' [label=in0 shape=cds]\n\t"t0.0" -> cmul0\n\tcmul0 [label=cmul0'
' shape=ellipse]\n\t"t0.0" -> out0\n\tout0 [label=out0 shape=cds]\n\tt0Out'
' -> "t0.0"\n\tt0Out [label=t0 shape=square]\n\t"cmul0.0" -> add0\n\tadd0'
' [label=add0 shape=ellipse]\n\tcmul0 -> "cmul0.0"\n\tcmul0 [label=cmul0'
' shape=ellipse]\n\t"add0.0" -> t0In\n\tt0In [label=t0'
' shape=square]\n\tadd0 -> "add0.0"\n\tadd0 [label=add0 shape=ellipse]\n}'
assert sfg_simple_filter.precedence_graph.source in (res, res + "\n")
class TestSFGGraph:
def test_sfg(self, sfg_simple_filter):
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
res = """digraph {
rankdir=LR splines=spline
in0 [label="IN
(in0)" shape=cds]
in0 -> add0 [headlabel=0]
out0 [label="OUT
(out0)" shape=cds]
"t0.0" -> out0
"t0.0" [shape=point]
t0 -> "t0.0" [arrowhead=none]
add0 [label="ADD
(add0)" shape=ellipse]
cmul0 -> add0 [headlabel=1]
cmul0 [label="CMUL
(cmul0)" shape=ellipse]
add0 -> t0
t0 [label="T
(t0)" shape=square]
"t0.0" -> cmul0
}"""
assert sfg_simple_filter.sfg_digraph().source in (
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
def test_sfg_show_signal_id(self, sfg_simple_filter):
res = """digraph {
rankdir=LR splines=spline
in0 [label="IN
(in0)" shape=cds]
in0 -> add0 [label=s0 headlabel=0]
out0 [label="OUT
(out0)" shape=cds]
"t0.0" -> out0 [label=s1]
"t0.0" [shape=point]
t0 -> "t0.0" [arrowhead=none]
add0 [label="ADD
(add0)" shape=ellipse]
cmul0 -> add0 [label=s2 headlabel=1]
cmul0 [label="CMUL
(cmul0)" shape=ellipse]
add0 -> t0 [label=s3]
t0 [label="T
(t0)" shape=square]
"t0.0" -> cmul0 [label=s4]
}"""
assert sfg_simple_filter.sfg_digraph(show_signal_id=True).source in (
def test_sfg_no_branch(self, sfg_simple_filter):
res = """digraph {
rankdir=LR splines=spline
in0 [label="IN
(in0)" shape=cds]
in0 -> add0 [headlabel=0]
out0 [label="OUT
(out0)" shape=cds]
t0 -> out0
add0 [label="ADD
(add0)" shape=ellipse]
cmul0 -> add0 [headlabel=1]
cmul0 [label="CMUL
(cmul0)" shape=ellipse]
add0 -> t0
t0 [label="T
(t0)" shape=square]
t0 -> cmul0
}"""
assert sfg_simple_filter.sfg_digraph(branch_node=False).source in (
res,
res + "\n",
)
def test_sfg_no_port_numbering(self, sfg_simple_filter):
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
res = """digraph {
rankdir=LR splines=spline
in0 [label="IN
(in0)" shape=cds]
in0 -> add0
out0 [label="OUT
(out0)" shape=cds]
"t0.0" -> out0
"t0.0" [shape=point]
t0 -> "t0.0" [arrowhead=none]
add0 [label="ADD
(add0)" shape=ellipse]
cmul0 -> add0
cmul0 [label="CMUL
(cmul0)" shape=ellipse]
add0 -> t0
t0 [label="T
(t0)" shape=square]
"t0.0" -> cmul0
}"""
assert sfg_simple_filter.sfg_digraph(port_numbering=False).source in (
res,
res + "\n",
)
def test_show_sfg_invalid_format(self, sfg_simple_filter):
sfg_simple_filter.show(fmt="ppddff")
def test_show_sfg_invalid_engine(self, sfg_simple_filter):
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
class TestSFGErrors:
def test_dangling_output(self):
in1 = Input()
in2 = Input()
adaptor = SymmetricTwoportAdaptor(0.5, in1, in2)
out1 = Output(adaptor.output(0))
# No error, maybe should be?
_ = SFG([in1, in2], [out1])
def test_unconnected_input_port(self):
in1 = Input()
adaptor = SymmetricTwoportAdaptor(0.5, in1)
out1 = Output(adaptor.output(0))
with pytest.raises(ValueError, match="Unconnected input port in SFG"):
SFG([in1], [out1])
def test_unconnected_output(self):
in1 = Input()
in2 = Input()
adaptor = SymmetricTwoportAdaptor(0.5, in1, in2)
out1 = Output(adaptor.output(0))
out2 = Output()
ValueError,
match="At least one output operation is not connected!, Tips: Check for output ports that are connected to the same signal",
):
SFG([in1, in2], [out1, out2])
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
def test_unconnected_input(self):
in1 = Input()
in2 = Input()
adaptor = SymmetricTwoportAdaptor(0.5, in1)
out1 = Output(adaptor.output(0))
out2 = Output(adaptor.output(1))
# Correct error?
with pytest.raises(ValueError, match="Unconnected input port in SFG"):
SFG([in1, in2], [out1, out2])
def test_duplicate_input(self):
in1 = Input()
in2 = Input()
adaptor = SymmetricTwoportAdaptor(0.5, in1, in2)
out1 = Output(adaptor.output(0))
out2 = Output(adaptor.output(1))
with pytest.raises(ValueError, match="Duplicate input operation"):
SFG([in1, in1], [out1, out2])
def test_duplicate_output(self):
in1 = Input()
in2 = Input()
adaptor = SymmetricTwoportAdaptor(0.5, in1, in2)
out1 = Output(adaptor.output(0))
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
with pytest.raises(ValueError, match="Duplicate output operation"):
SFG([in1, in2], [out1, out1])
def test_unconnected_input_signal(self):
in1 = Input()
in2 = Input()
adaptor = SymmetricTwoportAdaptor(0.5, in1, in2)
out1 = Output(adaptor.output(0))
out2 = Output(adaptor.output(1))
signal = Signal()
with pytest.raises(
ValueError, match="Input signal #0 is missing destination in SFG"
):
SFG([in1, in2], [out1, out2], [signal])
def test_unconnected_output_signal(self):
in1 = Input()
in2 = Input()
adaptor = SymmetricTwoportAdaptor(0.5, in1, in2)
out1 = Output(adaptor.output(0))
out2 = Output(adaptor.output(1))
signal = Signal()
with pytest.raises(
ValueError, match="Output signal #0 is missing source in SFG"
):
SFG([in1, in2], [out1, out2], output_signals=[signal])
def test_duplicate_input_signal(self):
in1 = Input()
signal = Signal()
adaptor = SymmetricTwoportAdaptor(0.5, in1, signal)
out1 = Output(adaptor.output(0))
out2 = Output(adaptor.output(1))
with pytest.raises(ValueError, match="Duplicate input signal"):
SFG([in1], [out1, out2], [signal, signal])
def test_duplicate_output_signal(self):
in1 = Input()
in2 = Input()
adaptor = SymmetricTwoportAdaptor(0.5, in1, in2)
out1 = Output(adaptor.output(0))
signal = Signal(adaptor.output(1))
ValueError,
match="At least one output operation is not connected!, Tips: Check for output ports that are connected to the same signal",
):
SFG([in1, in2], [out1], output_signals=[signal, signal])
def test_dangling_input_signal(self):
in1 = Input()
signal = Signal()
adaptor = SymmetricTwoportAdaptor(0.5, in1, signal)
out1 = Output(adaptor.output(0))
out2 = Output(adaptor.output(1))
with pytest.raises(ValueError, match="Dangling signal without source in SFG"):
SFG([in1], [out1, out2])
def test_remove_signal_with_different_number_of_inputs_and_outputs(self):
in1 = Input()
in2 = Input()
add1 = Addition(in1, in2, name="addition")
out1 = Output(add1)
sfg = SFG([in1, in2], [out1])
# Try to remove non-existent operation
sfg1 = sfg.remove_operation("foo")
assert sfg1 is None
with pytest.raises(
ValueError,
match="Different number of input and output ports of operation with",
def test_inputs_required_for_output(self):
in1 = Input()
in2 = Input()
add1 = Addition(in1, in2, name="addition")
out1 = Output(add1)
sfg = SFG([in1, in2], [out1])
with pytest.raises(
IndexError,
match=re.escape("Output index out of range (expected 0-0, got 1)"),
):
sfg.inputs_required_for_output(1)
class TestInputDuplicationBug:
def test_input_is_not_duplicated_in_operation_list(self):
# Inputs:
in1 = Input(name="in1")
out1 = Output(name="out1")
# Operations:
t1 = Delay(initial_value=0, name="")
t1.inputs[0].connect(in1)
add1 = t1 + in1
out1.inputs[0].connect(add1)
twotapfir = SFG(inputs=[in1], outputs=[out1], name='twotapfir')
assert len([op for op in twotapfir.operations if isinstance(op, Input)]) == 1
class TestCriticalPath:
def test_single_accumulator(self, sfg_simple_accumulator: SFG):
sfg_simple_accumulator.set_latency_of_type(Addition.type_name(), 5)
assert sfg_simple_accumulator.critical_path_time() == 5
sfg_simple_accumulator.set_latency_of_type(Addition.type_name(), 6)
assert sfg_simple_accumulator.critical_path_time() == 6
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
def count_kinds(self, sfg: SFG) -> Dict[Type, int]:
return Counter([type(op) for op in sfg.operations])
# Checks that the number of each kind of operation in sfg2 is multiple*count
# of the same operation in sfg1.
# Filters out delay delays
def assert_counts_is_correct(self, sfg1: SFG, sfg2: SFG, multiple: int):
count1 = self.count_kinds(sfg1)
count2 = self.count_kinds(sfg2)
# Delays should not be duplicated. Check that and then clear them
# Using get to avoid issues if there are no delays in the sfg
assert count1.get(Delay) == count2.get(Delay)
count1[Delay] = 0
count2[Delay] = 0
# Ensure that we aren't missing any keys, or have any extras
assert count1.keys() == count2.keys()
for k in count1.keys():
assert count1[k] * multiple == count2[k]
# This is horrifying, but I can't figure out a way to run the test on multiple
# fixtures, so this is an ugly hack until someone that knows pytest comes along
def test_two_inputs_two_outputs(self, sfg_two_inputs_two_outputs: SFG):
self.do_tests(sfg_two_inputs_two_outputs)
def test_twotapfir(self, sfg_two_tap_fir: SFG):
self.do_tests(sfg_two_tap_fir)
def test_delay(self, sfg_delay: SFG):
self.do_tests(sfg_delay)
def test_sfg_two_inputs_two_outputs_independent(
self, sfg_two_inputs_two_outputs_independent: SFG
):
self.do_tests(sfg_two_inputs_two_outputs_independent)
def test_threetapiir(self, sfg_direct_form_iir_lp_filter: SFG):
self.do_tests(sfg_direct_form_iir_lp_filter)
def do_tests(self, sfg: SFG):
for factor in range(2, 4):
# Ensure that the correct number of operations get created
unfolded = sfg.unfold(factor)
self.assert_counts_is_correct(sfg, unfolded, factor)
double_unfolded = sfg.unfold(factor).unfold(factor)
self.assert_counts_is_correct(sfg, double_unfolded, factor * factor)
NUM_TESTS = 5
# Evaluate with some random values
# To avoid problems with missing inputs at the end of the sequence,
# we generate i*(some large enough) number
input_list = [
[random.random() for _ in range(0, NUM_TESTS * factor)]
for _ in sfg.inputs
]
sim = Simulation(sfg, input_list)
sim.run()
ref = sim.results
# We have i copies of the inputs, each sourcing their input from the orig
unfolded_input_lists = [[] for _ in range(len(sfg.inputs) * factor)]
for t in range(0, NUM_TESTS):
for n in range(0, factor):
for k in range(0, len(sfg.inputs)):
unfolded_input_lists[k + n * len(sfg.inputs)].append(
input_list[k][t * factor + n]
)
sim = Simulation(unfolded, unfolded_input_lists)
sim.run()
unfolded_results = sim.results
for n, _ in enumerate(sfg.outputs):
# Outputs for an original output
ref_values = list(ref[ResultKey(f"{n}")])
# Output n will be split into `factor` output ports, compute the
out_indices = [n + k * len(sfg.outputs) for k in range(factor)]
u_values = [
[unfolded_results[ResultKey(f"{idx}")][k] for idx in out_indices]
for k in range(int(NUM_TESTS))
]
flat_u_values = list(itertools.chain.from_iterable(u_values))
assert flat_u_values == ref_values
def test_value_error(self, sfg_two_inputs_two_outputs: SFG):
sfg = sfg_two_inputs_two_outputs
with pytest.raises(ValueError, match="Unfolding 0 times removes the SFG"):
class TestIsLinear:
def test_single_accumulator(self, sfg_simple_accumulator: SFG):
assert sfg_simple_accumulator.is_linear
def test_sfg_nested(self, sfg_nested: SFG):
assert not sfg_nested.is_linear
class TestIsConstant:
def test_single_accumulator(self, sfg_simple_accumulator: SFG):
assert not sfg_simple_accumulator.is_constant
def test_sfg_nested(self, sfg_nested: SFG):
assert not sfg_nested.is_constant
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
class TestSwapIOOfOperation:
def do_test(self, sfg: SFG, graph_id: GraphID):
NUM_TESTS = 5
# Evaluate with some random values
# To avoid problems with missing inputs at the end of the sequence,
# we generate i*(some large enough) number
input_list = [
[random.random() for _ in range(0, NUM_TESTS)] for _ in sfg.inputs
]
sim_ref = Simulation(sfg, input_list)
sim_ref.run()
sfg.swap_io_of_operation(graph_id)
sim_swap = Simulation(sfg, input_list)
sim_swap.run()
for n, _ in enumerate(sfg.outputs):
ref_values = list(sim_ref.results[ResultKey(f"{n}")])
swap_values = list(sim_swap.results[ResultKey(f"{n}")])
assert ref_values == swap_values
def test_single_accumulator(self, sfg_simple_accumulator: SFG):
self.do_test(sfg_simple_accumulator, 'add1')
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
class TestInsertComponentAfter:
def test_insert_component_after_in_sfg(self, large_operation_tree_names):
sfg = SFG(outputs=[Output(large_operation_tree_names)])
sqrt = SquareRoot()
_sfg = sfg.insert_operation_after(
sfg.find_by_name("constant4")[0].graph_id, sqrt
)
assert _sfg.evaluate() != sfg.evaluate()
assert any([isinstance(comp, SquareRoot) for comp in _sfg.operations])
assert not any([isinstance(comp, SquareRoot) for comp in sfg.operations])
assert not isinstance(
sfg.find_by_name("constant4")[0].output(0).signals[0].destination.operation,
SquareRoot,
)
assert isinstance(
_sfg.find_by_name("constant4")[0]
.output(0)
.signals[0]
.destination.operation,
SquareRoot,
)
assert sfg.find_by_name("constant4")[0].output(0).signals[
0
].destination.operation is sfg.find_by_id("add2")
assert _sfg.find_by_name("constant4")[0].output(0).signals[
0
].destination.operation is not _sfg.find_by_id("add2")
assert _sfg.find_by_id("sqrt0").output(0).signals[
].destination.operation is _sfg.find_by_id("add2")
def test_insert_component_after_mimo_operation_error(
self, large_operation_tree_names
):
sfg = SFG(outputs=[Output(large_operation_tree_names)])
with pytest.raises(
TypeError, match="Only operations with one input and one output"
):
sfg.insert_operation_after('constant4', SymmetricTwoportAdaptor(0.5))
def test_insert_component_after_unknown_component_error(
self, large_operation_tree_names
):
sfg = SFG(outputs=[Output(large_operation_tree_names)])
with pytest.raises(ValueError, match="Unknown component:"):
sfg.insert_operation_after('foo', SquareRoot())
Hugo Winbladh
committed
class TestInsertComponentBefore:
def test_insert_component_before_in_sfg(self, butterfly_operation_tree):
sfg = SFG(outputs=list(map(Output, butterfly_operation_tree.outputs)))
sqrt = SquareRoot()
_sfg = sfg.insert_operation_before(
sfg.find_by_name("bfly1")[0].graph_id, sqrt, port=0
)
assert _sfg.evaluate() != sfg.evaluate()
assert any([isinstance(comp, SquareRoot) for comp in _sfg.operations])
assert not any([isinstance(comp, SquareRoot) for comp in sfg.operations])
assert not isinstance(
sfg.find_by_name("bfly1")[0].input(0).signals[0].source.operation,
SquareRoot,
)
assert isinstance(
_sfg.find_by_name("bfly1")[0].input(0).signals[0].source.operation,
Hugo Winbladh
committed
SquareRoot,
)
assert (
sfg.find_by_name("bfly1")[0].input(0).signals[0].source.operation
is sfg.find_by_name("bfly2")[0]
)
assert (
_sfg.find_by_name("bfly1")[0].input(0).signals[0].destination.operation
is not _sfg.find_by_name("bfly2")[0]
)
assert (
_sfg.find_by_id("sqrt0").input(0).signals[0].source.operation
is _sfg.find_by_name("bfly2")[0]
)
Hugo Winbladh
committed
def test_insert_component_before_mimo_operation_error(
self, large_operation_tree_names
):
sfg = SFG(outputs=[Output(large_operation_tree_names)])
with pytest.raises(
TypeError, match="Only operations with one input and one output"
):
sfg.insert_operation_before('add0', SymmetricTwoportAdaptor(0.5), port=0)
def test_insert_component_before_unknown_component_error(
self, large_operation_tree_names
):
sfg = SFG(outputs=[Output(large_operation_tree_names)])
with pytest.raises(ValueError, match="Unknown component:"):
sfg.insert_operation_before('foo', SquareRoot())
class TestGetUsedTypeNames:
def test_single_accumulator(self, sfg_simple_accumulator: SFG):
assert sfg_simple_accumulator.get_used_type_names() == ['add', 'in', 'out', 't']
def test_sfg_nested(self, sfg_nested: SFG):
assert sfg_nested.get_used_type_names() == ['in', 'out', 'sfg']
def test_large_operation_tree(self, large_operation_tree):
sfg = SFG(outputs=[Output(large_operation_tree)])
assert sfg.get_used_type_names() == ['add', 'c', 'out']
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
class Test_Keep_GraphIDs:
def test_single_accumulator(self):
i = Input()
d = Delay()
o = Output(d)
c = ConstantMultiplication(0.5, d)
a = Addition(i, c)
d.input(0).connect(a)
sfg = SFG([i], [o])
sfg = sfg.insert_operation_before('t0', ConstantMultiplication(8))
sfg = sfg.insert_operation_after('t0', ConstantMultiplication(8))
sfg = sfg.insert_operation(ConstantMultiplication(8), 't0')
assert sfg.get_used_graph_ids() == {
'add0',
'cmul0',
'cmul1',
'cmul2',
'cmul3',
'in0',
'out0',
't0',
}
def test_large_operation_tree(self, large_operation_tree):
sfg = SFG(outputs=[Output(large_operation_tree)])
assert sfg.get_used_type_names() == ['add', 'c', 'out']
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
class TestInsertDelays:
def test_insert_delays_before_operation(self):
in1 = Input()
bfly = Butterfly()
d1 = bfly.input(0).delay(2)
d2 = bfly.input(1).delay(1)
d1 <<= in1
d2 <<= in1
out1 = Output(bfly.output(0))
out2 = Output(bfly.output(1))
sfg = SFG([in1], [out1, out2])
d_type_name = d1.operation.type_name()
assert len(sfg.find_by_type_name(d_type_name)) == 3
sfg.find_by_id('out1').input(0).delay(3)
sfg = sfg()
assert len(sfg.find_by_type_name(d_type_name)) == 6
source1 = sfg.find_by_id('out1').input(0).signals[0].source.operation
source2 = source1.input(0).signals[0].source.operation
source3 = source2.input(0).signals[0].source.operation
source4 = source3.input(0).signals[0].source.operation
assert source1.type_name() == d_type_name
assert source2.type_name() == d_type_name
assert source3.type_name() == d_type_name
assert source4.type_name() == bfly.type_name()
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
class TestIterationPeriodBound:
def test_accumulator(self, sfg_simple_accumulator):
sfg_simple_accumulator.set_latency_of_type('add', 2)
assert sfg_simple_accumulator.iteration_period_bound() == 2
def test_no_latency(self, sfg_simple_accumulator):
with pytest.raises(
ValueError,
match="All native offsets have to set to a non-negative value to",
):
sfg_simple_accumulator.iteration_period_bound()
def test_secondorder_iir(self, precedence_sfg_delays):
precedence_sfg_delays.set_latency_of_type('add', 2)
precedence_sfg_delays.set_latency_of_type('cmul', 3)
assert precedence_sfg_delays.iteration_period_bound() == 10
def test_no_delays(self, sfg_two_inputs_two_outputs):
assert sfg_two_inputs_two_outputs.iteration_period_bound() == -1
class TestStateSpace:
def test_accumulator(self, sfg_simple_accumulator):
ss = sfg_simple_accumulator.state_space_representation()
assert ss[0] == ['v0', 'y0']
assert (ss[1] == np.array([[1.0, 1.0], [0.0, 1.0]])).all()
assert ss[2] == ['v0', 'x0']
def test_secondorder_iir(self, precedence_sfg_delays):
ss = precedence_sfg_delays.state_space_representation()
assert ss[0] == ['v0', 'v1', 'y0']
mat = np.array([[3.0, 2.0, 5.0], [1.0, 0.0, 0.0], [4.0, 6.0, 35.0]])
assert (ss[1] == mat).all()
assert ss[2] == ['v0', 'v1', 'x0']
# @pytest.mark.xfail()
def test_sfg_two_inputs_two_outputs(self, sfg_two_inputs_two_outputs):
ss = sfg_two_inputs_two_outputs.state_space_representation()
assert ss[0] == ['y0', 'y1']
assert (ss[1] == np.array([[1.0, 1.0], [1.0, 2.0]])).all()
assert ss[2] == ['x0', 'x1']
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
def test_sfg_two_inputs_two_outputs_independent(
self, sfg_two_inputs_two_outputs_independent
):
# assert sfg_two_inputs_two_outputs_independent.state_space_representation() == 1
ss = sfg_two_inputs_two_outputs_independent.state_space_representation()
assert ss[0] == ['y0', 'y1']
assert (ss[1] == np.array([[1.0, 0.0], [0.0, 4.0]])).all()
assert ss[2] == ['x0', 'x1']
def test_sfg_two_inputs_two_outputs_independent_with_cmul(
self, sfg_two_inputs_two_outputs_independent_with_cmul
):
ss = (
sfg_two_inputs_two_outputs_independent_with_cmul.state_space_representation()
)
assert ss[0] == ['y0', 'y1']
assert (ss[1] == np.array([[20.0, 0.0], [0.0, 8.0]])).all()
assert ss[2] == ['x0', 'x1']