Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • da/B-ASIC
  • lukja239/B-ASIC
  • robal695/B-ASIC
3 results
Show changes
......@@ -4,92 +4,64 @@ B-ASIC test suite for Inputport
import pytest
from b_asic import InputPort, OutputPort
from b_asic import Signal
from b_asic import InputPort, OutputPort, Signal
@pytest.fixture
def inp_port():
return InputPort(0, None)
@pytest.fixture
def out_port():
return OutputPort(0, None)
@pytest.fixture
def out_port2():
return OutputPort(1, None)
def output_port2():
return OutputPort(None, 1)
@pytest.fixture
def dangling_sig():
return Signal()
@pytest.fixture
def s_w_source():
out_port = OutputPort(0, None)
return Signal(source=out_port)
def s_w_source(output_port):
return Signal(source=output_port)
@pytest.fixture
def sig_with_dest():
inp_port = InputPort(0, None)
return Signal(destination=out_port)
def sig_with_dest(inp_port):
return Signal(destination=inp_port)
@pytest.fixture
def connected_sig():
out_port = OutputPort(0, None)
inp_port = InputPort(0, None)
return Signal(source=out_port, destination=inp_port)
def connected_sig(inp_port, output_port):
return Signal(source=output_port, destination=inp_port)
def test_connect_then_disconnect(inp_port, out_port):
def test_connect_then_disconnect(input_port, output_port):
"""Test connect unused port to port."""
s1 = inp_port.connect(out_port)
assert inp_port.connected_ports == [out_port]
assert out_port.connected_ports == [inp_port]
assert inp_port.signals == [s1]
assert out_port.signals == [s1]
assert s1.source is out_port
assert s1.destination is inp_port
inp_port.remove_signal(s1)
assert inp_port.connected_ports == []
assert out_port.connected_ports == []
assert inp_port.signals == []
assert out_port.signals == [s1]
assert s1.source is out_port
s1 = input_port.connect(output_port)
assert input_port.connected_source == output_port
assert input_port.signals == [s1]
assert output_port.signals == [s1]
assert s1.source is output_port
assert s1.destination is input_port
input_port.remove_signal(s1)
assert input_port.connected_source is None
assert input_port.signals == []
assert output_port.signals == [s1]
assert s1.source is output_port
assert s1.destination is None
def test_connect_used_port_to_new_port(inp_port, out_port, out_port2):
"""Does connecting multiple ports to an inputport throw error?"""
inp_port.connect(out_port)
with pytest.raises(AssertionError):
inp_port.connect(out_port2)
def test_connect_used_port_to_new_port(input_port, output_port, output_port2):
"""Multiple connections to an input port should throw an error."""
input_port.connect(output_port)
with pytest.raises(Exception):
input_port.connect(output_port2)
def test_add_signal_then_disconnect(inp_port, s_w_source):
def test_add_signal_then_disconnect(input_port, s_w_source):
"""Can signal be connected then disconnected properly?"""
inp_port.add_signal(s_w_source)
input_port.add_signal(s_w_source)
assert inp_port.connected_ports == [s_w_source.source]
assert s_w_source.source.connected_ports == [inp_port]
assert inp_port.signals == [s_w_source]
assert input_port.connected_source == s_w_source.source
assert input_port.signals == [s_w_source]
assert s_w_source.source.signals == [s_w_source]
assert s_w_source.destination is inp_port
assert s_w_source.destination is input_port
inp_port.remove_signal(s_w_source)
input_port.remove_signal(s_w_source)
assert inp_port.connected_ports == []
assert s_w_source.source.connected_ports == []
assert inp_port.signals == []
assert input_port.connected_source is None
assert input_port.signals == []
assert s_w_source.source.signals == [s_w_source]
assert s_w_source.destination is None
def test_connect_then_disconnect(inp_port, out_port):
"""Can port be connected and then disconnected properly?"""
inp_port.connect(out_port)
inp_port.disconnect(out_port)
print("outport signals:", out_port.signals, "count:", out_port.signal_count())
assert inp_port.signal_count() == 1
assert len(inp_port.connected_ports) == 0
assert out_port.signal_count() == 0
from b_asic.core_operations import Constant, Addition
from b_asic.signal import Signal
from b_asic.port import InputPort, OutputPort
"""
B-ASIC test suite for the AbstractOperation class.
"""
import pytest
from b_asic import Addition, Subtraction, Multiplication, ConstantMultiplication, Division, Constant, Butterfly, \
MAD, SquareRoot
class TestOperationOverloading:
def test_addition_overload(self):
"""Tests addition overloading for both operation and number argument."""
add1 = Addition(None, None, "add1")
add2 = Addition(None, None, "add2")
add3 = add1 + add2
assert isinstance(add3, Addition)
assert add3.input(0).signals == add1.output(0).signals
assert add3.input(1).signals == add2.output(0).signals
add4 = add3 + 5
assert isinstance(add4, Addition)
assert add4.input(0).signals == add3.output(0).signals
assert add4.input(1).signals[0].source.operation.value == 5
add5 = 5 + add4
assert isinstance(add5, Addition)
assert add5.input(0).signals[0].source.operation.value == 5
assert add5.input(1).signals == add4.output(0).signals
def test_subtraction_overload(self):
"""Tests subtraction overloading for both operation and number argument."""
add1 = Addition(None, None, "add1")
add2 = Addition(None, None, "add2")
sub1 = add1 - add2
assert isinstance(sub1, Subtraction)
assert sub1.input(0).signals == add1.output(0).signals
assert sub1.input(1).signals == add2.output(0).signals
sub2 = sub1 - 5
assert isinstance(sub2, Subtraction)
assert sub2.input(0).signals == sub1.output(0).signals
assert sub2.input(1).signals[0].source.operation.value == 5
sub3 = 5 - sub2
assert isinstance(sub3, Subtraction)
assert sub3.input(0).signals[0].source.operation.value == 5
assert sub3.input(1).signals == sub2.output(0).signals
def test_multiplication_overload(self):
"""Tests multiplication overloading for both operation and number argument."""
add1 = Addition(None, None, "add1")
add2 = Addition(None, None, "add2")
mul1 = add1 * add2
assert isinstance(mul1, Multiplication)
assert mul1.input(0).signals == add1.output(0).signals
assert mul1.input(1).signals == add2.output(0).signals
mul2 = mul1 * 5
assert isinstance(mul2, ConstantMultiplication)
assert mul2.input(0).signals == mul1.output(0).signals
assert mul2.value == 5
mul3 = 5 * mul2
assert isinstance(mul3, ConstantMultiplication)
assert mul3.input(0).signals == mul2.output(0).signals
assert mul3.value == 5
def test_division_overload(self):
"""Tests division overloading for both operation and number argument."""
add1 = Addition(None, None, "add1")
add2 = Addition(None, None, "add2")
div1 = add1 / add2
assert isinstance(div1, Division)
assert div1.input(0).signals == add1.output(0).signals
assert div1.input(1).signals == add2.output(0).signals
div2 = div1 / 5
assert isinstance(div2, Division)
assert div2.input(0).signals == div1.output(0).signals
assert div2.input(1).signals[0].source.operation.value == 5
div3 = 5 / div2
assert isinstance(div3, Division)
assert div3.input(0).signals[0].source.operation.value == 5
assert div3.input(1).signals == div2.output(0).signals
class TestTraverse:
def test_traverse_single_tree(self, operation):
"""Traverse a tree consisting of one operation."""
......@@ -12,20 +98,89 @@ class TestTraverse:
def test_traverse_tree(self, operation_tree):
"""Traverse a basic addition tree with two constants."""
assert len(list(operation_tree.traverse())) == 3
assert len(list(operation_tree.traverse())) == 5
def test_traverse_large_tree(self, large_operation_tree):
"""Traverse a larger tree."""
assert len(list(large_operation_tree.traverse())) == 7
assert len(list(large_operation_tree.traverse())) == 13
def test_traverse_type(self, large_operation_tree):
traverse = list(large_operation_tree.traverse())
assert len(list(filter(lambda type_: isinstance(type_, Addition), traverse))) == 3
assert len(list(filter(lambda type_: isinstance(type_, Constant), traverse))) == 4
def test_traverse_loop(self, operation_tree):
add_oper_signal = Signal()
operation_tree._output_ports[0].add_signal(add_oper_signal)
operation_tree._input_ports[0].remove_signal(add_oper_signal)
operation_tree._input_ports[0].add_signal(add_oper_signal)
assert len(list(operation_tree.traverse())) == 2
result = list(large_operation_tree.traverse())
assert len(list(filter(lambda type_: isinstance(type_, Addition), result))) == 3
assert len(list(filter(lambda type_: isinstance(type_, Constant), result))) == 4
def test_traverse_loop(self, operation_graph_with_cycle):
assert len(list(operation_graph_with_cycle.traverse())) == 8
class TestToSfg:
def test_convert_mad_to_sfg(self):
mad1 = MAD()
mad1_sfg = mad1.to_sfg()
assert mad1.evaluate(1, 1, 1) == mad1_sfg.evaluate(1, 1, 1)
assert len(mad1_sfg.operations) == 6
def test_butterfly_to_sfg(self):
but1 = Butterfly()
but1_sfg = but1.to_sfg()
assert but1.evaluate(1, 1)[0] == but1_sfg.evaluate(1, 1)[0]
assert but1.evaluate(1, 1)[1] == but1_sfg.evaluate(1, 1)[1]
assert len(but1_sfg.operations) == 8
def test_add_to_sfg(self):
add1 = Addition()
add1_sfg = add1.to_sfg()
assert len(add1_sfg.operations) == 4
def test_sqrt_to_sfg(self):
sqrt1 = SquareRoot()
sqrt1_sfg = sqrt1.to_sfg()
assert len(sqrt1_sfg.operations) == 3
class TestLatency:
def test_latency_constructor(self):
bfly = Butterfly(latency=5)
assert bfly.latency == 5
assert bfly.latency_offsets == {'in0': 0, 'in1': 0, 'out0': 5, 'out1': 5}
def test_latency_offsets_constructor(self):
bfly = Butterfly(latency_offsets={'in0': 2, 'in1': 3, 'out0': 5, 'out1': 10})
assert bfly.latency == 8
assert bfly.latency_offsets == {'in0': 2, 'in1': 3, 'out0': 5, 'out1': 10}
def test_latency_and_latency_offsets_constructor(self):
bfly = Butterfly(latency=5, latency_offsets={'in1': 2, 'out0': 9})
assert bfly.latency == 9
assert bfly.latency_offsets == {"in0": 0, "in1": 2, "out0": 9, "out1": 5}
def test_set_latency(self):
bfly = Butterfly()
bfly.set_latency(9)
assert bfly.latency == 9
assert bfly.latency_offsets == {"in0": 0, "in1": 0, "out0": 9, "out1": 9}
def test_set_latency_offsets(self):
bfly = Butterfly()
bfly.set_latency_offsets({'in0': 3, 'out1': 5})
assert bfly.latency_offsets == {'in0': 3, "in1": None, "out0": None, 'out1': 5}
class TestCopyOperation:
def test_copy_buttefly_latency_offsets(self):
bfly = Butterfly(latency_offsets={'in0': 4, 'in1': 2, 'out0': 10, 'out1': 9})
bfly_copy = bfly.copy_component()
assert bfly_copy.latency_offsets == {'in0': 4, 'in1': 2, 'out0': 10, 'out1': 9}
"""
B-ASIC test suite for OutputPort.
"""
from b_asic import OutputPort, InputPort, Signal
import pytest
@pytest.fixture
def output_port():
return OutputPort(0, None)
@pytest.fixture
def input_port():
return InputPort(0, None)
@pytest.fixture
def list_of_input_ports():
return [InputPort(_, None) for _ in range(0,3)]
from b_asic import OutputPort, InputPort, Signal
class TestConnect:
def test_multiple_ports(self, output_port, list_of_input_ports):
"""Can multiple ports connect to an output port?"""
"""Multiple connections to an output port should be possible."""
for port in list_of_input_ports:
output_port.connect(port)
port.connect(output_port)
assert output_port.signal_count() == len(list_of_input_ports)
assert output_port.signal_count == len(list_of_input_ports)
def test_same_port(self, output_port, list_of_input_ports):
def test_same_port(self, output_port, input_port):
"""Check error handing."""
output_port.connect(list_of_input_ports[0])
with pytest.raises(AssertionError):
output_port.connect(list_of_input_ports[0])
input_port.connect(output_port)
with pytest.raises(Exception):
input_port.connect(output_port)
assert output_port.signal_count() == 2
assert output_port.signal_count == 1
class TestAddSignal:
def test_dangling(self, output_port):
s = Signal()
output_port.add_signal(s)
assert output_port.signal_count() == 1
def test_with_destination(self, output_port, input_port):
s = Signal(destination=input_port)
output_port.add_signal(s)
assert output_port.signal_count == 1
assert output_port.signals == [s]
assert output_port.connected_ports == [s.destination]
class TestClear:
def test_others_clear(self, output_port, list_of_input_ports):
for port in list_of_input_ports:
port.connect(output_port)
class TestDisconnect:
def test_multiple_ports(self, output_port, list_of_input_ports):
"""Can multiple ports disconnect from OutputPort?"""
for port in list_of_input_ports:
output_port.connect(port)
port.clear()
assert output_port.signal_count == 3
assert all(s.dangling() for s in output_port.signals)
def test_self_clear(self, output_port, list_of_input_ports):
for port in list_of_input_ports:
output_port.disconnect(port)
port.connect(output_port)
assert output_port.signal_count() == 3
assert output_port.connected_ports == []
output_port.clear()
assert output_port.signal_count == 0
assert output_port.signals == []
class TestRemoveSignal:
def test_one_signal(self, output_port, input_port):
s = output_port.connect(input_port)
s = input_port.connect(output_port)
output_port.remove_signal(s)
assert output_port.signal_count() == 0
assert output_port.signal_count == 0
assert output_port.signals == []
assert output_port.connected_ports == []
def test_multiple_signals(self, output_port, list_of_input_ports):
"""Can multiple signals disconnect from OutputPort?"""
sigs = []
for port in list_of_input_ports:
sigs.append(output_port.connect(port))
sigs.append(port.connect(output_port))
for sig in sigs:
output_port.remove_signal(sig)
for s in sigs:
output_port.remove_signal(s)
assert output_port.signal_count() == 0
assert output_port.signal_count == 0
assert output_port.signals == []
"""
B-ASIC test suite for the schema module and Schema class.
"""
from b_asic import Schema, Addition, ConstantMultiplication
class TestInit:
def test_simple_filter_normal_latency(self, sfg_simple_filter):
sfg_simple_filter.set_latency_of_type(Addition.type_name(), 5)
sfg_simple_filter.set_latency_of_type(ConstantMultiplication.type_name(), 4)
schema = Schema(sfg_simple_filter)
assert schema._start_times == {"add1": 4, "cmul1": 0}
def test_complicated_single_outputs_normal_latency(self, precedence_sfg_delays):
precedence_sfg_delays.set_latency_of_type(Addition.type_name(), 4)
precedence_sfg_delays.set_latency_of_type(ConstantMultiplication.type_name(), 3)
schema = Schema(precedence_sfg_delays, scheduling_alg="ASAP")
for op in schema._sfg.get_operations_topological_order():
print(op.latency_offsets)
start_times_names = dict()
for op_id, start_time in schema._start_times.items():
op_name = precedence_sfg_delays.find_by_id(op_id).name
start_times_names[op_name] = start_time
assert start_times_names == {"C0": 0, "B1": 0, "B2": 0, "ADD2": 3, "ADD1": 7, "Q1": 11,
"A0": 14, "A1": 0, "A2": 0, "ADD3": 3, "ADD4": 17}
def test_complicated_single_outputs_complex_latencies(self, precedence_sfg_delays):
precedence_sfg_delays.set_latency_offsets_of_type(ConstantMultiplication.type_name(), {'in0': 3, 'out0': 5})
precedence_sfg_delays.find_by_name("B1")[0].set_latency_offsets({'in0': 4, 'out0': 7})
precedence_sfg_delays.find_by_name("B2")[0].set_latency_offsets({'in0': 1, 'out0': 4})
precedence_sfg_delays.find_by_name("ADD2")[0].set_latency_offsets({'in0': 4, 'in1': 2, 'out0': 4})
precedence_sfg_delays.find_by_name("ADD1")[0].set_latency_offsets({'in0': 1, 'in1': 2, 'out0': 4})
precedence_sfg_delays.find_by_name("Q1")[0].set_latency_offsets({'in0': 3, 'out0': 6})
precedence_sfg_delays.find_by_name("A0")[0].set_latency_offsets({'in0': 0, 'out0': 2})
precedence_sfg_delays.find_by_name("A1")[0].set_latency_offsets({'in0': 0, 'out0': 5})
precedence_sfg_delays.find_by_name("A2")[0].set_latency_offsets({'in0': 2, 'out0': 3})
precedence_sfg_delays.find_by_name("ADD3")[0].set_latency_offsets({'in0': 2, 'in1': 1, 'out0': 4})
precedence_sfg_delays.find_by_name("ADD4")[0].set_latency_offsets({'in0': 6, 'in1': 7, 'out0': 9})
schema = Schema(precedence_sfg_delays, scheduling_alg="ASAP")
start_times_names = dict()
for op_id, start_time in schema._start_times.items():
op_name = precedence_sfg_delays.find_by_id(op_id).name
start_times_names[op_name] = start_time
assert start_times_names == {'C0': 0, 'B1': 0, 'B2': 0, 'ADD2': 3, 'ADD1': 5, 'Q1': 6, 'A0': 12,
'A1': 0, 'A2': 0, 'ADD3': 3, 'ADD4': 8}
def test_independent_sfg(self, sfg_two_inputs_two_outputs_independent_with_cmul):
schema = Schema(sfg_two_inputs_two_outputs_independent_with_cmul, scheduling_alg="ASAP")
start_times_names = dict()
for op_id, start_time in schema._start_times.items():
op_name = sfg_two_inputs_two_outputs_independent_with_cmul.find_by_id(op_id).name
start_times_names[op_name] = start_time
assert start_times_names == {'CMUL1': 0, 'CMUL2': 5, "ADD1": 0, "CMUL3": 7}
from os import path, remove
import pytest
import random
import string
import io
import sys
from b_asic import SFG, Signal, Input, Output, Constant, ConstantMultiplication, Addition, Multiplication, Delay, \
Butterfly, Subtraction, SquareRoot
from b_asic.save_load_structure import sfg_to_python, python_to_sfg
class TestInit:
def test_direct_input_to_output_sfg_construction(self):
in1 = Input("IN1")
out1 = Output(None, "OUT1")
out1.input(0).connect(in1, "S1")
sfg = SFG(inputs=[in1], outputs=[out1]) # in1 ---s1---> out1
assert len(list(sfg.components)) == 3
assert len(list(sfg.operations)) == 2
assert sfg.input_count == 1
assert sfg.output_count == 1
def test_same_signal_input_and_output_sfg_construction(self):
add1 = Addition(None, None, "ADD1")
add2 = Addition(None, None, "ADD2")
s1 = add2.input(0).connect(add1, "S1")
# in1 ---s1---> out1
sfg = SFG(input_signals=[s1], output_signals=[s1])
assert len(list(sfg.components)) == 3
assert len(list(sfg.operations)) == 2
assert sfg.input_count == 1
assert sfg.output_count == 1
def test_outputs_construction(self, operation_tree):
sfg = SFG(outputs=[Output(operation_tree)])
assert len(list(sfg.components)) == 7
assert len(list(sfg.operations)) == 4
assert sfg.input_count == 0
assert sfg.output_count == 1
def test_signals_construction(self, operation_tree):
sfg = SFG(output_signals=[Signal(source=operation_tree.output(0))])
assert len(list(sfg.components)) == 7
assert len(list(sfg.operations)) == 4
assert sfg.input_count == 0
assert sfg.output_count == 1
class TestPrintSfg:
def test_one_addition(self):
inp1 = Input("INP1")
inp2 = Input("INP2")
add1 = Addition(inp1, inp2, "ADD1")
out1 = Output(add1, "OUT1")
sfg = SFG(inputs=[inp1, inp2], outputs=[out1], name="SFG1")
assert sfg.__str__() == \
"id: no_id, \tname: SFG1, \tinputs: {0: '-'}, \toutputs: {0: '-'}\n" + \
"Internal Operations:\n" + \
"----------------------------------------------------------------------------------------------------\n" + \
str(sfg.find_by_name("INP1")[0]) + "\n" + \
str(sfg.find_by_name("INP2")[0]) + "\n" + \
str(sfg.find_by_name("ADD1")[0]) + "\n" + \
str(sfg.find_by_name("OUT1")[0]) + "\n" + \
"----------------------------------------------------------------------------------------------------\n"
def test_add_mul(self):
inp1 = Input("INP1")
inp2 = Input("INP2")
inp3 = Input("INP3")
add1 = Addition(inp1, inp2, "ADD1")
mul1 = Multiplication(add1, inp3, "MUL1")
out1 = Output(mul1, "OUT1")
sfg = SFG(inputs=[inp1, inp2, inp3], outputs=[out1], name="mac_sfg")
assert sfg.__str__() == \
"id: no_id, \tname: mac_sfg, \tinputs: {0: '-'}, \toutputs: {0: '-'}\n" + \
"Internal Operations:\n" + \
"----------------------------------------------------------------------------------------------------\n" + \
str(sfg.find_by_name("INP1")[0]) + "\n" + \
str(sfg.find_by_name("INP2")[0]) + "\n" + \
str(sfg.find_by_name("ADD1")[0]) + "\n" + \
str(sfg.find_by_name("INP3")[0]) + "\n" + \
str(sfg.find_by_name("MUL1")[0]) + "\n" + \
str(sfg.find_by_name("OUT1")[0]) + "\n" + \
"----------------------------------------------------------------------------------------------------\n"
def test_constant(self):
inp1 = Input("INP1")
const1 = Constant(3, "CONST")
add1 = Addition(const1, inp1, "ADD1")
out1 = Output(add1, "OUT1")
sfg = SFG(inputs=[inp1], outputs=[out1], name="sfg")
assert sfg.__str__() == \
"id: no_id, \tname: sfg, \tinputs: {0: '-'}, \toutputs: {0: '-'}\n" + \
"Internal Operations:\n" + \
"----------------------------------------------------------------------------------------------------\n" + \
str(sfg.find_by_name("CONST")[0]) + "\n" + \
str(sfg.find_by_name("INP1")[0]) + "\n" + \
str(sfg.find_by_name("ADD1")[0]) + "\n" + \
str(sfg.find_by_name("OUT1")[0]) + "\n" + \
"----------------------------------------------------------------------------------------------------\n"
def test_simple_filter(self, sfg_simple_filter):
assert sfg_simple_filter.__str__() == \
"id: no_id, \tname: simple_filter, \tinputs: {0: '-'}, \toutputs: {0: '-'}\n" + \
"Internal Operations:\n" + \
"----------------------------------------------------------------------------------------------------\n" + \
str(sfg_simple_filter.find_by_name("IN1")[0]) + "\n" + \
str(sfg_simple_filter.find_by_name("ADD1")[0]) + "\n" + \
str(sfg_simple_filter.find_by_name("T1")[0]) + "\n" + \
str(sfg_simple_filter.find_by_name("CMUL1")[0]) + "\n" + \
str(sfg_simple_filter.find_by_name("OUT1")[0]) + "\n" + \
"----------------------------------------------------------------------------------------------------\n"
class TestDeepCopy:
def test_deep_copy_no_duplicates(self):
inp1 = Input("INP1")
inp2 = Input("INP2")
inp3 = Input("INP3")
add1 = Addition(inp1, inp2, "ADD1")
mul1 = Multiplication(add1, inp3, "MUL1")
out1 = Output(mul1, "OUT1")
mac_sfg = SFG(inputs=[inp1, inp2], outputs=[out1], name="mac_sfg")
mac_sfg_new = mac_sfg()
assert mac_sfg.name == "mac_sfg"
assert mac_sfg_new.name == ""
for g_id, component in mac_sfg._components_by_id.items():
component_copy = mac_sfg_new.find_by_id(g_id)
assert component.name == component_copy.name
def test_deep_copy(self):
inp1 = Input("INP1")
inp2 = Input("INP2")
inp3 = Input("INP3")
add1 = Addition(None, None, "ADD1")
add2 = Addition(None, None, "ADD2")
mul1 = Multiplication(None, None, "MUL1")
out1 = Output(None, "OUT1")
add1.input(0).connect(inp1, "S1")
add1.input(1).connect(inp2, "S2")
add2.input(0).connect(add1, "S4")
add2.input(1).connect(inp3, "S3")
mul1.input(0).connect(add1, "S5")
mul1.input(1).connect(add2, "S6")
out1.input(0).connect(mul1, "S7")
mac_sfg = SFG(inputs=[inp1, inp2], outputs=[out1],
id_number_offset=100, name="mac_sfg")
mac_sfg_new = mac_sfg(name="mac_sfg2")
assert mac_sfg.name == "mac_sfg"
assert mac_sfg_new.name == "mac_sfg2"
assert mac_sfg.id_number_offset == 100
assert mac_sfg_new.id_number_offset == 100
for g_id, component in mac_sfg._components_by_id.items():
component_copy = mac_sfg_new.find_by_id(g_id)
assert component.name == component_copy.name
def test_deep_copy_with_new_sources(self):
inp1 = Input("INP1")
inp2 = Input("INP2")
inp3 = Input("INP3")
add1 = Addition(inp1, inp2, "ADD1")
mul1 = Multiplication(add1, inp3, "MUL1")
out1 = Output(mul1, "OUT1")
mac_sfg = SFG(inputs=[inp1, inp2], outputs=[out1], name="mac_sfg")
a = Addition(Constant(3), Constant(5))
b = Constant(2)
mac_sfg_new = mac_sfg(a, b)
assert mac_sfg_new.input(0).signals[0].source.operation is a
assert mac_sfg_new.input(1).signals[0].source.operation is b
class TestEvaluateOutput:
def test_evaluate_output(self, operation_tree):
sfg = SFG(outputs=[Output(operation_tree)])
assert sfg.evaluate_output(0, []) == 5
def test_evaluate_output_large(self, large_operation_tree):
sfg = SFG(outputs=[Output(large_operation_tree)])
assert sfg.evaluate_output(0, []) == 14
def test_evaluate_output_cycle(self, operation_graph_with_cycle):
sfg = SFG(outputs=[Output(operation_graph_with_cycle)])
with pytest.raises(Exception):
sfg.evaluate_output(0, [])
class TestComponents:
def test_advanced_components(self):
inp1 = Input("INP1")
inp2 = Input("INP2")
inp3 = Input("INP3")
add1 = Addition(None, None, "ADD1")
add2 = Addition(None, None, "ADD2")
mul1 = Multiplication(None, None, "MUL1")
out1 = Output(None, "OUT1")
add1.input(0).connect(inp1, "S1")
add1.input(1).connect(inp2, "S2")
add2.input(0).connect(add1, "S4")
add2.input(1).connect(inp3, "S3")
mul1.input(0).connect(add1, "S5")
mul1.input(1).connect(add2, "S6")
out1.input(0).connect(mul1, "S7")
mac_sfg = SFG(inputs=[inp1, inp2], outputs=[out1], name="mac_sfg")
assert set([comp.name for comp in mac_sfg.components]) == {
"INP1", "INP2", "INP3", "ADD1", "ADD2", "MUL1", "OUT1", "S1", "S2", "S3", "S4", "S5", "S6", "S7"}
class TestReplaceComponents:
def test_replace_addition_by_id(self, operation_tree):
sfg = SFG(outputs=[Output(operation_tree)])
component_id = "add1"
sfg = sfg.replace_component(
Multiplication(name="Multi"), _id=component_id)
assert component_id not in sfg._components_by_id.keys()
assert "Multi" in sfg._components_by_name.keys()
def test_replace_addition_large_tree(self, large_operation_tree):
sfg = SFG(outputs=[Output(large_operation_tree)])
component_id = "add3"
sfg = sfg.replace_component(
Multiplication(name="Multi"), _id=component_id)
assert "Multi" in sfg._components_by_name.keys()
assert component_id not in sfg._components_by_id.keys()
def test_replace_no_input_component(self, operation_tree):
sfg = SFG(outputs=[Output(operation_tree)])
component_id = "c1"
_const = sfg.find_by_id(component_id)
sfg = sfg.replace_component(Constant(1), _id=component_id)
assert _const is not sfg.find_by_id(component_id)
def test_no_match_on_replace(self, large_operation_tree):
sfg = SFG(outputs=[Output(large_operation_tree)])
component_id = "addd1"
try:
sfg = sfg.replace_component(
Multiplication(name="Multi"), _id=component_id)
except AssertionError:
assert True
else:
assert False
def test_not_equal_input(self, large_operation_tree):
sfg = SFG(outputs=[Output(large_operation_tree)])
component_id = "c1"
try:
sfg = sfg.replace_component(
Multiplication(name="Multi"), _id=component_id)
except AssertionError:
assert True
else:
assert False
class TestInsertComponent:
def test_insert_component_in_sfg(self, large_operation_tree_names):
sfg = SFG(outputs=[Output(large_operation_tree_names)])
sqrt = SquareRoot()
_sfg = sfg.insert_operation(sqrt, sfg.find_by_name("constant4")[0].graph_id)
assert _sfg.evaluate() != sfg.evaluate()
assert any([isinstance(comp, SquareRoot) for comp in _sfg.operations])
assert not any([isinstance(comp, SquareRoot) for comp in sfg.operations])
assert not isinstance(sfg.find_by_name("constant4")[0].output(0).signals[0].destination.operation, SquareRoot)
assert isinstance(_sfg.find_by_name("constant4")[0].output(0).signals[0].destination.operation, SquareRoot)
assert sfg.find_by_name("constant4")[0].output(0).signals[0].destination.operation is sfg.find_by_id("add3")
assert _sfg.find_by_name("constant4")[0].output(
0).signals[0].destination.operation is not _sfg.find_by_id("add3")
assert _sfg.find_by_id("sqrt1").output(0).signals[0].destination.operation is _sfg.find_by_id("add3")
def test_insert_invalid_component_in_sfg(self, large_operation_tree):
sfg = SFG(outputs=[Output(large_operation_tree)])
# Should raise an exception for not matching input count to output count.
add4 = Addition()
with pytest.raises(Exception):
sfg.insert_operation(add4, "c1")
def test_insert_at_output(self, large_operation_tree):
sfg = SFG(outputs=[Output(large_operation_tree)])
# Should raise an exception for trying to insert an operation after an output.
sqrt = SquareRoot()
with pytest.raises(Exception):
_sfg = sfg.insert_operation(sqrt, "out1")
def test_insert_multiple_output_ports(self, butterfly_operation_tree):
sfg = SFG(outputs=list(map(Output, butterfly_operation_tree.outputs)))
_sfg = sfg.insert_operation(Butterfly(name="n_bfly"), "bfly3")
assert sfg.evaluate() != _sfg.evaluate()
assert len(sfg.find_by_name("n_bfly")) == 0
assert len(_sfg.find_by_name("n_bfly")) == 1
# Correctly connected old output -> new input
assert _sfg.find_by_name("bfly3")[0].output(
0).signals[0].destination.operation is _sfg.find_by_name("n_bfly")[0]
assert _sfg.find_by_name("bfly3")[0].output(
1).signals[0].destination.operation is _sfg.find_by_name("n_bfly")[0]
# Correctly connected new input -> old output
assert _sfg.find_by_name("n_bfly")[0].input(0).signals[0].source.operation is _sfg.find_by_name("bfly3")[0]
assert _sfg.find_by_name("n_bfly")[0].input(1).signals[0].source.operation is _sfg.find_by_name("bfly3")[0]
# Correctly connected new output -> next input
assert _sfg.find_by_name("n_bfly")[0].output(
0).signals[0].destination.operation is _sfg.find_by_name("bfly2")[0]
assert _sfg.find_by_name("n_bfly")[0].output(
1).signals[0].destination.operation is _sfg.find_by_name("bfly2")[0]
# Correctly connected next input -> new output
assert _sfg.find_by_name("bfly2")[0].input(0).signals[0].source.operation is _sfg.find_by_name("n_bfly")[0]
assert _sfg.find_by_name("bfly2")[0].input(1).signals[0].source.operation is _sfg.find_by_name("n_bfly")[0]
class TestFindComponentsWithTypeName:
def test_mac_components(self):
inp1 = Input("INP1")
inp2 = Input("INP2")
inp3 = Input("INP3")
add1 = Addition(None, None, "ADD1")
add2 = Addition(None, None, "ADD2")
mul1 = Multiplication(None, None, "MUL1")
out1 = Output(None, "OUT1")
add1.input(0).connect(inp1, "S1")
add1.input(1).connect(inp2, "S2")
add2.input(0).connect(add1, "S4")
add2.input(1).connect(inp3, "S3")
mul1.input(0).connect(add1, "S5")
mul1.input(1).connect(add2, "S6")
out1.input(0).connect(mul1, "S7")
mac_sfg = SFG(inputs=[inp1, inp2], outputs=[out1], name="mac_sfg")
assert {comp.name for comp in mac_sfg.get_components_with_type_name(
inp1.type_name())} == {"INP1", "INP2", "INP3"}
assert {comp.name for comp in mac_sfg.get_components_with_type_name(
add1.type_name())} == {"ADD1", "ADD2"}
assert {comp.name for comp in mac_sfg.get_components_with_type_name(
mul1.type_name())} == {"MUL1"}
assert {comp.name for comp in mac_sfg.get_components_with_type_name(
out1.type_name())} == {"OUT1"}
assert {comp.name for comp in mac_sfg.get_components_with_type_name(
Signal.type_name())} == {"S1", "S2", "S3", "S4", "S5", "S6", "S7"}
class TestGetPrecedenceList:
def test_inputs_delays(self, precedence_sfg_delays):
precedence_list = precedence_sfg_delays.get_precedence_list()
assert len(precedence_list) == 7
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[0]]) == {"IN1", "T1", "T2"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[1]]) == {"C0", "B1", "B2", "A1", "A2"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[2]]) == {"ADD2", "ADD3"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[3]]) == {"ADD1"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[4]]) == {"Q1"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[5]]) == {"A0"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[6]]) == {"ADD4"}
def test_inputs_constants_delays_multiple_outputs(self, precedence_sfg_delays_and_constants):
precedence_list = precedence_sfg_delays_and_constants.get_precedence_list()
assert len(precedence_list) == 7
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[0]]) == {"IN1", "T1", "CONST1"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[1]]) == {"C0", "B1", "B2", "A1", "A2"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[2]]) == {"ADD2", "ADD3"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[3]]) == {"ADD1"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[4]]) == {"Q1"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[5]]) == {"A0"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[6]]) == {"BFLY1.0", "BFLY1.1"}
def test_precedence_multiple_outputs_same_precedence(self, sfg_two_inputs_two_outputs):
sfg_two_inputs_two_outputs.name = "NESTED_SFG"
in1 = Input("IN1")
sfg_two_inputs_two_outputs.input(0).connect(in1, "S1")
in2 = Input("IN2")
cmul1 = ConstantMultiplication(10, None, "CMUL1")
cmul1.input(0).connect(in2, "S2")
sfg_two_inputs_two_outputs.input(1).connect(cmul1, "S3")
out1 = Output(sfg_two_inputs_two_outputs.output(0), "OUT1")
out2 = Output(sfg_two_inputs_two_outputs.output(1), "OUT2")
sfg = SFG(inputs=[in1, in2], outputs=[out1, out2])
precedence_list = sfg.get_precedence_list()
assert len(precedence_list) == 3
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[0]]) == {"IN1", "IN2"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[1]]) == {"CMUL1"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[2]]) == {"NESTED_SFG.0", "NESTED_SFG.1"}
def test_precedence_sfg_multiple_outputs_different_precedences(self, sfg_two_inputs_two_outputs_independent):
sfg_two_inputs_two_outputs_independent.name = "NESTED_SFG"
in1 = Input("IN1")
in2 = Input("IN2")
sfg_two_inputs_two_outputs_independent.input(0).connect(in1, "S1")
cmul1 = ConstantMultiplication(10, None, "CMUL1")
cmul1.input(0).connect(in2, "S2")
sfg_two_inputs_two_outputs_independent.input(1).connect(cmul1, "S3")
out1 = Output(sfg_two_inputs_two_outputs_independent.output(0), "OUT1")
out2 = Output(sfg_two_inputs_two_outputs_independent.output(1), "OUT2")
sfg = SFG(inputs=[in1, in2], outputs=[out1, out2])
precedence_list = sfg.get_precedence_list()
assert len(precedence_list) == 3
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[0]]) == {"IN1", "IN2"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[1]]) == {"CMUL1"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[2]]) == {"NESTED_SFG.0", "NESTED_SFG.1"}
class TestPrintPrecedence:
def test_delays(self, precedence_sfg_delays):
sfg = precedence_sfg_delays
captured_output = io.StringIO()
sys.stdout = captured_output
sfg.print_precedence_graph()
sys.stdout = sys.__stdout__
captured_output = captured_output.getvalue()
assert captured_output == \
"-" * 120 + "\n" + \
"1.1 \t" + str(sfg.find_by_name("IN1")[0]) + "\n" + \
"1.2 \t" + str(sfg.find_by_name("T1")[0]) + "\n" + \
"1.3 \t" + str(sfg.find_by_name("T2")[0]) + "\n" + \
"-" * 120 + "\n" + \
"2.1 \t" + str(sfg.find_by_name("C0")[0]) + "\n" + \
"2.2 \t" + str(sfg.find_by_name("A1")[0]) + "\n" + \
"2.3 \t" + str(sfg.find_by_name("B1")[0]) + "\n" + \
"2.4 \t" + str(sfg.find_by_name("A2")[0]) + "\n" + \
"2.5 \t" + str(sfg.find_by_name("B2")[0]) + "\n" + \
"-" * 120 + "\n" + \
"3.1 \t" + str(sfg.find_by_name("ADD3")[0]) + "\n" + \
"3.2 \t" + str(sfg.find_by_name("ADD2")[0]) + "\n" + \
"-" * 120 + "\n" + \
"4.1 \t" + str(sfg.find_by_name("ADD1")[0]) + "\n" + \
"-" * 120 + "\n" + \
"5.1 \t" + str(sfg.find_by_name("Q1")[0]) + "\n" + \
"-" * 120 + "\n" + \
"6.1 \t" + str(sfg.find_by_name("A0")[0]) + "\n" + \
"-" * 120 + "\n" + \
"7.1 \t" + str(sfg.find_by_name("ADD4")[0]) + "\n" + \
"-" * 120 + "\n"
class TestDepends:
def test_depends_sfg(self, sfg_two_inputs_two_outputs):
assert set(sfg_two_inputs_two_outputs.inputs_required_for_output(0)) == {
0, 1}
assert set(sfg_two_inputs_two_outputs.inputs_required_for_output(1)) == {
0, 1}
def test_depends_sfg_independent(self, sfg_two_inputs_two_outputs_independent):
assert set(
sfg_two_inputs_two_outputs_independent.inputs_required_for_output(0)) == {0}
assert set(
sfg_two_inputs_two_outputs_independent.inputs_required_for_output(1)) == {1}
class TestConnectExternalSignalsToComponentsSoloComp:
def test_connect_external_signals_to_components_mac(self):
""" Replace a MAC with inner components in an SFG """
inp1 = Input("INP1")
inp2 = Input("INP2")
inp3 = Input("INP3")
add1 = Addition(None, None, "ADD1")
add2 = Addition(None, None, "ADD2")
mul1 = Multiplication(None, None, "MUL1")
out1 = Output(None, "OUT1")
add1.input(0).connect(inp1, "S1")
add1.input(1).connect(inp2, "S2")
add2.input(0).connect(add1, "S3")
add2.input(1).connect(inp3, "S4")
mul1.input(0).connect(add1, "S5")
mul1.input(1).connect(add2, "S6")
out1.input(0).connect(mul1, "S7")
mac_sfg = SFG(inputs=[inp1, inp2], outputs=[out1])
inp4 = Input("INP4")
inp5 = Input("INP5")
out2 = Output(None, "OUT2")
mac_sfg.input(0).connect(inp4, "S8")
mac_sfg.input(1).connect(inp5, "S9")
out2.input(0).connect(mac_sfg.outputs[0], "S10")
test_sfg = SFG(inputs=[inp4, inp5], outputs=[out2])
assert test_sfg.evaluate(1, 2) == 9
mac_sfg.connect_external_signals_to_components()
assert test_sfg.evaluate(1, 2) == 9
assert not test_sfg.connect_external_signals_to_components()
def test_connect_external_signals_to_components_operation_tree(self, operation_tree):
""" Replaces an SFG with only a operation_tree component with its inner components """
sfg1 = SFG(outputs=[Output(operation_tree)])
out1 = Output(None, "OUT1")
out1.input(0).connect(sfg1.outputs[0], "S1")
test_sfg = SFG(outputs=[out1])
assert test_sfg.evaluate_output(0, []) == 5
sfg1.connect_external_signals_to_components()
assert test_sfg.evaluate_output(0, []) == 5
assert not test_sfg.connect_external_signals_to_components()
def test_connect_external_signals_to_components_large_operation_tree(self, large_operation_tree):
""" Replaces an SFG with only a large_operation_tree component with its inner components """
sfg1 = SFG(outputs=[Output(large_operation_tree)])
out1 = Output(None, "OUT1")
out1.input(0).connect(sfg1.outputs[0], "S1")
test_sfg = SFG(outputs=[out1])
assert test_sfg.evaluate_output(0, []) == 14
sfg1.connect_external_signals_to_components()
assert test_sfg.evaluate_output(0, []) == 14
assert not test_sfg.connect_external_signals_to_components()
class TestConnectExternalSignalsToComponentsMultipleComp:
def test_connect_external_signals_to_components_operation_tree(self, operation_tree):
""" Replaces a operation_tree in an SFG with other components """
sfg1 = SFG(outputs=[Output(operation_tree)])
inp1 = Input("INP1")
inp2 = Input("INP2")
out1 = Output(None, "OUT1")
add1 = Addition(None, None, "ADD1")
add2 = Addition(None, None, "ADD2")
add1.input(0).connect(inp1, "S1")
add1.input(1).connect(inp2, "S2")
add2.input(0).connect(add1, "S3")
add2.input(1).connect(sfg1.outputs[0], "S4")
out1.input(0).connect(add2, "S5")
test_sfg = SFG(inputs=[inp1, inp2], outputs=[out1])
assert test_sfg.evaluate(1, 2) == 8
sfg1.connect_external_signals_to_components()
assert test_sfg.evaluate(1, 2) == 8
assert not test_sfg.connect_external_signals_to_components()
def test_connect_external_signals_to_components_large_operation_tree(self, large_operation_tree):
""" Replaces a large_operation_tree in an SFG with other components """
sfg1 = SFG(outputs=[Output(large_operation_tree)])
inp1 = Input("INP1")
inp2 = Input("INP2")
out1 = Output(None, "OUT1")
add1 = Addition(None, None, "ADD1")
add2 = Addition(None, None, "ADD2")
add1.input(0).connect(inp1, "S1")
add1.input(1).connect(inp2, "S2")
add2.input(0).connect(add1, "S3")
add2.input(1).connect(sfg1.outputs[0], "S4")
out1.input(0).connect(add2, "S5")
test_sfg = SFG(inputs=[inp1, inp2], outputs=[out1])
assert test_sfg.evaluate(1, 2) == 17
sfg1.connect_external_signals_to_components()
assert test_sfg.evaluate(1, 2) == 17
assert not test_sfg.connect_external_signals_to_components()
def create_sfg(self, op_tree):
""" Create a simple SFG with either operation_tree or large_operation_tree """
sfg1 = SFG(outputs=[Output(op_tree)])
inp1 = Input("INP1")
inp2 = Input("INP2")
out1 = Output(None, "OUT1")
add1 = Addition(None, None, "ADD1")
add2 = Addition(None, None, "ADD2")
add1.input(0).connect(inp1, "S1")
add1.input(1).connect(inp2, "S2")
add2.input(0).connect(add1, "S3")
add2.input(1).connect(sfg1.outputs[0], "S4")
out1.input(0).connect(add2, "S5")
return SFG(inputs=[inp1, inp2], outputs=[out1])
def test_connect_external_signals_to_components_many_op(self, large_operation_tree):
""" Replaces an sfg component in a larger SFG with several component operations """
inp1 = Input("INP1")
inp2 = Input("INP2")
inp3 = Input("INP3")
inp4 = Input("INP4")
out1 = Output(None, "OUT1")
add1 = Addition(None, None, "ADD1")
sub1 = Subtraction(None, None, "SUB1")
add1.input(0).connect(inp1, "S1")
add1.input(1).connect(inp2, "S2")
sfg1 = self.create_sfg(large_operation_tree)
sfg1.input(0).connect(add1, "S3")
sfg1.input(1).connect(inp3, "S4")
sub1.input(0).connect(sfg1.outputs[0], "S5")
sub1.input(1).connect(inp4, "S6")
out1.input(0).connect(sub1, "S7")
test_sfg = SFG(inputs=[inp1, inp2, inp3, inp4], outputs=[out1])
assert test_sfg.evaluate(1, 2, 3, 4) == 16
sfg1.connect_external_signals_to_components()
assert test_sfg.evaluate(1, 2, 3, 4) == 16
assert not test_sfg.connect_external_signals_to_components()
class TestTopologicalOrderOperations:
def test_feedback_sfg(self, sfg_simple_filter):
topological_order = sfg_simple_filter.get_operations_topological_order()
assert [comp.name for comp in topological_order] == ["IN1", "ADD1", "T1", "CMUL1", "OUT1"]
def test_multiple_independent_inputs(self, sfg_two_inputs_two_outputs_independent):
topological_order = sfg_two_inputs_two_outputs_independent.get_operations_topological_order()
assert [comp.name for comp in topological_order] == ["IN1", "OUT1", "IN2", "C1", "ADD1", "OUT2"]
def test_complex_graph(self, precedence_sfg_delays):
topological_order = precedence_sfg_delays.get_operations_topological_order()
assert [comp.name for comp in topological_order] == \
['IN1', 'C0', 'ADD1', 'Q1', 'A0', 'T1', 'B1', 'A1', 'T2', 'B2', 'ADD2', 'A2', 'ADD3', 'ADD4', 'OUT1']
class TestRemove:
def test_remove_single_input_outputs(self, sfg_simple_filter):
new_sfg = sfg_simple_filter.remove_operation("cmul1")
assert set(op.name for op in sfg_simple_filter.find_by_name("T1")[0].subsequent_operations) == {"CMUL1", "OUT1"}
assert set(op.name for op in new_sfg.find_by_name("T1")[0].subsequent_operations) == {"ADD1", "OUT1"}
assert set(op.name for op in sfg_simple_filter.find_by_name("ADD1")[0].preceding_operations) == {"CMUL1", "IN1"}
assert set(op.name for op in new_sfg.find_by_name("ADD1")[0].preceding_operations) == {"T1", "IN1"}
assert "S1" in set([sig.name for sig in sfg_simple_filter.find_by_name("T1")[0].output(0).signals])
assert "S2" in set([sig.name for sig in new_sfg.find_by_name("T1")[0].output(0).signals])
def test_remove_multiple_inputs_outputs(self, butterfly_operation_tree):
out1 = Output(butterfly_operation_tree.output(0), "OUT1")
out2 = Output(butterfly_operation_tree.output(1), "OUT2")
sfg = SFG(outputs=[out1, out2])
new_sfg = sfg.remove_operation(sfg.find_by_name("bfly2")[0].graph_id)
assert sfg.find_by_name("bfly3")[0].output(0).signal_count == 1
assert new_sfg.find_by_name("bfly3")[0].output(0).signal_count == 1
sfg_dest_0 = sfg.find_by_name("bfly3")[0].output(0).signals[0].destination
new_sfg_dest_0 = new_sfg.find_by_name("bfly3")[0].output(0).signals[0].destination
assert sfg_dest_0.index == 0
assert new_sfg_dest_0.index == 0
assert sfg_dest_0.operation.name == "bfly2"
assert new_sfg_dest_0.operation.name == "bfly1"
assert sfg.find_by_name("bfly3")[0].output(1).signal_count == 1
assert new_sfg.find_by_name("bfly3")[0].output(1).signal_count == 1
sfg_dest_1 = sfg.find_by_name("bfly3")[0].output(1).signals[0].destination
new_sfg_dest_1 = new_sfg.find_by_name("bfly3")[0].output(1).signals[0].destination
assert sfg_dest_1.index == 1
assert new_sfg_dest_1.index == 1
assert sfg_dest_1.operation.name == "bfly2"
assert new_sfg_dest_1.operation.name == "bfly1"
assert sfg.find_by_name("bfly1")[0].input(0).signal_count == 1
assert new_sfg.find_by_name("bfly1")[0].input(0).signal_count == 1
sfg_source_0 = sfg.find_by_name("bfly1")[0].input(0).signals[0].source
new_sfg_source_0 = new_sfg.find_by_name("bfly1")[0].input(0).signals[0].source
assert sfg_source_0.index == 0
assert new_sfg_source_0.index == 0
assert sfg_source_0.operation.name == "bfly2"
assert new_sfg_source_0.operation.name == "bfly3"
sfg_source_1 = sfg.find_by_name("bfly1")[0].input(1).signals[0].source
new_sfg_source_1 = new_sfg.find_by_name("bfly1")[0].input(1).signals[0].source
assert sfg_source_1.index == 1
assert new_sfg_source_1.index == 1
assert sfg_source_1.operation.name == "bfly2"
assert new_sfg_source_1.operation.name == "bfly3"
assert "bfly2" not in set(op.name for op in new_sfg.operations)
def remove_different_number_inputs_outputs(self, sfg_simple_filter):
with pytest.raises(ValueError):
sfg_simple_filter.remove_operation("add1")
class TestSaveLoadSFG:
def get_path(self, existing=False):
path_ = "".join(random.choices(string.ascii_uppercase, k=4)) + ".py"
while path.exists(path_) if not existing else not path.exists(path_):
path_ = "".join(random.choices(string.ascii_uppercase, k=4)) + ".py"
return path_
def test_save_simple_sfg(self, sfg_simple_filter):
result = sfg_to_python(sfg_simple_filter)
path_ = self.get_path()
assert not path.exists(path_)
with open(path_, "w") as file_obj:
file_obj.write(result)
assert path.exists(path_)
with open(path_, "r") as file_obj:
assert file_obj.read() == result
remove(path_)
def test_save_complex_sfg(self, precedence_sfg_delays_and_constants):
result = sfg_to_python(precedence_sfg_delays_and_constants)
path_ = self.get_path()
assert not path.exists(path_)
with open(path_, "w") as file_obj:
file_obj.write(result)
assert path.exists(path_)
with open(path_, "r") as file_obj:
assert file_obj.read() == result
remove(path_)
def test_load_simple_sfg(self, sfg_simple_filter):
result = sfg_to_python(sfg_simple_filter)
path_ = self.get_path()
assert not path.exists(path_)
with open(path_, "w") as file_obj:
file_obj.write(result)
assert path.exists(path_)
simple_filter_, _ = python_to_sfg(path_)
assert str(sfg_simple_filter) == str(simple_filter_)
assert sfg_simple_filter.evaluate([2]) == simple_filter_.evaluate([2])
remove(path_)
def test_load_complex_sfg(self, precedence_sfg_delays_and_constants):
result = sfg_to_python(precedence_sfg_delays_and_constants)
path_ = self.get_path()
assert not path.exists(path_)
with open(path_, "w") as file_obj:
file_obj.write(result)
assert path.exists(path_)
precedence_sfg_registers_and_constants_, _ = python_to_sfg(path_)
assert str(precedence_sfg_delays_and_constants) == str(precedence_sfg_registers_and_constants_)
remove(path_)
def test_load_invalid_path(self):
path_ = self.get_path(existing=False)
with pytest.raises(Exception):
python_to_sfg(path_)
class TestGetComponentsOfType:
def test_get_no_operations_of_type(self, sfg_two_inputs_two_outputs):
assert [op.name for op in sfg_two_inputs_two_outputs.get_components_with_type_name(Multiplication.type_name())] \
== []
def test_get_multple_operations_of_type(self, sfg_two_inputs_two_outputs):
assert [op.name for op in sfg_two_inputs_two_outputs.get_components_with_type_name(Addition.type_name())] \
== ["ADD1", "ADD2"]
assert [op.name for op in sfg_two_inputs_two_outputs.get_components_with_type_name(Input.type_name())] \
== ["IN1", "IN2"]
assert [op.name for op in sfg_two_inputs_two_outputs.get_components_with_type_name(Output.type_name())] \
== ["OUT1", "OUT2"]
......@@ -2,14 +2,14 @@
B-ASIC test suit for the signal module which consists of the Signal class.
"""
from b_asic.port import InputPort, OutputPort
from b_asic.signal import Signal
import pytest
from b_asic import InputPort, OutputPort, Signal
def test_signal_creation_and_disconnction_and_connection_changing():
in_port = InputPort(0, None)
out_port = OutputPort(1, None)
in_port = InputPort(None, 0)
out_port = OutputPort(None, 1)
s = Signal(out_port, in_port)
assert in_port.signals == [s]
......@@ -17,7 +17,7 @@ def test_signal_creation_and_disconnction_and_connection_changing():
assert s.source is out_port
assert s.destination is in_port
in_port1 = InputPort(0, None)
in_port1 = InputPort(None, 0)
s.set_destination(in_port1)
assert in_port.signals == []
......@@ -40,7 +40,7 @@ def test_signal_creation_and_disconnction_and_connection_changing():
assert s.source is None
assert s.destination is None
out_port1 = OutputPort(0, None)
out_port1 = OutputPort(None, 0)
s.set_source(out_port1)
assert out_port1.signals == [s]
......@@ -60,3 +60,29 @@ def test_signal_creation_and_disconnction_and_connection_changing():
assert in_port.signals == [s]
assert s.source is out_port
assert s.destination is in_port
class Bits:
def test_pos_int(self, signal):
signal.bits = 10
assert signal.bits == 10
def test_bits_zero(self, signal):
signal.bits = 0
assert signal.bits == 0
def test_bits_neg_int(self, signal):
with pytest.raises(Exception):
signal.bits = -10
def test_bits_complex(self, signal):
with pytest.raises(Exception):
signal.bits = (2+4j)
def test_bits_float(self, signal):
with pytest.raises(Exception):
signal.bits = 3.2
def test_bits_pos_then_none(self, signal):
signal.bits = 10
signal.bits = None
assert signal.bits is None
\ No newline at end of file
import pytest
import numpy as np
from b_asic import SFG, Output, Simulation
class TestRunFor:
def test_with_lambdas_as_input(self, sfg_two_inputs_two_outputs):
simulation = Simulation(sfg_two_inputs_two_outputs, [lambda n: n + 3, lambda n: 1 + n * 2])
output = simulation.run_for(101, save_results = True)
assert output[0] == 304
assert output[1] == 505
assert simulation.results["0"][100] == 304
assert simulation.results["1"][100] == 505
assert simulation.results["in1"][0] == 3
assert simulation.results["in2"][0] == 1
assert simulation.results["add1"][0] == 4
assert simulation.results["add2"][0] == 5
assert simulation.results["0"][0] == 4
assert simulation.results["1"][0] == 5
assert simulation.results["in1"][1] == 4
assert simulation.results["in2"][1] == 3
assert simulation.results["add1"][1] == 7
assert simulation.results["add2"][1] == 10
assert simulation.results["0"][1] == 7
assert simulation.results["1"][1] == 10
assert simulation.results["in1"][2] == 5
assert simulation.results["in2"][2] == 5
assert simulation.results["add1"][2] == 10
assert simulation.results["add2"][2] == 15
assert simulation.results["0"][2] == 10
assert simulation.results["1"][2] == 15
assert simulation.results["in1"][3] == 6
assert simulation.results["in2"][3] == 7
assert simulation.results["add1"][3] == 13
assert simulation.results["add2"][3] == 20
assert simulation.results["0"][3] == 13
assert simulation.results["1"][3] == 20
def test_with_numpy_arrays_as_input(self, sfg_two_inputs_two_outputs):
input0 = np.array([5, 9, 25, -5, 7])
input1 = np.array([7, 3, 3, 54, 2])
simulation = Simulation(sfg_two_inputs_two_outputs, [input0, input1])
output = simulation.run_for(5, save_results = True)
assert output[0] == 9
assert output[1] == 11
assert isinstance(simulation.results["in1"], np.ndarray)
assert isinstance(simulation.results["in2"], np.ndarray)
assert isinstance(simulation.results["add1"], np.ndarray)
assert isinstance(simulation.results["add2"], np.ndarray)
assert isinstance(simulation.results["0"], np.ndarray)
assert isinstance(simulation.results["1"], np.ndarray)
assert simulation.results["in1"][0] == 5
assert simulation.results["in2"][0] == 7
assert simulation.results["add1"][0] == 12
assert simulation.results["add2"][0] == 19
assert simulation.results["0"][0] == 12
assert simulation.results["1"][0] == 19
assert simulation.results["in1"][1] == 9
assert simulation.results["in2"][1] == 3
assert simulation.results["add1"][1] == 12
assert simulation.results["add2"][1] == 15
assert simulation.results["0"][1] == 12
assert simulation.results["1"][1] == 15
assert simulation.results["in1"][2] == 25
assert simulation.results["in2"][2] == 3
assert simulation.results["add1"][2] == 28
assert simulation.results["add2"][2] == 31
assert simulation.results["0"][2] == 28
assert simulation.results["1"][2] == 31
assert simulation.results["in1"][3] == -5
assert simulation.results["in2"][3] == 54
assert simulation.results["add1"][3] == 49
assert simulation.results["add2"][3] == 103
assert simulation.results["0"][3] == 49
assert simulation.results["1"][3] == 103
assert simulation.results["0"][4] == 9
assert simulation.results["1"][4] == 11
def test_with_numpy_array_overflow(self, sfg_two_inputs_two_outputs):
input0 = np.array([5, 9, 25, -5, 7])
input1 = np.array([7, 3, 3, 54, 2])
simulation = Simulation(sfg_two_inputs_two_outputs, [input0, input1])
simulation.run_for(5)
with pytest.raises(IndexError):
simulation.step()
def test_run_whole_numpy_array(self, sfg_two_inputs_two_outputs):
input0 = np.array([5, 9, 25, -5, 7])
input1 = np.array([7, 3, 3, 54, 2])
simulation = Simulation(sfg_two_inputs_two_outputs, [input0, input1])
simulation.run()
assert len(simulation.results["0"]) == 5
assert len(simulation.results["1"]) == 5
with pytest.raises(IndexError):
simulation.step()
def test_delay(self, sfg_delay):
simulation = Simulation(sfg_delay)
simulation.set_input(0, [5, -2, 25, -6, 7, 0])
simulation.run_for(6, save_results = True)
assert simulation.results["0"][0] == 0
assert simulation.results["0"][1] == 5
assert simulation.results["0"][2] == -2
assert simulation.results["0"][3] == 25
assert simulation.results["0"][4] == -6
assert simulation.results["0"][5] == 7
def test_find_result_key(self, precedence_sfg_delays):
sim = Simulation(precedence_sfg_delays, [[0, 4, 542, 42, 31.314, 534.123, -453415, 5431]])
sim.run()
assert sim.results[precedence_sfg_delays.find_result_keys_by_name("ADD2")[0]][4] == 31220
assert sim.results[precedence_sfg_delays.find_result_keys_by_name("A1")[0]][2] == 80
class TestRun:
def test_save_results(self, sfg_two_inputs_two_outputs):
simulation = Simulation(sfg_two_inputs_two_outputs, [2, 3])
assert not simulation.results
simulation.run_for(10, save_results = False)
assert not simulation.results
simulation.run_for(10)
assert len(simulation.results["0"]) == 10
assert len(simulation.results["1"]) == 10
simulation.run_for(10, save_results = True)
assert len(simulation.results["0"]) == 20
assert len(simulation.results["1"]) == 20
simulation.run_for(10, save_results = False)
assert len(simulation.results["0"]) == 20
assert len(simulation.results["1"]) == 20
simulation.run_for(13, save_results = True)
assert len(simulation.results["0"]) == 33
assert len(simulation.results["1"]) == 33
simulation.step(save_results = False)
assert len(simulation.results["0"]) == 33
assert len(simulation.results["1"]) == 33
simulation.step()
assert len(simulation.results["0"]) == 34
assert len(simulation.results["1"]) == 34
simulation.clear_results()
assert not simulation.results
def test_nested(self, sfg_nested):
input0 = np.array([5, 9])
input1 = np.array([7, 3])
simulation = Simulation(sfg_nested, [input0, input1])
output0 = simulation.step()
output1 = simulation.step()
assert output0[0] == 11405
assert output1[0] == 4221
def test_accumulator(self, sfg_accumulator):
data_in = np.array([5, -2, 25, -6, 7, 0])
reset = np.array([0, 0, 0, 1, 0, 0])
simulation = Simulation(sfg_accumulator, [data_in, reset])
output0 = simulation.step()
output1 = simulation.step()
output2 = simulation.step()
output3 = simulation.step()
output4 = simulation.step()
output5 = simulation.step()
assert output0[0] == 0
assert output1[0] == 5
assert output2[0] == 3
assert output3[0] == 28
assert output4[0] == 0
assert output5[0] == 7
def test_simple_accumulator(self, sfg_simple_accumulator):
data_in = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
simulation = Simulation(sfg_simple_accumulator, [data_in])
simulation.run()
assert list(simulation.results["0"]) == [0, 1, 3, 6, 10, 15, 21, 28, 36, 45]
def test_simple_filter(self, sfg_simple_filter):
input0 = np.array([1, 2, 3, 4, 5])
simulation = Simulation(sfg_simple_filter, [input0])
simulation.run_for(len(input0), save_results = True)
assert all(simulation.results["0"] == np.array([0, 1.0, 2.5, 4.25, 6.125]))
def test_custom_operation(self, sfg_custom_operation):
simulation = Simulation(sfg_custom_operation, [lambda n: n + 1])
simulation.run_for(5)
assert all(simulation.results["0"] == np.array([2, 4, 6, 8, 10]))
assert all(simulation.results["1"] == np.array([2, 4, 8, 16, 32]))