Skip to content
Snippets Groups Projects
Commit 4b7a4caf authored by Simon Bjurek's avatar Simon Bjurek Committed by Oscar Gustafsson
Browse files

Add matrix inversion sfg generator

parent ff218ffa
No related branches found
No related tags found
1 merge request!469Add matrix inversion sfg generator
Pipeline #156551 passed
......@@ -116,6 +116,7 @@ TODO.txt
b_asic/_version.py
docs_sphinx/_build/
docs_sphinx/examples
docs_sphinx/sg_execution_times.rst
result_images/
.coverage
Digraph.gv
......
......@@ -1099,6 +1099,108 @@ class MAD(AbstractOperation):
p._index = i
class MADS(AbstractOperation):
__slots__ = (
"_is_add",
"_override_zero_on_src0",
"_src0",
"_src1",
"_src2",
"_name",
"_latency",
"_latency_offsets",
"_execution_time",
)
_is_add: Optional[bool]
_override_zero_on_src0: Optional[bool]
_src0: Optional[SignalSourceProvider]
_src1: Optional[SignalSourceProvider]
_src2: Optional[SignalSourceProvider]
_name: Name
_latency: Optional[int]
_latency_offsets: Optional[Dict[str, int]]
_execution_time: Optional[int]
is_swappable = True
def __init__(
self,
is_add: Optional[bool] = True,
override_zero_on_src0: Optional[bool] = False,
src0: Optional[SignalSourceProvider] = None,
src1: Optional[SignalSourceProvider] = None,
src2: Optional[SignalSourceProvider] = None,
name: Name = Name(""),
latency: Optional[int] = None,
latency_offsets: Optional[Dict[str, int]] = None,
execution_time: Optional[int] = None,
):
"""Construct a MADS operation."""
super().__init__(
input_count=3,
output_count=1,
name=Name(name),
input_sources=[src0, src1, src2],
latency=latency,
latency_offsets=latency_offsets,
execution_time=execution_time,
)
self.set_param("is_add", is_add)
self.set_param("override_zero_on_src0", override_zero_on_src0)
@classmethod
def type_name(cls) -> TypeName:
return TypeName("mads")
def evaluate(self, a, b, c):
if self.is_add:
if self.override_zero_on_src0:
return b * c
else:
return a + b * c
else:
if self.override_zero_on_src0:
return -b * c
else:
return a - b * c
@property
def is_add(self) -> bool:
"""Get if operation is an addition."""
return self.param("is_add")
@is_add.setter
def is_add(self, is_add: bool) -> None:
"""Set if operation is an addition."""
self.set_param("is_add", is_add)
@property
def override_zero_on_src0(self) -> bool:
"""Get if operation is overriding a zero on port src0."""
return self.param("override_zero_on_src0")
@override_zero_on_src0.setter
def override_zero_on_src0(self, override_zero_on_src0: bool) -> None:
"""Set if operation is overriding a zero on port src0."""
self.set_param("override_zero_on_src0", override_zero_on_src0)
@property
def is_linear(self) -> bool:
return (
self.input(1).connected_source.operation.is_constant
or self.input(2).connected_source.operation.is_constant
)
def swap_io(self) -> None:
self._input_ports = [
self._input_ports[0],
self._input_ports[2],
self._input_ports[1],
]
for i, p in enumerate(self._input_ports):
p._index = i
class SymmetricTwoportAdaptor(AbstractOperation):
r"""
Wave digital filter symmetric twoport-adaptor operation.
......@@ -1519,6 +1621,51 @@ class Shift(AbstractOperation):
self.set_param("value", value)
class DontCare(AbstractOperation):
r"""
Dont-care operation
Used for ignoring the input to another operation and thus avoiding dangling input nodes.
Parameters
----------
name : Name, optional
Operation name.
"""
__slots__ = "_name"
_name: Name
is_linear = True
def __init__(self, name: Name = ""):
"""Construct a DontCare operation."""
super().__init__(
input_count=0,
output_count=1,
name=name,
latency_offsets={"out0": 0},
)
@classmethod
def type_name(cls) -> TypeName:
return TypeName("dontcare")
def evaluate(self):
return 0
@property
def latency(self) -> int:
return self.latency_offsets["out0"]
def __repr__(self) -> str:
return "DontCare()"
def __str__(self) -> str:
return "dontcare"
class Sink(AbstractOperation):
r"""
Sink operation.
......
......@@ -9,10 +9,13 @@ from typing import TYPE_CHECKING, Dict, Optional, Sequence, Union
import numpy as np
from b_asic.core_operations import (
MADS,
Addition,
Butterfly,
ConstantMultiplication,
DontCare,
Name,
Reciprocal,
SymmetricTwoportAdaptor,
)
from b_asic.signal import Signal
......@@ -432,6 +435,82 @@ def radix_2_dif_fft(points: int) -> SFG:
return SFG(inputs=inputs, outputs=outputs)
def ldlt_matrix_inverse(N: int) -> SFG:
"""Generates an SFG for the LDLT matrix inverse algorithm.
Parameters
----------
N : int
Dimension of the square input matrix.
Returns
-------
SFG
Signal Flow Graph
"""
inputs = []
A = [[None for _ in range(N)] for _ in range(N)]
for i in range(N):
for j in range(i, N):
in_op = Input()
A[i][j] = in_op
inputs.append(in_op)
D = [None for _ in range(N)]
for i in range(N):
D[i] = A[i][i]
D_inv = [None for _ in range(N)]
R = [[None for _ in range(N)] for _ in range(N)]
M = [[None for _ in range(N)] for _ in range(N)]
# R*di*R^T factorization
for i in range(N):
for k in range(i):
D[i] = MADS(False, False, D[i], M[k][i], R[k][i])
D_inv[i] = Reciprocal(D[i])
for j in range(i + 1, N):
R[i][j] = A[i][j]
for k in range(i):
R[i][j] = MADS(False, False, R[i][j], M[k][i], R[k][j])
# if is_complex:
# M[i][j] = ComplexConjugate(R[i][j])
# else:
M[i][j] = R[i][j]
R[i][j] = MADS(True, True, DontCare(), R[i][j], D_inv[i])
# back substitution
A_inv = [[None for _ in range(N)] for _ in range(N)]
for i in reversed(range(N)):
A_inv[i][i] = D_inv[i]
for j in reversed(range(i + 1)):
for k in reversed(range(j + 1, N)):
if k == N - 1 and i != j:
A_inv[j][i] = MADS(False, True, DontCare(), R[j][k], A_inv[i][k])
else:
if A_inv[i][k]:
A_inv[j][i] = MADS(
False, False, A_inv[j][i], R[j][k], A_inv[i][k]
)
else:
A_inv[j][i] = MADS(
False, False, A_inv[j][i], R[j][k], A_inv[k][i]
)
outputs = []
for i in range(N):
for j in range(i, N):
outputs.append(Output(A_inv[i][j]))
return SFG(inputs, outputs)
def _construct_dif_fft_stage(
ports_from_previous_stage: list["OutputPort"],
number_of_stages: int,
......
......@@ -3,6 +3,9 @@
import pytest
from b_asic import (
MAD,
MADS,
SFG,
Absolute,
Addition,
AddSub,
......@@ -11,10 +14,13 @@ from b_asic import (
Constant,
ConstantMultiplication,
Division,
DontCare,
Input,
LeftShift,
Max,
Min,
Multiplication,
Output,
Reciprocal,
RightShift,
Shift,
......@@ -47,6 +53,14 @@ class TestConstant:
test_operation.value = 4
assert test_operation.value == 4
def test_constant_repr(self):
test_operation = Constant(3)
assert repr(test_operation) == "Constant(3)"
def test_constant_str(self):
test_operation = Constant(3)
assert str(test_operation) == "3"
class TestAddition:
"""Tests for Addition class."""
......@@ -83,16 +97,16 @@ class TestSubtraction:
class TestAddSub:
"""Tests for AddSub class."""
def test_addition_positive(self):
def test_addsub_positive(self):
test_operation = AddSub(is_add=True)
assert test_operation.evaluate_output(0, [3, 5]) == 8
def test_addition_negative(self):
def test_addsub_negative(self):
test_operation = AddSub(is_add=True)
assert test_operation.evaluate_output(0, [-3, -5]) == -8
assert test_operation.is_add
def test_addition_complex(self):
def test_addsub_complex(self):
test_operation = AddSub(is_add=True)
assert test_operation.evaluate_output(0, [3 + 5j, 4 + 6j]) == 7 + 11j
......@@ -116,6 +130,22 @@ class TestAddSub:
test_operation = AddSub(is_add=True)
assert test_operation.is_swappable
def test_addsub_is_add_getter(self):
test_operation = AddSub(is_add=False)
assert not test_operation.is_add
test_operation = AddSub(is_add=True)
assert test_operation.is_add
def test_addsub_is_add_setter(self):
test_operation = AddSub(is_add=False)
test_operation.is_add = True
assert test_operation.is_add
test_operation = AddSub(is_add=True)
test_operation.is_add = False
assert not test_operation.is_add
class TestMultiplication:
"""Tests for Multiplication class."""
......@@ -148,6 +178,13 @@ class TestDivision:
test_operation = Division()
assert test_operation.evaluate_output(0, [60 + 40j, 10 + 20j]) == 2.8 - 1.6j
def test_mads_is_linear(self):
test_operation = Division(Constant(3), Addition(Input(), Constant(3)))
assert not test_operation.is_linear
test_operation = Division(Addition(Input(), Constant(3)), Constant(3))
assert test_operation.is_linear
class TestSquareRoot:
"""Tests for SquareRoot class."""
......@@ -200,6 +237,13 @@ class TestMin:
test_operation = Min()
assert test_operation.evaluate_output(0, [-30, -5]) == -30
def test_min_complex(self):
test_operation = Min()
with pytest.raises(
ValueError, match="core_operations.Min does not support complex numbers."
):
test_operation.evaluate_output(0, [-1 - 1j, 2 + 2j])
class TestAbsolute:
"""Tests for Absolute class."""
......@@ -216,6 +260,13 @@ class TestAbsolute:
test_operation = Absolute()
assert test_operation.evaluate_output(0, [3 + 4j]) == 5.0
def test_max_complex(self):
test_operation = Max()
with pytest.raises(
ValueError, match="core_operations.Max does not support complex numbers."
):
test_operation.evaluate_output(0, [-1 - 1j, 2 + 2j])
class TestConstantMultiplication:
"""Tests for ConstantMultiplication class."""
......@@ -234,6 +285,136 @@ class TestConstantMultiplication:
assert test_operation.evaluate_output(0, [3 + 4j]) == 1 + 18j
class TestMAD:
def test_mad_positive(self):
test_operation = MAD()
assert test_operation.evaluate_output(0, [1, 2, 3]) == 5
def test_mad_negative(self):
test_operation = MAD()
assert test_operation.evaluate_output(0, [-3, -5, -8]) == 7
def test_mad_complex(self):
test_operation = MAD()
assert test_operation.evaluate_output(0, [3 + 6j, 2 + 6j, 1 + 1j]) == -29 + 31j
def test_mad_is_linear(self):
test_operation = MAD(
Constant(3), Addition(Input(), Constant(3)), Addition(Input(), Constant(3))
)
assert test_operation.is_linear
test_operation = MAD(
Addition(Input(), Constant(3)), Constant(3), Addition(Input(), Constant(3))
)
assert test_operation.is_linear
test_operation = MAD(
Addition(Input(), Constant(3)), Addition(Input(), Constant(3)), Constant(3)
)
assert not test_operation.is_linear
def test_mad_swap_io(self):
test_operation = MAD()
assert test_operation.evaluate_output(0, [1, 2, 3]) == 5
test_operation.swap_io()
assert test_operation.evaluate_output(0, [1, 2, 3]) == 5
class TestMADS:
def test_mads_positive(self):
test_operation = MADS(is_add=False)
assert test_operation.evaluate_output(0, [1, 2, 3]) == -5
def test_mads_negative(self):
test_operation = MADS(is_add=False)
assert test_operation.evaluate_output(0, [-3, -5, -8]) == -43
def test_mads_complex(self):
test_operation = MADS(is_add=False)
assert test_operation.evaluate_output(0, [3 + 6j, 2 + 6j, 1 + 1j]) == 7 - 2j
def test_mads_positive_add(self):
test_operation = MADS(is_add=True)
assert test_operation.evaluate_output(0, [1, 2, 3]) == 7
def test_mads_negative_add(self):
test_operation = MADS(is_add=True)
assert test_operation.evaluate_output(0, [-3, -5, -8]) == 37
def test_mads_complex_add(self):
test_operation = MADS(is_add=True)
assert test_operation.evaluate_output(0, [3 + 6j, 2 + 6j, 1 + 1j]) == -1 + 14j
def test_mads_zero_override(self):
test_operation = MADS(is_add=True, override_zero_on_src0=True)
assert test_operation.evaluate_output(0, [1, 1, 1]) == 1
def test_mads_sub_zero_override(self):
test_operation = MADS(is_add=False, override_zero_on_src0=True)
assert test_operation.evaluate_output(0, [1, 1, 1]) == -1
def test_mads_is_linear(self):
test_operation = MADS(
src0=Constant(3),
src1=Addition(Input(), Constant(3)),
src2=Addition(Input(), Constant(3)),
)
assert not test_operation.is_linear
test_operation = MADS(
src0=Addition(Input(), Constant(3)),
src1=Constant(3),
src2=Addition(Input(), Constant(3)),
)
assert test_operation.is_linear
test_operation = MADS(
src0=Addition(Input(), Constant(3)),
src1=Addition(Input(), Constant(3)),
src2=Constant(3),
)
assert test_operation.is_linear
def test_mads_swap_io(self):
test_operation = MADS(is_add=False)
assert test_operation.evaluate_output(0, [1, 2, 3]) == -5
test_operation.swap_io()
assert test_operation.evaluate_output(0, [1, 2, 3]) == -5
def test_mads_is_add_getter(self):
test_operation = MADS(is_add=False)
assert not test_operation.is_add
test_operation = MADS(is_add=True)
assert test_operation.is_add
def test_mads_is_add_setter(self):
test_operation = MADS(is_add=False)
test_operation.is_add = True
assert test_operation.is_add
test_operation = MADS(is_add=True)
test_operation.is_add = False
assert not test_operation.is_add
def test_mads_override_zero_on_src0_getter(self):
test_operation = MADS(override_zero_on_src0=False)
assert not test_operation.override_zero_on_src0
test_operation = MADS(override_zero_on_src0=True)
assert test_operation.override_zero_on_src0
def test_mads_override_zero_on_src0_setter(self):
test_operation = MADS(override_zero_on_src0=False)
test_operation.override_zero_on_src0 = True
assert test_operation.override_zero_on_src0
test_operation = MADS(override_zero_on_src0=True)
test_operation.override_zero_on_src0 = False
assert not test_operation.override_zero_on_src0
class TestRightShift:
"""Tests for RightShift class."""
......@@ -408,6 +589,33 @@ class TestDepends:
assert set(bfly1.inputs_required_for_output(1)) == {0, 1}
class TestDontCare:
def test_create_sfg_with_dontcare(self):
i1 = Input()
dc = DontCare()
a = Addition(i1, dc)
o = Output(a)
sfg = SFG([i1], [o])
assert sfg.output_count == 1
assert sfg.input_count == 1
assert sfg.evaluate_output(0, [0]) == 0
assert sfg.evaluate_output(0, [1]) == 1
def test_dontcare_latency_getter(self):
test_operation = DontCare()
assert test_operation.latency == 0
def test_dontcare_repr(self):
test_operation = DontCare()
assert repr(test_operation) == "DontCare()"
def test_dontcare_str(self):
test_operation = DontCare()
assert str(test_operation) == "dontcare"
class TestSink:
def test_create_sfg_with_sink(self):
bfly = Butterfly()
......@@ -420,3 +628,15 @@ class TestSink:
assert sfg1.input_count == 2
assert sfg.evaluate_output(1, [0, 1]) == sfg1.evaluate_output(0, [0, 1])
def test_sink_latency_getter(self):
test_operation = Sink()
assert test_operation.latency == 0
def test_sink_repr(self):
test_operation = Sink()
assert repr(test_operation) == "Sink()"
def test_sink_str(self):
test_operation = Sink()
assert str(test_operation) == "sink"
......@@ -3,15 +3,18 @@ import pytest
from scipy import signal
from b_asic.core_operations import (
MADS,
Addition,
Butterfly,
ConstantMultiplication,
Reciprocal,
SymmetricTwoportAdaptor,
)
from b_asic.sfg_generators import (
direct_form_1_iir,
direct_form_2_iir,
direct_form_fir,
ldlt_matrix_inverse,
radix_2_dif_fft,
transposed_direct_form_fir,
wdf_allpass,
......@@ -637,3 +640,176 @@ class TestRadix2FFT:
POINTS = 5
with pytest.raises(ValueError, match="Points must be a power of two."):
radix_2_dif_fft(points=POINTS)
class TestLdltMatrixInverse:
def test_1x1(self):
sfg = ldlt_matrix_inverse(N=1)
assert len(sfg.inputs) == 1
assert len(sfg.outputs) == 1
assert len(sfg.find_by_type_name(MADS.type_name())) == 0
assert len(sfg.find_by_type_name(Reciprocal.type_name())) == 1
A_input = [Constant(5)]
sim = Simulation(sfg, A_input)
sim.run_for(1)
res = sim.results
assert np.isclose(res["0"], 0.2)
def test_2x2_simple_spd(self):
sfg = ldlt_matrix_inverse(N=2)
assert len(sfg.inputs) == 3
assert len(sfg.outputs) == 3
assert len(sfg.find_by_type_name(MADS.type_name())) == 4
assert len(sfg.find_by_type_name(Reciprocal.type_name())) == 2
A = np.array([[1, 2], [2, 1]])
A_input = [Constant(1), Constant(2), Constant(1)]
A_inv = np.linalg.inv(A)
sim = Simulation(sfg, A_input)
sim.run_for(1)
res = sim.results
assert np.isclose(res["0"], A_inv[0, 0])
assert np.isclose(res["1"], A_inv[0, 1])
assert np.isclose(res["2"], A_inv[1, 1])
def test_3x3_simple_spd(self):
sfg = ldlt_matrix_inverse(N=3)
assert len(sfg.inputs) == 6
assert len(sfg.outputs) == 6
assert len(sfg.find_by_type_name(MADS.type_name())) == 15
assert len(sfg.find_by_type_name(Reciprocal.type_name())) == 3
A = np.array([[2, -1, 0], [-1, 3, -1], [0, -1, 2]])
A_input = [
Constant(2),
Constant(-1),
Constant(0),
Constant(3),
Constant(-1),
Constant(2),
]
A_inv = np.linalg.inv(A)
sim = Simulation(sfg, A_input)
sim.run_for(1)
res = sim.results
assert np.isclose(res["0"], A_inv[0, 0])
assert np.isclose(res["1"], A_inv[0, 1])
assert np.isclose(res["2"], A_inv[0, 2])
assert np.isclose(res["3"], A_inv[1, 1])
assert np.isclose(res["4"], A_inv[1, 2])
assert np.isclose(res["5"], A_inv[2, 2])
def test_5x5_random_spd(self):
N = 5
sfg = ldlt_matrix_inverse(N=N)
assert len(sfg.inputs) == 15
assert len(sfg.outputs) == 15
assert len(sfg.find_by_type_name(MADS.type_name())) == 70
assert len(sfg.find_by_type_name(Reciprocal.type_name())) == N
A = self._generate_random_spd_matrix(N)
upper_tridiag = A[np.triu_indices_from(A)]
A_input = [Constant(num) for num in upper_tridiag]
A_inv = np.linalg.inv(A)
sim = Simulation(sfg, A_input)
sim.run_for(1)
res = sim.results
row_indices, col_indices = np.triu_indices(N)
expected_values = A_inv[row_indices, col_indices]
actual_values = [res[str(i)] for i in range(len(expected_values))]
for i in range(len(expected_values)):
assert np.isclose(actual_values[i], expected_values[i])
def test_20x20_random_spd(self):
N = 20
sfg = ldlt_matrix_inverse(N=N)
A = self._generate_random_spd_matrix(N)
assert len(sfg.inputs) == len(A[np.triu_indices_from(A)])
assert len(sfg.outputs) == len(A[np.triu_indices_from(A)])
assert len(sfg.find_by_type_name(Reciprocal.type_name())) == N
upper_tridiag = A[np.triu_indices_from(A)]
A_input = [Constant(num) for num in upper_tridiag]
A_inv = np.linalg.inv(A)
sim = Simulation(sfg, A_input)
sim.run_for(1)
res = sim.results
row_indices, col_indices = np.triu_indices(N)
expected_values = A_inv[row_indices, col_indices]
actual_values = [res[str(i)] for i in range(len(expected_values))]
for i in range(len(expected_values)):
assert np.isclose(actual_values[i], expected_values[i])
# def test_2x2_random_complex_spd(self):
# N = 2
# sfg = ldlt_matrix_inverse(N=N, is_complex=True)
# # A = self._generate_random_complex_spd_matrix(N)
# A = np.array([[2, 1+1j],[1-1j, 3]])
# assert len(sfg.inputs) == len(A[np.triu_indices_from(A)])
# assert len(sfg.outputs) == len(A[np.triu_indices_from(A)])
# assert len(sfg.find_by_type_name(Reciprocal.type_name())) == N
# upper_tridiag = A[np.triu_indices_from(A)]
# A_input = [Constant(num) for num in upper_tridiag]
# A_inv = np.linalg.inv(A)
# sim = Simulation(sfg, A_input)
# sim.run_for(1)
# res = sim.results
# row_indices, col_indices = np.triu_indices(N)
# expected_values = A_inv[row_indices, col_indices]
# actual_values = [res[str(i)] for i in range(len(expected_values))]
# for i in range(len(expected_values)):
# assert np.isclose(actual_values[i], expected_values[i])
def _generate_random_spd_matrix(self, N: int) -> np.ndarray:
A = np.random.rand(N, N)
A = (A + A.T) / 2 # ensure symmetric
min_eig = np.min(np.linalg.eigvals(A))
A += (np.abs(min_eig) + 0.1) * np.eye(N) # ensure positive definiteness
return A
# def _generate_random_complex_spd_matrix(self, N: int) -> np.ndarray:
# A = np.random.randn(N, N) + 1j * np.random.randn(N, N)
# A = (A + A.conj().T) / 2 # ensure symmetric
# min_eig = np.min(np.linalg.eigvals(A))
# A += (np.abs(min_eig) + 0.1) * np.eye(N) # ensure positive definiteness
# return A
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment