Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • da/B-ASIC
  • lukja239/B-ASIC
  • robal695/B-ASIC
3 results
Show changes
......@@ -25,6 +25,7 @@ from typing import (
cast,
)
import numpy as np
from graphviz import Digraph
from b_asic.graph_component import GraphComponent
......@@ -660,27 +661,28 @@ class SFG(AbstractOperation):
# Preserve the original SFG by creating a copy.
sfg_copy = self()
comp = sfg_copy._add_component_unconnected_copy(component)
output_comp = cast(Operation, sfg_copy.find_by_id(output_comp_id))
if output_comp is None:
return None
if isinstance(output_comp, Output):
raise TypeError("Source operation cannot be an output operation.")
if len(output_comp.output_signals) != component.input_count:
if len(output_comp.output_signals) != comp.input_count:
raise TypeError(
"Source operation output count"
f" ({len(output_comp.output_signals)}) does not match input"
f" count for component ({component.input_count})."
f" count for component ({comp.input_count})."
)
if len(output_comp.output_signals) != component.output_count:
if len(output_comp.output_signals) != comp.output_count:
raise TypeError(
"Destination operation input count does not match output for component."
)
for index, signal_in in enumerate(output_comp.output_signals):
destination = cast(InputPort, signal_in.destination)
signal_in.set_destination(component.input(index))
destination.connect(component.output(index))
signal_in.set_destination(comp.input(index))
destination.connect(comp.output(index))
# Recreate the newly coupled SFG so that all attributes are correct.
return sfg_copy()
......@@ -725,15 +727,16 @@ class SFG(AbstractOperation):
if output_comp is None:
raise ValueError(f"Unknown component: {output_comp_id!r}")
if isinstance(output_comp, Operation):
comp = sfg_copy._add_component_unconnected_copy(new_operation)
if port_id is None:
sfg_copy._insert_operation_after_operation(output_comp, new_operation)
sfg_copy._insert_operation_after_operation(output_comp, comp)
else:
sfg_copy._insert_operation_after_outputport(
output_comp.output(port_id), new_operation
output_comp.output(port_id), comp
)
elif isinstance(output_comp, Signal):
sfg_copy._insert_operation_before_signal(output_comp, new_operation)
sfg_copy._insert_operation_before_signal(output_comp, comp)
# Recreate the newly coupled SFG so that all attributes are correct.
return sfg_copy()
......@@ -776,14 +779,15 @@ class SFG(AbstractOperation):
if input_comp is None:
raise ValueError(f"Unknown component: {input_comp_id!r}")
if isinstance(input_comp, Operation):
comp = sfg_copy._add_component_unconnected_copy(new_operation)
if port is None:
sfg_copy._insert_operation_before_operation(input_comp, new_operation)
sfg_copy._insert_operation_before_operation(input_comp, comp)
else:
sfg_copy._insert_operation_before_inputport(
input_comp.input(port), new_operation
input_comp.input(port), comp
)
elif isinstance(input_comp, Signal):
sfg_copy._insert_operation_after_signal(input_comp, new_operation)
sfg_copy._insert_operation_after_signal(input_comp, comp)
# Recreate the newly coupled SFG so that all attributes are correct.
return sfg_copy()
......@@ -1688,13 +1692,284 @@ class SFG(AbstractOperation):
return Schedule(self, algorithm="ASAP").schedule_time
def _dfs(self, graph, start, end):
"""
Find loop(s) in graph
Parameters
----------
graph : dictionary
The dictionary that are to be searched for loops.
start : key in dictionary graph
The "node" in the dictionary that are set as the start point.
end : key in dictionary graph
The "node" in the dictionary that are set as the end point.
"""
fringe = [(start, [])]
while fringe:
state, path = fringe.pop()
if path and state == end:
yield path
continue
for next_state in graph[state]:
if next_state in path:
continue
fringe.append((next_state, path + [next_state]))
def iteration_period_bound(self) -> int:
"""
Return the iteration period bound of the SFG.
If -1, the SFG does not have any loops and therefore no iteration period bound.
Returns
-------
The iteration period bound.
"""
inputs_used = []
for used_input in self._used_ids:
if 'in' in str(used_input):
used_input = used_input.replace("in", "")
inputs_used.append(int(used_input))
if inputs_used == []:
raise ValueError("No inputs to sfg")
for input in inputs_used:
input_op = self._input_operations[input]
queue: Deque[Operation] = deque([input_op])
visited: Set[Operation] = {input_op}
dict_of_sfg = {}
while queue:
op = queue.popleft()
for output_port in op.outputs:
if not (isinstance(op, Input) or isinstance(op, Output)):
dict_of_sfg[op.graph_id] = []
for signal in output_port.signals:
if signal.destination is not None:
new_op = signal.destination.operation
if not (isinstance(op, Input) or isinstance(op, Output)):
if not isinstance(new_op, Output):
dict_of_sfg[op.graph_id] += [new_op.graph_id]
if new_op not in visited:
queue.append(new_op)
visited.add(new_op)
else:
raise ValueError("Destination does not exist")
if not dict_of_sfg:
raise ValueError(
"the SFG does not have any loops and therefore no iteration period bound."
)
cycles = [
[node] + path
for node in dict_of_sfg
for path in self._dfs(dict_of_sfg, node, node)
]
if not cycles:
return -1
op_and_latency = {}
for op in self.operations:
for lista in cycles:
for element in lista:
if op.type_name() not in op_and_latency:
op_and_latency[op.type_name()] = op.latency
t_l_values = []
for loop in cycles:
loop.pop()
time_of_loop = 0
number_of_t_in_loop = 0
for element in loop:
if ''.join([i for i in element if not i.isdigit()]) == 't':
number_of_t_in_loop += 1
for key, item in op_and_latency.items():
if key in element:
time_of_loop += item
if number_of_t_in_loop in (0, 1):
t_l_values.append(time_of_loop)
else:
t_l_values.append(time_of_loop / number_of_t_in_loop)
return max(t_l_values)
def state_space_representation(self):
"""
Find the state-space representation of the SFG.
Returns
-------
The state-space representation.
"""
delay_element_used = []
for delay_element in self._used_ids:
if ''.join([i for i in delay_element if not i.isdigit()]) == 't':
delay_element_used.append(delay_element)
delay_element_used.sort()
input_index_used = []
inputs_used = []
output_index_used = []
outputs_used = []
for used_input in self._used_ids:
if 'in' in str(used_input):
inputs_used.append(used_input)
input_index_used.append(int(used_input.replace("in", "")))
for used_output in self._used_ids:
if 'out' in str(used_output):
outputs_used.append(used_output)
output_index_used.append(int(used_output.replace("out", "")))
if input_index_used == []:
raise ValueError("No input(s) to sfg")
if output_index_used == []:
raise ValueError("No output(s) to sfg")
for input in input_index_used:
input_op = self._input_operations[input]
queue: Deque[Operation] = deque([input_op])
visited: Set[Operation] = {input_op}
dict_of_sfg = {}
while queue:
op = queue.popleft()
if isinstance(op, Output):
dict_of_sfg[op.graph_id] = []
for output_port in op.outputs:
dict_of_sfg[op.graph_id] = []
for signal in output_port.signals:
if signal.destination is not None:
new_op = signal.destination.operation
dict_of_sfg[op.graph_id] += [new_op.graph_id]
if new_op not in visited:
queue.append(new_op)
visited.add(new_op)
else:
raise ValueError("Destination does not exist")
if not dict_of_sfg:
raise ValueError("Empty SFG")
cycles = [
[node] + path
for node in dict_of_sfg
if node[0] == 't'
for path in self._dfs(dict_of_sfg, node, node)
]
delay_loop_list = []
for lista in cycles:
if not len(lista) < 2:
temp_list = []
for element in lista:
temp_list.append(element)
if element[0] == 't' and len(temp_list) > 2:
delay_loop_list.append(temp_list)
temp_list = [element]
state_space_lista = []
[
state_space_lista.append(x)
for x in delay_loop_list
if x not in state_space_lista
]
mat_row = len(delay_element_used) + len(output_index_used)
mat_col = len(delay_element_used) + len(input_index_used)
mat_content = np.zeros((mat_row, mat_col))
matrix_in = [0] * mat_col
matrix_answer = [0] * mat_row
for in_signal in inputs_used:
matrix_in[len(delay_element_used) + int(in_signal.replace('in', ''))] = (
in_signal.replace('in', 'x')
)
for delay_element in delay_element_used:
matrix_answer[delay_element_used.index(delay_element)] = (
delay_element.replace('t', 'v')
)
matrix_in[delay_element_used.index(delay_element)] = (
delay_element.replace('t', 'v')
)
paths = self.find_all_paths(dict_of_sfg, in_signal, delay_element)
for lista in paths:
temp_list = []
for element in lista:
temp_list.append(element)
if element[0] == 't':
state_space_lista.append(temp_list)
temp_list = [element]
for out_signal in outputs_used:
paths = self.find_all_paths(dict_of_sfg, in_signal, out_signal)
matrix_answer[
len(delay_element_used) + int(out_signal.replace('out', ''))
] = out_signal.replace('out', 'y')
for lista in paths:
temp_list1 = []
for element in lista:
temp_list1.append(element)
if element[0] == 't':
state_space_lista.append(temp_list1)
temp_list1 = [element]
if "out" in element:
state_space_lista.append(temp_list1)
temp_list1 = []
state_space_list_no_dup = []
[
state_space_list_no_dup.append(x)
for x in state_space_lista
if x not in state_space_list_no_dup
]
for lista in state_space_list_no_dup:
if "in" in lista[0] and lista[-1][0] == 't':
row = int(lista[-1].replace("t", ""))
column = len(delay_element_used) + int(lista[0].replace("in", ""))
temp_value = 1
for element in lista:
if "cmul" in element:
temp_value *= self.find_by_id(element).value
mat_content[row, column] += temp_value
elif "in" in lista[0] and "out" in lista[-1]:
row = len(delay_element_used) + int(lista[-1].replace("out", ""))
column = len(delay_element_used) + int(lista[0].replace("in", ""))
temp_value = 1
for element in lista:
if "cmul" in element:
temp_value *= self.find_by_id(element).value
mat_content[row, column] += temp_value
elif lista[0][0] == 't' and lista[-1][0] == 't':
row = int(lista[-1].replace("t", ""))
column = int(lista[0].replace("t", ""))
temp_value = 1
for element in lista:
if "cmul" in element:
temp_value *= self.find_by_id(element).value
mat_content[row, column] += temp_value
elif lista[0][0] == 't' and "out" in lista[-1]:
row = len(delay_element_used) + int(lista[-1].replace("out", ""))
column = int(lista[0].replace("t", ""))
temp_value = 1
for element in lista:
if "cmul" in element:
temp_value *= self.find_by_id(element).value
mat_content[row, column] += temp_value
return matrix_answer, mat_content, matrix_in
def find_all_paths(self, graph: dict, start: str, end: str, path=[]) -> list:
"""
Returns all paths in graph from node start to node end
Parameters
----------
graph : dictionary
The dictionary that are to be searched for loops.
start : key in dictionary graph
The "node" in the dictionary that are set as the start point.
end : key in dictionary graph
The "node" in the dictionary that are set as the end point.
Returns
-------
The state-space representation of the SFG.
"""
raise NotImplementedError()
path = path + [start]
if start == end:
return [path]
if start not in graph:
return []
paths = []
for node in graph[start]:
if node not in path:
newpaths = self.find_all_paths(graph, node, end, path)
for newpath in newpaths:
paths.append(newpath)
return paths
def edit(self) -> Dict[str, "SFG"]:
"""Edit SFG in GUI."""
......
......@@ -90,6 +90,10 @@ class Impulse(SignalGenerator):
The delay before the signal goes to 1 for one sample.
"""
__slots__ = ("_delay",)
_delay: int
def __init__(self, delay: int = 0) -> None:
self._delay = delay
......@@ -110,6 +114,10 @@ class Step(SignalGenerator):
The delay before the signal goes to 1.
"""
__slots__ = ("_delay",)
_delay: int
def __init__(self, delay: int = 0) -> None:
self._delay = delay
......@@ -130,6 +138,10 @@ class Constant(SignalGenerator):
The constant.
"""
__slots__ = ("_constant",)
_constant: complex
def __init__(self, constant: complex = 1.0) -> None:
self._constant = constant
......@@ -150,6 +162,11 @@ class ZeroPad(SignalGenerator):
The data that should be padded.
"""
__slots__ = ("_data", "_len")
_data: Sequence[complex]
_len: int
def __init__(self, data: Sequence[complex]) -> None:
self._data = data
self._len = len(data)
......@@ -175,7 +192,13 @@ class FromFile(SignalGenerator):
Path to input file.
"""
def __init__(self, path) -> None:
__slots__ = ("_data", "_len", "_path")
_path: str
_data: List[Num]
_len: int
def __init__(self, path: str) -> None:
self._path = path
data: List[Num] = np.loadtxt(path, dtype=complex).tolist()
self._data = data
......@@ -203,6 +226,11 @@ class Sinusoid(SignalGenerator):
The normalized phase offset.
"""
__slots__ = ("_frequency", "_phase")
_frequency: float
_phase: float
def __init__(self, frequency: float, phase: float = 0.0) -> None:
self._frequency = frequency
self._phase = phase
......@@ -234,6 +262,12 @@ class Gaussian(SignalGenerator):
The standard deviation of the noise.
"""
__slots__ = ("_rng", "_seed", "_loc", "_scale")
_seed: Optional[int]
_loc: float
_scale: float
def __init__(
self, seed: Optional[int] = None, loc: float = 0.0, scale: float = 1.0
) -> None:
......@@ -273,6 +307,12 @@ class Uniform(SignalGenerator):
The upper value of the uniform range.
"""
__slots__ = ("_rng", "_seed", "_low", "_high")
_seed: Optional[int]
_low: float
_high: float
def __init__(
self, seed: Optional[int] = None, low: float = -1.0, high: float = 1.0
) -> None:
......@@ -313,6 +353,11 @@ class Delay(SignalGenerator):
The number of time units to delay the generated signal.
"""
__slots__ = ("_delay", "_generator")
_generator: SignalGenerator
_delay: int
def __init__(self, generator: SignalGenerator, delay: int = 1) -> None:
self._generator = generator
self._delay = delay
......@@ -329,6 +374,11 @@ class _AddGenerator(SignalGenerator):
Signal generator that adds two signals.
"""
__slots__ = ("_a", "_b")
_a: SignalGenerator
_b: SignalGenerator
def __init__(self, a: SignalGenerator, b: SignalGenerator) -> None:
self._a = a
self._b = b
......@@ -345,6 +395,11 @@ class _SubGenerator(SignalGenerator):
Signal generator that subtracts two signals.
"""
__slots__ = ("_a", "_b")
_a: SignalGenerator
_b: SignalGenerator
def __init__(self, a: SignalGenerator, b: SignalGenerator) -> None:
self._a = a
self._b = b
......@@ -361,6 +416,11 @@ class _MulGenerator(SignalGenerator):
Signal generator that multiplies two signals.
"""
__slots__ = ("_a", "_b")
_a: SignalGenerator
_b: SignalGenerator
def __init__(self, a: SignalGenerator, b: SignalGenerator) -> None:
self._a = a
self._b = b
......@@ -387,6 +447,11 @@ class _DivGenerator(SignalGenerator):
Signal generator that divides two signals.
"""
__slots__ = ("_a", "_b")
_a: SignalGenerator
_b: SignalGenerator
def __init__(self, a: SignalGenerator, b: SignalGenerator) -> None:
self._a = a
self._b = b
......@@ -428,6 +493,12 @@ class Upsample(SignalGenerator):
The phase of the upsampling.
"""
__slots__ = ("_generator", "_factor", "_phase")
_generator: Union[SignalGenerator, Sequence[complex]]
_factor: int
_phase: int
def __init__(
self,
data: Union[SignalGenerator, Sequence[complex]],
......@@ -467,6 +538,12 @@ class Downsample(SignalGenerator):
The phase of the downsampling.
"""
__slots__ = ("_generator", "_factor", "_phase")
_generator: Union[SignalGenerator, Sequence[complex]]
_factor: int
_phase: int
def __init__(
self,
data: Union[SignalGenerator, Sequence[complex]],
......
......@@ -191,6 +191,7 @@ class Delay(AbstractOperation):
output_count=1,
name=Name(name),
input_sources=[src0],
latency=0,
)
self.set_param("initial_value", initial_value)
......
......@@ -3,6 +3,7 @@
Introduction example for the TSTE87 course
==========================================
"""
from b_asic.core_operations import Addition, ConstantMultiplication
from b_asic.signal_flow_graph import SFG
from b_asic.special_operations import Delay, Input, Output
......
......@@ -14,6 +14,7 @@ Node numbering from the original SFG used with the Matlab toolbox::
sfg=addoperand(sfg,'delay',1,3,4);
sfg=addoperand(sfg,'out',1,7);
"""
from b_asic.signal_flow_graph import SFG
from b_asic.special_operations import Delay, Input, Output
......
......@@ -5,6 +5,7 @@ Third-order Bireciprocal LWDF
Small bireciprocal lattice wave digital filter.
"""
import numpy as np
from mplsignal.freq_plots import freqz_fir
......
......@@ -9,9 +9,9 @@ dependencies = [
"numpy",
"qtpy",
"graphviz>=0.19",
"matplotlib",
"matplotlib>=3.7",
"setuptools_scm[toml]>=6.2",
"networkx",
"networkx>=3",
"qtawesome"
]
classifiers = [
......
test/baseline_images/test_schedule/test__get_figure_no_execution_times.png

36.6 KiB | W: 0px | H: 0px

test/baseline_images/test_schedule/test__get_figure_no_execution_times.png

49.4 KiB | W: 0px | H: 0px

test/baseline_images/test_schedule/test__get_figure_no_execution_times.png
test/baseline_images/test_schedule/test__get_figure_no_execution_times.png
test/baseline_images/test_schedule/test__get_figure_no_execution_times.png
test/baseline_images/test_schedule/test__get_figure_no_execution_times.png
  • 2-up
  • Swipe
  • Onion skin
......@@ -87,10 +87,10 @@ def butterfly_operation_tree():
*(
Butterfly(
*(Butterfly(Constant(2), Constant(4), name="bfly3").outputs),
name="bfly2"
name="bfly2",
).outputs
),
name="bfly1"
name="bfly1",
)
......
......@@ -3,15 +3,8 @@ B-ASIC automatically generated SFG file.
Name: twotapfir
Last saved: 2023-01-24 14:38:17.654639.
"""
from b_asic import (
SFG,
Addition,
ConstantMultiplication,
Delay,
Input,
Output,
Signal,
)
from b_asic import SFG, Addition, ConstantMultiplication, Delay, Input, Output, Signal
# Inputs:
in1 = Input(name="in1")
......@@ -24,9 +17,7 @@ t1 = Delay(initial_value=0, name="")
cmul1 = ConstantMultiplication(
value=-0.5, name="cmul1", latency_offsets={'in0': None, 'out0': None}
)
add1 = Addition(
name="add1", latency_offsets={'in0': None, 'in1': None, 'out0': None}
)
add1 = Addition(name="add1", latency_offsets={'in0': None, 'in1': None, 'out0': None})
cmul2 = ConstantMultiplication(
value=0.5, name="cmul2", latency_offsets={'in0': None, 'out0': None}
)
......
"""
B-ASIC test suite for the AbstractOperation class.
"""
import re
import pytest
......
"""
B-ASIC test suite for OutputPort.
"""
import pytest
from b_asic import Signal
......
......@@ -7,6 +7,7 @@ import sys
from os import path, remove
from typing import Counter, Dict, Type
import numpy as np
import pytest
from b_asic import Input, Output, Signal
......@@ -1772,3 +1773,48 @@ class TestInsertDelays:
assert source2.type_name() == d_type_name
assert source3.type_name() == d_type_name
assert source4.type_name() == bfly.type_name()
class TestIterationPeriodBound:
def test_accumulator(self, sfg_simple_accumulator):
sfg_simple_accumulator.set_latency_of_type('add', 2)
assert sfg_simple_accumulator.iteration_period_bound() == 2
def test_no_latency(self, sfg_simple_accumulator):
with pytest.raises(
ValueError,
match="All native offsets have to set to a non-negative value to",
):
sfg_simple_accumulator.iteration_period_bound()
def test_secondorder_iir(self, precedence_sfg_delays):
precedence_sfg_delays.set_latency_of_type('add', 2)
precedence_sfg_delays.set_latency_of_type('cmul', 3)
assert precedence_sfg_delays.iteration_period_bound() == 10
def test_no_delays(self, sfg_two_inputs_two_outputs):
assert sfg_two_inputs_two_outputs.iteration_period_bound() == -1
class TestStateSpace:
def test_accumulator(self, sfg_simple_accumulator):
ss = sfg_simple_accumulator.state_space_representation()
assert ss[0] == ['v0', 'y0']
assert (ss[1] == np.array([[1.0, 1.0], [0.0, 1.0]])).all()
assert ss[2] == ['v0', 'x0']
def test_secondorder_iir(self, precedence_sfg_delays):
ss = precedence_sfg_delays.state_space_representation()
assert ss[0] == ['v0', 'v1', 'y0']
mat = np.array([[5.0, 2.0, 5.0], [1.0, 0.0, 0.0], [4.0, 6.0, 35.0]])
assert (ss[1] == mat).all()
assert ss[2] == ['v0', 'v1', 'x0']
@pytest.mark.xfail()
def test_no_delays(self, sfg_two_inputs_two_outputs):
ss = sfg_two_inputs_two_outputs.state_space_representation()
assert ss[0] == ['y0', 'y1']
assert (ss[1] == np.array([[1.0, 1.0], [1.0, 2.0]])).all()
assert ss[2] == ['x0', 'x1']