Newer
Older

Ivar Härnqvist
committed
import pytest
Angus Lothian
committed
import io
import sys

Ivar Härnqvist
committed
from b_asic import SFG, Signal, Input, Output, Constant, ConstantMultiplication, Addition, Multiplication, Register, \

Ivar Härnqvist
committed
Kevin Scott
committed
class TestInit:

Ivar Härnqvist
committed
def test_direct_input_to_output_sfg_construction(self):
in1 = Input("IN1")
out1 = Output(None, "OUT1")
out1.input(0).connect(in1, "S1")
sfg = SFG(inputs=[in1], outputs=[out1]) # in1 ---s1---> out1

Ivar Härnqvist
committed
assert len(list(sfg.components)) == 3
assert len(list(sfg.operations)) == 2
assert sfg.input_count == 1
assert sfg.output_count == 1
def test_same_signal_input_and_output_sfg_construction(self):
add1 = Addition(None, None, "ADD1")
add2 = Addition(None, None, "ADD2")
s1 = add2.input(0).connect(add1, "S1")
# in1 ---s1---> out1
sfg = SFG(input_signals=[s1], output_signals=[s1])

Ivar Härnqvist
committed
assert len(list(sfg.components)) == 3
assert len(list(sfg.operations)) == 2
assert sfg.input_count == 1
assert sfg.output_count == 1
def test_outputs_construction(self, operation_tree):
sfg = SFG(outputs=[Output(operation_tree)])

Ivar Härnqvist
committed
assert len(list(sfg.components)) == 7
assert len(list(sfg.operations)) == 4
assert sfg.input_count == 0
assert sfg.output_count == 1
def test_signals_construction(self, operation_tree):
sfg = SFG(output_signals=[Signal(source=operation_tree.output(0))])

Ivar Härnqvist
committed
assert len(list(sfg.components)) == 7
assert len(list(sfg.operations)) == 4
assert sfg.input_count == 0
assert sfg.output_count == 1
Kevin Scott
committed
class TestPrintSfg:
def test_one_addition(self):
inp1 = Input("INP1")
inp2 = Input("INP2")
add1 = Addition(inp1, inp2, "ADD1")
out1 = Output(add1, "OUT1")
Angus Lothian
committed
sfg = SFG(inputs=[inp1, inp2], outputs=[out1], name="SFG1")

Ivar Härnqvist
committed
Kevin Scott
committed
assert sfg.__str__() == \
Angus Lothian
committed
"id: no_id, \tname: SFG1, \tinputs: {0: '-'}, \toutputs: {0: '-'}\n" + \
"Internal Operations:\n" + \
"----------------------------------------------------------------------------------------------------\n" + \
str(sfg.find_by_name("INP1")[0]) + "\n" + \
str(sfg.find_by_name("INP2")[0]) + "\n" + \
str(sfg.find_by_name("ADD1")[0]) + "\n" + \
str(sfg.find_by_name("OUT1")[0]) + "\n" + \
"----------------------------------------------------------------------------------------------------\n"

Ivar Härnqvist
committed
Kevin Scott
committed
def test_add_mul(self):
inp1 = Input("INP1")
inp2 = Input("INP2")
inp3 = Input("INP3")
add1 = Addition(inp1, inp2, "ADD1")
mul1 = Multiplication(add1, inp3, "MUL1")
out1 = Output(mul1, "OUT1")
sfg = SFG(inputs=[inp1, inp2, inp3], outputs=[out1], name="mac_sfg")

Ivar Härnqvist
committed
Kevin Scott
committed
assert sfg.__str__() == \
Angus Lothian
committed
"id: no_id, \tname: mac_sfg, \tinputs: {0: '-'}, \toutputs: {0: '-'}\n" + \
"Internal Operations:\n" + \
"----------------------------------------------------------------------------------------------------\n" + \
str(sfg.find_by_name("INP1")[0]) + "\n" + \
str(sfg.find_by_name("INP2")[0]) + "\n" + \
str(sfg.find_by_name("ADD1")[0]) + "\n" + \
str(sfg.find_by_name("INP3")[0]) + "\n" + \
str(sfg.find_by_name("MUL1")[0]) + "\n" + \
str(sfg.find_by_name("OUT1")[0]) + "\n" + \
"----------------------------------------------------------------------------------------------------\n"

Ivar Härnqvist
committed
Kevin Scott
committed
def test_constant(self):
inp1 = Input("INP1")
const1 = Constant(3, "CONST")
add1 = Addition(const1, inp1, "ADD1")
out1 = Output(add1, "OUT1")
sfg = SFG(inputs=[inp1], outputs=[out1], name="sfg")
assert sfg.__str__() == \
Angus Lothian
committed
"id: no_id, \tname: sfg, \tinputs: {0: '-'}, \toutputs: {0: '-'}\n" + \
"Internal Operations:\n" + \
"----------------------------------------------------------------------------------------------------\n" + \
str(sfg.find_by_name("CONST")[0]) + "\n" + \
str(sfg.find_by_name("INP1")[0]) + "\n" + \
str(sfg.find_by_name("ADD1")[0]) + "\n" + \
str(sfg.find_by_name("OUT1")[0]) + "\n" + \
"----------------------------------------------------------------------------------------------------\n"
Kevin Scott
committed
def test_simple_filter(self, simple_filter):
Angus Lothian
committed
assert simple_filter.__str__() == \
Angus Lothian
committed
"id: no_id, \tname: simple_filter, \tinputs: {0: '-'}, \toutputs: {0: '-'}\n" + \
"Internal Operations:\n" + \
"----------------------------------------------------------------------------------------------------\n" + \
str(simple_filter.find_by_name("IN1")[0]) + "\n" + \
str(simple_filter.find_by_name("ADD1")[0]) + "\n" + \
str(simple_filter.find_by_name("REG1")[0]) + "\n" + \
str(simple_filter.find_by_name("CMUL1")[0]) + "\n" + \
str(simple_filter.find_by_name("OUT1")[0]) + "\n" + \
"----------------------------------------------------------------------------------------------------\n"

Ivar Härnqvist
committed

Ivar Härnqvist
committed
class TestDeepCopy:
def test_deep_copy_no_duplicates(self):
inp1 = Input("INP1")
inp2 = Input("INP2")
inp3 = Input("INP3")
add1 = Addition(inp1, inp2, "ADD1")
mul1 = Multiplication(add1, inp3, "MUL1")
out1 = Output(mul1, "OUT1")
mac_sfg = SFG(inputs=[inp1, inp2], outputs=[out1], name="mac_sfg")

Ivar Härnqvist
committed
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
mac_sfg_new = mac_sfg()
assert mac_sfg.name == "mac_sfg"
assert mac_sfg_new.name == ""
for g_id, component in mac_sfg._components_by_id.items():
component_copy = mac_sfg_new.find_by_id(g_id)
assert component.name == component_copy.name
def test_deep_copy(self):
inp1 = Input("INP1")
inp2 = Input("INP2")
inp3 = Input("INP3")
add1 = Addition(None, None, "ADD1")
add2 = Addition(None, None, "ADD2")
mul1 = Multiplication(None, None, "MUL1")
out1 = Output(None, "OUT1")
add1.input(0).connect(inp1, "S1")
add1.input(1).connect(inp2, "S2")
add2.input(0).connect(add1, "S4")
add2.input(1).connect(inp3, "S3")
mul1.input(0).connect(add1, "S5")
mul1.input(1).connect(add2, "S6")
out1.input(0).connect(mul1, "S7")
mac_sfg = SFG(inputs=[inp1, inp2], outputs=[out1],
id_number_offset=100, name="mac_sfg")
mac_sfg_new = mac_sfg(name="mac_sfg2")

Ivar Härnqvist
committed
assert mac_sfg.name == "mac_sfg"
assert mac_sfg_new.name == "mac_sfg2"
assert mac_sfg.id_number_offset == 100
assert mac_sfg_new.id_number_offset == 100
for g_id, component in mac_sfg._components_by_id.items():
component_copy = mac_sfg_new.find_by_id(g_id)
assert component.name == component_copy.name

Ivar Härnqvist
committed
def test_deep_copy_with_new_sources(self):
inp1 = Input("INP1")
inp2 = Input("INP2")
inp3 = Input("INP3")
add1 = Addition(inp1, inp2, "ADD1")
mul1 = Multiplication(add1, inp3, "MUL1")
out1 = Output(mul1, "OUT1")
mac_sfg = SFG(inputs=[inp1, inp2], outputs=[out1], name="mac_sfg")

Ivar Härnqvist
committed
a = Addition(Constant(3), Constant(5))
b = Constant(2)
mac_sfg_new = mac_sfg(a, b)
assert mac_sfg_new.input(0).signals[0].source.operation is a
assert mac_sfg_new.input(1).signals[0].source.operation is b
Kevin Scott
committed
class TestEvaluateOutput:
def test_evaluate_output(self, operation_tree):
sfg = SFG(outputs=[Output(operation_tree)])
Kevin Scott
committed
assert sfg.evaluate_output(0, []) == 5
def test_evaluate_output_large(self, large_operation_tree):
sfg = SFG(outputs=[Output(large_operation_tree)])
Kevin Scott
committed
assert sfg.evaluate_output(0, []) == 14
def test_evaluate_output_cycle(self, operation_graph_with_cycle):
sfg = SFG(outputs=[Output(operation_graph_with_cycle)])
Kevin Scott
committed
with pytest.raises(Exception):
sfg.evaluate_output(0, [])

Ivar Härnqvist
committed

Ivar Härnqvist
committed
class TestComponents:
def test_advanced_components(self):
inp1 = Input("INP1")
inp2 = Input("INP2")
inp3 = Input("INP3")
add1 = Addition(None, None, "ADD1")
add2 = Addition(None, None, "ADD2")
mul1 = Multiplication(None, None, "MUL1")
out1 = Output(None, "OUT1")
add1.input(0).connect(inp1, "S1")
add1.input(1).connect(inp2, "S2")
add2.input(0).connect(add1, "S4")
add2.input(1).connect(inp3, "S3")
mul1.input(0).connect(add1, "S5")
mul1.input(1).connect(add2, "S6")
out1.input(0).connect(mul1, "S7")
mac_sfg = SFG(inputs=[inp1, inp2], outputs=[out1], name="mac_sfg")

Ivar Härnqvist
committed
assert set([comp.name for comp in mac_sfg.components]) == {
"INP1", "INP2", "INP3", "ADD1", "ADD2", "MUL1", "OUT1", "S1", "S2", "S3", "S4", "S5", "S6", "S7"}
class TestReplaceComponents:
def test_replace_addition_by_id(self, operation_tree):
sfg = SFG(outputs=[Output(operation_tree)])
component_id = "add1"
sfg = sfg.replace_component(
Multiplication(name="Multi"), _id=component_id)
assert component_id not in sfg._components_by_id.keys()
assert "Multi" in sfg._components_by_name.keys()
def test_replace_addition_large_tree(self, large_operation_tree):
sfg = SFG(outputs=[Output(large_operation_tree)])
component_id = "add3"
sfg = sfg.replace_component(
Multiplication(name="Multi"), _id=component_id)
assert "Multi" in sfg._components_by_name.keys()
assert component_id not in sfg._components_by_id.keys()
def test_replace_no_input_component(self, operation_tree):
sfg = SFG(outputs=[Output(operation_tree)])
component_id = "c1"
_const = sfg.find_by_id(component_id)
sfg = sfg.replace_component(Constant(1), _id=component_id)
assert _const is not sfg.find_by_id(component_id)
def test_no_match_on_replace(self, large_operation_tree):
sfg = SFG(outputs=[Output(large_operation_tree)])
component_id = "addd1"
try:
sfg = sfg.replace_component(
Multiplication(name="Multi"), _id=component_id)
except AssertionError:
assert True
else:
assert False
def test_not_equal_input(self, large_operation_tree):
sfg = SFG(outputs=[Output(large_operation_tree)])
component_id = "c1"
try:
sfg = sfg.replace_component(
Multiplication(name="Multi"), _id=component_id)
except AssertionError:
assert True
else:
assert False

Rasmus Karlsson
committed
class TestInsertComponent:
def test_insert_component_in_sfg(self, large_operation_tree_names):
sfg = SFG(outputs=[Output(large_operation_tree_names)])
sqrt = SquareRoot()
_sfg = sfg.insert_operation(sqrt, sfg.find_by_name("constant4")[0].graph_id)
assert _sfg.evaluate() != sfg.evaluate()
Angus Lothian
committed
assert any([isinstance(comp, SquareRoot) for comp in _sfg.operations])
assert not any([isinstance(comp, SquareRoot) for comp in sfg.operations])
assert not isinstance(sfg.find_by_name("constant4")[0].output(0).signals[0].destination.operation, SquareRoot)
assert isinstance(_sfg.find_by_name("constant4")[0].output(0).signals[0].destination.operation, SquareRoot)
assert sfg.find_by_name("constant4")[0].output(0).signals[0].destination.operation is sfg.find_by_id("add3")
Angus Lothian
committed
assert _sfg.find_by_name("constant4")[0].output(
0).signals[0].destination.operation is not _sfg.find_by_id("add3")
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
assert _sfg.find_by_id("sqrt1").output(0).signals[0].destination.operation is _sfg.find_by_id("add3")
def test_insert_invalid_component_in_sfg(self, large_operation_tree):
sfg = SFG(outputs=[Output(large_operation_tree)])
# Should raise an exception for not matching input count to output count.
add4 = Addition()
with pytest.raises(Exception):
sfg.insert_operation(add4, "c1")
def test_insert_at_output(self, large_operation_tree):
sfg = SFG(outputs=[Output(large_operation_tree)])
# Should raise an exception for trying to insert an operation after an output.
sqrt = SquareRoot()
with pytest.raises(Exception):
_sfg = sfg.insert_operation(sqrt, "out1")
def test_insert_multiple_output_ports(self, butterfly_operation_tree):
sfg = SFG(outputs=list(map(Output, butterfly_operation_tree.outputs)))
_sfg = sfg.insert_operation(Butterfly(name="n_bfly"), "bfly3")
assert sfg.evaluate() != _sfg.evaluate()
assert len(sfg.find_by_name("n_bfly")) == 0
assert len(_sfg.find_by_name("n_bfly")) == 1
# Correctly connected old output -> new input
Angus Lothian
committed
assert _sfg.find_by_name("bfly3")[0].output(
0).signals[0].destination.operation is _sfg.find_by_name("n_bfly")[0]
assert _sfg.find_by_name("bfly3")[0].output(
1).signals[0].destination.operation is _sfg.find_by_name("n_bfly")[0]
# Correctly connected new input -> old output
assert _sfg.find_by_name("n_bfly")[0].input(0).signals[0].source.operation is _sfg.find_by_name("bfly3")[0]
assert _sfg.find_by_name("n_bfly")[0].input(1).signals[0].source.operation is _sfg.find_by_name("bfly3")[0]
# Correctly connected new output -> next input
Angus Lothian
committed
assert _sfg.find_by_name("n_bfly")[0].output(
0).signals[0].destination.operation is _sfg.find_by_name("bfly2")[0]
assert _sfg.find_by_name("n_bfly")[0].output(
1).signals[0].destination.operation is _sfg.find_by_name("bfly2")[0]
# Correctly connected next input -> new output
assert _sfg.find_by_name("bfly2")[0].input(0).signals[0].source.operation is _sfg.find_by_name("n_bfly")[0]
assert _sfg.find_by_name("bfly2")[0].input(1).signals[0].source.operation is _sfg.find_by_name("n_bfly")[0]
Angus Lothian
committed
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
class TestFindComponentsWithTypeName:
def test_mac_components(self):
inp1 = Input("INP1")
inp2 = Input("INP2")
inp3 = Input("INP3")
add1 = Addition(None, None, "ADD1")
add2 = Addition(None, None, "ADD2")
mul1 = Multiplication(None, None, "MUL1")
out1 = Output(None, "OUT1")
add1.input(0).connect(inp1, "S1")
add1.input(1).connect(inp2, "S2")
add2.input(0).connect(add1, "S4")
add2.input(1).connect(inp3, "S3")
mul1.input(0).connect(add1, "S5")
mul1.input(1).connect(add2, "S6")
out1.input(0).connect(mul1, "S7")
mac_sfg = SFG(inputs=[inp1, inp2], outputs=[out1], name="mac_sfg")
assert {comp.name for comp in mac_sfg.get_components_with_type_name(
inp1.type_name())} == {"INP1", "INP2", "INP3"}
assert {comp.name for comp in mac_sfg.get_components_with_type_name(
add1.type_name())} == {"ADD1", "ADD2"}
assert {comp.name for comp in mac_sfg.get_components_with_type_name(
mul1.type_name())} == {"MUL1"}
assert {comp.name for comp in mac_sfg.get_components_with_type_name(
out1.type_name())} == {"OUT1"}
assert {comp.name for comp in mac_sfg.get_components_with_type_name(
Signal.type_name())} == {"S1", "S2", "S3", "S4", "S5", "S6", "S7"}
class TestGetPrecedenceList:
Angus Lothian
committed
def test_inputs_registers(self, precedence_sfg_registers):
Angus Lothian
committed
precedence_list = precedence_sfg_registers.get_precedence_list()
assert len(precedence_list) == 7
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[0]]) == {"IN1", "T1", "T2"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[1]]) == {"C0", "B1", "B2", "A1", "A2"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[2]]) == {"ADD2", "ADD3"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[3]]) == {"ADD1"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[4]]) == {"Q1"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[5]]) == {"A0"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[6]]) == {"ADD4"}
Angus Lothian
committed
def test_inputs_constants_registers_multiple_outputs(self, precedence_sfg_registers_and_constants):
Angus Lothian
committed
precedence_list = precedence_sfg_registers_and_constants.get_precedence_list()
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
assert len(precedence_list) == 7
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[0]]) == {"IN1", "T1", "CONST1"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[1]]) == {"C0", "B1", "B2", "A1", "A2"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[2]]) == {"ADD2", "ADD3"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[3]]) == {"ADD1"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[4]]) == {"Q1"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[5]]) == {"A0"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[6]]) == {"BFLY1.0", "BFLY1.1"}
def test_precedence_multiple_outputs_same_precedence(self, sfg_two_inputs_two_outputs):
sfg_two_inputs_two_outputs.name = "NESTED_SFG"
in1 = Input("IN1")
sfg_two_inputs_two_outputs.input(0).connect(in1, "S1")
in2 = Input("IN2")
cmul1 = ConstantMultiplication(10, None, "CMUL1")
cmul1.input(0).connect(in2, "S2")
sfg_two_inputs_two_outputs.input(1).connect(cmul1, "S3")
out1 = Output(sfg_two_inputs_two_outputs.output(0), "OUT1")
out2 = Output(sfg_two_inputs_two_outputs.output(1), "OUT2")
sfg = SFG(inputs=[in1, in2], outputs=[out1, out2])
precedence_list = sfg.get_precedence_list()
assert len(precedence_list) == 3
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[0]]) == {"IN1", "IN2"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[1]]) == {"CMUL1"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[2]]) == {"NESTED_SFG.0", "NESTED_SFG.1"}
def test_precedence_sfg_multiple_outputs_different_precedences(self, sfg_two_inputs_two_outputs_independent):
sfg_two_inputs_two_outputs_independent.name = "NESTED_SFG"
in1 = Input("IN1")
in2 = Input("IN2")
sfg_two_inputs_two_outputs_independent.input(0).connect(in1, "S1")
cmul1 = ConstantMultiplication(10, None, "CMUL1")
cmul1.input(0).connect(in2, "S2")
sfg_two_inputs_two_outputs_independent.input(1).connect(cmul1, "S3")
out1 = Output(sfg_two_inputs_two_outputs_independent.output(0), "OUT1")
out2 = Output(sfg_two_inputs_two_outputs_independent.output(1), "OUT2")
sfg = SFG(inputs=[in1, in2], outputs=[out1, out2])
precedence_list = sfg.get_precedence_list()
assert len(precedence_list) == 3
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[0]]) == {"IN1", "IN2"}
assert set([port.operation.key(port.index, port.operation.name)
Angus Lothian
committed
for port in precedence_list[1]]) == {"CMUL1"}
assert set([port.operation.key(port.index, port.operation.name)
Angus Lothian
committed
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
for port in precedence_list[2]]) == {"NESTED_SFG.0", "NESTED_SFG.1"}
class TestPrintPrecedence:
def test_registers(self, precedence_sfg_registers):
sfg = precedence_sfg_registers
captured_output = io.StringIO()
sys.stdout = captured_output
sfg.print_precedence_graph()
sys.stdout = sys.__stdout__
captured_output = captured_output.getvalue()
assert captured_output == \
"-" * 120 + "\n" + \
"1.1 \t" + str(sfg.find_by_name("IN1")[0]) + "\n" + \
"1.2 \t" + str(sfg.find_by_name("T1")[0]) + "\n" + \
"1.3 \t" + str(sfg.find_by_name("T2")[0]) + "\n" + \
"-" * 120 + "\n" + \
"2.1 \t" + str(sfg.find_by_name("C0")[0]) + "\n" + \
"2.2 \t" + str(sfg.find_by_name("A1")[0]) + "\n" + \
"2.3 \t" + str(sfg.find_by_name("B1")[0]) + "\n" + \
"2.4 \t" + str(sfg.find_by_name("A2")[0]) + "\n" + \
"2.5 \t" + str(sfg.find_by_name("B2")[0]) + "\n" + \
"-" * 120 + "\n" + \
"3.1 \t" + str(sfg.find_by_name("ADD3")[0]) + "\n" + \
"3.2 \t" + str(sfg.find_by_name("ADD2")[0]) + "\n" + \
"-" * 120 + "\n" + \
"4.1 \t" + str(sfg.find_by_name("ADD1")[0]) + "\n" + \
"-" * 120 + "\n" + \
"5.1 \t" + str(sfg.find_by_name("Q1")[0]) + "\n" + \
"-" * 120 + "\n" + \
"6.1 \t" + str(sfg.find_by_name("A0")[0]) + "\n" + \
"-" * 120 + "\n" + \
"7.1 \t" + str(sfg.find_by_name("ADD4")[0]) + "\n" + \
"-" * 120 + "\n"
class TestDepends:
def test_depends_sfg(self, sfg_two_inputs_two_outputs):
assert set(sfg_two_inputs_two_outputs.inputs_required_for_output(0)) == {
0, 1}
assert set(sfg_two_inputs_two_outputs.inputs_required_for_output(1)) == {
0, 1}
def test_depends_sfg_independent(self, sfg_two_inputs_two_outputs_independent):
assert set(
sfg_two_inputs_two_outputs_independent.inputs_required_for_output(0)) == {0}
assert set(
sfg_two_inputs_two_outputs_independent.inputs_required_for_output(1)) == {1}

Rasmus Karlsson
committed
class TestConnectExternalSignalsToComponentsSoloComp:
def test_connect_external_signals_to_components_mac(self):
""" Replace a MAC with inner components in an SFG """
inp1 = Input("INP1")
inp2 = Input("INP2")
inp3 = Input("INP3")
add1 = Addition(None, None, "ADD1")
add2 = Addition(None, None, "ADD2")
mul1 = Multiplication(None, None, "MUL1")
out1 = Output(None, "OUT1")
add1.input(0).connect(inp1, "S1")
add1.input(1).connect(inp2, "S2")
add2.input(0).connect(add1, "S3")
add2.input(1).connect(inp3, "S4")
mul1.input(0).connect(add1, "S5")
mul1.input(1).connect(add2, "S6")
out1.input(0).connect(mul1, "S7")
mac_sfg = SFG(inputs=[inp1, inp2], outputs=[out1])

Rasmus Karlsson
committed
inp4 = Input("INP4")
inp5 = Input("INP5")
out2 = Output(None, "OUT2")
mac_sfg.input(0).connect(inp4, "S8")
mac_sfg.input(1).connect(inp5, "S9")
out2.input(0).connect(mac_sfg.outputs[0], "S10")
test_sfg = SFG(inputs=[inp4, inp5], outputs=[out2])
assert test_sfg.evaluate(1, 2) == 9

Rasmus Karlsson
committed
mac_sfg.connect_external_signals_to_components()
assert test_sfg.evaluate(1, 2) == 9
assert not test_sfg.connect_external_signals_to_components()

Rasmus Karlsson
committed
def test_connect_external_signals_to_components_operation_tree(self, operation_tree):
""" Replaces an SFG with only a operation_tree component with its inner components """
sfg1 = SFG(outputs=[Output(operation_tree)])

Rasmus Karlsson
committed
out1 = Output(None, "OUT1")
out1.input(0).connect(sfg1.outputs[0], "S1")
test_sfg = SFG(outputs=[out1])

Rasmus Karlsson
committed
assert test_sfg.evaluate_output(0, []) == 5
sfg1.connect_external_signals_to_components()
assert test_sfg.evaluate_output(0, []) == 5
assert not test_sfg.connect_external_signals_to_components()

Rasmus Karlsson
committed
def test_connect_external_signals_to_components_large_operation_tree(self, large_operation_tree):
""" Replaces an SFG with only a large_operation_tree component with its inner components """
sfg1 = SFG(outputs=[Output(large_operation_tree)])

Rasmus Karlsson
committed
out1 = Output(None, "OUT1")
out1.input(0).connect(sfg1.outputs[0], "S1")
test_sfg = SFG(outputs=[out1])

Rasmus Karlsson
committed
assert test_sfg.evaluate_output(0, []) == 14
sfg1.connect_external_signals_to_components()
assert test_sfg.evaluate_output(0, []) == 14
assert not test_sfg.connect_external_signals_to_components()

Rasmus Karlsson
committed
class TestConnectExternalSignalsToComponentsMultipleComp:
def test_connect_external_signals_to_components_operation_tree(self, operation_tree):
""" Replaces a operation_tree in an SFG with other components """
sfg1 = SFG(outputs=[Output(operation_tree)])

Rasmus Karlsson
committed
inp1 = Input("INP1")
inp2 = Input("INP2")
out1 = Output(None, "OUT1")
add1 = Addition(None, None, "ADD1")
add2 = Addition(None, None, "ADD2")
add1.input(0).connect(inp1, "S1")
add1.input(1).connect(inp2, "S2")
add2.input(0).connect(add1, "S3")
add2.input(1).connect(sfg1.outputs[0], "S4")
out1.input(0).connect(add2, "S5")
test_sfg = SFG(inputs=[inp1, inp2], outputs=[out1])

Rasmus Karlsson
committed
assert test_sfg.evaluate(1, 2) == 8
sfg1.connect_external_signals_to_components()
assert test_sfg.evaluate(1, 2) == 8
assert not test_sfg.connect_external_signals_to_components()

Rasmus Karlsson
committed
def test_connect_external_signals_to_components_large_operation_tree(self, large_operation_tree):
""" Replaces a large_operation_tree in an SFG with other components """
sfg1 = SFG(outputs=[Output(large_operation_tree)])

Rasmus Karlsson
committed
inp1 = Input("INP1")
inp2 = Input("INP2")
out1 = Output(None, "OUT1")
add1 = Addition(None, None, "ADD1")
add2 = Addition(None, None, "ADD2")
add1.input(0).connect(inp1, "S1")
add1.input(1).connect(inp2, "S2")
add2.input(0).connect(add1, "S3")
add2.input(1).connect(sfg1.outputs[0], "S4")
out1.input(0).connect(add2, "S5")
test_sfg = SFG(inputs=[inp1, inp2], outputs=[out1])

Rasmus Karlsson
committed
assert test_sfg.evaluate(1, 2) == 17
sfg1.connect_external_signals_to_components()
assert test_sfg.evaluate(1, 2) == 17
assert not test_sfg.connect_external_signals_to_components()

Rasmus Karlsson
committed
def create_sfg(self, op_tree):
""" Create a simple SFG with either operation_tree or large_operation_tree """
sfg1 = SFG(outputs=[Output(op_tree)])

Rasmus Karlsson
committed
inp1 = Input("INP1")
inp2 = Input("INP2")
out1 = Output(None, "OUT1")
add1 = Addition(None, None, "ADD1")
add2 = Addition(None, None, "ADD2")
add1.input(0).connect(inp1, "S1")
add1.input(1).connect(inp2, "S2")
add2.input(0).connect(add1, "S3")
add2.input(1).connect(sfg1.outputs[0], "S4")
out1.input(0).connect(add2, "S5")
return SFG(inputs=[inp1, inp2], outputs=[out1])

Rasmus Karlsson
committed
def test_connect_external_signals_to_components_many_op(self, large_operation_tree):
""" Replaces an sfg component in a larger SFG with several component operations """
inp1 = Input("INP1")
inp2 = Input("INP2")
inp3 = Input("INP3")
inp4 = Input("INP4")
out1 = Output(None, "OUT1")
add1 = Addition(None, None, "ADD1")
sub1 = Subtraction(None, None, "SUB1")
add1.input(0).connect(inp1, "S1")
add1.input(1).connect(inp2, "S2")
sfg1 = self.create_sfg(large_operation_tree)
sfg1.input(0).connect(add1, "S3")
sfg1.input(1).connect(inp3, "S4")
sub1.input(0).connect(sfg1.outputs[0], "S5")
sub1.input(1).connect(inp4, "S6")
out1.input(0).connect(sub1, "S7")
test_sfg = SFG(inputs=[inp1, inp2, inp3, inp4], outputs=[out1])

Rasmus Karlsson
committed

Rasmus Karlsson
committed
assert test_sfg.evaluate(1, 2, 3, 4) == 16
sfg1.connect_external_signals_to_components()
assert test_sfg.evaluate(1, 2, 3, 4) == 16
assert not test_sfg.connect_external_signals_to_components()
Angus Lothian
committed
class TestTopologicalOrderOperations:
def test_feedback_sfg(self, simple_filter):
topological_order = simple_filter.get_operations_topological_order()
assert [comp.name for comp in topological_order] == ["IN1", "ADD1", "REG1", "CMUL1", "OUT1"]
def test_multiple_independent_inputs(self, sfg_two_inputs_two_outputs_independent):
topological_order = sfg_two_inputs_two_outputs_independent.get_operations_topological_order()
assert [comp.name for comp in topological_order] == ["IN1", "OUT1", "IN2", "C1", "ADD1", "OUT2"]
Angus Lothian
committed
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
class TestRemove:
def test_remove_single_input_outputs(self, simple_filter):
new_sfg = simple_filter.remove_operation("cmul1")
assert set(op.name for op in simple_filter.find_by_name("REG1")[0].subsequent_operations) == {"CMUL1", "OUT1"}
assert set(op.name for op in new_sfg.find_by_name("REG1")[0].subsequent_operations) == {"ADD1", "OUT1"}
assert set(op.name for op in simple_filter.find_by_name("ADD1")[0].preceding_operations) == {"CMUL1", "IN1"}
assert set(op.name for op in new_sfg.find_by_name("ADD1")[0].preceding_operations) == {"REG1", "IN1"}
assert "S1" in set([sig.name for sig in simple_filter.find_by_name("REG1")[0].output(0).signals])
assert "S2" in set([sig.name for sig in new_sfg.find_by_name("REG1")[0].output(0).signals])
def test_remove_multiple_inputs_outputs(self, butterfly_operation_tree):
out1 = Output(butterfly_operation_tree.output(0), "OUT1")
out2 = Output(butterfly_operation_tree.output(1), "OUT2")
sfg = SFG(outputs=[out1, out2])
new_sfg = sfg.remove_operation(sfg.find_by_name("bfly2")[0].graph_id)
assert sfg.find_by_name("bfly3")[0].output(0).signal_count == 1
assert new_sfg.find_by_name("bfly3")[0].output(0).signal_count == 1
sfg_dest_0 = sfg.find_by_name("bfly3")[0].output(0).signals[0].destination
new_sfg_dest_0 = new_sfg.find_by_name("bfly3")[0].output(0).signals[0].destination
assert sfg_dest_0.index == 0
assert new_sfg_dest_0.index == 0
assert sfg_dest_0.operation.name == "bfly2"
assert new_sfg_dest_0.operation.name == "bfly1"
assert sfg.find_by_name("bfly3")[0].output(1).signal_count == 1
assert new_sfg.find_by_name("bfly3")[0].output(1).signal_count == 1
sfg_dest_1 = sfg.find_by_name("bfly3")[0].output(1).signals[0].destination
new_sfg_dest_1 = new_sfg.find_by_name("bfly3")[0].output(1).signals[0].destination
assert sfg_dest_1.index == 1
assert new_sfg_dest_1.index == 1
assert sfg_dest_1.operation.name == "bfly2"
assert new_sfg_dest_1.operation.name == "bfly1"
assert sfg.find_by_name("bfly1")[0].input(0).signal_count == 1
assert new_sfg.find_by_name("bfly1")[0].input(0).signal_count == 1
sfg_source_0 = sfg.find_by_name("bfly1")[0].input(0).signals[0].source
new_sfg_source_0 = new_sfg.find_by_name("bfly1")[0].input(0).signals[0].source
assert sfg_source_0.index == 0
assert new_sfg_source_0.index == 0
assert sfg_source_0.operation.name == "bfly2"
assert new_sfg_source_0.operation.name == "bfly3"
sfg_source_1 = sfg.find_by_name("bfly1")[0].input(1).signals[0].source
new_sfg_source_1 = new_sfg.find_by_name("bfly1")[0].input(1).signals[0].source
assert sfg_source_1.index == 1
assert new_sfg_source_1.index == 1
assert sfg_source_1.operation.name == "bfly2"
assert new_sfg_source_1.operation.name == "bfly3"
assert "bfly2" not in set(op.name for op in new_sfg.operations)
def remove_different_number_inputs_outputs(self, simple_filter):
with pytest.raises(ValueError):
simple_filter.remove_operation("add1")