Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • da/B-ASIC
  • lukja239/B-ASIC
  • robal695/B-ASIC
3 results
Show changes
import pytest
from b_asic.port import InputPort, OutputPort
from b_asic import InputPort, OutputPort
@pytest.fixture
def input_port():
return InputPort(0, None)
return InputPort(None, 0)
@pytest.fixture
def output_port():
return OutputPort(0, None)
return OutputPort(None, 0)
@pytest.fixture
def list_of_input_ports():
return [InputPort(None, i) for i in range(0, 3)]
@pytest.fixture
def list_of_output_ports():
return [OutputPort(None, i) for i in range(0, 3)]
import pytest
from b_asic import Signal
@pytest.fixture
def signal():
"""Return a signal with no connections."""
......@@ -9,4 +11,4 @@ def signal():
@pytest.fixture
def signals():
"""Return 3 signals with no connections."""
return [Signal() for _ in range(0,3)]
return [Signal() for _ in range(0, 3)]
import pytest
from b_asic import SFG, Input, Output, Constant, Register, ConstantMultiplication, Addition, Butterfly
@pytest.fixture
def sfg_two_inputs_two_outputs():
"""Valid SFG with two inputs and two outputs.
. .
in1-------+ +--------->out1
. | | .
. v | .
. add1+--+ .
. ^ | .
. | v .
in2+------+ add2---->out2
| . ^ .
| . | .
+------------+ .
. .
out1 = in1 + in2
out2 = in1 + 2 * in2
"""
in1 = Input()
in2 = Input()
add1 = in1 + in2
add2 = add1 + in2
out1 = Output(add1)
out2 = Output(add2)
return SFG(inputs=[in1, in2], outputs=[out1, out2])
@pytest.fixture
def sfg_two_inputs_two_outputs_independent():
"""Valid SFG with two inputs and two outputs, where the first output only depends
on the first input and the second output only depends on the second input.
. .
in1-------------------->out1
. .
. .
. c1--+ .
. | .
. v .
in2------+ add1---->out2
. | ^ .
. | | .
. +------+ .
. .
out1 = in1
out2 = in2 + 3
"""
in1 = Input("IN1")
in2 = Input("IN2")
c1 = Constant(3, "C1")
add1 = Addition(in2, c1, "ADD1")
out1 = Output(in1, "OUT1")
out2 = Output(add1, "OUT2")
return SFG(inputs=[in1, in2], outputs=[out1, out2])
@pytest.fixture
def sfg_nested():
"""Valid SFG with two inputs and one output.
out1 = in1 + (in1 + in1 * in2) * (in1 + in2 * (in1 + in1 * in2))
"""
mac_in1 = Input()
mac_in2 = Input()
mac_in3 = Input()
mac_out1 = Output(mac_in1 + mac_in2 * mac_in3)
MAC = SFG(inputs=[mac_in1, mac_in2, mac_in3], outputs=[mac_out1])
in1 = Input()
in2 = Input()
mac1 = MAC(in1, in1, in2)
mac2 = MAC(in1, in2, mac1)
mac3 = MAC(in1, mac1, mac2)
out1 = Output(mac3)
return SFG(inputs=[in1, in2], outputs=[out1])
@pytest.fixture
def sfg_delay():
"""Valid SFG with one input and one output.
out1 = in1'
"""
in1 = Input()
reg1 = Register(in1)
out1 = Output(reg1)
return SFG(inputs=[in1], outputs=[out1])
@pytest.fixture
def sfg_simple_accumulator():
"""Valid SFG with one input and one output. With feedback. Explained in lecture 5.
in>--->add>-------o--->out
^ |
| |
|--<T<-----
"""
data_in = Input()
reg = Register()
add = data_in + reg
reg.input(0).connect(add)
data_out = Output(add)
return SFG(inputs=[data_in], outputs=[data_out], name="simple_accumulator")
@pytest.fixture
def sfg_unfolded_simple_accumulator():
"""Valid unfolded simple accumulator SFG with two inputs and two outputs. With feedback. Explained in lecture 5.
in(2n)>----->add>---o------->out(2n)
^ |
| |
^ |
T<----|----
| |
|------ |
v ^
in(2n+1)>--->add> ------o--->out(2n+1)
"""
in_2n = Input()
in_2n1 = Input()
add_upper = Addition()
add_lower = in_2n1 + add_upper
reg = Register(add_lower)
add_upper.input(0).connect(in_2n)
add_upper.input(1).connect(reg)
out_2n = Output(add_upper)
out_2n1 = Output(add_lower)
return SFG(inputs=[in_2n, in_2n1], outputs=[out_2n, out_2n1], name="unfolded_simple_accumulator")
@pytest.fixture
def sfg_accumulator():
"""Valid SFG with two inputs and one output.
data_out = (data_in' + data_in) * (1 - reset)
"""
data_in = Input()
reset = Input()
reg = Register()
reg.input(0).connect((reg + data_in) * (1 - reset))
data_out = Output(reg)
return SFG(inputs=[data_in, reset], outputs=[data_out])
@pytest.fixture
def simple_filter():
"""A valid SFG that is used as a filter in the first lab for TSTE87.
+----<constmul1----+
| |
| |
in1>------add1>------reg>------+------out1>
"""
in1 = Input("IN1")
constmul1 = ConstantMultiplication(0.5, name="CMUL1")
add1 = Addition(in1, constmul1, "ADD1")
add1.input(1).signals[0].name = "S2"
reg = Register(add1, name="REG1")
constmul1.input(0).connect(reg, "S1")
out1 = Output(reg, "OUT1")
return SFG(inputs=[in1], outputs=[out1], name="simple_filter")
@pytest.fixture
def precedence_sfg_registers():
"""A sfg with registers and interesting layout for precednce list generation.
IN1>--->C0>--->ADD1>--->Q1>---+--->A0>--->ADD4>--->OUT1
^ | ^
| T1 |
| | |
ADD2<---<B1<---+--->A1>--->ADD3
^ | ^
| T2 |
| | |
+-----<B2<---+--->A2>-----+
"""
in1 = Input("IN1")
c0 = ConstantMultiplication(5, in1, "C0")
add1 = Addition(c0, None, "ADD1")
# Not sure what operation "Q" is supposed to be in the example
Q1 = ConstantMultiplication(1, add1, "Q1")
T1 = Register(Q1, 0, "T1")
T2 = Register(T1, 0, "T2")
b2 = ConstantMultiplication(2, T2, "B2")
b1 = ConstantMultiplication(3, T1, "B1")
add2 = Addition(b1, b2, "ADD2")
add1.input(1).connect(add2)
a1 = ConstantMultiplication(4, T1, "A1")
a2 = ConstantMultiplication(6, T2, "A2")
add3 = Addition(a1, a2, "ADD3")
a0 = ConstantMultiplication(7, Q1, "A0")
add4 = Addition(a0, add3, "ADD4")
out1 = Output(add4, "OUT1")
return SFG(inputs=[in1], outputs=[out1], name="SFG")
@pytest.fixture
def precedence_sfg_registers_and_constants():
in1 = Input("IN1")
c0 = ConstantMultiplication(5, in1, "C0")
add1 = Addition(c0, None, "ADD1")
# Not sure what operation "Q" is supposed to be in the example
Q1 = ConstantMultiplication(1, add1, "Q1")
T1 = Register(Q1, 0, "T1")
const1 = Constant(10, "CONST1") # Replace T2 register with a constant
b2 = ConstantMultiplication(2, const1, "B2")
b1 = ConstantMultiplication(3, T1, "B1")
add2 = Addition(b1, b2, "ADD2")
add1.input(1).connect(add2)
a1 = ConstantMultiplication(4, T1, "A1")
a2 = ConstantMultiplication(10, const1, "A2")
add3 = Addition(a1, a2, "ADD3")
a0 = ConstantMultiplication(7, Q1, "A0")
# Replace ADD4 with a butterfly to test multiple output ports
bfly1 = Butterfly(a0, add3, "BFLY1")
out1 = Output(bfly1.output(0), "OUT1")
out2 = Output(bfly1.output(1), "OUT2")
return SFG(inputs=[in1], outputs=[out1, out2], name="SFG")
......@@ -2,11 +2,10 @@
B-ASIC test suite for the AbstractOperation class.
"""
from b_asic.core_operations import Addition, ConstantAddition, Subtraction, ConstantSubtraction, \
Multiplication, ConstantMultiplication, Division, ConstantDivision
import pytest
from b_asic import Addition, Subtraction, Multiplication, ConstantMultiplication, Division
def test_addition_overload():
"""Tests addition overloading for both operation and number argument."""
......@@ -14,15 +13,19 @@ def test_addition_overload():
add2 = Addition(None, None, "add2")
add3 = add1 + add2
assert isinstance(add3, Addition)
assert add3.input(0).signals == add1.output(0).signals
assert add3.input(1).signals == add2.output(0).signals
add4 = add3 + 5
assert isinstance(add4, ConstantAddition)
assert isinstance(add4, Addition)
assert add4.input(0).signals == add3.output(0).signals
assert add4.input(1).signals[0].source.operation.value == 5
add5 = 5 + add4
assert isinstance(add5, Addition)
assert add5.input(0).signals[0].source.operation.value == 5
assert add5.input(1).signals == add4.output(0).signals
def test_subtraction_overload():
......@@ -31,15 +34,19 @@ def test_subtraction_overload():
add2 = Addition(None, None, "add2")
sub1 = add1 - add2
assert isinstance(sub1, Subtraction)
assert sub1.input(0).signals == add1.output(0).signals
assert sub1.input(1).signals == add2.output(0).signals
sub2 = sub1 - 5
assert isinstance(sub2, ConstantSubtraction)
assert isinstance(sub2, Subtraction)
assert sub2.input(0).signals == sub1.output(0).signals
assert sub2.input(1).signals[0].source.operation.value == 5
sub3 = 5 - sub2
assert isinstance(sub3, Subtraction)
assert sub3.input(0).signals[0].source.operation.value == 5
assert sub3.input(1).signals == sub2.output(0).signals
def test_multiplication_overload():
......@@ -48,15 +55,19 @@ def test_multiplication_overload():
add2 = Addition(None, None, "add2")
mul1 = add1 * add2
assert isinstance(mul1, Multiplication)
assert mul1.input(0).signals == add1.output(0).signals
assert mul1.input(1).signals == add2.output(0).signals
mul2 = mul1 * 5
assert isinstance(mul2, ConstantMultiplication)
assert mul2.input(0).signals == mul1.output(0).signals
assert mul2.value == 5
mul3 = 5 * mul2
assert isinstance(mul3, ConstantMultiplication)
assert mul3.input(0).signals == mul2.output(0).signals
assert mul3.value == 5
def test_division_overload():
......@@ -65,13 +76,16 @@ def test_division_overload():
add2 = Addition(None, None, "add2")
div1 = add1 / add2
assert isinstance(div1, Division)
assert div1.input(0).signals == add1.output(0).signals
assert div1.input(1).signals == add2.output(0).signals
div2 = div1 / 5
assert isinstance(div2, ConstantDivision)
assert isinstance(div2, Division)
assert div2.input(0).signals == div1.output(0).signals
assert div2.input(1).signals[0].source.operation.value == 5
div3 = 5 / div2
assert isinstance(div3, Division)
assert div3.input(0).signals[0].source.operation.value == 5
assert div3.input(1).signals == div2.output(0).signals
......@@ -2,226 +2,175 @@
B-ASIC test suite for the core operations.
"""
from b_asic.core_operations import Constant, Addition, Subtraction, Multiplication, Division, SquareRoot, ComplexConjugate, Max, Min, Absolute, ConstantMultiplication, ConstantAddition, ConstantSubtraction, ConstantDivision
# Constant tests.
def test_constant():
constant_operation = Constant(3)
assert constant_operation.evaluate() == 3
def test_constant_negative():
constant_operation = Constant(-3)
assert constant_operation.evaluate() == -3
def test_constant_complex():
constant_operation = Constant(3+4j)
assert constant_operation.evaluate() == 3+4j
# Addition tests.
def test_addition():
test_operation = Addition()
constant_operation = Constant(3)
constant_operation_2 = Constant(5)
assert test_operation.evaluate(constant_operation.evaluate(), constant_operation_2.evaluate()) == 8
def test_addition_negative():
test_operation = Addition()
constant_operation = Constant(-3)
constant_operation_2 = Constant(-5)
assert test_operation.evaluate(constant_operation.evaluate(), constant_operation_2.evaluate()) == -8
def test_addition_complex():
test_operation = Addition()
constant_operation = Constant((3+5j))
constant_operation_2 = Constant((4+6j))
assert test_operation.evaluate(constant_operation.evaluate(), constant_operation_2.evaluate()) == (7+11j)
# Subtraction tests.
def test_subtraction():
test_operation = Subtraction()
constant_operation = Constant(5)
constant_operation_2 = Constant(3)
assert test_operation.evaluate(constant_operation.evaluate(), constant_operation_2.evaluate()) == 2
def test_subtraction_negative():
test_operation = Subtraction()
constant_operation = Constant(-5)
constant_operation_2 = Constant(-3)
assert test_operation.evaluate(constant_operation.evaluate(), constant_operation_2.evaluate()) == -2
def test_subtraction_complex():
test_operation = Subtraction()
constant_operation = Constant((3+5j))
constant_operation_2 = Constant((4+6j))
assert test_operation.evaluate(constant_operation.evaluate(), constant_operation_2.evaluate()) == (-1-1j)
# Multiplication tests.
def test_multiplication():
test_operation = Multiplication()
constant_operation = Constant(5)
constant_operation_2 = Constant(3)
assert test_operation.evaluate(constant_operation.evaluate(), constant_operation_2.evaluate()) == 15
def test_multiplication_negative():
test_operation = Multiplication()
constant_operation = Constant(-5)
constant_operation_2 = Constant(-3)
assert test_operation.evaluate(constant_operation.evaluate(), constant_operation_2.evaluate()) == 15
def test_multiplication_complex():
test_operation = Multiplication()
constant_operation = Constant((3+5j))
constant_operation_2 = Constant((4+6j))
assert test_operation.evaluate(constant_operation.evaluate(), constant_operation_2.evaluate()) == (-18+38j)
# Division tests.
def test_division():
test_operation = Division()
constant_operation = Constant(30)
constant_operation_2 = Constant(5)
assert test_operation.evaluate(constant_operation.evaluate(), constant_operation_2.evaluate()) == 6
def test_division_negative():
test_operation = Division()
constant_operation = Constant(-30)
constant_operation_2 = Constant(-5)
assert test_operation.evaluate(constant_operation.evaluate(), constant_operation_2.evaluate()) == 6
def test_division_complex():
test_operation = Division()
constant_operation = Constant((60+40j))
constant_operation_2 = Constant((10+20j))
assert test_operation.evaluate(constant_operation.evaluate(), constant_operation_2.evaluate()) == (2.8-1.6j)
# SquareRoot tests.
def test_squareroot():
test_operation = SquareRoot()
constant_operation = Constant(36)
assert test_operation.evaluate(constant_operation.evaluate()) == 6
def test_squareroot_negative():
test_operation = SquareRoot()
constant_operation = Constant(-36)
assert test_operation.evaluate(constant_operation.evaluate()) == 6j
def test_squareroot_complex():
test_operation = SquareRoot()
constant_operation = Constant((48+64j))
assert test_operation.evaluate(constant_operation.evaluate()) == (8+4j)
# ComplexConjugate tests.
def test_complexconjugate():
test_operation = ComplexConjugate()
constant_operation = Constant(3+4j)
assert test_operation.evaluate(constant_operation.evaluate()) == (3-4j)
def test_test_complexconjugate_negative():
test_operation = ComplexConjugate()
constant_operation = Constant(-3-4j)
assert test_operation.evaluate(constant_operation.evaluate()) == (-3+4j)
# Max tests.
def test_max():
test_operation = Max()
constant_operation = Constant(30)
constant_operation_2 = Constant(5)
assert test_operation.evaluate(constant_operation.evaluate(), constant_operation_2.evaluate()) == 30
def test_max_negative():
test_operation = Max()
constant_operation = Constant(-30)
constant_operation_2 = Constant(-5)
assert test_operation.evaluate(constant_operation.evaluate(), constant_operation_2.evaluate()) == -5
# Min tests.
def test_min():
test_operation = Min()
constant_operation = Constant(30)
constant_operation_2 = Constant(5)
assert test_operation.evaluate(constant_operation.evaluate(), constant_operation_2.evaluate()) == 5
def test_min_negative():
test_operation = Min()
constant_operation = Constant(-30)
constant_operation_2 = Constant(-5)
assert test_operation.evaluate(constant_operation.evaluate(), constant_operation_2.evaluate()) == -30
# Absolute tests.
def test_absolute():
test_operation = Absolute()
constant_operation = Constant(30)
assert test_operation.evaluate(constant_operation.evaluate()) == 30
def test_absolute_negative():
test_operation = Absolute()
constant_operation = Constant(-5)
assert test_operation.evaluate(constant_operation.evaluate()) == 5
def test_absolute_complex():
test_operation = Absolute()
constant_operation = Constant((3+4j))
assert test_operation.evaluate(constant_operation.evaluate()) == 5.0
# ConstantMultiplication tests.
def test_constantmultiplication():
test_operation = ConstantMultiplication(5)
constant_operation = Constant(20)
assert test_operation.evaluate(constant_operation.evaluate()) == 100
def test_constantmultiplication_negative():
test_operation = ConstantMultiplication(5)
constant_operation = Constant(-5)
assert test_operation.evaluate(constant_operation.evaluate()) == -25
def test_constantmultiplication_complex():
test_operation = ConstantMultiplication(3+2j)
constant_operation = Constant((3+4j))
assert test_operation.evaluate(constant_operation.evaluate()) == (1+18j)
# ConstantAddition tests.
def test_constantaddition():
test_operation = ConstantAddition(5)
constant_operation = Constant(20)
assert test_operation.evaluate(constant_operation.evaluate()) == 25
def test_constantaddition_negative():
test_operation = ConstantAddition(4)
constant_operation = Constant(-5)
assert test_operation.evaluate(constant_operation.evaluate()) == -1
def test_constantaddition_complex():
test_operation = ConstantAddition(3+2j)
constant_operation = Constant((3+4j))
assert test_operation.evaluate(constant_operation.evaluate()) == (6+6j)
# ConstantSubtraction tests.
def test_constantsubtraction():
test_operation = ConstantSubtraction(5)
constant_operation = Constant(20)
assert test_operation.evaluate(constant_operation.evaluate()) == 15
def test_constantsubtraction_negative():
test_operation = ConstantSubtraction(4)
constant_operation = Constant(-5)
assert test_operation.evaluate(constant_operation.evaluate()) == -9
def test_constantsubtraction_complex():
test_operation = ConstantSubtraction(4+6j)
constant_operation = Constant((3+4j))
assert test_operation.evaluate(constant_operation.evaluate()) == (-1-2j)
# ConstantDivision tests.
def test_constantdivision():
test_operation = ConstantDivision(5)
constant_operation = Constant(20)
assert test_operation.evaluate(constant_operation.evaluate()) == 4
def test_constantdivision_negative():
test_operation = ConstantDivision(4)
constant_operation = Constant(-20)
assert test_operation.evaluate(constant_operation.evaluate()) == -5
def test_constantdivision_complex():
test_operation = ConstantDivision(2+2j)
constant_operation = Constant((10+10j))
assert test_operation.evaluate(constant_operation.evaluate()) == (5+0j)
from b_asic import \
Constant, Addition, Subtraction, Multiplication, ConstantMultiplication, Division, \
SquareRoot, ComplexConjugate, Max, Min, Absolute, Butterfly
class TestConstant:
def test_constant_positive(self):
test_operation = Constant(3)
assert test_operation.evaluate_output(0, []) == 3
def test_constant_negative(self):
test_operation = Constant(-3)
assert test_operation.evaluate_output(0, []) == -3
def test_constant_complex(self):
test_operation = Constant(3+4j)
assert test_operation.evaluate_output(0, []) == 3+4j
class TestAddition:
def test_addition_positive(self):
test_operation = Addition()
assert test_operation.evaluate_output(0, [3, 5]) == 8
def test_addition_negative(self):
test_operation = Addition()
assert test_operation.evaluate_output(0, [-3, -5]) == -8
def test_addition_complex(self):
test_operation = Addition()
assert test_operation.evaluate_output(0, [3+5j, 4+6j]) == 7+11j
class TestSubtraction:
def test_subtraction_positive(self):
test_operation = Subtraction()
assert test_operation.evaluate_output(0, [5, 3]) == 2
def test_subtraction_negative(self):
test_operation = Subtraction()
assert test_operation.evaluate_output(0, [-5, -3]) == -2
def test_subtraction_complex(self):
test_operation = Subtraction()
assert test_operation.evaluate_output(0, [3+5j, 4+6j]) == -1-1j
class TestMultiplication:
def test_multiplication_positive(self):
test_operation = Multiplication()
assert test_operation.evaluate_output(0, [5, 3]) == 15
def test_multiplication_negative(self):
test_operation = Multiplication()
assert test_operation.evaluate_output(0, [-5, -3]) == 15
def test_multiplication_complex(self):
test_operation = Multiplication()
assert test_operation.evaluate_output(0, [3+5j, 4+6j]) == -18+38j
class TestDivision:
def test_division_positive(self):
test_operation = Division()
assert test_operation.evaluate_output(0, [30, 5]) == 6
def test_division_negative(self):
test_operation = Division()
assert test_operation.evaluate_output(0, [-30, -5]) == 6
def test_division_complex(self):
test_operation = Division()
assert test_operation.evaluate_output(0, [60+40j, 10+20j]) == 2.8-1.6j
class TestSquareRoot:
def test_squareroot_positive(self):
test_operation = SquareRoot()
assert test_operation.evaluate_output(0, [36]) == 6
def test_squareroot_negative(self):
test_operation = SquareRoot()
assert test_operation.evaluate_output(0, [-36]) == 6j
def test_squareroot_complex(self):
test_operation = SquareRoot()
assert test_operation.evaluate_output(0, [48+64j]) == 8+4j
class TestComplexConjugate:
def test_complexconjugate_positive(self):
test_operation = ComplexConjugate()
assert test_operation.evaluate_output(0, [3+4j]) == 3-4j
def test_test_complexconjugate_negative(self):
test_operation = ComplexConjugate()
assert test_operation.evaluate_output(0, [-3-4j]) == -3+4j
class TestMax:
def test_max_positive(self):
test_operation = Max()
assert test_operation.evaluate_output(0, [30, 5]) == 30
def test_max_negative(self):
test_operation = Max()
assert test_operation.evaluate_output(0, [-30, -5]) == -5
class TestMin:
def test_min_positive(self):
test_operation = Min()
assert test_operation.evaluate_output(0, [30, 5]) == 5
def test_min_negative(self):
test_operation = Min()
assert test_operation.evaluate_output(0, [-30, -5]) == -30
class TestAbsolute:
def test_absolute_positive(self):
test_operation = Absolute()
assert test_operation.evaluate_output(0, [30]) == 30
def test_absolute_negative(self):
test_operation = Absolute()
assert test_operation.evaluate_output(0, [-5]) == 5
def test_absolute_complex(self):
test_operation = Absolute()
assert test_operation.evaluate_output(0, [3+4j]) == 5.0
class TestConstantMultiplication:
def test_constantmultiplication_positive(self):
test_operation = ConstantMultiplication(5)
assert test_operation.evaluate_output(0, [20]) == 100
def test_constantmultiplication_negative(self):
test_operation = ConstantMultiplication(5)
assert test_operation.evaluate_output(0, [-5]) == -25
def test_constantmultiplication_complex(self):
test_operation = ConstantMultiplication(3+2j)
assert test_operation.evaluate_output(0, [3+4j]) == 1+18j
class TestButterfly:
def test_butterfly_positive(self):
test_operation = Butterfly()
assert test_operation.evaluate_output(0, [2, 3]) == 5
assert test_operation.evaluate_output(1, [2, 3]) == -1
def test_butterfly_negative(self):
test_operation = Butterfly()
assert test_operation.evaluate_output(0, [-2, -3]) == -5
assert test_operation.evaluate_output(1, [-2, -3]) == 1
def test_buttefly_complex(self):
test_operation = Butterfly()
assert test_operation.evaluate_output(0, [2+1j, 3-2j]) == 5-1j
assert test_operation.evaluate_output(1, [2+1j, 3-2j]) == -1+3j
class TestDepends:
def test_depends_addition(self):
add1 = Addition()
assert set(add1.inputs_required_for_output(0)) == {0, 1}
def test_depends_butterfly(self):
bfly1 = Butterfly()
assert set(bfly1.inputs_required_for_output(0)) == {0, 1}
assert set(bfly1.inputs_required_for_output(1)) == {0, 1}
......@@ -2,9 +2,10 @@
B-ASIC test suite for graph id generator.
"""
from b_asic.graph_id import GraphIDGenerator, GraphID
import pytest
from b_asic import GraphIDGenerator, GraphID
@pytest.fixture
def graph_id_generator():
return GraphIDGenerator()
......@@ -12,17 +13,17 @@ def graph_id_generator():
class TestGetNextId:
def test_empty_string_generator(self, graph_id_generator):
"""Test the graph id generator for an empty string type."""
assert graph_id_generator.get_next_id("") == "1"
assert graph_id_generator.get_next_id("") == "2"
assert graph_id_generator.next_id("") == "1"
assert graph_id_generator.next_id("") == "2"
def test_normal_string_generator(self, graph_id_generator):
""""Test the graph id generator for a normal string type."""
assert graph_id_generator.get_next_id("add") == "add1"
assert graph_id_generator.get_next_id("add") == "add2"
assert graph_id_generator.next_id("add") == "add1"
assert graph_id_generator.next_id("add") == "add2"
def test_different_strings_generator(self, graph_id_generator):
"""Test the graph id generator for different strings."""
assert graph_id_generator.get_next_id("sub") == "sub1"
assert graph_id_generator.get_next_id("mul") == "mul1"
assert graph_id_generator.get_next_id("sub") == "sub2"
assert graph_id_generator.get_next_id("mul") == "mul2"
assert graph_id_generator.next_id("sub") == "sub1"
assert graph_id_generator.next_id("mul") == "mul1"
assert graph_id_generator.next_id("sub") == "sub2"
assert graph_id_generator.next_id("mul") == "mul2"
......@@ -4,92 +4,64 @@ B-ASIC test suite for Inputport
import pytest
from b_asic import InputPort, OutputPort
from b_asic import Signal
from b_asic import InputPort, OutputPort, Signal
@pytest.fixture
def inp_port():
return InputPort(0, None)
@pytest.fixture
def out_port():
return OutputPort(0, None)
@pytest.fixture
def out_port2():
return OutputPort(1, None)
def output_port2():
return OutputPort(None, 1)
@pytest.fixture
def dangling_sig():
return Signal()
@pytest.fixture
def s_w_source():
out_port = OutputPort(0, None)
return Signal(source=out_port)
def s_w_source(output_port):
return Signal(source=output_port)
@pytest.fixture
def sig_with_dest():
inp_port = InputPort(0, None)
return Signal(destination=out_port)
def sig_with_dest(inp_port):
return Signal(destination=inp_port)
@pytest.fixture
def connected_sig():
out_port = OutputPort(0, None)
inp_port = InputPort(0, None)
return Signal(source=out_port, destination=inp_port)
def connected_sig(inp_port, output_port):
return Signal(source=output_port, destination=inp_port)
def test_connect_then_disconnect(inp_port, out_port):
def test_connect_then_disconnect(input_port, output_port):
"""Test connect unused port to port."""
s1 = inp_port.connect(out_port)
assert inp_port.connected_ports == [out_port]
assert out_port.connected_ports == [inp_port]
assert inp_port.signals == [s1]
assert out_port.signals == [s1]
assert s1.source is out_port
assert s1.destination is inp_port
inp_port.remove_signal(s1)
assert inp_port.connected_ports == []
assert out_port.connected_ports == []
assert inp_port.signals == []
assert out_port.signals == [s1]
assert s1.source is out_port
s1 = input_port.connect(output_port)
assert input_port.connected_source == output_port
assert input_port.signals == [s1]
assert output_port.signals == [s1]
assert s1.source is output_port
assert s1.destination is input_port
input_port.remove_signal(s1)
assert input_port.connected_source is None
assert input_port.signals == []
assert output_port.signals == [s1]
assert s1.source is output_port
assert s1.destination is None
def test_connect_used_port_to_new_port(inp_port, out_port, out_port2):
"""Does connecting multiple ports to an inputport throw error?"""
inp_port.connect(out_port)
with pytest.raises(AssertionError):
inp_port.connect(out_port2)
def test_connect_used_port_to_new_port(input_port, output_port, output_port2):
"""Multiple connections to an input port should throw an error."""
input_port.connect(output_port)
with pytest.raises(Exception):
input_port.connect(output_port2)
def test_add_signal_then_disconnect(inp_port, s_w_source):
def test_add_signal_then_disconnect(input_port, s_w_source):
"""Can signal be connected then disconnected properly?"""
inp_port.add_signal(s_w_source)
input_port.add_signal(s_w_source)
assert inp_port.connected_ports == [s_w_source.source]
assert s_w_source.source.connected_ports == [inp_port]
assert inp_port.signals == [s_w_source]
assert input_port.connected_source == s_w_source.source
assert input_port.signals == [s_w_source]
assert s_w_source.source.signals == [s_w_source]
assert s_w_source.destination is inp_port
assert s_w_source.destination is input_port
inp_port.remove_signal(s_w_source)
input_port.remove_signal(s_w_source)
assert inp_port.connected_ports == []
assert s_w_source.source.connected_ports == []
assert inp_port.signals == []
assert input_port.connected_source is None
assert input_port.signals == []
assert s_w_source.source.signals == [s_w_source]
assert s_w_source.destination is None
def test_connect_then_disconnect(inp_port, out_port):
"""Can port be connected and then disconnected properly?"""
inp_port.connect(out_port)
inp_port.disconnect(out_port)
print("outport signals:", out_port.signals, "count:", out_port.signal_count())
assert inp_port.signal_count() == 1
assert len(inp_port.connected_ports) == 0
assert out_port.signal_count() == 0
from b_asic.core_operations import Constant, Addition
from b_asic.signal import Signal
from b_asic.port import InputPort, OutputPort
import pytest
from b_asic import Constant, Addition, MAD, Butterfly, SquareRoot
class TestTraverse:
def test_traverse_single_tree(self, operation):
"""Traverse a tree consisting of one operation."""
......@@ -12,20 +10,44 @@ class TestTraverse:
def test_traverse_tree(self, operation_tree):
"""Traverse a basic addition tree with two constants."""
assert len(list(operation_tree.traverse())) == 3
assert len(list(operation_tree.traverse())) == 5
def test_traverse_large_tree(self, large_operation_tree):
"""Traverse a larger tree."""
assert len(list(large_operation_tree.traverse())) == 7
assert len(list(large_operation_tree.traverse())) == 13
def test_traverse_type(self, large_operation_tree):
traverse = list(large_operation_tree.traverse())
assert len(list(filter(lambda type_: isinstance(type_, Addition), traverse))) == 3
assert len(list(filter(lambda type_: isinstance(type_, Constant), traverse))) == 4
def test_traverse_loop(self, operation_tree):
add_oper_signal = Signal()
operation_tree._output_ports[0].add_signal(add_oper_signal)
operation_tree._input_ports[0].remove_signal(add_oper_signal)
operation_tree._input_ports[0].add_signal(add_oper_signal)
assert len(list(operation_tree.traverse())) == 2
result = list(large_operation_tree.traverse())
assert len(list(filter(lambda type_: isinstance(type_, Addition), result))) == 3
assert len(list(filter(lambda type_: isinstance(type_, Constant), result))) == 4
def test_traverse_loop(self, operation_graph_with_cycle):
assert len(list(operation_graph_with_cycle.traverse())) == 8
class TestToSfg:
def test_convert_mad_to_sfg(self):
mad1 = MAD()
mad1_sfg = mad1.to_sfg()
assert mad1.evaluate(1,1,1) == mad1_sfg.evaluate(1,1,1)
assert len(mad1_sfg.operations) == 6
def test_butterfly_to_sfg(self):
but1 = Butterfly()
but1_sfg = but1.to_sfg()
assert but1.evaluate(1,1)[0] == but1_sfg.evaluate(1,1)[0]
assert but1.evaluate(1,1)[1] == but1_sfg.evaluate(1,1)[1]
assert len(but1_sfg.operations) == 8
def test_add_to_sfg(self):
add1 = Addition()
add1_sfg = add1.to_sfg()
assert len(add1_sfg.operations) == 4
def test_sqrt_to_sfg(self):
sqrt1 = SquareRoot()
sqrt1_sfg = sqrt1.to_sfg()
assert len(sqrt1_sfg.operations) == 3
"""
B-ASIC test suite for OutputPort.
"""
from b_asic import OutputPort, InputPort, Signal
import pytest
@pytest.fixture
def output_port():
return OutputPort(0, None)
@pytest.fixture
def input_port():
return InputPort(0, None)
@pytest.fixture
def list_of_input_ports():
return [InputPort(_, None) for _ in range(0,3)]
from b_asic import OutputPort, InputPort, Signal
class TestConnect:
def test_multiple_ports(self, output_port, list_of_input_ports):
"""Can multiple ports connect to an output port?"""
"""Multiple connections to an output port should be possible."""
for port in list_of_input_ports:
output_port.connect(port)
port.connect(output_port)
assert output_port.signal_count() == len(list_of_input_ports)
assert output_port.signal_count == len(list_of_input_ports)
def test_same_port(self, output_port, list_of_input_ports):
def test_same_port(self, output_port, input_port):
"""Check error handing."""
output_port.connect(list_of_input_ports[0])
with pytest.raises(AssertionError):
output_port.connect(list_of_input_ports[0])
input_port.connect(output_port)
with pytest.raises(Exception):
input_port.connect(output_port)
assert output_port.signal_count() == 2
assert output_port.signal_count == 1
class TestAddSignal:
def test_dangling(self, output_port):
s = Signal()
output_port.add_signal(s)
assert output_port.signal_count() == 1
def test_with_destination(self, output_port, input_port):
s = Signal(destination=input_port)
output_port.add_signal(s)
assert output_port.signal_count == 1
assert output_port.signals == [s]
assert output_port.connected_ports == [s.destination]
class TestClear:
def test_others_clear(self, output_port, list_of_input_ports):
for port in list_of_input_ports:
port.connect(output_port)
class TestDisconnect:
def test_multiple_ports(self, output_port, list_of_input_ports):
"""Can multiple ports disconnect from OutputPort?"""
for port in list_of_input_ports:
output_port.connect(port)
port.clear()
assert output_port.signal_count == 3
assert all(s.dangling() for s in output_port.signals)
def test_self_clear(self, output_port, list_of_input_ports):
for port in list_of_input_ports:
output_port.disconnect(port)
port.connect(output_port)
assert output_port.signal_count() == 3
assert output_port.connected_ports == []
output_port.clear()
assert output_port.signal_count == 0
assert output_port.signals == []
class TestRemoveSignal:
def test_one_signal(self, output_port, input_port):
s = output_port.connect(input_port)
s = input_port.connect(output_port)
output_port.remove_signal(s)
assert output_port.signal_count() == 0
assert output_port.signal_count == 0
assert output_port.signals == []
assert output_port.connected_ports == []
def test_multiple_signals(self, output_port, list_of_input_ports):
"""Can multiple signals disconnect from OutputPort?"""
sigs = []
for port in list_of_input_ports:
sigs.append(output_port.connect(port))
sigs.append(port.connect(output_port))
for sig in sigs:
output_port.remove_signal(sig)
for s in sigs:
output_port.remove_signal(s)
assert output_port.signal_count() == 0
assert output_port.signal_count == 0
assert output_port.signals == []
import pytest
import io
import sys
from b_asic import SFG, Signal, Input, Output, Constant, ConstantMultiplication, Addition, Multiplication, Register, \
Butterfly, Subtraction, SquareRoot
class TestInit:
def test_direct_input_to_output_sfg_construction(self):
in1 = Input("IN1")
out1 = Output(None, "OUT1")
out1.input(0).connect(in1, "S1")
sfg = SFG(inputs=[in1], outputs=[out1]) # in1 ---s1---> out1
assert len(list(sfg.components)) == 3
assert len(list(sfg.operations)) == 2
assert sfg.input_count == 1
assert sfg.output_count == 1
def test_same_signal_input_and_output_sfg_construction(self):
add1 = Addition(None, None, "ADD1")
add2 = Addition(None, None, "ADD2")
s1 = add2.input(0).connect(add1, "S1")
# in1 ---s1---> out1
sfg = SFG(input_signals=[s1], output_signals=[s1])
assert len(list(sfg.components)) == 3
assert len(list(sfg.operations)) == 2
assert sfg.input_count == 1
assert sfg.output_count == 1
def test_outputs_construction(self, operation_tree):
sfg = SFG(outputs=[Output(operation_tree)])
assert len(list(sfg.components)) == 7
assert len(list(sfg.operations)) == 4
assert sfg.input_count == 0
assert sfg.output_count == 1
def test_signals_construction(self, operation_tree):
sfg = SFG(output_signals=[Signal(source=operation_tree.output(0))])
assert len(list(sfg.components)) == 7
assert len(list(sfg.operations)) == 4
assert sfg.input_count == 0
assert sfg.output_count == 1
class TestPrintSfg:
def test_one_addition(self):
inp1 = Input("INP1")
inp2 = Input("INP2")
add1 = Addition(inp1, inp2, "ADD1")
out1 = Output(add1, "OUT1")
sfg = SFG(inputs=[inp1, inp2], outputs=[out1], name="SFG1")
assert sfg.__str__() == \
"id: no_id, \tname: SFG1, \tinputs: {0: '-'}, \toutputs: {0: '-'}\n" + \
"Internal Operations:\n" + \
"----------------------------------------------------------------------------------------------------\n" + \
str(sfg.find_by_name("INP1")[0]) + "\n" + \
str(sfg.find_by_name("INP2")[0]) + "\n" + \
str(sfg.find_by_name("ADD1")[0]) + "\n" + \
str(sfg.find_by_name("OUT1")[0]) + "\n" + \
"----------------------------------------------------------------------------------------------------\n"
def test_add_mul(self):
inp1 = Input("INP1")
inp2 = Input("INP2")
inp3 = Input("INP3")
add1 = Addition(inp1, inp2, "ADD1")
mul1 = Multiplication(add1, inp3, "MUL1")
out1 = Output(mul1, "OUT1")
sfg = SFG(inputs=[inp1, inp2, inp3], outputs=[out1], name="mac_sfg")
assert sfg.__str__() == \
"id: no_id, \tname: mac_sfg, \tinputs: {0: '-'}, \toutputs: {0: '-'}\n" + \
"Internal Operations:\n" + \
"----------------------------------------------------------------------------------------------------\n" + \
str(sfg.find_by_name("INP1")[0]) + "\n" + \
str(sfg.find_by_name("INP2")[0]) + "\n" + \
str(sfg.find_by_name("ADD1")[0]) + "\n" + \
str(sfg.find_by_name("INP3")[0]) + "\n" + \
str(sfg.find_by_name("MUL1")[0]) + "\n" + \
str(sfg.find_by_name("OUT1")[0]) + "\n" + \
"----------------------------------------------------------------------------------------------------\n"
def test_constant(self):
inp1 = Input("INP1")
const1 = Constant(3, "CONST")
add1 = Addition(const1, inp1, "ADD1")
out1 = Output(add1, "OUT1")
sfg = SFG(inputs=[inp1], outputs=[out1], name="sfg")
assert sfg.__str__() == \
"id: no_id, \tname: sfg, \tinputs: {0: '-'}, \toutputs: {0: '-'}\n" + \
"Internal Operations:\n" + \
"----------------------------------------------------------------------------------------------------\n" + \
str(sfg.find_by_name("CONST")[0]) + "\n" + \
str(sfg.find_by_name("INP1")[0]) + "\n" + \
str(sfg.find_by_name("ADD1")[0]) + "\n" + \
str(sfg.find_by_name("OUT1")[0]) + "\n" + \
"----------------------------------------------------------------------------------------------------\n"
def test_simple_filter(self, simple_filter):
assert simple_filter.__str__() == \
"id: no_id, \tname: simple_filter, \tinputs: {0: '-'}, \toutputs: {0: '-'}\n" + \
"Internal Operations:\n" + \
"----------------------------------------------------------------------------------------------------\n" + \
str(simple_filter.find_by_name("IN1")[0]) + "\n" + \
str(simple_filter.find_by_name("ADD1")[0]) + "\n" + \
str(simple_filter.find_by_name("REG1")[0]) + "\n" + \
str(simple_filter.find_by_name("CMUL1")[0]) + "\n" + \
str(simple_filter.find_by_name("OUT1")[0]) + "\n" + \
"----------------------------------------------------------------------------------------------------\n"
class TestDeepCopy:
def test_deep_copy_no_duplicates(self):
inp1 = Input("INP1")
inp2 = Input("INP2")
inp3 = Input("INP3")
add1 = Addition(inp1, inp2, "ADD1")
mul1 = Multiplication(add1, inp3, "MUL1")
out1 = Output(mul1, "OUT1")
mac_sfg = SFG(inputs=[inp1, inp2], outputs=[out1], name="mac_sfg")
mac_sfg_new = mac_sfg()
assert mac_sfg.name == "mac_sfg"
assert mac_sfg_new.name == ""
for g_id, component in mac_sfg._components_by_id.items():
component_copy = mac_sfg_new.find_by_id(g_id)
assert component.name == component_copy.name
def test_deep_copy(self):
inp1 = Input("INP1")
inp2 = Input("INP2")
inp3 = Input("INP3")
add1 = Addition(None, None, "ADD1")
add2 = Addition(None, None, "ADD2")
mul1 = Multiplication(None, None, "MUL1")
out1 = Output(None, "OUT1")
add1.input(0).connect(inp1, "S1")
add1.input(1).connect(inp2, "S2")
add2.input(0).connect(add1, "S4")
add2.input(1).connect(inp3, "S3")
mul1.input(0).connect(add1, "S5")
mul1.input(1).connect(add2, "S6")
out1.input(0).connect(mul1, "S7")
mac_sfg = SFG(inputs=[inp1, inp2], outputs=[out1],
id_number_offset=100, name="mac_sfg")
mac_sfg_new = mac_sfg(name="mac_sfg2")
assert mac_sfg.name == "mac_sfg"
assert mac_sfg_new.name == "mac_sfg2"
assert mac_sfg.id_number_offset == 100
assert mac_sfg_new.id_number_offset == 100
for g_id, component in mac_sfg._components_by_id.items():
component_copy = mac_sfg_new.find_by_id(g_id)
assert component.name == component_copy.name
def test_deep_copy_with_new_sources(self):
inp1 = Input("INP1")
inp2 = Input("INP2")
inp3 = Input("INP3")
add1 = Addition(inp1, inp2, "ADD1")
mul1 = Multiplication(add1, inp3, "MUL1")
out1 = Output(mul1, "OUT1")
mac_sfg = SFG(inputs=[inp1, inp2], outputs=[out1], name="mac_sfg")
a = Addition(Constant(3), Constant(5))
b = Constant(2)
mac_sfg_new = mac_sfg(a, b)
assert mac_sfg_new.input(0).signals[0].source.operation is a
assert mac_sfg_new.input(1).signals[0].source.operation is b
class TestEvaluateOutput:
def test_evaluate_output(self, operation_tree):
sfg = SFG(outputs=[Output(operation_tree)])
assert sfg.evaluate_output(0, []) == 5
def test_evaluate_output_large(self, large_operation_tree):
sfg = SFG(outputs=[Output(large_operation_tree)])
assert sfg.evaluate_output(0, []) == 14
def test_evaluate_output_cycle(self, operation_graph_with_cycle):
sfg = SFG(outputs=[Output(operation_graph_with_cycle)])
with pytest.raises(Exception):
sfg.evaluate_output(0, [])
class TestComponents:
def test_advanced_components(self):
inp1 = Input("INP1")
inp2 = Input("INP2")
inp3 = Input("INP3")
add1 = Addition(None, None, "ADD1")
add2 = Addition(None, None, "ADD2")
mul1 = Multiplication(None, None, "MUL1")
out1 = Output(None, "OUT1")
add1.input(0).connect(inp1, "S1")
add1.input(1).connect(inp2, "S2")
add2.input(0).connect(add1, "S4")
add2.input(1).connect(inp3, "S3")
mul1.input(0).connect(add1, "S5")
mul1.input(1).connect(add2, "S6")
out1.input(0).connect(mul1, "S7")
mac_sfg = SFG(inputs=[inp1, inp2], outputs=[out1], name="mac_sfg")
assert set([comp.name for comp in mac_sfg.components]) == {
"INP1", "INP2", "INP3", "ADD1", "ADD2", "MUL1", "OUT1", "S1", "S2", "S3", "S4", "S5", "S6", "S7"}
class TestReplaceComponents:
def test_replace_addition_by_id(self, operation_tree):
sfg = SFG(outputs=[Output(operation_tree)])
component_id = "add1"
sfg = sfg.replace_component(
Multiplication(name="Multi"), _id=component_id)
assert component_id not in sfg._components_by_id.keys()
assert "Multi" in sfg._components_by_name.keys()
def test_replace_addition_large_tree(self, large_operation_tree):
sfg = SFG(outputs=[Output(large_operation_tree)])
component_id = "add3"
sfg = sfg.replace_component(
Multiplication(name="Multi"), _id=component_id)
assert "Multi" in sfg._components_by_name.keys()
assert component_id not in sfg._components_by_id.keys()
def test_replace_no_input_component(self, operation_tree):
sfg = SFG(outputs=[Output(operation_tree)])
component_id = "c1"
_const = sfg.find_by_id(component_id)
sfg = sfg.replace_component(Constant(1), _id=component_id)
assert _const is not sfg.find_by_id(component_id)
def test_no_match_on_replace(self, large_operation_tree):
sfg = SFG(outputs=[Output(large_operation_tree)])
component_id = "addd1"
try:
sfg = sfg.replace_component(
Multiplication(name="Multi"), _id=component_id)
except AssertionError:
assert True
else:
assert False
def test_not_equal_input(self, large_operation_tree):
sfg = SFG(outputs=[Output(large_operation_tree)])
component_id = "c1"
try:
sfg = sfg.replace_component(
Multiplication(name="Multi"), _id=component_id)
except AssertionError:
assert True
else:
assert False
class TestInsertComponent:
def test_insert_component_in_sfg(self, large_operation_tree_names):
sfg = SFG(outputs=[Output(large_operation_tree_names)])
sqrt = SquareRoot()
_sfg = sfg.insert_operation(sqrt, sfg.find_by_name("constant4")[0].graph_id)
assert _sfg.evaluate() != sfg.evaluate()
assert any([isinstance(comp, SquareRoot) for comp in _sfg.operations])
assert not any([isinstance(comp, SquareRoot) for comp in sfg.operations])
assert not isinstance(sfg.find_by_name("constant4")[0].output(0).signals[0].destination.operation, SquareRoot)
assert isinstance(_sfg.find_by_name("constant4")[0].output(0).signals[0].destination.operation, SquareRoot)
assert sfg.find_by_name("constant4")[0].output(0).signals[0].destination.operation is sfg.find_by_id("add3")
assert _sfg.find_by_name("constant4")[0].output(
0).signals[0].destination.operation is not _sfg.find_by_id("add3")
assert _sfg.find_by_id("sqrt1").output(0).signals[0].destination.operation is _sfg.find_by_id("add3")
def test_insert_invalid_component_in_sfg(self, large_operation_tree):
sfg = SFG(outputs=[Output(large_operation_tree)])
# Should raise an exception for not matching input count to output count.
add4 = Addition()
with pytest.raises(Exception):
sfg.insert_operation(add4, "c1")
def test_insert_at_output(self, large_operation_tree):
sfg = SFG(outputs=[Output(large_operation_tree)])
# Should raise an exception for trying to insert an operation after an output.
sqrt = SquareRoot()
with pytest.raises(Exception):
_sfg = sfg.insert_operation(sqrt, "out1")
def test_insert_multiple_output_ports(self, butterfly_operation_tree):
sfg = SFG(outputs=list(map(Output, butterfly_operation_tree.outputs)))
_sfg = sfg.insert_operation(Butterfly(name="n_bfly"), "bfly3")
assert sfg.evaluate() != _sfg.evaluate()
assert len(sfg.find_by_name("n_bfly")) == 0
assert len(_sfg.find_by_name("n_bfly")) == 1
# Correctly connected old output -> new input
assert _sfg.find_by_name("bfly3")[0].output(
0).signals[0].destination.operation is _sfg.find_by_name("n_bfly")[0]
assert _sfg.find_by_name("bfly3")[0].output(
1).signals[0].destination.operation is _sfg.find_by_name("n_bfly")[0]
# Correctly connected new input -> old output
assert _sfg.find_by_name("n_bfly")[0].input(0).signals[0].source.operation is _sfg.find_by_name("bfly3")[0]
assert _sfg.find_by_name("n_bfly")[0].input(1).signals[0].source.operation is _sfg.find_by_name("bfly3")[0]
# Correctly connected new output -> next input
assert _sfg.find_by_name("n_bfly")[0].output(
0).signals[0].destination.operation is _sfg.find_by_name("bfly2")[0]
assert _sfg.find_by_name("n_bfly")[0].output(
1).signals[0].destination.operation is _sfg.find_by_name("bfly2")[0]
# Correctly connected next input -> new output
assert _sfg.find_by_name("bfly2")[0].input(0).signals[0].source.operation is _sfg.find_by_name("n_bfly")[0]
assert _sfg.find_by_name("bfly2")[0].input(1).signals[0].source.operation is _sfg.find_by_name("n_bfly")[0]
class TestFindComponentsWithTypeName:
def test_mac_components(self):
inp1 = Input("INP1")
inp2 = Input("INP2")
inp3 = Input("INP3")
add1 = Addition(None, None, "ADD1")
add2 = Addition(None, None, "ADD2")
mul1 = Multiplication(None, None, "MUL1")
out1 = Output(None, "OUT1")
add1.input(0).connect(inp1, "S1")
add1.input(1).connect(inp2, "S2")
add2.input(0).connect(add1, "S4")
add2.input(1).connect(inp3, "S3")
mul1.input(0).connect(add1, "S5")
mul1.input(1).connect(add2, "S6")
out1.input(0).connect(mul1, "S7")
mac_sfg = SFG(inputs=[inp1, inp2], outputs=[out1], name="mac_sfg")
assert {comp.name for comp in mac_sfg.get_components_with_type_name(
inp1.type_name())} == {"INP1", "INP2", "INP3"}
assert {comp.name for comp in mac_sfg.get_components_with_type_name(
add1.type_name())} == {"ADD1", "ADD2"}
assert {comp.name for comp in mac_sfg.get_components_with_type_name(
mul1.type_name())} == {"MUL1"}
assert {comp.name for comp in mac_sfg.get_components_with_type_name(
out1.type_name())} == {"OUT1"}
assert {comp.name for comp in mac_sfg.get_components_with_type_name(
Signal.type_name())} == {"S1", "S2", "S3", "S4", "S5", "S6", "S7"}
class TestGetPrecedenceList:
def test_inputs_registers(self, precedence_sfg_registers):
precedence_list = precedence_sfg_registers.get_precedence_list()
assert len(precedence_list) == 7
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[0]]) == {"IN1", "T1", "T2"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[1]]) == {"C0", "B1", "B2", "A1", "A2"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[2]]) == {"ADD2", "ADD3"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[3]]) == {"ADD1"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[4]]) == {"Q1"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[5]]) == {"A0"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[6]]) == {"ADD4"}
def test_inputs_constants_registers_multiple_outputs(self, precedence_sfg_registers_and_constants):
precedence_list = precedence_sfg_registers_and_constants.get_precedence_list()
assert len(precedence_list) == 7
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[0]]) == {"IN1", "T1", "CONST1"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[1]]) == {"C0", "B1", "B2", "A1", "A2"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[2]]) == {"ADD2", "ADD3"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[3]]) == {"ADD1"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[4]]) == {"Q1"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[5]]) == {"A0"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[6]]) == {"BFLY1.0", "BFLY1.1"}
def test_precedence_multiple_outputs_same_precedence(self, sfg_two_inputs_two_outputs):
sfg_two_inputs_two_outputs.name = "NESTED_SFG"
in1 = Input("IN1")
sfg_two_inputs_two_outputs.input(0).connect(in1, "S1")
in2 = Input("IN2")
cmul1 = ConstantMultiplication(10, None, "CMUL1")
cmul1.input(0).connect(in2, "S2")
sfg_two_inputs_two_outputs.input(1).connect(cmul1, "S3")
out1 = Output(sfg_two_inputs_two_outputs.output(0), "OUT1")
out2 = Output(sfg_two_inputs_two_outputs.output(1), "OUT2")
sfg = SFG(inputs=[in1, in2], outputs=[out1, out2])
precedence_list = sfg.get_precedence_list()
assert len(precedence_list) == 3
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[0]]) == {"IN1", "IN2"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[1]]) == {"CMUL1"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[2]]) == {"NESTED_SFG.0", "NESTED_SFG.1"}
def test_precedence_sfg_multiple_outputs_different_precedences(self, sfg_two_inputs_two_outputs_independent):
sfg_two_inputs_two_outputs_independent.name = "NESTED_SFG"
in1 = Input("IN1")
in2 = Input("IN2")
sfg_two_inputs_two_outputs_independent.input(0).connect(in1, "S1")
cmul1 = ConstantMultiplication(10, None, "CMUL1")
cmul1.input(0).connect(in2, "S2")
sfg_two_inputs_two_outputs_independent.input(1).connect(cmul1, "S3")
out1 = Output(sfg_two_inputs_two_outputs_independent.output(0), "OUT1")
out2 = Output(sfg_two_inputs_two_outputs_independent.output(1), "OUT2")
sfg = SFG(inputs=[in1, in2], outputs=[out1, out2])
precedence_list = sfg.get_precedence_list()
assert len(precedence_list) == 3
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[0]]) == {"IN1", "IN2"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[1]]) == {"CMUL1"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[2]]) == {"NESTED_SFG.0", "NESTED_SFG.1"}
class TestPrintPrecedence:
def test_registers(self, precedence_sfg_registers):
sfg = precedence_sfg_registers
captured_output = io.StringIO()
sys.stdout = captured_output
sfg.print_precedence_graph()
sys.stdout = sys.__stdout__
captured_output = captured_output.getvalue()
assert captured_output == \
"-" * 120 + "\n" + \
"1.1 \t" + str(sfg.find_by_name("IN1")[0]) + "\n" + \
"1.2 \t" + str(sfg.find_by_name("T1")[0]) + "\n" + \
"1.3 \t" + str(sfg.find_by_name("T2")[0]) + "\n" + \
"-" * 120 + "\n" + \
"2.1 \t" + str(sfg.find_by_name("C0")[0]) + "\n" + \
"2.2 \t" + str(sfg.find_by_name("A1")[0]) + "\n" + \
"2.3 \t" + str(sfg.find_by_name("B1")[0]) + "\n" + \
"2.4 \t" + str(sfg.find_by_name("A2")[0]) + "\n" + \
"2.5 \t" + str(sfg.find_by_name("B2")[0]) + "\n" + \
"-" * 120 + "\n" + \
"3.1 \t" + str(sfg.find_by_name("ADD3")[0]) + "\n" + \
"3.2 \t" + str(sfg.find_by_name("ADD2")[0]) + "\n" + \
"-" * 120 + "\n" + \
"4.1 \t" + str(sfg.find_by_name("ADD1")[0]) + "\n" + \
"-" * 120 + "\n" + \
"5.1 \t" + str(sfg.find_by_name("Q1")[0]) + "\n" + \
"-" * 120 + "\n" + \
"6.1 \t" + str(sfg.find_by_name("A0")[0]) + "\n" + \
"-" * 120 + "\n" + \
"7.1 \t" + str(sfg.find_by_name("ADD4")[0]) + "\n" + \
"-" * 120 + "\n"
class TestDepends:
def test_depends_sfg(self, sfg_two_inputs_two_outputs):
assert set(sfg_two_inputs_two_outputs.inputs_required_for_output(0)) == {
0, 1}
assert set(sfg_two_inputs_two_outputs.inputs_required_for_output(1)) == {
0, 1}
def test_depends_sfg_independent(self, sfg_two_inputs_two_outputs_independent):
assert set(
sfg_two_inputs_two_outputs_independent.inputs_required_for_output(0)) == {0}
assert set(
sfg_two_inputs_two_outputs_independent.inputs_required_for_output(1)) == {1}
class TestConnectExternalSignalsToComponentsSoloComp:
def test_connect_external_signals_to_components_mac(self):
""" Replace a MAC with inner components in an SFG """
inp1 = Input("INP1")
inp2 = Input("INP2")
inp3 = Input("INP3")
add1 = Addition(None, None, "ADD1")
add2 = Addition(None, None, "ADD2")
mul1 = Multiplication(None, None, "MUL1")
out1 = Output(None, "OUT1")
add1.input(0).connect(inp1, "S1")
add1.input(1).connect(inp2, "S2")
add2.input(0).connect(add1, "S3")
add2.input(1).connect(inp3, "S4")
mul1.input(0).connect(add1, "S5")
mul1.input(1).connect(add2, "S6")
out1.input(0).connect(mul1, "S7")
mac_sfg = SFG(inputs=[inp1, inp2], outputs=[out1])
inp4 = Input("INP4")
inp5 = Input("INP5")
out2 = Output(None, "OUT2")
mac_sfg.input(0).connect(inp4, "S8")
mac_sfg.input(1).connect(inp5, "S9")
out2.input(0).connect(mac_sfg.outputs[0], "S10")
test_sfg = SFG(inputs=[inp4, inp5], outputs=[out2])
assert test_sfg.evaluate(1, 2) == 9
mac_sfg.connect_external_signals_to_components()
assert test_sfg.evaluate(1, 2) == 9
assert not test_sfg.connect_external_signals_to_components()
def test_connect_external_signals_to_components_operation_tree(self, operation_tree):
""" Replaces an SFG with only a operation_tree component with its inner components """
sfg1 = SFG(outputs=[Output(operation_tree)])
out1 = Output(None, "OUT1")
out1.input(0).connect(sfg1.outputs[0], "S1")
test_sfg = SFG(outputs=[out1])
assert test_sfg.evaluate_output(0, []) == 5
sfg1.connect_external_signals_to_components()
assert test_sfg.evaluate_output(0, []) == 5
assert not test_sfg.connect_external_signals_to_components()
def test_connect_external_signals_to_components_large_operation_tree(self, large_operation_tree):
""" Replaces an SFG with only a large_operation_tree component with its inner components """
sfg1 = SFG(outputs=[Output(large_operation_tree)])
out1 = Output(None, "OUT1")
out1.input(0).connect(sfg1.outputs[0], "S1")
test_sfg = SFG(outputs=[out1])
assert test_sfg.evaluate_output(0, []) == 14
sfg1.connect_external_signals_to_components()
assert test_sfg.evaluate_output(0, []) == 14
assert not test_sfg.connect_external_signals_to_components()
class TestConnectExternalSignalsToComponentsMultipleComp:
def test_connect_external_signals_to_components_operation_tree(self, operation_tree):
""" Replaces a operation_tree in an SFG with other components """
sfg1 = SFG(outputs=[Output(operation_tree)])
inp1 = Input("INP1")
inp2 = Input("INP2")
out1 = Output(None, "OUT1")
add1 = Addition(None, None, "ADD1")
add2 = Addition(None, None, "ADD2")
add1.input(0).connect(inp1, "S1")
add1.input(1).connect(inp2, "S2")
add2.input(0).connect(add1, "S3")
add2.input(1).connect(sfg1.outputs[0], "S4")
out1.input(0).connect(add2, "S5")
test_sfg = SFG(inputs=[inp1, inp2], outputs=[out1])
assert test_sfg.evaluate(1, 2) == 8
sfg1.connect_external_signals_to_components()
assert test_sfg.evaluate(1, 2) == 8
assert not test_sfg.connect_external_signals_to_components()
def test_connect_external_signals_to_components_large_operation_tree(self, large_operation_tree):
""" Replaces a large_operation_tree in an SFG with other components """
sfg1 = SFG(outputs=[Output(large_operation_tree)])
inp1 = Input("INP1")
inp2 = Input("INP2")
out1 = Output(None, "OUT1")
add1 = Addition(None, None, "ADD1")
add2 = Addition(None, None, "ADD2")
add1.input(0).connect(inp1, "S1")
add1.input(1).connect(inp2, "S2")
add2.input(0).connect(add1, "S3")
add2.input(1).connect(sfg1.outputs[0], "S4")
out1.input(0).connect(add2, "S5")
test_sfg = SFG(inputs=[inp1, inp2], outputs=[out1])
assert test_sfg.evaluate(1, 2) == 17
sfg1.connect_external_signals_to_components()
assert test_sfg.evaluate(1, 2) == 17
assert not test_sfg.connect_external_signals_to_components()
def create_sfg(self, op_tree):
""" Create a simple SFG with either operation_tree or large_operation_tree """
sfg1 = SFG(outputs=[Output(op_tree)])
inp1 = Input("INP1")
inp2 = Input("INP2")
out1 = Output(None, "OUT1")
add1 = Addition(None, None, "ADD1")
add2 = Addition(None, None, "ADD2")
add1.input(0).connect(inp1, "S1")
add1.input(1).connect(inp2, "S2")
add2.input(0).connect(add1, "S3")
add2.input(1).connect(sfg1.outputs[0], "S4")
out1.input(0).connect(add2, "S5")
return SFG(inputs=[inp1, inp2], outputs=[out1])
def test_connect_external_signals_to_components_many_op(self, large_operation_tree):
""" Replaces an sfg component in a larger SFG with several component operations """
inp1 = Input("INP1")
inp2 = Input("INP2")
inp3 = Input("INP3")
inp4 = Input("INP4")
out1 = Output(None, "OUT1")
add1 = Addition(None, None, "ADD1")
sub1 = Subtraction(None, None, "SUB1")
add1.input(0).connect(inp1, "S1")
add1.input(1).connect(inp2, "S2")
sfg1 = self.create_sfg(large_operation_tree)
sfg1.input(0).connect(add1, "S3")
sfg1.input(1).connect(inp3, "S4")
sub1.input(0).connect(sfg1.outputs[0], "S5")
sub1.input(1).connect(inp4, "S6")
out1.input(0).connect(sub1, "S7")
test_sfg = SFG(inputs=[inp1, inp2, inp3, inp4], outputs=[out1])
assert test_sfg.evaluate(1, 2, 3, 4) == 16
sfg1.connect_external_signals_to_components()
assert test_sfg.evaluate(1, 2, 3, 4) == 16
assert not test_sfg.connect_external_signals_to_components()
class TestTopologicalOrderOperations:
def test_feedback_sfg(self, simple_filter):
topological_order = simple_filter.get_operations_topological_order()
assert [comp.name for comp in topological_order] == ["IN1", "ADD1", "REG1", "CMUL1", "OUT1"]
def test_multiple_independent_inputs(self, sfg_two_inputs_two_outputs_independent):
topological_order = sfg_two_inputs_two_outputs_independent.get_operations_topological_order()
assert [comp.name for comp in topological_order] == ["IN1", "OUT1", "IN2", "C1", "ADD1", "OUT2"]
class TestRemove:
def test_remove_single_input_outputs(self, simple_filter):
new_sfg = simple_filter.remove_operation("cmul1")
assert set(op.name for op in simple_filter.find_by_name("REG1")[0].subsequent_operations) == {"CMUL1", "OUT1"}
assert set(op.name for op in new_sfg.find_by_name("REG1")[0].subsequent_operations) == {"ADD1", "OUT1"}
assert set(op.name for op in simple_filter.find_by_name("ADD1")[0].preceding_operations) == {"CMUL1", "IN1"}
assert set(op.name for op in new_sfg.find_by_name("ADD1")[0].preceding_operations) == {"REG1", "IN1"}
assert "S1" in set([sig.name for sig in simple_filter.find_by_name("REG1")[0].output(0).signals])
assert "S2" in set([sig.name for sig in new_sfg.find_by_name("REG1")[0].output(0).signals])
def test_remove_multiple_inputs_outputs(self, butterfly_operation_tree):
out1 = Output(butterfly_operation_tree.output(0), "OUT1")
out2 = Output(butterfly_operation_tree.output(1), "OUT2")
sfg = SFG(outputs=[out1, out2])
new_sfg = sfg.remove_operation(sfg.find_by_name("bfly2")[0].graph_id)
assert sfg.find_by_name("bfly3")[0].output(0).signal_count == 1
assert new_sfg.find_by_name("bfly3")[0].output(0).signal_count == 1
sfg_dest_0 = sfg.find_by_name("bfly3")[0].output(0).signals[0].destination
new_sfg_dest_0 = new_sfg.find_by_name("bfly3")[0].output(0).signals[0].destination
assert sfg_dest_0.index == 0
assert new_sfg_dest_0.index == 0
assert sfg_dest_0.operation.name == "bfly2"
assert new_sfg_dest_0.operation.name == "bfly1"
assert sfg.find_by_name("bfly3")[0].output(1).signal_count == 1
assert new_sfg.find_by_name("bfly3")[0].output(1).signal_count == 1
sfg_dest_1 = sfg.find_by_name("bfly3")[0].output(1).signals[0].destination
new_sfg_dest_1 = new_sfg.find_by_name("bfly3")[0].output(1).signals[0].destination
assert sfg_dest_1.index == 1
assert new_sfg_dest_1.index == 1
assert sfg_dest_1.operation.name == "bfly2"
assert new_sfg_dest_1.operation.name == "bfly1"
assert sfg.find_by_name("bfly1")[0].input(0).signal_count == 1
assert new_sfg.find_by_name("bfly1")[0].input(0).signal_count == 1
sfg_source_0 = sfg.find_by_name("bfly1")[0].input(0).signals[0].source
new_sfg_source_0 = new_sfg.find_by_name("bfly1")[0].input(0).signals[0].source
assert sfg_source_0.index == 0
assert new_sfg_source_0.index == 0
assert sfg_source_0.operation.name == "bfly2"
assert new_sfg_source_0.operation.name == "bfly3"
sfg_source_1 = sfg.find_by_name("bfly1")[0].input(1).signals[0].source
new_sfg_source_1 = new_sfg.find_by_name("bfly1")[0].input(1).signals[0].source
assert sfg_source_1.index == 1
assert new_sfg_source_1.index == 1
assert sfg_source_1.operation.name == "bfly2"
assert new_sfg_source_1.operation.name == "bfly3"
assert "bfly2" not in set(op.name for op in new_sfg.operations)
def remove_different_number_inputs_outputs(self, simple_filter):
with pytest.raises(ValueError):
simple_filter.remove_operation("add1")
......@@ -2,14 +2,14 @@
B-ASIC test suit for the signal module which consists of the Signal class.
"""
from b_asic.port import InputPort, OutputPort
from b_asic.signal import Signal
import pytest
from b_asic import InputPort, OutputPort, Signal
def test_signal_creation_and_disconnction_and_connection_changing():
in_port = InputPort(0, None)
out_port = OutputPort(1, None)
in_port = InputPort(None, 0)
out_port = OutputPort(None, 1)
s = Signal(out_port, in_port)
assert in_port.signals == [s]
......@@ -17,7 +17,7 @@ def test_signal_creation_and_disconnction_and_connection_changing():
assert s.source is out_port
assert s.destination is in_port
in_port1 = InputPort(0, None)
in_port1 = InputPort(None, 0)
s.set_destination(in_port1)
assert in_port.signals == []
......@@ -40,7 +40,7 @@ def test_signal_creation_and_disconnction_and_connection_changing():
assert s.source is None
assert s.destination is None
out_port1 = OutputPort(0, None)
out_port1 = OutputPort(None, 0)
s.set_source(out_port1)
assert out_port1.signals == [s]
......@@ -60,3 +60,29 @@ def test_signal_creation_and_disconnction_and_connection_changing():
assert in_port.signals == [s]
assert s.source is out_port
assert s.destination is in_port
class Bits:
def test_pos_int(self, signal):
signal.bits = 10
assert signal.bits == 10
def test_bits_zero(self, signal):
signal.bits = 0
assert signal.bits == 0
def test_bits_neg_int(self, signal):
with pytest.raises(Exception):
signal.bits = -10
def test_bits_complex(self, signal):
with pytest.raises(Exception):
signal.bits = (2+4j)
def test_bits_float(self, signal):
with pytest.raises(Exception):
signal.bits = 3.2
def test_bits_pos_then_none(self, signal):
signal.bits = 10
signal.bits = None
assert signal.bits is None
\ No newline at end of file
import pytest
import numpy as np
from b_asic import SFG, Output, Simulation
class TestRunFor:
def test_with_lambdas_as_input(self, sfg_two_inputs_two_outputs):
simulation = Simulation(sfg_two_inputs_two_outputs, [lambda n: n + 3, lambda n: 1 + n * 2], save_results = True)
output = simulation.run_for(101)
assert output[0] == 304
assert output[1] == 505
assert simulation.results[100]["0"] == 304
assert simulation.results[100]["1"] == 505
assert simulation.results[0]["in1"] == 3
assert simulation.results[0]["in2"] == 1
assert simulation.results[0]["add1"] == 4
assert simulation.results[0]["add2"] == 5
assert simulation.results[0]["0"] == 4
assert simulation.results[0]["1"] == 5
assert simulation.results[1]["in1"] == 4
assert simulation.results[1]["in2"] == 3
assert simulation.results[1]["add1"] == 7
assert simulation.results[1]["add2"] == 10
assert simulation.results[1]["0"] == 7
assert simulation.results[1]["1"] == 10
assert simulation.results[2]["in1"] == 5
assert simulation.results[2]["in2"] == 5
assert simulation.results[2]["add1"] == 10
assert simulation.results[2]["add2"] == 15
assert simulation.results[2]["0"] == 10
assert simulation.results[2]["1"] == 15
assert simulation.results[3]["in1"] == 6
assert simulation.results[3]["in2"] == 7
assert simulation.results[3]["add1"] == 13
assert simulation.results[3]["add2"] == 20
assert simulation.results[3]["0"] == 13
assert simulation.results[3]["1"] == 20
def test_with_numpy_arrays_as_input(self, sfg_two_inputs_two_outputs):
input0 = np.array([5, 9, 25, -5, 7])
input1 = np.array([7, 3, 3, 54, 2])
simulation = Simulation(sfg_two_inputs_two_outputs, [input0, input1])
simulation.save_results = True
output = simulation.run_for(5)
assert output[0] == 9
assert output[1] == 11
assert simulation.results[0]["in1"] == 5
assert simulation.results[0]["in2"] == 7
assert simulation.results[0]["add1"] == 12
assert simulation.results[0]["add2"] == 19
assert simulation.results[0]["0"] == 12
assert simulation.results[0]["1"] == 19
assert simulation.results[1]["in1"] == 9
assert simulation.results[1]["in2"] == 3
assert simulation.results[1]["add1"] == 12
assert simulation.results[1]["add2"] == 15
assert simulation.results[1]["0"] == 12
assert simulation.results[1]["1"] == 15
assert simulation.results[2]["in1"] == 25
assert simulation.results[2]["in2"] == 3
assert simulation.results[2]["add1"] == 28
assert simulation.results[2]["add2"] == 31
assert simulation.results[2]["0"] == 28
assert simulation.results[2]["1"] == 31
assert simulation.results[3]["in1"] == -5
assert simulation.results[3]["in2"] == 54
assert simulation.results[3]["add1"] == 49
assert simulation.results[3]["add2"] == 103
assert simulation.results[3]["0"] == 49
assert simulation.results[3]["1"] == 103
assert simulation.results[4]["0"] == 9
assert simulation.results[4]["1"] == 11
def test_with_numpy_array_overflow(self, sfg_two_inputs_two_outputs):
input0 = np.array([5, 9, 25, -5, 7])
input1 = np.array([7, 3, 3, 54, 2])
simulation = Simulation(sfg_two_inputs_two_outputs, [input0, input1])
simulation.run_for(5)
with pytest.raises(IndexError):
simulation.run_for(1)
def test_delay(self, sfg_delay):
simulation = Simulation(sfg_delay, save_results = True)
simulation.set_input(0, [5, -2, 25, -6, 7, 0])
simulation.run_for(6)
assert simulation.results[0]["0"] == 0
assert simulation.results[1]["0"] == 5
assert simulation.results[2]["0"] == -2
assert simulation.results[3]["0"] == 25
assert simulation.results[4]["0"] == -6
assert simulation.results[5]["0"] == 7
class TestRun:
def test_nested(self, sfg_nested):
input0 = np.array([5, 9])
input1 = np.array([7, 3])
simulation = Simulation(sfg_nested, [input0, input1])
output0 = simulation.run()
output1 = simulation.run()
assert output0[0] == 11405
assert output1[0] == 4221
def test_accumulator(self, sfg_accumulator):
data_in = np.array([5, -2, 25, -6, 7, 0])
reset = np.array([0, 0, 0, 1, 0, 0])
simulation = Simulation(sfg_accumulator, [data_in, reset])
output0 = simulation.run()
output1 = simulation.run()
output2 = simulation.run()
output3 = simulation.run()
output4 = simulation.run()
output5 = simulation.run()
assert output0[0] == 0
assert output1[0] == 5
assert output2[0] == 3
assert output3[0] == 28
assert output4[0] == 0
assert output5[0] == 7
def test_simple_filter(self, simple_filter):
input0 = np.array([1, 2, 3, 4, 5])
simulation = Simulation(simple_filter, [input0], save_results=True)
output0 = [simulation.run()[0] for _ in range(len(input0))]
assert output0 == [0, 1.0, 2.5, 4.25, 6.125]