Skip to content
Snippets Groups Projects
Commit 7b732af1 authored by Jacob Wahlman's avatar Jacob Wahlman :ok_hand:
Browse files

Merge branch '26-print-pg' into 'develop'

Resolve "Print PG"

See merge request PUM_TDDD96/B-ASIC!47
parents 6653c16f 9c9e2af6
No related branches found
No related tags found
3 merge requests!67WIP: B-ASIC version 1.0.0 hotfix,!65B-ASIC version 1.0.0,!47Resolve "Print PG"
Pipeline #15079 passed
......@@ -105,6 +105,10 @@ class AbstractGraphComponent(GraphComponent):
self._graph_id = ""
self._parameters = {}
def __str__(self):
return f"id: {self.graph_id if self.graph_id else 'no_id'}, \tname: {self.name if self.name else 'no_name'}" + \
"".join((f", \t{key}: {str(param)}" for key, param in self._parameters.items()))
@property
def name(self) -> Name:
return self._name
......
......@@ -262,6 +262,47 @@ class AbstractOperation(Operation, AbstractGraphComponent):
from b_asic.core_operations import Constant, Division
return Division(Constant(src) if isinstance(src, Number) else src, self)
def __str__(self):
inputs_dict = dict()
for i, port in enumerate(self.inputs):
if port.signal_count == 0:
inputs_dict[i] = '-'
break
dict_ele = []
for signal in port.signals:
if signal.source:
if signal.source.operation.graph_id:
dict_ele.append(signal.source.operation.graph_id)
else:
dict_ele.append("no_id")
else:
if signal.graph_id:
dict_ele.append(signal.graph_id)
else:
dict_ele.append("no_id")
inputs_dict[i] = dict_ele
outputs_dict = dict()
for i, port in enumerate(self.outputs):
if port.signal_count == 0:
outputs_dict[i] = '-'
break
dict_ele = []
for signal in port.signals:
if signal.destination:
if signal.destination.operation.graph_id:
dict_ele.append(signal.destination.operation.graph_id)
else:
dict_ele.append("no_id")
else:
if signal.graph_id:
dict_ele.append(signal.graph_id)
else:
dict_ele.append("no_id")
outputs_dict[i] = dict_ele
return super().__str__() + f", \tinputs: {str(inputs_dict)}, \toutputs: {str(outputs_dict)}"
@property
def input_count(self) -> int:
return len(self._input_ports)
......@@ -400,6 +441,16 @@ class AbstractOperation(Operation, AbstractGraphComponent):
def neighbors(self) -> Iterable[GraphComponent]:
return list(self.input_signals) + list(self.output_signals)
@property
def preceding_operations(self) -> Iterable[Operation]:
"""Returns an Iterable of all Operations that are connected to this Operations input ports."""
return [signal.source.operation for signal in self.input_signals if signal.source]
@property
def subsequent_operations(self) -> Iterable[Operation]:
"""Returns an Iterable of all Operations that are connected to this Operations output ports."""
return [signal.destination.operation for signal in self.output_signals if signal.destination]
@property
def source(self) -> OutputPort:
if self.output_count != 1:
......
This diff is collapsed.
import pytest
from b_asic import SFG, Input, Output, Constant, Register, ConstantMultiplication
from b_asic import SFG, Input, Output, Constant, Register, ConstantMultiplication, Addition, Butterfly
@pytest.fixture
def sfg_two_inputs_two_outputs():
"""Valid SFG with two inputs and two outputs.
. .
. .
in1-------+ +--------->out1
. | | .
. v | .
......@@ -17,9 +17,9 @@ def sfg_two_inputs_two_outputs():
| . ^ .
| . | .
+------------+ .
. .
. .
out1 = in1 + in2
out2 = in1 + 2 * in2
out2 = in1 + 2 * in2
"""
in1 = Input()
in2 = Input()
......@@ -27,13 +27,14 @@ def sfg_two_inputs_two_outputs():
add2 = add1 + in2
out1 = Output(add1)
out2 = Output(add2)
return SFG(inputs = [in1, in2], outputs = [out1, out2])
return SFG(inputs=[in1, in2], outputs=[out1, out2])
@pytest.fixture
def sfg_two_inputs_two_outputs_independent():
"""Valid SFG with two inputs and two outputs, where the first output only depends
on the first input and the second output only depends on the second input.
. .
. .
in1-------------------->out1
. .
. .
......@@ -44,17 +45,18 @@ def sfg_two_inputs_two_outputs_independent():
. | ^ .
. | | .
. +------+ .
. .
. .
out1 = in1
out2 = in2 + 3
out2 = in2 + 3
"""
in1 = Input()
in2 = Input()
c1 = Constant(3)
add1 = in2 + c1
out1 = Output(in1)
out2 = Output(add1)
return SFG(inputs = [in1, in2], outputs = [out1, out2])
in1 = Input("IN1")
in2 = Input("IN2")
c1 = Constant(3, "C1")
add1 = Addition(in2, c1, "ADD1")
out1 = Output(in1, "OUT1")
out2 = Output(add1, "OUT2")
return SFG(inputs=[in1, in2], outputs=[out1, out2])
@pytest.fixture
def sfg_nested():
......@@ -65,7 +67,7 @@ def sfg_nested():
mac_in2 = Input()
mac_in3 = Input()
mac_out1 = Output(mac_in1 + mac_in2 * mac_in3)
MAC = SFG(inputs = [mac_in1, mac_in2, mac_in3], outputs = [mac_out1])
MAC = SFG(inputs=[mac_in1, mac_in2, mac_in3], outputs=[mac_out1])
in1 = Input()
in2 = Input()
......@@ -73,7 +75,8 @@ def sfg_nested():
mac2 = MAC(in1, in2, mac1)
mac3 = MAC(in1, mac1, mac2)
out1 = Output(mac3)
return SFG(inputs = [in1, in2], outputs = [out1])
return SFG(inputs=[in1, in2], outputs=[out1])
@pytest.fixture
def sfg_delay():
......@@ -83,7 +86,8 @@ def sfg_delay():
in1 = Input()
reg1 = Register(in1)
out1 = Output(reg1)
return SFG(inputs = [in1], outputs = [out1])
return SFG(inputs=[in1], outputs=[out1])
@pytest.fixture
def sfg_accumulator():
......@@ -95,7 +99,8 @@ def sfg_accumulator():
reg = Register()
reg.input(0).connect((reg + data_in) * (1 - reset))
data_out = Output(reg)
return SFG(inputs = [data_in, reset], outputs = [data_out])
return SFG(inputs=[data_in, reset], outputs=[data_out])
@pytest.fixture
def simple_filter():
......@@ -105,11 +110,70 @@ def simple_filter():
| |
in1>------add1>------reg>------+------out1>
"""
in1 = Input()
reg = Register()
constmul1 = ConstantMultiplication(0.5)
add1 = in1 + constmul1
reg.input(0).connect(add1)
in1 = Input("IN1")
constmul1 = ConstantMultiplication(0.5, name="CMUL1")
add1 = Addition(in1, constmul1, "ADD1")
reg = Register(add1, name="REG1")
constmul1.input(0).connect(reg)
out1 = Output(reg)
return SFG(inputs=[in1], outputs=[out1])
out1 = Output(reg, "OUT1")
return SFG(inputs=[in1], outputs=[out1], name="simple_filter")
@pytest.fixture
def precedence_sfg_registers():
"""A sfg with registers and interesting layout for precednce list generation.
IN1>--->C0>--->ADD1>--->Q1>---+--->A0>--->ADD4>--->OUT1
^ | ^
| T1 |
| | |
ADD2<---<B1<---+--->A1>--->ADD3
^ | ^
| T2 |
| | |
+-----<B2<---+--->A2>-----+
"""
in1 = Input("IN1")
c0 = ConstantMultiplication(5, in1, "C0")
add1 = Addition(c0, None, "ADD1")
# Not sure what operation "Q" is supposed to be in the example
Q1 = ConstantMultiplication(1, add1, "Q1")
T1 = Register(Q1, 0, "T1")
T2 = Register(T1, 0, "T2")
b2 = ConstantMultiplication(2, T2, "B2")
b1 = ConstantMultiplication(3, T1, "B1")
add2 = Addition(b1, b2, "ADD2")
add1.input(1).connect(add2)
a1 = ConstantMultiplication(4, T1, "A1")
a2 = ConstantMultiplication(6, T2, "A2")
add3 = Addition(a1, a2, "ADD3")
a0 = ConstantMultiplication(7, Q1, "A0")
add4 = Addition(a0, add3, "ADD4")
out1 = Output(add4, "OUT1")
return SFG(inputs=[in1], outputs=[out1], name="SFG")
@pytest.fixture
def precedence_sfg_registers_and_constants():
in1 = Input("IN1")
c0 = ConstantMultiplication(5, in1, "C0")
add1 = Addition(c0, None, "ADD1")
# Not sure what operation "Q" is supposed to be in the example
Q1 = ConstantMultiplication(1, add1, "Q1")
T1 = Register(Q1, 0, "T1")
const1 = Constant(10, "CONST1") # Replace T2 register with a constant
b2 = ConstantMultiplication(2, const1, "B2")
b1 = ConstantMultiplication(3, T1, "B1")
add2 = Addition(b1, b2, "ADD2")
add1.input(1).connect(add2)
a1 = ConstantMultiplication(4, T1, "A1")
a2 = ConstantMultiplication(10, const1, "A2")
add3 = Addition(a1, a2, "ADD3")
a0 = ConstantMultiplication(7, Q1, "A0")
# Replace ADD4 with a butterfly to test multiple output ports
bfly1 = Butterfly(a0, add3, "BFLY1")
out1 = Output(bfly1.output(0), "OUT1")
out2 = Output(bfly1.output(1), "OUT2")
return SFG(inputs=[in1], outputs=[out1], name="SFG")
import pytest
import io
import sys
from b_asic import SFG, Signal, Input, Output, Constant, ConstantMultiplication, Addition, Multiplication, Register, \
Butterfly, Subtraction, SquareRoot
......@@ -54,13 +57,17 @@ class TestPrintSfg:
inp2 = Input("INP2")
add1 = Addition(inp1, inp2, "ADD1")
out1 = Output(add1, "OUT1")
sfg = SFG(inputs=[inp1, inp2], outputs=[out1], name="sf1")
sfg = SFG(inputs=[inp1, inp2], outputs=[out1], name="SFG1")
assert sfg.__str__() == \
"id: add1, name: ADD1, input: [s1, s2], output: [s3]\n" + \
"id: in1, name: INP1, input: [], output: [s1]\n" + \
"id: in2, name: INP2, input: [], output: [s2]\n" + \
"id: out1, name: OUT1, input: [s3], output: []\n"
"id: no_id, \tname: SFG1, \tinputs: {0: '-'}, \toutputs: {0: '-'}\n" + \
"Internal Operations:\n" + \
"----------------------------------------------------------------------------------------------------\n" + \
str(sfg.find_by_name("INP1")[0]) + "\n" + \
str(sfg.find_by_name("INP2")[0]) + "\n" + \
str(sfg.find_by_name("ADD1")[0]) + "\n" + \
str(sfg.find_by_name("OUT1")[0]) + "\n" + \
"----------------------------------------------------------------------------------------------------\n"
def test_add_mul(self):
inp1 = Input("INP1")
......@@ -72,12 +79,16 @@ class TestPrintSfg:
sfg = SFG(inputs=[inp1, inp2, inp3], outputs=[out1], name="mac_sfg")
assert sfg.__str__() == \
"id: add1, name: ADD1, input: [s1, s2], output: [s5]\n" + \
"id: in1, name: INP1, input: [], output: [s1]\n" + \
"id: in2, name: INP2, input: [], output: [s2]\n" + \
"id: mul1, name: MUL1, input: [s5, s3], output: [s4]\n" + \
"id: in3, name: INP3, input: [], output: [s3]\n" + \
"id: out1, name: OUT1, input: [s4], output: []\n"
"id: no_id, \tname: mac_sfg, \tinputs: {0: '-'}, \toutputs: {0: '-'}\n" + \
"Internal Operations:\n" + \
"----------------------------------------------------------------------------------------------------\n" + \
str(sfg.find_by_name("INP1")[0]) + "\n" + \
str(sfg.find_by_name("INP2")[0]) + "\n" + \
str(sfg.find_by_name("ADD1")[0]) + "\n" + \
str(sfg.find_by_name("INP3")[0]) + "\n" + \
str(sfg.find_by_name("MUL1")[0]) + "\n" + \
str(sfg.find_by_name("OUT1")[0]) + "\n" + \
"----------------------------------------------------------------------------------------------------\n"
def test_constant(self):
inp1 = Input("INP1")
......@@ -88,18 +99,27 @@ class TestPrintSfg:
sfg = SFG(inputs=[inp1], outputs=[out1], name="sfg")
assert sfg.__str__() == \
"id: add1, name: ADD1, input: [s3, s1], output: [s2]\n" + \
"id: c1, name: CONST, value: 3, input: [], output: [s3]\n" + \
"id: in1, name: INP1, input: [], output: [s1]\n" + \
"id: out1, name: OUT1, input: [s2], output: []\n"
"id: no_id, \tname: sfg, \tinputs: {0: '-'}, \toutputs: {0: '-'}\n" + \
"Internal Operations:\n" + \
"----------------------------------------------------------------------------------------------------\n" + \
str(sfg.find_by_name("CONST")[0]) + "\n" + \
str(sfg.find_by_name("INP1")[0]) + "\n" + \
str(sfg.find_by_name("ADD1")[0]) + "\n" + \
str(sfg.find_by_name("OUT1")[0]) + "\n" + \
"----------------------------------------------------------------------------------------------------\n"
def test_simple_filter(self, simple_filter):
assert simple_filter.__str__() == \
'id: add1, name: , input: [s1, s3], output: [s4]\n' + \
'id: in1, name: , input: [], output: [s1]\n' + \
'id: cmul1, name: , input: [s5], output: [s3]\n' + \
'id: reg1, name: , input: [s4], output: [s5, s2]\n' + \
'id: out1, name: , input: [s2], output: []\n'
"id: no_id, \tname: simple_filter, \tinputs: {0: '-'}, \toutputs: {0: '-'}\n" + \
"Internal Operations:\n" + \
"----------------------------------------------------------------------------------------------------\n" + \
str(simple_filter.find_by_name("IN1")[0]) + "\n" + \
str(simple_filter.find_by_name("ADD1")[0]) + "\n" + \
str(simple_filter.find_by_name("REG1")[0]) + "\n" + \
str(simple_filter.find_by_name("CMUL1")[0]) + "\n" + \
str(simple_filter.find_by_name("OUT1")[0]) + "\n" + \
"----------------------------------------------------------------------------------------------------\n"
class TestDeepCopy:
......@@ -267,7 +287,7 @@ class TestInsertComponent:
_sfg = sfg.insert_operation(sqrt, sfg.find_by_name("constant4")[0].graph_id)
assert _sfg.evaluate() != sfg.evaluate()
assert any([isinstance(comp, SquareRoot) for comp in _sfg.operations])
assert not any([isinstance(comp, SquareRoot) for comp in sfg.operations])
......@@ -275,7 +295,8 @@ class TestInsertComponent:
assert isinstance(_sfg.find_by_name("constant4")[0].output(0).signals[0].destination.operation, SquareRoot)
assert sfg.find_by_name("constant4")[0].output(0).signals[0].destination.operation is sfg.find_by_id("add3")
assert _sfg.find_by_name("constant4")[0].output(0).signals[0].destination.operation is not _sfg.find_by_id("add3")
assert _sfg.find_by_name("constant4")[0].output(
0).signals[0].destination.operation is not _sfg.find_by_id("add3")
assert _sfg.find_by_id("sqrt1").output(0).signals[0].destination.operation is _sfg.find_by_id("add3")
def test_insert_invalid_component_in_sfg(self, large_operation_tree):
......@@ -304,22 +325,26 @@ class TestInsertComponent:
assert len(_sfg.find_by_name("n_bfly")) == 1
# Correctly connected old output -> new input
assert _sfg.find_by_name("bfly3")[0].output(0).signals[0].destination.operation is _sfg.find_by_name("n_bfly")[0]
assert _sfg.find_by_name("bfly3")[0].output(1).signals[0].destination.operation is _sfg.find_by_name("n_bfly")[0]
assert _sfg.find_by_name("bfly3")[0].output(
0).signals[0].destination.operation is _sfg.find_by_name("n_bfly")[0]
assert _sfg.find_by_name("bfly3")[0].output(
1).signals[0].destination.operation is _sfg.find_by_name("n_bfly")[0]
# Correctly connected new input -> old output
assert _sfg.find_by_name("n_bfly")[0].input(0).signals[0].source.operation is _sfg.find_by_name("bfly3")[0]
assert _sfg.find_by_name("n_bfly")[0].input(1).signals[0].source.operation is _sfg.find_by_name("bfly3")[0]
# Correctly connected new output -> next input
assert _sfg.find_by_name("n_bfly")[0].output(0).signals[0].destination.operation is _sfg.find_by_name("bfly2")[0]
assert _sfg.find_by_name("n_bfly")[0].output(1).signals[0].destination.operation is _sfg.find_by_name("bfly2")[0]
assert _sfg.find_by_name("n_bfly")[0].output(
0).signals[0].destination.operation is _sfg.find_by_name("bfly2")[0]
assert _sfg.find_by_name("n_bfly")[0].output(
1).signals[0].destination.operation is _sfg.find_by_name("bfly2")[0]
# Correctly connected next input -> new output
assert _sfg.find_by_name("bfly2")[0].input(0).signals[0].source.operation is _sfg.find_by_name("n_bfly")[0]
assert _sfg.find_by_name("bfly2")[0].input(1).signals[0].source.operation is _sfg.find_by_name("n_bfly")[0]
class TestFindComponentsWithTypeName:
def test_mac_components(self):
inp1 = Input("INP1")
......@@ -358,28 +383,9 @@ class TestFindComponentsWithTypeName:
class TestGetPrecedenceList:
def test_inputs_registers(self):
in1 = Input("IN1")
c0 = ConstantMultiplication(5, in1, "C0")
add1 = Addition(c0, None, "ADD1")
# Not sure what operation "Q" is supposed to be in the example
Q1 = ConstantMultiplication(1, add1, "Q1")
T1 = Register(Q1, 0, "T1")
T2 = Register(T1, 0, "T2")
b2 = ConstantMultiplication(2, T2, "B2")
b1 = ConstantMultiplication(3, T1, "B1")
add2 = Addition(b1, b2, "ADD2")
add1.input(1).connect(add2)
a1 = ConstantMultiplication(4, T1, "A1")
a2 = ConstantMultiplication(6, T2, "A2")
add3 = Addition(a1, a2, "ADD3")
a0 = ConstantMultiplication(7, Q1, "A0")
add4 = Addition(a0, add3, "ADD4")
out1 = Output(add4, "OUT1")
sfg = SFG(inputs=[in1], outputs=[out1], name="SFG")
def test_inputs_registers(self, precedence_sfg_registers):
precedence_list = sfg.get_precedence_list()
precedence_list = precedence_sfg_registers.get_precedence_list()
assert len(precedence_list) == 7
......@@ -404,30 +410,9 @@ class TestGetPrecedenceList:
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[6]]) == {"ADD4"}
def test_inputs_constants_registers_multiple_outputs(self):
in1 = Input("IN1")
c0 = ConstantMultiplication(5, in1, "C0")
add1 = Addition(c0, None, "ADD1")
# Not sure what operation "Q" is supposed to be in the example
Q1 = ConstantMultiplication(1, add1, "Q1")
T1 = Register(Q1, 0, "T1")
const1 = Constant(10, "CONST1") # Replace T2 register with a constant
b2 = ConstantMultiplication(2, const1, "B2")
b1 = ConstantMultiplication(3, T1, "B1")
add2 = Addition(b1, b2, "ADD2")
add1.input(1).connect(add2)
a1 = ConstantMultiplication(4, T1, "A1")
a2 = ConstantMultiplication(10, const1, "A2")
add3 = Addition(a1, a2, "ADD3")
a0 = ConstantMultiplication(7, Q1, "A0")
# Replace ADD4 with a butterfly to test multiple output ports
bfly1 = Butterfly(a0, add3, "BFLY1")
out1 = Output(bfly1.output(0), "OUT1")
out2 = Output(bfly1.output(1), "OUT2")
sfg = SFG(inputs=[in1], outputs=[out1], name="SFG")
def test_inputs_constants_registers_multiple_outputs(self, precedence_sfg_registers_and_constants):
precedence_list = sfg.get_precedence_list()
precedence_list = precedence_sfg_registers_and_constants.get_precedence_list()
assert len(precedence_list) == 7
......@@ -502,10 +487,48 @@ class TestGetPrecedenceList:
for port in precedence_list[0]]) == {"IN1", "IN2"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[1]]) == {"NESTED_SFG.0", "CMUL1"}
for port in precedence_list[1]]) == {"CMUL1"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[2]]) == {"NESTED_SFG.1"}
for port in precedence_list[2]]) == {"NESTED_SFG.0", "NESTED_SFG.1"}
class TestPrintPrecedence:
def test_registers(self, precedence_sfg_registers):
sfg = precedence_sfg_registers
captured_output = io.StringIO()
sys.stdout = captured_output
sfg.print_precedence_graph()
sys.stdout = sys.__stdout__
captured_output = captured_output.getvalue()
assert captured_output == \
"-" * 120 + "\n" + \
"1.1 \t" + str(sfg.find_by_name("IN1")[0]) + "\n" + \
"1.2 \t" + str(sfg.find_by_name("T1")[0]) + "\n" + \
"1.3 \t" + str(sfg.find_by_name("T2")[0]) + "\n" + \
"-" * 120 + "\n" + \
"2.1 \t" + str(sfg.find_by_name("C0")[0]) + "\n" + \
"2.2 \t" + str(sfg.find_by_name("A1")[0]) + "\n" + \
"2.3 \t" + str(sfg.find_by_name("B1")[0]) + "\n" + \
"2.4 \t" + str(sfg.find_by_name("A2")[0]) + "\n" + \
"2.5 \t" + str(sfg.find_by_name("B2")[0]) + "\n" + \
"-" * 120 + "\n" + \
"3.1 \t" + str(sfg.find_by_name("ADD3")[0]) + "\n" + \
"3.2 \t" + str(sfg.find_by_name("ADD2")[0]) + "\n" + \
"-" * 120 + "\n" + \
"4.1 \t" + str(sfg.find_by_name("ADD1")[0]) + "\n" + \
"-" * 120 + "\n" + \
"5.1 \t" + str(sfg.find_by_name("Q1")[0]) + "\n" + \
"-" * 120 + "\n" + \
"6.1 \t" + str(sfg.find_by_name("A0")[0]) + "\n" + \
"-" * 120 + "\n" + \
"7.1 \t" + str(sfg.find_by_name("ADD4")[0]) + "\n" + \
"-" * 120 + "\n"
class TestDepends:
......@@ -672,3 +695,15 @@ class TestConnectExternalSignalsToComponentsMultipleComp:
sfg1.connect_external_signals_to_components()
assert test_sfg.evaluate(1, 2, 3, 4) == 16
assert not test_sfg.connect_external_signals_to_components()
class TestTopologicalOrderOperations:
def test_feedback_sfg(self, simple_filter):
topological_order = simple_filter.get_operations_topological_order()
assert [comp.name for comp in topological_order] == ["IN1", "ADD1", "REG1", "CMUL1", "OUT1"]
def test_multiple_independent_inputs(self, sfg_two_inputs_two_outputs_independent):
topological_order = sfg_two_inputs_two_outputs_independent.get_operations_topological_order()
assert [comp.name for comp in topological_order] == ["IN1", "OUT1", "IN2", "C1", "ADD1", "OUT2"]
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment