Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • da/B-ASIC
  • lukja239/B-ASIC
  • robal695/B-ASIC
3 results
Show changes
Showing
with 777 additions and 639 deletions
......@@ -5,12 +5,8 @@ Contains the class for representing the connections between operations.
"""
from typing import TYPE_CHECKING, Iterable, Optional, Union
from b_asic.graph_component import (
AbstractGraphComponent,
GraphComponent,
Name,
TypeName,
)
from b_asic.graph_component import AbstractGraphComponent, GraphComponent
from b_asic.types import Name, TypeName
if TYPE_CHECKING:
from b_asic.operation import Operation
......@@ -48,9 +44,7 @@ class Signal(AbstractGraphComponent):
def __init__(
self,
source: Optional[Union["OutputPort", "Signal", "Operation"]] = None,
destination: Optional[
Union["InputPort", "Signal", "Operation"]
] = None,
destination: Optional[Union["InputPort", "Signal", "Operation"]] = None,
bits: Optional[int] = None,
name: Name = Name(""),
):
......@@ -70,29 +64,26 @@ class Signal(AbstractGraphComponent):
@property
def neighbors(self) -> Iterable[GraphComponent]:
return [
p.operation
for p in [self.source, self.destination]
if p is not None
]
"""Return a list of the (up to) two operations connected to this signal."""
return [p.operation for p in [self.source, self.destination] if p is not None]
@property
def source(self) -> Optional["OutputPort"]:
"""The source OutputPort of the signal."""
"""Return the source OutputPort of the signal."""
return self._source
@property
def destination(self) -> Optional["InputPort"]:
"""The destination InputPort of the signal."""
"""Return the destination InputPort of the signal."""
return self._destination
def set_source(
self, source: Union["OutputPort", "Signal", "Operation"]
) -> None:
def set_source(self, source: Union["OutputPort", "Signal", "Operation"]) -> None:
"""
Disconnect the previous source OutputPort of the signal and
connect to the entered source OutputPort. Also connect the entered
source port to the signal if it has not already been connected.
Connect to the entered source OutputPort.
Also, disconnect the previous source OutputPort of the signal and
connect the entered source port to the signal if it has not
already been connected.
Parameters
==========
......@@ -123,9 +114,11 @@ class Signal(AbstractGraphComponent):
self, destination: Union["InputPort", "Signal", "Operation"]
) -> None:
"""
Disconnect the previous destination InputPort of the signal and
connect to the entered destination InputPort. Also connect the entered
destination port to the signal if it has not already been connected.
Connect to the entered *destination* InputPort.
Also, disconnect the previous destination InputPort of the signal and
connect the entered destination port to the signal if it has not already
been connected.
Parameters
==========
......@@ -155,8 +148,10 @@ class Signal(AbstractGraphComponent):
def remove_source(self) -> None:
"""
Disconnect the source OutputPort of the signal. If the source port
still is connected to this signal then also disconnect the source port.
Disconnect the source OutputPort of the signal.
If the source port still is connected to this signal then also disconnect
the source port.
"""
source = self._source
if source is not None:
......@@ -174,8 +169,7 @@ class Signal(AbstractGraphComponent):
def dangling(self) -> bool:
"""
Returns True if the signal is missing either a source or a destination,
else False.
Return True if the signal is missing either a source or a destination.
"""
return self._source is None or self._destination is None
......@@ -197,9 +191,7 @@ class Signal(AbstractGraphComponent):
"""
if bits is not None:
if not isinstance(bits, int):
raise TypeError(
f"Bits must be an int, not {type(bits)}: {bits!r}"
)
raise TypeError(f"Bits must be an int, not {type(bits)}: {bits!r}")
if bits < 0:
raise ValueError("Bits cannot be negative")
self.set_param("bits", bits)
This diff is collapsed.
"""
B-ASIC signal generators
B-ASIC Signal Generator Module.
These can be used as input to Simulation to algorithmically provide signal values.
Especially, all classes defined here will act as a callable which accepts an integer
......@@ -22,7 +22,7 @@ class SignalGenerator:
"""
Base class for signal generators.
Handles operator overloading and defined the ``__call__`` method that should
Handles operator overloading and defines the ``__call__`` method that should
be overridden.
"""
......@@ -94,7 +94,7 @@ class Impulse(SignalGenerator):
def __call__(self, time: int) -> complex:
return 1 if time == self._delay else 0
def __repr__(self):
def __repr__(self) -> str:
return f"Impulse({self._delay})" if self._delay else "Impulse()"
......@@ -114,7 +114,7 @@ class Step(SignalGenerator):
def __call__(self, time: int) -> complex:
return 1 if time >= self._delay else 0
def __repr__(self):
def __repr__(self) -> str:
return f"Step({self._delay})" if self._delay else "Step()"
......@@ -134,7 +134,7 @@ class Constant(SignalGenerator):
def __call__(self, time: int) -> complex:
return self._constant
def __str__(self):
def __str__(self) -> str:
return f"{self._constant}"
......@@ -157,7 +157,7 @@ class ZeroPad(SignalGenerator):
return self._data[time]
return 0.0
def __repr__(self):
def __repr__(self) -> str:
return f"ZeroPad({self._data})"
......@@ -181,7 +181,7 @@ class Sinusoid(SignalGenerator):
def __call__(self, time: int) -> complex:
return sin(pi * (self._frequency * time + self._phase))
def __repr__(self):
def __repr__(self) -> str:
return (
f"Sinusoid({self._frequency}, {self._phase})"
if self._phase
......@@ -216,7 +216,7 @@ class Gaussian(SignalGenerator):
def __call__(self, time: int) -> complex:
return self._rng.normal(self._loc, self._scale)
def __repr__(self):
def __repr__(self) -> str:
ret_list = []
if self._seed is not None:
ret_list.append(f"seed={self._seed}")
......@@ -256,7 +256,7 @@ class Uniform(SignalGenerator):
def __call__(self, time: int) -> complex:
return self._rng.uniform(self._low, self._high)
def __repr__(self):
def __repr__(self) -> str:
ret_list = []
if self._seed is not None:
ret_list.append(f"seed={self._seed}")
......@@ -280,7 +280,7 @@ class _AddGenerator(SignalGenerator):
def __call__(self, time: int) -> complex:
return self._a(time) + self._b(time)
def __repr__(self):
def __repr__(self) -> str:
return f"{self._a} + {self._b}"
......@@ -296,7 +296,7 @@ class _SubGenerator(SignalGenerator):
def __call__(self, time: int) -> complex:
return self._a(time) - self._b(time)
def __repr__(self):
def __repr__(self) -> str:
return f"{self._a} - {self._b}"
......@@ -312,7 +312,7 @@ class _MulGenerator(SignalGenerator):
def __call__(self, time: int) -> complex:
return self._a(time) * self._b(time)
def __repr__(self):
def __repr__(self) -> str:
a = (
f"({self._a})"
if isinstance(self._a, (_AddGenerator, _SubGenerator))
......@@ -338,7 +338,7 @@ class _DivGenerator(SignalGenerator):
def __call__(self, time: int) -> complex:
return self._a(time) / self._b(time)
def __repr__(self):
def __repr__(self) -> str:
a = (
f"({self._a})"
if isinstance(self._a, (_AddGenerator, _SubGenerator))
......
......@@ -35,6 +35,15 @@ class Simulation:
Use FastSimulation (from the C++ extension module) for a more effective
simulation when running many iterations.
Parameters
----------
sfg : SFG
The signal flow graph to simulate.
input_providers : list, optional
Input values, one list item per input. Each list item can be an array of values,
a callable taking a time index and returning the value, or a
number (constant input). If a value is not provided for an input, it will be 0.
"""
_sfg: SFG
......@@ -50,23 +59,28 @@ class Simulation:
input_providers: Optional[Sequence[Optional[InputProvider]]] = None,
):
"""Construct a Simulation of an SFG."""
self._sfg = (
sfg()
) # Copy the SFG to make sure it's not modified from the outside.
# Copy the SFG to make sure it's not modified from the outside.
self._sfg = sfg()
self._results = defaultdict(list)
self._delays = {}
self._iteration = 0
self._input_functions = [
lambda _: 0 for _ in range(self._sfg.input_count)
]
self._input_functions = [lambda _: 0 for _ in range(self._sfg.input_count)]
self._input_length = None
if input_providers is not None:
self.set_inputs(input_providers)
def set_input(self, index: int, input_provider: InputProvider) -> None:
"""
Set the input function used to get values for the specific input at the
given index to the internal SFG.
Set the input used to get values for the specific input at the given index of\
the internal SFG.
Parameters
----------
index : int
The input index.
input_provider : list, callable, or number
Can be an array of values, a callable taking a time index and returning
the value, or a number (constant input).
"""
if index < 0 or index >= len(self._input_functions):
raise IndexError(
......@@ -87,9 +101,7 @@ class Simulation:
)
self._input_functions[index] = lambda n: input_provider[n]
def set_inputs(
self, input_providers: Sequence[Optional[InputProvider]]
) -> None:
def set_inputs(self, input_providers: Sequence[Optional[InputProvider]]) -> None:
"""
Set the input functions used to get values for the inputs to the internal SFG.
"""
......@@ -108,8 +120,7 @@ class Simulation:
bits_override: Optional[int] = None,
truncate: bool = True,
) -> Sequence[Num]:
"""
Run one iteration of the simulation and return the resulting output values.
"""Run one iteration of the simulation and return the resulting output values.
"""
return self.run_for(1, save_results, bits_override, truncate)
......@@ -121,8 +132,8 @@ class Simulation:
truncate: bool = True,
) -> Sequence[Num]:
"""
Run the simulation until its iteration is greater than or equal to the given
iteration and return the output values of the last iteration.
Run the simulation until its iteration is greater than or equal to the given\
iteration and return the output values of the last iteration.
"""
result: Sequence[Num] = []
while self._iteration < iteration:
......@@ -153,8 +164,8 @@ class Simulation:
truncate: bool = True,
) -> Sequence[Num]:
"""
Run a given number of iterations of the simulation and return the output
values of the last iteration.
Run a given number of iterations of the simulation and return the output\
values of the last iteration.
"""
return self.run_until(
self._iteration + iterations, save_results, bits_override, truncate
......@@ -167,14 +178,12 @@ class Simulation:
truncate: bool = True,
) -> Sequence[Num]:
"""
Run the simulation until the end of its input arrays and return the output
values of the last iteration.
Run the simulation until the end of its input arrays and return the output\
values of the last iteration.
"""
if self._input_length is None:
raise IndexError("Tried to run unlimited simulation")
return self.run_until(
self._input_length, save_results, bits_override, truncate
)
return self.run_until(self._input_length, save_results, bits_override, truncate)
@property
def iteration(self) -> int:
......@@ -184,11 +193,12 @@ class Simulation:
@property
def results(self) -> ResultArrayMap:
"""
Get a mapping from result keys to numpy arrays containing all results, including
intermediate values, calculated for each iteration up until now that was run
with save_results enabled.
The mapping is indexed using the key() method of Operation with the appropriate
output index.
Get a mapping from result keys to numpy arrays containing all results.
This includes intermediate values, calculated for each iteration up until now
that was run with *save_results* enabled.
The mapping is indexed using the ``key()`` method of Operation with the
appropriate output index.
Example result after 3 iterations::
{"c1": [3, 6, 7], "c2": [4, 5, 5], "bfly1.0": [7, 0, 0], "bfly1.1": [-1, 0, 2], "0": [7, -2, -1]}
......
......@@ -44,6 +44,10 @@ class Input(AbstractOperation):
def evaluate(self):
return self.param("value")
@property
def latency(self) -> int:
return self.latency_offsets["out0"]
@property
def value(self) -> Num:
"""Get the current value of this input."""
......@@ -56,9 +60,7 @@ class Input(AbstractOperation):
def get_plot_coordinates(
self,
) -> Tuple[
Tuple[Tuple[float, float], ...], Tuple[Tuple[float, float], ...]
]:
) -> Tuple[Tuple[Tuple[float, float], ...], Tuple[Tuple[float, float], ...]]:
# Doc-string inherited
return (
(
......@@ -87,6 +89,14 @@ class Input(AbstractOperation):
# doc-string inherited
return ((0, 0.5),)
@property
def is_constant(self) -> bool:
return False
@property
def is_linear(self) -> bool:
return True
class Output(AbstractOperation):
"""
......@@ -122,9 +132,7 @@ class Output(AbstractOperation):
def get_plot_coordinates(
self,
) -> Tuple[
Tuple[Tuple[float, float], ...], Tuple[Tuple[float, float], ...]
]:
) -> Tuple[Tuple[Tuple[float, float], ...], Tuple[Tuple[float, float], ...]]:
# Doc-string inherited
return (
((0, 0), (0, 1), (0.25, 1), (0.5, 0.5), (0.25, 0), (0, 0)),
......@@ -139,6 +147,14 @@ class Output(AbstractOperation):
# doc-string inherited
return tuple()
@property
def latency(self) -> int:
return self.latency_offsets["in0"]
@property
def is_linear(self) -> bool:
return True
class Delay(AbstractOperation):
"""
......@@ -174,9 +190,7 @@ class Delay(AbstractOperation):
self, index: int, delays: Optional[DelayMap] = None, prefix: str = ""
) -> Optional[Num]:
if delays is not None:
return delays.get(
self.key(index, prefix), self.param("initial_value")
)
return delays.get(self.key(index, prefix), self.param("initial_value"))
return self.param("initial_value")
def evaluate_output(
......@@ -190,9 +204,7 @@ class Delay(AbstractOperation):
truncate: bool = True,
) -> Num:
if index != 0:
raise IndexError(
f"Output index out of range (expected 0-0, got {index})"
)
raise IndexError(f"Output index out of range (expected 0-0, got {index})")
if len(input_values) != 1:
raise ValueError(
"Wrong number of inputs supplied to SFG for evaluation"
......@@ -221,3 +233,7 @@ class Delay(AbstractOperation):
def initial_value(self, value: Num) -> None:
"""Set the initial value of this delay."""
self.set_param("initial_value", value)
@property
def is_linear(self) -> bool:
return True
......@@ -54,3 +54,5 @@ sfg.set_execution_time_of_type(Subtraction.type_name(), 1)
# Generate schedule
schedule = Schedule(sfg, cyclic=True)
schedule.plot()
pc = schedule.get_memory_variables()
......@@ -14,21 +14,21 @@ from b_asic import (
)
# Inputs:
in1 = Input(name="in1")
in1 = Input(name="in_1")
# Outputs:
out1 = Output(name="")
out1 = Output(name="out1")
# Operations:
t1 = Delay(initial_value=0, name="")
t1 = Delay(initial_value=0, name="t1")
cmul1 = ConstantMultiplication(
value=0.5, name="cmul2", latency_offsets={'in0': None, 'out0': None}
value=0.5, name="cmul1", latency_offsets={'in0': None, 'out0': None}
)
add1 = Addition(
name="", latency_offsets={'in0': None, 'in1': None, 'out0': None}
name="add1", latency_offsets={'in0': None, 'in1': None, 'out0': None}
)
cmul2 = ConstantMultiplication(
value=0.5, name="cmul", latency_offsets={'in0': None, 'out0': None}
value=0.5, name="cmul2", latency_offsets={'in0': None, 'out0': None}
)
# Signals:
......
......@@ -3,7 +3,7 @@ name = "b-asic"
description = "Better ASIC Toolbox"
readme = "README.md"
maintainers = [
{ name = "Oscar Gustafsson", email = "oscar.gustafsson@gmail.com" },
{ name = "Oscar Gustafsson", email = "oscar.gustafsson@liu.se" },
]
license = { file = "LICENSE" }
requires-python = ">=3.8"
......@@ -54,14 +54,14 @@ documentation = "https://da.gitlab-pages.liu.se/B-ASIC/"
[tool.black]
skip-string-normalization = true
preview = true
line-length = 79
line-length = 88
exclude = [
"test/test_gui", "b_asic/scheduler_gui/ui_main_window.py"
]
[tool.isort]
profile = "black"
line_length = 79
line_length = 88
src_paths = ["b_asic", "test"]
skip = [
"test/test_gui", "b_asic/scheduler_gui/ui_main_window.py"
......
......@@ -57,9 +57,7 @@ class CMakeBuild(build_ext):
print(f"=== Configuring {ext.name} ===")
print(f"Temp dir: {self.build_temp}")
print(f"Output dir: {cmake_output_dir}")
subprocess.check_call(
cmake_configure_argv, cwd=self.build_temp, env=env
)
subprocess.check_call(cmake_configure_argv, cwd=self.build_temp, env=env)
print(f"=== Building {ext.name} ===")
print(f"Temp dir: {self.build_temp}")
......@@ -72,16 +70,16 @@ class CMakeBuild(build_ext):
setuptools.setup(
author=(
"Adam Jakobsson, Angus Lothian, Arvid Westerlund, Felix Goding, "
"Ivar Härnqvist, Jacob Wahlman, Kevin Scott, Rasmus Karlsson, "
"Oscar Gustafsson, Andreas Bolin"
"Adam Jakobsson, Angus Lothian, Arvid Westerlund, Felix Goding, Ivar Härnqvist,"
" Jacob Wahlman, Kevin Scott, Rasmus Karlsson, Oscar Gustafsson, Andreas Bolin,"
" Mikael Henriksson, Frans Skarman, Petter Källström, Olle Hansson"
),
author_email=(
"adaja901@student.liu.se, anglo547@student.liu.se, "
"arvwe160@student.liu.se, felgo673@student.liu.se, "
"ivaha717@student.liu.se, jacwa448@student.liu.se, "
"kevsc634@student.liu.se, raska119@student.liu.se, "
"oscar.gustafsson@liu.se, andbo467@student.liu.se"
"adaja901@student.liu.se, anglo547@student.liu.se, arvwe160@student.liu.se,"
" felgo673@student.liu.se, ivaha717@student.liu.se, jacwa448@student.liu.se,"
" kevsc634@student.liu.se, raska119@student.liu.se, oscar.gustafsson@liu.se,"
" andbo467@student.liu.se, mikael.henriksson@liu.se, frans.skarman@liu.se,"
" petter.kallstrom@liu.se, olle.hansson@liu.se"
),
ext_modules=[CMakeExtension("_b_asic")],
cmdclass={"build_ext": CMakeBuild},
......
test/baseline/test_draw_matrix_transposer_4.png

21 KiB | W: | H:

test/baseline/test_draw_matrix_transposer_4.png

26.8 KiB | W: | H:

test/baseline/test_draw_matrix_transposer_4.png
test/baseline/test_draw_matrix_transposer_4.png
test/baseline/test_draw_matrix_transposer_4.png
test/baseline/test_draw_matrix_transposer_4.png
  • 2-up
  • Swipe
  • Onion skin
test/baseline/test_draw_process_collection.png

14.7 KiB | W: | H:

test/baseline/test_draw_process_collection.png

14.7 KiB | W: | H:

test/baseline/test_draw_process_collection.png
test/baseline/test_draw_process_collection.png
test/baseline/test_draw_process_collection.png
test/baseline/test_draw_process_collection.png
  • 2-up
  • Swipe
  • Onion skin
File added
......@@ -9,13 +9,13 @@ def simple_collection():
NO_PORT = 0
return ProcessCollection(
{
PlainMemoryVariable(4, NO_PORT, {NO_PORT: 2}),
PlainMemoryVariable(2, NO_PORT, {NO_PORT: 6}),
PlainMemoryVariable(3, NO_PORT, {NO_PORT: 5}),
PlainMemoryVariable(6, NO_PORT, {NO_PORT: 2}),
PlainMemoryVariable(0, NO_PORT, {NO_PORT: 3}),
PlainMemoryVariable(0, NO_PORT, {NO_PORT: 2}),
PlainMemoryVariable(0, NO_PORT, {NO_PORT: 6}),
PlainMemoryVariable(4, NO_PORT, {NO_PORT: 2}, "Proc. 1"),
PlainMemoryVariable(2, NO_PORT, {NO_PORT: 6}, "Proc. 2"),
PlainMemoryVariable(3, NO_PORT, {NO_PORT: 5}, "Proc. 3"),
PlainMemoryVariable(6, NO_PORT, {NO_PORT: 2}, "Proc. 4"),
PlainMemoryVariable(0, NO_PORT, {NO_PORT: 3}, "Proc. 5"),
PlainMemoryVariable(0, NO_PORT, {NO_PORT: 2}, "Proc. 6"),
PlainMemoryVariable(0, NO_PORT, {NO_PORT: 6}, "Proc. 7"),
},
8,
)
......
......@@ -13,6 +13,7 @@ from b_asic import (
Input,
Name,
Output,
Signal,
SignalSourceProvider,
TypeName,
)
......@@ -93,10 +94,10 @@ def sfg_two_inputs_two_outputs_independent_with_cmul():
in1 = Input("IN1")
in2 = Input("IN2")
c1 = Constant(3, "C1")
add1 = Addition(in2, c1, "ADD1", 7)
cmul3 = ConstantMultiplication(2, add1, "CMUL3", 3)
cmul1 = ConstantMultiplication(5, in1, "CMUL1", 5)
cmul2 = ConstantMultiplication(4, cmul1, "CMUL2", 4)
add1 = Addition(in2, c1, "ADD1", 7, execution_time=2)
cmul3 = ConstantMultiplication(2, add1, "CMUL3", 3, execution_time=1)
cmul1 = ConstantMultiplication(5, in1, "CMUL1", 5, execution_time=3)
cmul2 = ConstantMultiplication(4, cmul1, "CMUL2", 4, execution_time=1)
out1 = Output(cmul2, "OUT1")
out2 = Output(cmul3, "OUT2")
return SFG(inputs=[in1, in2], outputs=[out1, out2])
......@@ -274,3 +275,34 @@ def precedence_sfg_delays_and_constants():
Output(bfly1.output(1), "OUT2")
return SFG(inputs=[in1], outputs=[out1], name="SFG")
@pytest.fixture
def sfg_two_tap_fir():
# Inputs:
in1 = Input(name="in1")
# Outputs:
out1 = Output(name="out1")
# Operations:
t1 = Delay(initial_value=0, name="t1")
cmul1 = ConstantMultiplication(
value=0.5, name="cmul1", latency_offsets={'in0': None, 'out0': None}
)
add1 = Addition(
name="add1", latency_offsets={'in0': None, 'in1': None, 'out0': None}
)
cmul2 = ConstantMultiplication(
value=0.5, name="cmul2", latency_offsets={'in0': None, 'out0': None}
)
# Signals:
Signal(source=t1.output(0), destination=cmul1.input(0))
Signal(source=in1.output(0), destination=t1.input(0))
Signal(source=in1.output(0), destination=cmul2.input(0))
Signal(source=cmul1.output(0), destination=add1.input(0))
Signal(source=add1.output(0), destination=out1.input(0))
Signal(source=cmul2.output(0), destination=add1.input(1))
return SFG(inputs=[in1], outputs=[out1], name='twotapfir')
......@@ -21,8 +21,7 @@ from b_asic import (
class TestOperationOverloading:
def test_addition_overload(self):
"""Tests addition overloading for both operation and number argument.
"""
"""Tests addition overloading for both operation and number argument."""
add1 = Addition(None, None, "add1")
add2 = Addition(None, None, "add2")
......@@ -42,8 +41,7 @@ class TestOperationOverloading:
assert add5.input(1).signals == add4.output(0).signals
def test_subtraction_overload(self):
"""Tests subtraction overloading for both operation and number argument.
"""
"""Tests subtraction overloading for both operation and number argument."""
add1 = Addition(None, None, "add1")
add2 = Addition(None, None, "add2")
......@@ -63,8 +61,7 @@ class TestOperationOverloading:
assert sub3.input(1).signals == sub2.output(0).signals
def test_multiplication_overload(self):
"""Tests multiplication overloading for both operation and number argument.
"""
"""Tests multiplication overloading for both operation and number argument."""
add1 = Addition(None, None, "add1")
add2 = Addition(None, None, "add2")
......@@ -84,8 +81,7 @@ class TestOperationOverloading:
assert mul3.value == 5
def test_division_overload(self):
"""Tests division overloading for both operation and number argument.
"""
"""Tests division overloading for both operation and number argument."""
add1 = Addition(None, None, "add1")
add2 = Addition(None, None, "add2")
......@@ -125,18 +121,8 @@ class TestTraverse:
def test_traverse_type(self, large_operation_tree):
result = list(large_operation_tree.traverse())
assert (
len(
list(filter(lambda type_: isinstance(type_, Addition), result))
)
== 3
)
assert (
len(
list(filter(lambda type_: isinstance(type_, Constant), result))
)
== 4
)
assert len(list(filter(lambda type_: isinstance(type_, Addition), result))) == 3
assert len(list(filter(lambda type_: isinstance(type_, Constant), result))) == 4
def test_traverse_loop(self, operation_graph_with_cycle):
assert len(list(operation_graph_with_cycle.traverse())) == 8
......@@ -184,9 +170,7 @@ class TestLatency:
}
def test_latency_offsets_constructor(self):
bfly = Butterfly(
latency_offsets={"in0": 2, "in1": 3, "out0": 5, "out1": 10}
)
bfly = Butterfly(latency_offsets={"in0": 2, "in1": 3, "out0": 5, "out1": 10})
assert bfly.latency == 8
assert bfly.latency_offsets == {
......@@ -220,6 +204,10 @@ class TestLatency:
"out1": 9,
}
def test_set_latency_negative(self):
with pytest.raises(ValueError, match="Latency cannot be negative"):
Butterfly(latency=-1)
class TestExecutionTime:
def test_execution_time_constructor(self):
......@@ -233,17 +221,13 @@ class TestExecutionTime:
def test_set_execution_time_negative(self):
bfly = Butterfly()
with pytest.raises(
ValueError, match="Execution time cannot be negative"
):
with pytest.raises(ValueError, match="Execution time cannot be negative"):
bfly.execution_time = -1
class TestCopyOperation:
def test_copy_butterfly_latency_offsets(self):
bfly = Butterfly(
latency_offsets={"in0": 4, "in1": 2, "out0": 10, "out1": 9}
)
bfly = Butterfly(latency_offsets={"in0": 4, "in1": 2, "out0": 10, "out1": 9})
bfly_copy = bfly.copy_component()
......@@ -274,9 +258,7 @@ class TestPlotCoordinates:
assert exe == ((0, 0), (0, 1), (1, 1), (1, 0), (0, 0))
def test_complicated_case(self):
bfly = Butterfly(
latency_offsets={"in0": 2, "in1": 3, "out0": 5, "out1": 10}
)
bfly = Butterfly(latency_offsets={"in0": 2, "in1": 3, "out0": 5, "out1": 10})
bfly.execution_time = 7
lat, exe = bfly.get_plot_coordinates()
......@@ -300,28 +282,29 @@ class TestIOCoordinates:
cmult.execution_time = 1
cmult.set_latency(3)
i_c, o_c = cmult.get_io_coordinates()
assert i_c == ((0, 0.5),)
assert o_c == ((3, 0.5),)
assert cmult.get_input_coordinates() == ((0, 0.5),)
assert cmult.get_output_coordinates() == ((3, 0.5),)
def test_complicated_case(self):
bfly = Butterfly(
latency_offsets={"in0": 2, "in1": 3, "out0": 5, "out1": 10}
)
bfly = Butterfly(latency_offsets={"in0": 2, "in1": 3, "out0": 5, "out1": 10})
bfly.execution_time = 7
i_c, o_c = bfly.get_io_coordinates()
assert i_c == ((2, 0.25), (3, 0.75))
assert o_c == ((5, 0.25), (10, 0.75))
assert bfly.get_input_coordinates() == ((2, 0.25), (3, 0.75))
assert bfly.get_output_coordinates() == ((5, 0.25), (10, 0.75))
def test_io_coordinates_error(self):
bfly = Butterfly()
bfly.set_latency_offsets({"in0": 3, "out1": 5})
with pytest.raises(
ValueError, match="Missing latencies for inputs \\[1\\]"
ValueError, match="Missing latencies for input\\(s\\) \\[1\\]"
):
bfly.get_input_coordinates()
with pytest.raises(
ValueError, match="Missing latencies for output\\(s\\) \\[0\\]"
):
bfly.get_io_coordinates()
bfly.get_output_coordinates()
class TestSplit:
......
import re
import pytest
from b_asic.process import PlainMemoryVariable
......@@ -10,3 +12,25 @@ def test_PlainMemoryVariable():
assert mem.execution_time == 2
assert mem.life_times == (1, 2)
assert mem.read_ports == (4, 5)
assert repr(mem) == "PlainMemoryVariable(3, 0, {4: 1, 5: 2}, 'Proc. 0')"
mem2 = PlainMemoryVariable(2, 0, {4: 2, 5: 3}, 'foo')
assert repr(mem2) == "PlainMemoryVariable(2, 0, {4: 2, 5: 3}, 'foo')"
assert mem2 < mem
mem3 = PlainMemoryVariable(2, 0, {4: 1, 5: 2})
assert mem2 < mem3
def test_MemoryVariables(secondorder_iir_schedule):
pc = secondorder_iir_schedule.get_memory_variables()
mem_vars = pc.collection
pattern = re.compile(
"MemoryVariable\\(3, <b_asic.port.OutputPort object at 0x[a-f0-9]+>,"
" {<b_asic.port.InputPort object at 0x[a-f0-9]+>: 4}, 'cmul1.0'\\)"
)
mem_var = [m for m in mem_vars if m.name == 'cmul1.0'][0]
assert pattern.match(repr(mem_var))
assert mem_var.execution_time == 4
assert mem_var.start_time == 3
import pickle
import matplotlib.pyplot as plt
import networkx as nx
import pytest
from b_asic.process import Process
from b_asic.research.interleaver import (
generate_matrix_transposer,
generate_random_interleaver,
)
from b_asic.resources import draw_exclusion_graph_coloring
from b_asic.resources import ProcessCollection, draw_exclusion_graph_coloring
class TestProcessCollectionPlainMemoryVariable:
@pytest.mark.mpl_image_compare(style='mpl20')
def test_draw_process_collection(self, simple_collection):
fig, ax = plt.subplots()
simple_collection.draw_lifetime_chart(ax=ax)
simple_collection.draw_lifetime_chart(ax=ax, show_markers=False)
return fig
def test_draw_proces_collection(self, simple_collection):
_, ax = plt.subplots(1, 2)
simple_collection.draw_lifetime_chart(ax=ax[0])
exclusion_graph = (
simple_collection.create_exclusion_graph_from_overlap()
)
color_dict = nx.coloring.greedy_color(exclusion_graph)
draw_exclusion_graph_coloring(exclusion_graph, color_dict, ax=ax[1])
def test_split_memory_variable(self, simple_collection):
collection_split = simple_collection.split(
read_ports=1, write_ports=1, total_ports=2
)
assert len(collection_split) == 3
@pytest.mark.mpl_image_compare(style='mpl20')
def test_draw_matrix_transposer_4(self):
fig, ax = plt.subplots()
generate_matrix_transposer(4).draw_lifetime_chart(ax=ax)
return fig
def test_split_memory_variable(self, simple_collection: ProcessCollection):
collection_split = simple_collection.split_ports(
heuristic="graph_color", read_ports=1, write_ports=1, total_ports=2
)
assert len(collection_split) == 3
# Issue: #175
def test_interleaver_issue175(self):
with open('test/fixtures/interleaver-two-port-issue175.p', 'rb') as f:
interleaver_collection: ProcessCollection = pickle.load(f)
assert len(interleaver_collection.split_ports(total_ports=1)) == 2
def test_generate_random_interleaver(self):
return
for _ in range(10):
for size in range(5, 20, 5):
assert (
len(
generate_random_interleaver(size).split(
read_ports=1, write_ports=1
)
)
== 1
)
assert (
len(generate_random_interleaver(size).split(total_ports=1))
== 2
)
collection = generate_random_interleaver(size)
assert len(collection.split_ports(read_ports=1, write_ports=1)) == 1
if any(var.execution_time for var in collection.collection):
assert len(collection.split_ports(total_ports=1)) == 2
......@@ -8,15 +8,13 @@ import pytest
from b_asic.core_operations import Addition, Butterfly, ConstantMultiplication
from b_asic.schedule import Schedule
from b_asic.signal_flow_graph import SFG
from b_asic.special_operations import Input, Output
from b_asic.special_operations import Delay, Input, Output
class TestInit:
def test_simple_filter_normal_latency(self, sfg_simple_filter):
sfg_simple_filter.set_latency_of_type(Addition.type_name(), 5)
sfg_simple_filter.set_latency_of_type(
ConstantMultiplication.type_name(), 4
)
sfg_simple_filter.set_latency_of_type(ConstantMultiplication.type_name(), 4)
schedule = Schedule(sfg_simple_filter)
......@@ -28,13 +26,9 @@ class TestInit:
}
assert schedule.schedule_time == 9
def test_complicated_single_outputs_normal_latency(
self, precedence_sfg_delays
):
def test_complicated_single_outputs_normal_latency(self, precedence_sfg_delays):
precedence_sfg_delays.set_latency_of_type(Addition.type_name(), 4)
precedence_sfg_delays.set_latency_of_type(
ConstantMultiplication.type_name(), 3
)
precedence_sfg_delays.set_latency_of_type(ConstantMultiplication.type_name(), 3)
schedule = Schedule(precedence_sfg_delays, scheduling_algorithm="ASAP")
......@@ -88,9 +82,7 @@ class TestInit:
}
assert secondorder_iir_schedule.schedule_time == 21
def test_complicated_single_outputs_complex_latencies(
self, precedence_sfg_delays
):
def test_complicated_single_outputs_complex_latencies(self, precedence_sfg_delays):
precedence_sfg_delays.set_latency_offsets_of_type(
ConstantMultiplication.type_name(), {"in0": 3, "out0": 5}
)
......@@ -152,9 +144,7 @@ class TestInit:
assert schedule.schedule_time == 17
def test_independent_sfg(
self, sfg_two_inputs_two_outputs_independent_with_cmul
):
def test_independent_sfg(self, sfg_two_inputs_two_outputs_independent_with_cmul):
schedule = Schedule(
sfg_two_inputs_two_outputs_independent_with_cmul,
scheduling_algorithm="ASAP",
......@@ -162,11 +152,9 @@ class TestInit:
start_times_names = {}
for op_id, start_time in schedule._start_times.items():
op_name = (
sfg_two_inputs_two_outputs_independent_with_cmul.find_by_id(
op_id
).name
)
op_name = sfg_two_inputs_two_outputs_independent_with_cmul.find_by_id(
op_id
).name
start_times_names[op_name] = start_time
assert start_times_names == {
......@@ -184,13 +172,9 @@ class TestInit:
class TestSlacks:
def test_forward_backward_slack_normal_latency(
self, precedence_sfg_delays
):
def test_forward_backward_slack_normal_latency(self, precedence_sfg_delays):
precedence_sfg_delays.set_latency_of_type(Addition.type_name(), 1)
precedence_sfg_delays.set_latency_of_type(
ConstantMultiplication.type_name(), 3
)
precedence_sfg_delays.set_latency_of_type(ConstantMultiplication.type_name(), 3)
schedule = Schedule(precedence_sfg_delays, scheduling_algorithm="ASAP")
assert (
......@@ -207,9 +191,7 @@ class TestSlacks:
)
assert (
schedule.forward_slack(
precedence_sfg_delays.find_by_name("A2")[0].graph_id
)
schedule.forward_slack(precedence_sfg_delays.find_by_name("A2")[0].graph_id)
== 0
)
assert (
......@@ -221,9 +203,7 @@ class TestSlacks:
def test_slacks_normal_latency(self, precedence_sfg_delays):
precedence_sfg_delays.set_latency_of_type(Addition.type_name(), 1)
precedence_sfg_delays.set_latency_of_type(
ConstantMultiplication.type_name(), 3
)
precedence_sfg_delays.set_latency_of_type(ConstantMultiplication.type_name(), 3)
schedule = Schedule(precedence_sfg_delays, scheduling_algorithm="ASAP")
assert schedule.slacks(
......@@ -237,18 +217,14 @@ class TestSlacks:
class TestRescheduling:
def test_move_operation(self, precedence_sfg_delays):
precedence_sfg_delays.set_latency_of_type(Addition.type_name(), 4)
precedence_sfg_delays.set_latency_of_type(
ConstantMultiplication.type_name(), 3
)
precedence_sfg_delays.set_latency_of_type(ConstantMultiplication.type_name(), 3)
schedule = Schedule(precedence_sfg_delays, scheduling_algorithm="ASAP")
schedule.move_operation(
precedence_sfg_delays.find_by_name("ADD3")[0].graph_id, 4
)
schedule.move_operation(
precedence_sfg_delays.find_by_name("A2")[0].graph_id, 2
)
schedule.move_operation(precedence_sfg_delays.find_by_name("A2")[0].graph_id, 2)
start_times_names = {}
for op_id, start_time in schedule._start_times.items():
......@@ -271,13 +247,9 @@ class TestRescheduling:
"OUT1": 21,
}
def test_move_operation_slack_after_rescheduling(
self, precedence_sfg_delays
):
def test_move_operation_slack_after_rescheduling(self, precedence_sfg_delays):
precedence_sfg_delays.set_latency_of_type(Addition.type_name(), 1)
precedence_sfg_delays.set_latency_of_type(
ConstantMultiplication.type_name(), 3
)
precedence_sfg_delays.set_latency_of_type(ConstantMultiplication.type_name(), 3)
schedule = Schedule(precedence_sfg_delays, scheduling_algorithm="ASAP")
add3_id = precedence_sfg_delays.find_by_name("ADD3")[0].graph_id
......@@ -297,34 +269,97 @@ class TestRescheduling:
assert schedule.forward_slack(a2_id) == 2
assert schedule.backward_slack(a2_id) == 18
def test_move_operation_incorrect_move_backward(
self, precedence_sfg_delays
):
def test_move_operation_incorrect_move_backward(self, precedence_sfg_delays):
precedence_sfg_delays.set_latency_of_type(Addition.type_name(), 1)
precedence_sfg_delays.set_latency_of_type(
ConstantMultiplication.type_name(), 3
)
precedence_sfg_delays.set_latency_of_type(ConstantMultiplication.type_name(), 3)
schedule = Schedule(precedence_sfg_delays, scheduling_algorithm="ASAP")
with pytest.raises(ValueError):
with pytest.raises(
ValueError,
match="Operation add4 got incorrect move: -4. Must be between 0 and 7.",
):
schedule.move_operation(
precedence_sfg_delays.find_by_name("ADD3")[0].graph_id, -4
)
def test_move_operation_incorrect_move_forward(
self, precedence_sfg_delays
):
def test_move_operation_incorrect_move_forward(self, precedence_sfg_delays):
precedence_sfg_delays.set_latency_of_type(Addition.type_name(), 1)
precedence_sfg_delays.set_latency_of_type(
ConstantMultiplication.type_name(), 3
)
precedence_sfg_delays.set_latency_of_type(ConstantMultiplication.type_name(), 3)
schedule = Schedule(precedence_sfg_delays, scheduling_algorithm="ASAP")
with pytest.raises(ValueError):
with pytest.raises(
ValueError,
match="Operation add4 got incorrect move: 10. Must be between 0 and 7.",
):
schedule.move_operation(
precedence_sfg_delays.find_by_name("ADD3")[0].graph_id, 10
)
def test_move_operation_acc(self):
in0 = Input()
d = Delay()
a = d + in0
out0 = Output(a)
d << a
sfg = SFG([in0], [out0])
sfg.set_latency_of_type(Addition.type_name(), 1)
schedule = Schedule(sfg, cyclic=True)
# Check initial conditions
assert schedule.laps[sfg.find_by_id("add1").input(0).signals[0].graph_id] == 1
assert schedule.laps[sfg.find_by_id("add1").input(1).signals[0].graph_id] == 0
assert schedule._start_times["add1"] == 0
assert schedule.laps[sfg.find_by_id("out1").input(0).signals[0].graph_id] == 0
assert schedule._start_times["out1"] == 1
# Move and scheduling algorithm behaves differently
schedule.move_operation("out1", 0)
assert schedule.laps[sfg.find_by_id("out1").input(0).signals[0].graph_id] == 1
assert schedule.laps[sfg.find_by_id("add1").input(0).signals[0].graph_id] == 1
assert schedule.laps[sfg.find_by_id("add1").input(1).signals[0].graph_id] == 0
assert schedule._start_times["out1"] == 0
assert schedule._start_times["add1"] == 0
# Increase schedule time
schedule.set_schedule_time(2)
assert schedule.laps[sfg.find_by_id("out1").input(0).signals[0].graph_id] == 1
assert schedule.laps[sfg.find_by_id("add1").input(0).signals[0].graph_id] == 1
assert schedule.laps[sfg.find_by_id("add1").input(1).signals[0].graph_id] == 0
assert schedule._start_times["out1"] == 0
assert schedule._start_times["add1"] == 0
# Move out one time unit
schedule.move_operation("out1", 1)
assert schedule.laps[sfg.find_by_id("out1").input(0).signals[0].graph_id] == 1
assert schedule.laps[sfg.find_by_id("add1").input(0).signals[0].graph_id] == 1
assert schedule.laps[sfg.find_by_id("add1").input(1).signals[0].graph_id] == 0
assert schedule._start_times["out1"] == 1
assert schedule._start_times["add1"] == 0
# Move add one time unit
schedule.move_operation("add1", 1)
assert schedule.laps[sfg.find_by_id("add1").input(0).signals[0].graph_id] == 1
assert schedule.laps[sfg.find_by_id("add1").input(1).signals[0].graph_id] == 0
assert schedule.laps[sfg.find_by_id("out1").input(0).signals[0].graph_id] == 1
assert schedule._start_times["add1"] == 1
assert schedule._start_times["out1"] == 1
# Move out back one time unit
schedule.move_operation("out1", -1)
assert schedule.laps[sfg.find_by_id("out1").input(0).signals[0].graph_id] == 1
assert schedule._start_times["out1"] == 0
assert schedule.laps[sfg.find_by_id("add1").input(0).signals[0].graph_id] == 1
assert schedule.laps[sfg.find_by_id("add1").input(1).signals[0].graph_id] == 0
assert schedule._start_times["add1"] == 1
# Move add back one time unit
schedule.move_operation("add1", -1)
assert schedule.laps[sfg.find_by_id("add1").input(0).signals[0].graph_id] == 1
assert schedule.laps[sfg.find_by_id("add1").input(1).signals[0].graph_id] == 0
assert schedule.laps[sfg.find_by_id("out1").input(0).signals[0].graph_id] == 1
assert schedule._start_times["add1"] == 0
assert schedule._start_times["out1"] == 0
class TestTimeResolution:
def test_increase_time_resolution(
......@@ -341,11 +376,9 @@ class TestTimeResolution:
start_times_names = {}
for op_id, start_time in schedule._start_times.items():
op_name = (
sfg_two_inputs_two_outputs_independent_with_cmul.find_by_id(
op_id
).name
)
op_name = sfg_two_inputs_two_outputs_independent_with_cmul.find_by_id(
op_id
).name
start_times_names[op_name] = start_time
assert start_times_names == {
......@@ -377,23 +410,19 @@ class TestTimeResolution:
start_times_names = {}
for op_id, start_time in schedule._start_times.items():
op_name = (
sfg_two_inputs_two_outputs_independent_with_cmul.find_by_id(
op_id
).name
)
start_times_names[op_name] = start_time
op = sfg_two_inputs_two_outputs_independent_with_cmul.find_by_id(op_id)
start_times_names[op.name] = (start_time, op.latency, op.execution_time)
assert start_times_names == {
"C1": 0,
"IN1": 0,
"IN2": 0,
"CMUL1": 0,
"CMUL2": 30,
"ADD1": 0,
"CMUL3": 42,
"OUT1": 54,
"OUT2": 60,
"C1": (0, 0, None),
"IN1": (0, 0, None),
"IN2": (0, 0, None),
"CMUL1": (0, 30, 18),
"CMUL2": (30, 24, 6),
"ADD1": (0, 42, 12),
"CMUL3": (42, 18, 6),
"OUT1": (54, 0, None),
"OUT2": (60, 0, None),
}
assert 6 * old_schedule_time == schedule.schedule_time
......@@ -418,11 +447,9 @@ class TestTimeResolution:
start_times_names = {}
for op_id, start_time in schedule._start_times.items():
op_name = (
sfg_two_inputs_two_outputs_independent_with_cmul.find_by_id(
op_id
).name
)
op_name = sfg_two_inputs_two_outputs_independent_with_cmul.find_by_id(
op_id
).name
start_times_names[op_name] = start_time
assert start_times_names == {
......@@ -437,19 +464,15 @@ class TestTimeResolution:
"OUT2": 60,
}
with pytest.raises(
ValueError, match="Not possible to decrease resolution"
):
with pytest.raises(ValueError, match="Not possible to decrease resolution"):
schedule.decrease_time_resolution(4)
schedule.decrease_time_resolution(3)
start_times_names = {}
for op_id, start_time in schedule._start_times.items():
op_name = (
sfg_two_inputs_two_outputs_independent_with_cmul.find_by_id(
op_id
).name
)
op_name = sfg_two_inputs_two_outputs_independent_with_cmul.find_by_id(
op_id
).name
start_times_names[op_name] = start_time
assert start_times_names == {
......@@ -491,9 +514,7 @@ class TestErrors:
def test_no_output_latency(self):
in1 = Input()
in2 = Input()
bfly = Butterfly(
in1, in2, latency_offsets={"in0": 4, "in1": 2, "out0": 10}
)
bfly = Butterfly(in1, in2, latency_offsets={"in0": 4, "in1": 2, "out0": 10})
out1 = Output(bfly.output(0))
out2 = Output(bfly.output(1))
sfg = SFG([in1, in2], [out1, out2])
......@@ -504,9 +525,7 @@ class TestErrors:
Schedule(sfg)
in1 = Input()
in2 = Input()
bfly1 = Butterfly(
in1, in2, latency_offsets={"in0": 4, "in1": 2, "out1": 10}
)
bfly1 = Butterfly(in1, in2, latency_offsets={"in0": 4, "in1": 2, "out1": 10})
bfly2 = Butterfly(
bfly1.output(0),
bfly1.output(1),
......@@ -523,12 +542,8 @@ class TestErrors:
def test_too_short_schedule_time(self, sfg_simple_filter):
sfg_simple_filter.set_latency_of_type(Addition.type_name(), 5)
sfg_simple_filter.set_latency_of_type(
ConstantMultiplication.type_name(), 4
)
with pytest.raises(
ValueError, match="Too short schedule time. Minimum is 9."
):
sfg_simple_filter.set_latency_of_type(ConstantMultiplication.type_name(), 4)
with pytest.raises(ValueError, match="Too short schedule time. Minimum is 9."):
Schedule(sfg_simple_filter, schedule_time=3)
schedule = Schedule(sfg_simple_filter)
......@@ -540,9 +555,7 @@ class TestErrors:
def test_incorrect_scheduling_algorithm(self, sfg_simple_filter):
sfg_simple_filter.set_latency_of_type(Addition.type_name(), 1)
sfg_simple_filter.set_latency_of_type(
ConstantMultiplication.type_name(), 2
)
sfg_simple_filter.set_latency_of_type(ConstantMultiplication.type_name(), 2)
with pytest.raises(
NotImplementedError, match="No algorithm with name: foo defined."
):
......
This diff is collapsed.
......@@ -49,20 +49,15 @@ def test_direct_form_fir():
)
== 3
)
assert (
len([comp for comp in sfg.components if isinstance(comp, Addition)])
== 2
)
assert (
len([comp for comp in sfg.components if isinstance(comp, Delay)]) == 2
)
assert len([comp for comp in sfg.components if isinstance(comp, Addition)]) == 2
assert len([comp for comp in sfg.components if isinstance(comp, Delay)]) == 2
sfg = direct_form_fir(
(0.3, 0.4, 0.5, 0.6, 0.3),
mult_properties={'latency': 2, 'execution_time': 1},
add_properties={'latency': 1, 'execution_time': 1},
)
assert sfg.critical_path() == 6
assert sfg.critical_path_time() == 6
def test_transposed_direct_form_fir():
......@@ -77,17 +72,12 @@ def test_transposed_direct_form_fir():
)
== 3
)
assert (
len([comp for comp in sfg.components if isinstance(comp, Addition)])
== 2
)
assert (
len([comp for comp in sfg.components if isinstance(comp, Delay)]) == 2
)
assert len([comp for comp in sfg.components if isinstance(comp, Addition)]) == 2
assert len([comp for comp in sfg.components if isinstance(comp, Delay)]) == 2
sfg = transposed_direct_form_fir(
(0.3, 0.4, 0.5, 0.6, 0.3),
mult_properties={'latency': 2, 'execution_time': 1},
add_properties={'latency': 1, 'execution_time': 1},
)
assert sfg.critical_path() == 3
assert sfg.critical_path_time() == 3