From d2eeb347d15e2faecd0a4cc147af0c6cb98ce79b Mon Sep 17 00:00:00 2001 From: Simon Bjurek <simbj106@student.liu.se> Date: Tue, 11 Feb 2025 18:08:29 +0100 Subject: [PATCH 01/15] started work on the generator and added a MADS operation --- b_asic/core_operations.py | 70 +++++++++++++++++++++++++++++++ b_asic/sfg_generators.py | 87 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 157 insertions(+) diff --git a/b_asic/core_operations.py b/b_asic/core_operations.py index df0868ee..70d43370 100644 --- a/b_asic/core_operations.py +++ b/b_asic/core_operations.py @@ -1099,6 +1099,76 @@ class MAD(AbstractOperation): p._index = i +class MADS(AbstractOperation): + __slots__ = ( + "_is_add", + "_src0", + "_src1", + "_src2", + "_name", + "_latency", + "_latency_offsets", + "_execution_time", + ) + _is_add: Optional[bool] + _src0: Optional[SignalSourceProvider] + _src1: Optional[SignalSourceProvider] + _src2: Optional[SignalSourceProvider] + _name: Name + _latency: Optional[int] + _latency_offsets: Optional[Dict[str, int]] + _execution_time: Optional[int] + + is_swappable = True + + def __init__( + self, + is_add: Optional[bool] = False, + src0: Optional[SignalSourceProvider] = None, + src1: Optional[SignalSourceProvider] = None, + src2: Optional[SignalSourceProvider] = None, + name: Name = Name(""), + latency: Optional[int] = None, + latency_offsets: Optional[Dict[str, int]] = None, + execution_time: Optional[int] = None, + ): + """Construct a MADS operation.""" + super().__init__( + input_count=3, + output_count=1, + name=Name(name), + input_sources=[src0, src1, src2], + latency=latency, + latency_offsets=latency_offsets, + execution_time=execution_time, + ) + self._is_add = is_add + # self.set_param("is_add", is_add) + + @classmethod + def type_name(cls) -> TypeName: + return TypeName("mads") + + def evaluate(self, a, b, c): + return a + b * c if self._is_add else a - b * c + + @property + def is_linear(self) -> bool: + return ( + self.input(0).connected_source.operation.is_constant + or self.input(1).connected_source.operation.is_constant + ) + + def swap_io(self) -> None: + self._input_ports = [ + self._input_ports[1], + self._input_ports[0], + self._input_ports[2], + ] + for i, p in enumerate(self._input_ports): + p._index = i + + class SymmetricTwoportAdaptor(AbstractOperation): r""" Wave digital filter symmetric twoport-adaptor operation. diff --git a/b_asic/sfg_generators.py b/b_asic/sfg_generators.py index 737501f1..0ceeb4f6 100644 --- a/b_asic/sfg_generators.py +++ b/b_asic/sfg_generators.py @@ -9,10 +9,14 @@ from typing import TYPE_CHECKING, Dict, Optional, Sequence, Union import numpy as np from b_asic.core_operations import ( + MADS, Addition, Butterfly, + ComplexConjugate, + Constant, ConstantMultiplication, Name, + Reciprocal, SymmetricTwoportAdaptor, ) from b_asic.signal import Signal @@ -432,6 +436,69 @@ def radix_2_dif_fft(points: int) -> SFG: return SFG(inputs=inputs, outputs=outputs) +def ldlt_matrix_inverse(N: int, is_complex: bool) -> SFG: + inputs = [] + A = [[None for _ in range(N)] for _ in range(N)] + for i in range(N): + for j in range(i, N): + in_op = Input() + A[i][j] = in_op + inputs.append(in_op) + + D = [None for _ in range(N)] + for i in range(N): + D[i] = A[i][i] + + D_inv = [None for _ in range(N)] + + R = [[None for _ in range(N)] for _ in range(N)] + M = [[None for _ in range(N)] for _ in range(N)] + + # R*di*R^T factorization + for i in range(N): + for k in range(1, i): + D[i] = MADS(False, D[i], M[k][i], R[k][i]) + + D_inv[i] = Reciprocal(D[i]) + + for j in range(i, N): + R[i][j] = A[i][j] + + for k in range(1, i): + R[i][j] = MADS(False, R[i][j], M[k][i], R[k][j]) + + if is_complex: + M[i][j] = ComplexConjugate(R[i][j]) + else: + M[i][j] = R[i][j] + + R[i][j] = MADS(True, Constant(0, name="0"), R[i][j], D_inv[i]) + + # back substitution + A_inv = [[None for _ in range(N)] for _ in range(N)] + for i in reversed(range(N)): + A_inv[i][i] = D_inv[i] + for j in reversed(range(i)): + for k in reversed(range(j, N)): + if k == N - 1 and i != j: + # my custom + if A_inv[k][i]: + A_inv[j][i] = MADS( + False, Constant(0, name="0"), R[j][k], A_inv[k][i] + ) + else: + A_inv[j][i] = Constant(0, name="0") + else: + A_inv[j][i] = MADS(False, A_inv[j][i], R[j][k], A_inv[k][i]) + + outputs = [] + for i in range(N): + for j in range(i, N): + outputs.append(Output(A_inv[i][j])) + + return SFG(inputs, outputs) + + def _construct_dif_fft_stage( ports_from_previous_stage: list["OutputPort"], number_of_stages: int, @@ -465,6 +532,26 @@ def _construct_dif_fft_stage( return ports +def _extract_diagonal(elems): + n = 0 + k = 0 + while k <= len(elems): + k += n + 1 + n += 1 + n -= 1 + k -= n + 1 + if k != len(elems): + return None + + diagonal = np.zeros(n) + index = 0 + for i in range(n): + diagonal[n] = elems[index] + index += i + 2 + + return diagonal + + def _get_bit_reversed_number(number: int, number_of_bits: int) -> int: reversed_number = 0 for i in range(number_of_bits): -- GitLab From e3b15d3c121bb4ca2bdfaf681ca79bf5231a6384 Mon Sep 17 00:00:00 2001 From: Simon Bjurek <simbj106@student.liu.se> Date: Wed, 12 Feb 2025 11:17:21 +0100 Subject: [PATCH 02/15] ldlt inverse now verified for matrices up to and including size 3x3 --- .gitignore | 1 + b_asic/core_operations.py | 17 ++++++++--- b_asic/sfg_generators.py | 29 +++++++++---------- test/test_sfg_generators.py | 56 +++++++++++++++++++++++++++++++++++++ 4 files changed, 85 insertions(+), 18 deletions(-) diff --git a/.gitignore b/.gitignore index c5b2148a..d240bbd1 100644 --- a/.gitignore +++ b/.gitignore @@ -116,6 +116,7 @@ TODO.txt b_asic/_version.py docs_sphinx/_build/ docs_sphinx/examples +docs_sphinx/sg_execution_times.rst result_images/ .coverage Digraph.gv diff --git a/b_asic/core_operations.py b/b_asic/core_operations.py index 70d43370..67d6b07f 100644 --- a/b_asic/core_operations.py +++ b/b_asic/core_operations.py @@ -1123,7 +1123,7 @@ class MADS(AbstractOperation): def __init__( self, - is_add: Optional[bool] = False, + is_add: Optional[bool] = True, src0: Optional[SignalSourceProvider] = None, src1: Optional[SignalSourceProvider] = None, src2: Optional[SignalSourceProvider] = None, @@ -1142,15 +1142,24 @@ class MADS(AbstractOperation): latency_offsets=latency_offsets, execution_time=execution_time, ) - self._is_add = is_add - # self.set_param("is_add", is_add) + self.set_param("is_add", is_add) @classmethod def type_name(cls) -> TypeName: return TypeName("mads") def evaluate(self, a, b, c): - return a + b * c if self._is_add else a - b * c + return a + b * c if self.is_add else a - b * c + + @property + def is_add(self) -> bool: + """Get if operation is an addition.""" + return self.param("is_add") + + @is_add.setter + def is_add(self, is_add: bool) -> None: + """Set if operation is an addition.""" + self.set_param("is_add", is_add) @property def is_linear(self) -> bool: diff --git a/b_asic/sfg_generators.py b/b_asic/sfg_generators.py index 0ceeb4f6..a0847de4 100644 --- a/b_asic/sfg_generators.py +++ b/b_asic/sfg_generators.py @@ -456,40 +456,41 @@ def ldlt_matrix_inverse(N: int, is_complex: bool) -> SFG: # R*di*R^T factorization for i in range(N): - for k in range(1, i): - D[i] = MADS(False, D[i], M[k][i], R[k][i]) + for k in range(i): + D[i] = MADS(False, D[i], M[k][i], R[k][i], name="M1") D_inv[i] = Reciprocal(D[i]) - for j in range(i, N): + for j in range(i + 1, N): R[i][j] = A[i][j] - for k in range(1, i): - R[i][j] = MADS(False, R[i][j], M[k][i], R[k][j]) + for k in range(i): + R[i][j] = MADS(False, R[i][j], M[k][i], R[k][j], name="M2") if is_complex: M[i][j] = ComplexConjugate(R[i][j]) else: M[i][j] = R[i][j] - R[i][j] = MADS(True, Constant(0, name="0"), R[i][j], D_inv[i]) + R[i][j] = MADS(True, Constant(0, name="0"), R[i][j], D_inv[i], name="M3") # back substitution A_inv = [[None for _ in range(N)] for _ in range(N)] for i in reversed(range(N)): A_inv[i][i] = D_inv[i] - for j in reversed(range(i)): - for k in reversed(range(j, N)): + for j in reversed(range(i + 1)): + for k in reversed(range(j + 1, N)): if k == N - 1 and i != j: - # my custom - if A_inv[k][i]: + A_inv[j][i] = MADS( + False, Constant(0, name="0"), R[j][k], A_inv[i][k], name="M4" + ) + else: + if A_inv[i][k]: A_inv[j][i] = MADS( - False, Constant(0, name="0"), R[j][k], A_inv[k][i] + False, A_inv[j][i], R[j][k], A_inv[i][k], name="M5" ) else: - A_inv[j][i] = Constant(0, name="0") - else: - A_inv[j][i] = MADS(False, A_inv[j][i], R[j][k], A_inv[k][i]) + A_inv[j][i] = MADS(False, A_inv[j][i], R[j][k], A_inv[k][i]) outputs = [] for i in range(N): diff --git a/test/test_sfg_generators.py b/test/test_sfg_generators.py index 7064658c..e3f56ec0 100644 --- a/test/test_sfg_generators.py +++ b/test/test_sfg_generators.py @@ -12,6 +12,7 @@ from b_asic.sfg_generators import ( direct_form_1_iir, direct_form_2_iir, direct_form_fir, + ldlt_matrix_inverse, radix_2_dif_fft, transposed_direct_form_fir, wdf_allpass, @@ -637,3 +638,58 @@ class TestRadix2FFT: POINTS = 5 with pytest.raises(ValueError, match="Points must be a power of two."): radix_2_dif_fft(points=POINTS) + + +class TestLdltMatrixInverse: + def test_1x1(self): + sfg = ldlt_matrix_inverse(N=1, is_complex=False) + + A_input = [Constant(5)] + + sim = Simulation(sfg, A_input) + sim.run_for(1) + + res = sim.results + assert np.isclose(res["0"], 0.2) + + def test_2x2_simple_spd(self): + sfg = ldlt_matrix_inverse(N=2, is_complex=False) + + A = np.array([[1, 2], [2, 1]]) + A_input = [Constant(1), Constant(2), Constant(1)] + + A_inv = np.linalg.inv(A) + + sim = Simulation(sfg, A_input) + sim.run_for(1) + + res = sim.results + assert np.isclose(res["0"], A_inv[0, 0]) + assert np.isclose(res["1"], A_inv[0, 1]) + assert np.isclose(res["2"], A_inv[1, 1]) + + def test_3x3_simple_spd(self): + sfg = ldlt_matrix_inverse(N=3, is_complex=False) + + A = np.array([[2, -1, 0], [-1, 3, -1], [0, -1, 2]]) + A_input = [ + Constant(2), + Constant(-1), + Constant(0), + Constant(3), + Constant(-1), + Constant(2), + ] + + A_inv = np.linalg.inv(A) + + sim = Simulation(sfg, A_input) + sim.run_for(1) + + res = sim.results + assert np.isclose(res["0"], A_inv[0, 0]) + assert np.isclose(res["1"], A_inv[0, 1]) + assert np.isclose(res["2"], A_inv[0, 2]) + assert np.isclose(res["3"], A_inv[1, 1]) + assert np.isclose(res["4"], A_inv[1, 2]) + assert np.isclose(res["5"], A_inv[2, 2]) -- GitLab From d7aeec30247ce4389d086b1c56226ff37298cced Mon Sep 17 00:00:00 2001 From: Simon Bjurek <simbj106@student.liu.se> Date: Wed, 12 Feb 2025 18:01:38 +0100 Subject: [PATCH 03/15] added more tests and one commented out test for complex matrices --- test/test_sfg_generators.py | 120 ++++++++++++++++++++++++++++++++++++ 1 file changed, 120 insertions(+) diff --git a/test/test_sfg_generators.py b/test/test_sfg_generators.py index e3f56ec0..0a57b116 100644 --- a/test/test_sfg_generators.py +++ b/test/test_sfg_generators.py @@ -3,9 +3,11 @@ import pytest from scipy import signal from b_asic.core_operations import ( + MADS, Addition, Butterfly, ConstantMultiplication, + Reciprocal, SymmetricTwoportAdaptor, ) from b_asic.sfg_generators import ( @@ -644,6 +646,12 @@ class TestLdltMatrixInverse: def test_1x1(self): sfg = ldlt_matrix_inverse(N=1, is_complex=False) + assert len(sfg.inputs) == 1 + assert len(sfg.outputs) == 1 + + assert len(sfg.find_by_type_name(MADS.type_name())) == 0 + assert len(sfg.find_by_type_name(Reciprocal.type_name())) == 1 + A_input = [Constant(5)] sim = Simulation(sfg, A_input) @@ -655,6 +663,12 @@ class TestLdltMatrixInverse: def test_2x2_simple_spd(self): sfg = ldlt_matrix_inverse(N=2, is_complex=False) + assert len(sfg.inputs) == 3 + assert len(sfg.outputs) == 3 + + assert len(sfg.find_by_type_name(MADS.type_name())) == 4 + assert len(sfg.find_by_type_name(Reciprocal.type_name())) == 2 + A = np.array([[1, 2], [2, 1]]) A_input = [Constant(1), Constant(2), Constant(1)] @@ -671,6 +685,12 @@ class TestLdltMatrixInverse: def test_3x3_simple_spd(self): sfg = ldlt_matrix_inverse(N=3, is_complex=False) + assert len(sfg.inputs) == 6 + assert len(sfg.outputs) == 6 + + assert len(sfg.find_by_type_name(MADS.type_name())) == 15 + assert len(sfg.find_by_type_name(Reciprocal.type_name())) == 3 + A = np.array([[2, -1, 0], [-1, 3, -1], [0, -1, 2]]) A_input = [ Constant(2), @@ -693,3 +713,103 @@ class TestLdltMatrixInverse: assert np.isclose(res["3"], A_inv[1, 1]) assert np.isclose(res["4"], A_inv[1, 2]) assert np.isclose(res["5"], A_inv[2, 2]) + + def test_5x5_random_spd(self): + N = 5 + + sfg = ldlt_matrix_inverse(N=N, is_complex=False) + + assert len(sfg.inputs) == 15 + assert len(sfg.outputs) == 15 + + assert len(sfg.find_by_type_name(MADS.type_name())) == 70 + assert len(sfg.find_by_type_name(Reciprocal.type_name())) == N + + A = self._generate_random_spd_matrix(N) + + upper_tridiag = A[np.triu_indices_from(A)] + + A_input = [Constant(num) for num in upper_tridiag] + A_inv = np.linalg.inv(A) + + sim = Simulation(sfg, A_input) + sim.run_for(1) + res = sim.results + + row_indices, col_indices = np.triu_indices(N) + expected_values = A_inv[row_indices, col_indices] + actual_values = [res[str(i)] for i in range(len(expected_values))] + + for i in range(len(expected_values)): + assert np.isclose(actual_values[i], expected_values[i]) + + def test_30x30_random_spd(self): + N = 30 + + sfg = ldlt_matrix_inverse(N=N, is_complex=False) + + A = self._generate_random_spd_matrix(N) + + assert len(sfg.inputs) == len(A[np.triu_indices_from(A)]) + assert len(sfg.outputs) == len(A[np.triu_indices_from(A)]) + + assert len(sfg.find_by_type_name(Reciprocal.type_name())) == N + + upper_tridiag = A[np.triu_indices_from(A)] + + A_input = [Constant(num) for num in upper_tridiag] + A_inv = np.linalg.inv(A) + + sim = Simulation(sfg, A_input) + sim.run_for(1) + res = sim.results + + row_indices, col_indices = np.triu_indices(N) + expected_values = A_inv[row_indices, col_indices] + actual_values = [res[str(i)] for i in range(len(expected_values))] + + for i in range(len(expected_values)): + assert np.isclose(actual_values[i], expected_values[i]) + + # def test_2x2_random_complex_spd(self): + # N = 2 + + # sfg = ldlt_matrix_inverse(N=N, is_complex=True) + + # # A = self._generate_random_complex_spd_matrix(N) + # A = np.array([[2, 1+1j],[1-1j, 3]]) + + # assert len(sfg.inputs) == len(A[np.triu_indices_from(A)]) + # assert len(sfg.outputs) == len(A[np.triu_indices_from(A)]) + + # assert len(sfg.find_by_type_name(Reciprocal.type_name())) == N + + # upper_tridiag = A[np.triu_indices_from(A)] + + # A_input = [Constant(num) for num in upper_tridiag] + # A_inv = np.linalg.inv(A) + + # sim = Simulation(sfg, A_input) + # sim.run_for(1) + # res = sim.results + + # row_indices, col_indices = np.triu_indices(N) + # expected_values = A_inv[row_indices, col_indices] + # actual_values = [res[str(i)] for i in range(len(expected_values))] + + # for i in range(len(expected_values)): + # assert np.isclose(actual_values[i], expected_values[i]) + + def _generate_random_spd_matrix(self, N: int) -> np.ndarray: + A = np.random.rand(N, N) + A = (A + A.T) / 2 # ensure symmetric + min_eig = np.min(np.linalg.eigvals(A)) + A += (np.abs(min_eig) + 0.1) * np.eye(N) # ensure positive definiteness + return A + + def _generate_random_complex_spd_matrix(self, N: int) -> np.ndarray: + A = np.random.randn(N, N) + 1j * np.random.randn(N, N) + A = (A + A.conj().T) / 2 # ensure symmetric + min_eig = np.min(np.linalg.eigvals(A)) + A += (np.abs(min_eig) + 0.1) * np.eye(N) # ensure positive definiteness + return A -- GitLab From a633cfd57cdaa9ab8e487d31c3ec6d7c9666e938 Mon Sep 17 00:00:00 2001 From: Simon Bjurek <simbj106@student.liu.se> Date: Wed, 12 Feb 2025 20:14:11 +0100 Subject: [PATCH 04/15] changed test that took too long --- test/test_sfg_generators.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/test_sfg_generators.py b/test/test_sfg_generators.py index 0a57b116..40dd973f 100644 --- a/test/test_sfg_generators.py +++ b/test/test_sfg_generators.py @@ -743,7 +743,7 @@ class TestLdltMatrixInverse: for i in range(len(expected_values)): assert np.isclose(actual_values[i], expected_values[i]) - def test_30x30_random_spd(self): + def test_20x20_random_spd(self): N = 30 sfg = ldlt_matrix_inverse(N=N, is_complex=False) -- GitLab From 01534e6ec2d1a51ef347e2483c74aca00fb5a8be Mon Sep 17 00:00:00 2001 From: Simon Bjurek <simbj106@student.liu.se> Date: Wed, 12 Feb 2025 20:15:33 +0100 Subject: [PATCH 05/15] fix from last commit --- test/test_sfg_generators.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/test_sfg_generators.py b/test/test_sfg_generators.py index 40dd973f..e3430bc4 100644 --- a/test/test_sfg_generators.py +++ b/test/test_sfg_generators.py @@ -744,7 +744,7 @@ class TestLdltMatrixInverse: assert np.isclose(actual_values[i], expected_values[i]) def test_20x20_random_spd(self): - N = 30 + N = 20 sfg = ldlt_matrix_inverse(N=N, is_complex=False) -- GitLab From 13c190a110d693035976f4df685491b6e7da71c1 Mon Sep 17 00:00:00 2001 From: Simon Bjurek <simbj106@student.liu.se> Date: Thu, 13 Feb 2025 12:35:29 +0100 Subject: [PATCH 06/15] fixes from mr comments and added tests for 100 percent test coverage in core_operations --- b_asic/core_operations.py | 2 +- b_asic/sfg_generators.py | 32 ++----- test/test_core_operations.py | 166 ++++++++++++++++++++++++++++++++++- test/test_sfg_generators.py | 12 +-- 4 files changed, 175 insertions(+), 37 deletions(-) diff --git a/b_asic/core_operations.py b/b_asic/core_operations.py index 67d6b07f..fd75359d 100644 --- a/b_asic/core_operations.py +++ b/b_asic/core_operations.py @@ -1170,9 +1170,9 @@ class MADS(AbstractOperation): def swap_io(self) -> None: self._input_ports = [ - self._input_ports[1], self._input_ports[0], self._input_ports[2], + self._input_ports[1], ] for i, p in enumerate(self._input_ports): p._index = i diff --git a/b_asic/sfg_generators.py b/b_asic/sfg_generators.py index a0847de4..0f7d26c5 100644 --- a/b_asic/sfg_generators.py +++ b/b_asic/sfg_generators.py @@ -457,7 +457,7 @@ def ldlt_matrix_inverse(N: int, is_complex: bool) -> SFG: # R*di*R^T factorization for i in range(N): for k in range(i): - D[i] = MADS(False, D[i], M[k][i], R[k][i], name="M1") + D[i] = MADS(False, D[i], M[k][i], R[k][i]) D_inv[i] = Reciprocal(D[i]) @@ -465,14 +465,14 @@ def ldlt_matrix_inverse(N: int, is_complex: bool) -> SFG: R[i][j] = A[i][j] for k in range(i): - R[i][j] = MADS(False, R[i][j], M[k][i], R[k][j], name="M2") + R[i][j] = MADS(False, R[i][j], M[k][i], R[k][j]) if is_complex: M[i][j] = ComplexConjugate(R[i][j]) else: M[i][j] = R[i][j] - R[i][j] = MADS(True, Constant(0, name="0"), R[i][j], D_inv[i], name="M3") + R[i][j] = MADS(True, Constant(0, name="0"), R[i][j], D_inv[i]) # back substitution A_inv = [[None for _ in range(N)] for _ in range(N)] @@ -482,13 +482,11 @@ def ldlt_matrix_inverse(N: int, is_complex: bool) -> SFG: for k in reversed(range(j + 1, N)): if k == N - 1 and i != j: A_inv[j][i] = MADS( - False, Constant(0, name="0"), R[j][k], A_inv[i][k], name="M4" + False, Constant(0, name="0"), R[j][k], A_inv[i][k] ) else: if A_inv[i][k]: - A_inv[j][i] = MADS( - False, A_inv[j][i], R[j][k], A_inv[i][k], name="M5" - ) + A_inv[j][i] = MADS(False, A_inv[j][i], R[j][k], A_inv[i][k]) else: A_inv[j][i] = MADS(False, A_inv[j][i], R[j][k], A_inv[k][i]) @@ -533,26 +531,6 @@ def _construct_dif_fft_stage( return ports -def _extract_diagonal(elems): - n = 0 - k = 0 - while k <= len(elems): - k += n + 1 - n += 1 - n -= 1 - k -= n + 1 - if k != len(elems): - return None - - diagonal = np.zeros(n) - index = 0 - for i in range(n): - diagonal[n] = elems[index] - index += i + 2 - - return diagonal - - def _get_bit_reversed_number(number: int, number_of_bits: int) -> int: reversed_number = 0 for i in range(number_of_bits): diff --git a/test/test_core_operations.py b/test/test_core_operations.py index 1a5f367b..328acad2 100644 --- a/test/test_core_operations.py +++ b/test/test_core_operations.py @@ -3,6 +3,8 @@ import pytest from b_asic import ( + MAD, + MADS, Absolute, Addition, AddSub, @@ -11,6 +13,7 @@ from b_asic import ( Constant, ConstantMultiplication, Division, + Input, LeftShift, Max, Min, @@ -47,6 +50,14 @@ class TestConstant: test_operation.value = 4 assert test_operation.value == 4 + def test_constant_repr(self): + test_operation = Constant(3) + assert repr(test_operation) == "Constant(3)" + + def test_constant_str(self): + test_operation = Constant(3) + assert str(test_operation) == "3" + class TestAddition: """Tests for Addition class.""" @@ -83,16 +94,16 @@ class TestSubtraction: class TestAddSub: """Tests for AddSub class.""" - def test_addition_positive(self): + def test_addsub_positive(self): test_operation = AddSub(is_add=True) assert test_operation.evaluate_output(0, [3, 5]) == 8 - def test_addition_negative(self): + def test_addsub_negative(self): test_operation = AddSub(is_add=True) assert test_operation.evaluate_output(0, [-3, -5]) == -8 assert test_operation.is_add - def test_addition_complex(self): + def test_addsub_complex(self): test_operation = AddSub(is_add=True) assert test_operation.evaluate_output(0, [3 + 5j, 4 + 6j]) == 7 + 11j @@ -116,6 +127,22 @@ class TestAddSub: test_operation = AddSub(is_add=True) assert test_operation.is_swappable + def test_addsub_is_add_getter(self): + test_operation = AddSub(is_add=False) + assert not test_operation.is_add + + test_operation = AddSub(is_add=True) + assert test_operation.is_add + + def test_addsub_is_add_setter(self): + test_operation = AddSub(is_add=False) + test_operation.is_add = True + assert test_operation.is_add + + test_operation = AddSub(is_add=True) + test_operation.is_add = False + assert not test_operation.is_add + class TestMultiplication: """Tests for Multiplication class.""" @@ -148,6 +175,13 @@ class TestDivision: test_operation = Division() assert test_operation.evaluate_output(0, [60 + 40j, 10 + 20j]) == 2.8 - 1.6j + def test_mads_is_linear(self): + test_operation = Division(Constant(3), Addition(Input(), Constant(3))) + assert not test_operation.is_linear + + test_operation = Division(Addition(Input(), Constant(3)), Constant(3)) + assert test_operation.is_linear + class TestSquareRoot: """Tests for SquareRoot class.""" @@ -200,6 +234,13 @@ class TestMin: test_operation = Min() assert test_operation.evaluate_output(0, [-30, -5]) == -30 + def test_min_complex(self): + test_operation = Min() + with pytest.raises( + ValueError, match="core_operations.Min does not support complex numbers." + ): + test_operation.evaluate_output(0, [-1 - 1j, 2 + 2j]) + class TestAbsolute: """Tests for Absolute class.""" @@ -216,6 +257,13 @@ class TestAbsolute: test_operation = Absolute() assert test_operation.evaluate_output(0, [3 + 4j]) == 5.0 + def test_max_complex(self): + test_operation = Max() + with pytest.raises( + ValueError, match="core_operations.Max does not support complex numbers." + ): + test_operation.evaluate_output(0, [-1 - 1j, 2 + 2j]) + class TestConstantMultiplication: """Tests for ConstantMultiplication class.""" @@ -234,6 +282,106 @@ class TestConstantMultiplication: assert test_operation.evaluate_output(0, [3 + 4j]) == 1 + 18j +class TestMAD: + def test_mad_positive(self): + test_operation = MAD() + assert test_operation.evaluate_output(0, [1, 2, 3]) == 5 + + def test_mad_negative(self): + test_operation = MAD() + assert test_operation.evaluate_output(0, [-3, -5, -8]) == 7 + + def test_mad_complex(self): + test_operation = MAD() + assert test_operation.evaluate_output(0, [3 + 6j, 2 + 6j, 1 + 1j]) == -29 + 31j + + def test_mad_is_linear(self): + test_operation = MAD( + Constant(3), Addition(Input(), Constant(3)), Addition(Input(), Constant(3)) + ) + assert test_operation.is_linear + + test_operation = MAD( + Addition(Input(), Constant(3)), Constant(3), Addition(Input(), Constant(3)) + ) + assert test_operation.is_linear + + test_operation = MAD( + Addition(Input(), Constant(3)), Addition(Input(), Constant(3)), Constant(3) + ) + assert not test_operation.is_linear + + def test_mad_swap_io(self): + test_operation = MAD() + assert test_operation.evaluate_output(0, [1, 2, 3]) == 5 + test_operation.swap_io() + assert test_operation.evaluate_output(0, [1, 2, 3]) == 5 + + +class TestMADS: + def test_mads_positive(self): + test_operation = MADS(is_add=False) + assert test_operation.evaluate_output(0, [1, 2, 3]) == -5 + + def test_mads_negative(self): + test_operation = MADS(is_add=False) + assert test_operation.evaluate_output(0, [-3, -5, -8]) == -43 + + def test_mads_complex(self): + test_operation = MADS(is_add=False) + assert test_operation.evaluate_output(0, [3 + 6j, 2 + 6j, 1 + 1j]) == 7 - 2j + + def test_mads_positive_add(self): + test_operation = MADS(is_add=True) + assert test_operation.evaluate_output(0, [1, 2, 3]) == 7 + + def test_mads_negative_add(self): + test_operation = MADS(is_add=True) + assert test_operation.evaluate_output(0, [-3, -5, -8]) == 37 + + def test_mads_complex_add(self): + test_operation = MADS(is_add=True) + assert test_operation.evaluate_output(0, [3 + 6j, 2 + 6j, 1 + 1j]) == -1 + 14j + + def test_mads_is_linear(self): + test_operation = MADS( + Constant(3), Addition(Input(), Constant(3)), Addition(Input(), Constant(3)) + ) + assert not test_operation.is_linear + + test_operation = MADS( + Addition(Input(), Constant(3)), Constant(3), Addition(Input(), Constant(3)) + ) + assert test_operation.is_linear + + test_operation = MADS( + Addition(Input(), Constant(3)), Addition(Input(), Constant(3)), Constant(3) + ) + assert test_operation.is_linear + + def test_mads_swap_io(self): + test_operation = MADS(is_add=False) + assert test_operation.evaluate_output(0, [1, 2, 3]) == -5 + test_operation.swap_io() + assert test_operation.evaluate_output(0, [1, 2, 3]) == -5 + + def test_mads_is_add_getter(self): + test_operation = MADS(is_add=False) + assert not test_operation.is_add + + test_operation = MADS(is_add=True) + assert test_operation.is_add + + def test_mads_is_add_setter(self): + test_operation = MADS(is_add=False) + test_operation.is_add = True + assert test_operation.is_add + + test_operation = MADS(is_add=True) + test_operation.is_add = False + assert not test_operation.is_add + + class TestRightShift: """Tests for RightShift class.""" @@ -420,3 +568,15 @@ class TestSink: assert sfg1.input_count == 2 assert sfg.evaluate_output(1, [0, 1]) == sfg1.evaluate_output(0, [0, 1]) + + def test_sink_latency_getter(self): + test_operation = Sink() + assert test_operation.latency == 0 + + def test_sink_repr(self): + test_operation = Sink() + assert repr(test_operation) == "Sink()" + + def test_sink_str(self): + test_operation = Sink() + assert str(test_operation) == "sink" diff --git a/test/test_sfg_generators.py b/test/test_sfg_generators.py index e3430bc4..3d876d75 100644 --- a/test/test_sfg_generators.py +++ b/test/test_sfg_generators.py @@ -807,9 +807,9 @@ class TestLdltMatrixInverse: A += (np.abs(min_eig) + 0.1) * np.eye(N) # ensure positive definiteness return A - def _generate_random_complex_spd_matrix(self, N: int) -> np.ndarray: - A = np.random.randn(N, N) + 1j * np.random.randn(N, N) - A = (A + A.conj().T) / 2 # ensure symmetric - min_eig = np.min(np.linalg.eigvals(A)) - A += (np.abs(min_eig) + 0.1) * np.eye(N) # ensure positive definiteness - return A + # def _generate_random_complex_spd_matrix(self, N: int) -> np.ndarray: + # A = np.random.randn(N, N) + 1j * np.random.randn(N, N) + # A = (A + A.conj().T) / 2 # ensure symmetric + # min_eig = np.min(np.linalg.eigvals(A)) + # A += (np.abs(min_eig) + 0.1) * np.eye(N) # ensure positive definiteness + # return A -- GitLab From 76dc87372f5890179a1932b67ee6eab1ab31da97 Mon Sep 17 00:00:00 2001 From: Simon Bjurek <simbj106@student.liu.se> Date: Fri, 14 Feb 2025 15:55:27 +0100 Subject: [PATCH 07/15] Changed sfg generator to use DontCare. --- b_asic/core_operations.py | 74 ++++++++++++++++++++++++++++++++++-- b_asic/sfg_generators.py | 43 +++++++++++++-------- test/test_core_operations.py | 66 ++++++++++++++++++++++++++++++-- test/test_sfg_generators.py | 10 ++--- 4 files changed, 167 insertions(+), 26 deletions(-) diff --git a/b_asic/core_operations.py b/b_asic/core_operations.py index fd75359d..3a1474cc 100644 --- a/b_asic/core_operations.py +++ b/b_asic/core_operations.py @@ -1102,6 +1102,7 @@ class MAD(AbstractOperation): class MADS(AbstractOperation): __slots__ = ( "_is_add", + "_override_zero_on_src0", "_src0", "_src1", "_src2", @@ -1111,6 +1112,7 @@ class MADS(AbstractOperation): "_execution_time", ) _is_add: Optional[bool] + _override_zero_on_src0: Optional[bool] _src0: Optional[SignalSourceProvider] _src1: Optional[SignalSourceProvider] _src2: Optional[SignalSourceProvider] @@ -1124,6 +1126,7 @@ class MADS(AbstractOperation): def __init__( self, is_add: Optional[bool] = True, + override_zero_on_src0: Optional[bool] = False, src0: Optional[SignalSourceProvider] = None, src1: Optional[SignalSourceProvider] = None, src2: Optional[SignalSourceProvider] = None, @@ -1143,13 +1146,23 @@ class MADS(AbstractOperation): execution_time=execution_time, ) self.set_param("is_add", is_add) + self.set_param("override_zero_on_src0", override_zero_on_src0) @classmethod def type_name(cls) -> TypeName: return TypeName("mads") def evaluate(self, a, b, c): - return a + b * c if self.is_add else a - b * c + if self.is_add: + if self.override_zero_on_src0: + return b * c + else: + return a + b * c + else: + if self.override_zero_on_src0: + return -b * c + else: + return a - b * c @property def is_add(self) -> bool: @@ -1161,11 +1174,21 @@ class MADS(AbstractOperation): """Set if operation is an addition.""" self.set_param("is_add", is_add) + @property + def override_zero_on_src0(self) -> bool: + """Get if operation is overriding a zero on port src0.""" + return self.param("override_zero_on_src0") + + @override_zero_on_src0.setter + def override_zero_on_src0(self, override_zero_on_src0: bool) -> None: + """Set if operation is overriding a zero on port src0.""" + self.set_param("override_zero_on_src0", override_zero_on_src0) + @property def is_linear(self) -> bool: return ( - self.input(0).connected_source.operation.is_constant - or self.input(1).connected_source.operation.is_constant + self.input(1).connected_source.operation.is_constant + or self.input(2).connected_source.operation.is_constant ) def swap_io(self) -> None: @@ -1598,6 +1621,51 @@ class Shift(AbstractOperation): self.set_param("value", value) +class DontCare(AbstractOperation): + r""" + Dont-care operation + + Used for ignoring the input to another operation and thus avoiding dangling input nodes. + + Parameters + ---------- + name : Name, optional + Operation name. + + """ + + __slots__ = "_name" + _name: Name + + is_linear = True + + def __init__(self, name: Name = ""): + """Construct a DontCare operation.""" + super().__init__( + input_count=0, + output_count=1, + name=name, + latency_offsets={"out0": 0}, + ) + + @classmethod + def type_name(cls) -> TypeName: + return TypeName("dontcare") + + def evaluate(self): + return 0 + + @property + def latency(self) -> int: + return self.latency_offsets["out0"] + + def __repr__(self) -> str: + return "DontCare()" + + def __str__(self) -> str: + return "dontcare" + + class Sink(AbstractOperation): r""" Sink operation. diff --git a/b_asic/sfg_generators.py b/b_asic/sfg_generators.py index 0f7d26c5..3e76f159 100644 --- a/b_asic/sfg_generators.py +++ b/b_asic/sfg_generators.py @@ -12,9 +12,8 @@ from b_asic.core_operations import ( MADS, Addition, Butterfly, - ComplexConjugate, - Constant, ConstantMultiplication, + DontCare, Name, Reciprocal, SymmetricTwoportAdaptor, @@ -436,7 +435,19 @@ def radix_2_dif_fft(points: int) -> SFG: return SFG(inputs=inputs, outputs=outputs) -def ldlt_matrix_inverse(N: int, is_complex: bool) -> SFG: +def ldlt_matrix_inverse(N: int) -> SFG: + """Generates an SFG for the LDLT matrix inverse algorithm. + + Parameters + ---------- + N : int + Dimension of the square input matrix. + + Returns + ------- + SFG + Signal Flow Graph + """ inputs = [] A = [[None for _ in range(N)] for _ in range(N)] for i in range(N): @@ -457,7 +468,7 @@ def ldlt_matrix_inverse(N: int, is_complex: bool) -> SFG: # R*di*R^T factorization for i in range(N): for k in range(i): - D[i] = MADS(False, D[i], M[k][i], R[k][i]) + D[i] = MADS(False, False, D[i], M[k][i], R[k][i]) D_inv[i] = Reciprocal(D[i]) @@ -465,14 +476,14 @@ def ldlt_matrix_inverse(N: int, is_complex: bool) -> SFG: R[i][j] = A[i][j] for k in range(i): - R[i][j] = MADS(False, R[i][j], M[k][i], R[k][j]) + R[i][j] = MADS(False, False, R[i][j], M[k][i], R[k][j]) - if is_complex: - M[i][j] = ComplexConjugate(R[i][j]) - else: - M[i][j] = R[i][j] + # if is_complex: + # M[i][j] = ComplexConjugate(R[i][j]) + # else: + M[i][j] = R[i][j] - R[i][j] = MADS(True, Constant(0, name="0"), R[i][j], D_inv[i]) + R[i][j] = MADS(True, True, DontCare(), R[i][j], D_inv[i]) # back substitution A_inv = [[None for _ in range(N)] for _ in range(N)] @@ -481,14 +492,16 @@ def ldlt_matrix_inverse(N: int, is_complex: bool) -> SFG: for j in reversed(range(i + 1)): for k in reversed(range(j + 1, N)): if k == N - 1 and i != j: - A_inv[j][i] = MADS( - False, Constant(0, name="0"), R[j][k], A_inv[i][k] - ) + A_inv[j][i] = MADS(False, True, DontCare(), R[j][k], A_inv[i][k]) else: if A_inv[i][k]: - A_inv[j][i] = MADS(False, A_inv[j][i], R[j][k], A_inv[i][k]) + A_inv[j][i] = MADS( + False, False, A_inv[j][i], R[j][k], A_inv[i][k] + ) else: - A_inv[j][i] = MADS(False, A_inv[j][i], R[j][k], A_inv[k][i]) + A_inv[j][i] = MADS( + False, False, A_inv[j][i], R[j][k], A_inv[k][i] + ) outputs = [] for i in range(N): diff --git a/test/test_core_operations.py b/test/test_core_operations.py index 328acad2..4b34ac58 100644 --- a/test/test_core_operations.py +++ b/test/test_core_operations.py @@ -5,6 +5,7 @@ import pytest from b_asic import ( MAD, MADS, + SFG, Absolute, Addition, AddSub, @@ -13,11 +14,13 @@ from b_asic import ( Constant, ConstantMultiplication, Division, + DontCare, Input, LeftShift, Max, Min, Multiplication, + Output, Reciprocal, RightShift, Shift, @@ -343,19 +346,33 @@ class TestMADS: test_operation = MADS(is_add=True) assert test_operation.evaluate_output(0, [3 + 6j, 2 + 6j, 1 + 1j]) == -1 + 14j + def test_mads_zero_override(self): + test_operation = MADS(is_add=True, override_zero_on_src0=True) + assert test_operation.evaluate_output(0, [1, 1, 1]) == 1 + + def test_mads_sub_zero_override(self): + test_operation = MADS(is_add=False, override_zero_on_src0=True) + assert test_operation.evaluate_output(0, [1, 1, 1]) == -1 + def test_mads_is_linear(self): test_operation = MADS( - Constant(3), Addition(Input(), Constant(3)), Addition(Input(), Constant(3)) + src0=Constant(3), + src1=Addition(Input(), Constant(3)), + src2=Addition(Input(), Constant(3)), ) assert not test_operation.is_linear test_operation = MADS( - Addition(Input(), Constant(3)), Constant(3), Addition(Input(), Constant(3)) + src0=Addition(Input(), Constant(3)), + src1=Constant(3), + src2=Addition(Input(), Constant(3)), ) assert test_operation.is_linear test_operation = MADS( - Addition(Input(), Constant(3)), Addition(Input(), Constant(3)), Constant(3) + src0=Addition(Input(), Constant(3)), + src1=Addition(Input(), Constant(3)), + src2=Constant(3), ) assert test_operation.is_linear @@ -381,6 +398,22 @@ class TestMADS: test_operation.is_add = False assert not test_operation.is_add + def test_mads_override_zero_on_src0_getter(self): + test_operation = MADS(override_zero_on_src0=False) + assert not test_operation.override_zero_on_src0 + + test_operation = MADS(override_zero_on_src0=True) + assert test_operation.override_zero_on_src0 + + def test_mads_override_zero_on_src0_setter(self): + test_operation = MADS(override_zero_on_src0=False) + test_operation.override_zero_on_src0 = True + assert test_operation.override_zero_on_src0 + + test_operation = MADS(override_zero_on_src0=True) + test_operation.override_zero_on_src0 = False + assert not test_operation.override_zero_on_src0 + class TestRightShift: """Tests for RightShift class.""" @@ -556,6 +589,33 @@ class TestDepends: assert set(bfly1.inputs_required_for_output(1)) == {0, 1} +class TestDontCare: + def test_create_sfg_with_dontcare(self): + i1 = Input() + dc = DontCare() + a = Addition(i1, dc) + o = Output(a) + sfg = SFG([i1], [o]) + + assert sfg.output_count == 1 + assert sfg.input_count == 1 + + assert sfg.evaluate_output(0, [0]) == 0 + assert sfg.evaluate_output(0, [1]) == 1 + + def test_dontcare_latency_getter(self): + test_operation = DontCare() + assert test_operation.latency == 0 + + def test_dontcare_repr(self): + test_operation = DontCare() + assert repr(test_operation) == "DontCare()" + + def test_dontcare_str(self): + test_operation = DontCare() + assert str(test_operation) == "dontcare" + + class TestSink: def test_create_sfg_with_sink(self): bfly = Butterfly() diff --git a/test/test_sfg_generators.py b/test/test_sfg_generators.py index 3d876d75..bbd8916e 100644 --- a/test/test_sfg_generators.py +++ b/test/test_sfg_generators.py @@ -644,7 +644,7 @@ class TestRadix2FFT: class TestLdltMatrixInverse: def test_1x1(self): - sfg = ldlt_matrix_inverse(N=1, is_complex=False) + sfg = ldlt_matrix_inverse(N=1) assert len(sfg.inputs) == 1 assert len(sfg.outputs) == 1 @@ -661,7 +661,7 @@ class TestLdltMatrixInverse: assert np.isclose(res["0"], 0.2) def test_2x2_simple_spd(self): - sfg = ldlt_matrix_inverse(N=2, is_complex=False) + sfg = ldlt_matrix_inverse(N=2) assert len(sfg.inputs) == 3 assert len(sfg.outputs) == 3 @@ -683,7 +683,7 @@ class TestLdltMatrixInverse: assert np.isclose(res["2"], A_inv[1, 1]) def test_3x3_simple_spd(self): - sfg = ldlt_matrix_inverse(N=3, is_complex=False) + sfg = ldlt_matrix_inverse(N=3) assert len(sfg.inputs) == 6 assert len(sfg.outputs) == 6 @@ -717,7 +717,7 @@ class TestLdltMatrixInverse: def test_5x5_random_spd(self): N = 5 - sfg = ldlt_matrix_inverse(N=N, is_complex=False) + sfg = ldlt_matrix_inverse(N=N) assert len(sfg.inputs) == 15 assert len(sfg.outputs) == 15 @@ -746,7 +746,7 @@ class TestLdltMatrixInverse: def test_20x20_random_spd(self): N = 20 - sfg = ldlt_matrix_inverse(N=N, is_complex=False) + sfg = ldlt_matrix_inverse(N=N) A = self._generate_random_spd_matrix(N) -- GitLab From c864f80390ec08e43db35e918225a504158ba7f9 Mon Sep 17 00:00:00 2001 From: Simon Bjurek <simbj106@student.liu.se> Date: Tue, 11 Feb 2025 18:08:29 +0100 Subject: [PATCH 08/15] started work on the generator and added a MADS operation --- b_asic/core_operations.py | 70 +++++++++++++++++++++++++++++++ b_asic/sfg_generators.py | 87 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 157 insertions(+) diff --git a/b_asic/core_operations.py b/b_asic/core_operations.py index df0868ee..70d43370 100644 --- a/b_asic/core_operations.py +++ b/b_asic/core_operations.py @@ -1099,6 +1099,76 @@ class MAD(AbstractOperation): p._index = i +class MADS(AbstractOperation): + __slots__ = ( + "_is_add", + "_src0", + "_src1", + "_src2", + "_name", + "_latency", + "_latency_offsets", + "_execution_time", + ) + _is_add: Optional[bool] + _src0: Optional[SignalSourceProvider] + _src1: Optional[SignalSourceProvider] + _src2: Optional[SignalSourceProvider] + _name: Name + _latency: Optional[int] + _latency_offsets: Optional[Dict[str, int]] + _execution_time: Optional[int] + + is_swappable = True + + def __init__( + self, + is_add: Optional[bool] = False, + src0: Optional[SignalSourceProvider] = None, + src1: Optional[SignalSourceProvider] = None, + src2: Optional[SignalSourceProvider] = None, + name: Name = Name(""), + latency: Optional[int] = None, + latency_offsets: Optional[Dict[str, int]] = None, + execution_time: Optional[int] = None, + ): + """Construct a MADS operation.""" + super().__init__( + input_count=3, + output_count=1, + name=Name(name), + input_sources=[src0, src1, src2], + latency=latency, + latency_offsets=latency_offsets, + execution_time=execution_time, + ) + self._is_add = is_add + # self.set_param("is_add", is_add) + + @classmethod + def type_name(cls) -> TypeName: + return TypeName("mads") + + def evaluate(self, a, b, c): + return a + b * c if self._is_add else a - b * c + + @property + def is_linear(self) -> bool: + return ( + self.input(0).connected_source.operation.is_constant + or self.input(1).connected_source.operation.is_constant + ) + + def swap_io(self) -> None: + self._input_ports = [ + self._input_ports[1], + self._input_ports[0], + self._input_ports[2], + ] + for i, p in enumerate(self._input_ports): + p._index = i + + class SymmetricTwoportAdaptor(AbstractOperation): r""" Wave digital filter symmetric twoport-adaptor operation. diff --git a/b_asic/sfg_generators.py b/b_asic/sfg_generators.py index 737501f1..0ceeb4f6 100644 --- a/b_asic/sfg_generators.py +++ b/b_asic/sfg_generators.py @@ -9,10 +9,14 @@ from typing import TYPE_CHECKING, Dict, Optional, Sequence, Union import numpy as np from b_asic.core_operations import ( + MADS, Addition, Butterfly, + ComplexConjugate, + Constant, ConstantMultiplication, Name, + Reciprocal, SymmetricTwoportAdaptor, ) from b_asic.signal import Signal @@ -432,6 +436,69 @@ def radix_2_dif_fft(points: int) -> SFG: return SFG(inputs=inputs, outputs=outputs) +def ldlt_matrix_inverse(N: int, is_complex: bool) -> SFG: + inputs = [] + A = [[None for _ in range(N)] for _ in range(N)] + for i in range(N): + for j in range(i, N): + in_op = Input() + A[i][j] = in_op + inputs.append(in_op) + + D = [None for _ in range(N)] + for i in range(N): + D[i] = A[i][i] + + D_inv = [None for _ in range(N)] + + R = [[None for _ in range(N)] for _ in range(N)] + M = [[None for _ in range(N)] for _ in range(N)] + + # R*di*R^T factorization + for i in range(N): + for k in range(1, i): + D[i] = MADS(False, D[i], M[k][i], R[k][i]) + + D_inv[i] = Reciprocal(D[i]) + + for j in range(i, N): + R[i][j] = A[i][j] + + for k in range(1, i): + R[i][j] = MADS(False, R[i][j], M[k][i], R[k][j]) + + if is_complex: + M[i][j] = ComplexConjugate(R[i][j]) + else: + M[i][j] = R[i][j] + + R[i][j] = MADS(True, Constant(0, name="0"), R[i][j], D_inv[i]) + + # back substitution + A_inv = [[None for _ in range(N)] for _ in range(N)] + for i in reversed(range(N)): + A_inv[i][i] = D_inv[i] + for j in reversed(range(i)): + for k in reversed(range(j, N)): + if k == N - 1 and i != j: + # my custom + if A_inv[k][i]: + A_inv[j][i] = MADS( + False, Constant(0, name="0"), R[j][k], A_inv[k][i] + ) + else: + A_inv[j][i] = Constant(0, name="0") + else: + A_inv[j][i] = MADS(False, A_inv[j][i], R[j][k], A_inv[k][i]) + + outputs = [] + for i in range(N): + for j in range(i, N): + outputs.append(Output(A_inv[i][j])) + + return SFG(inputs, outputs) + + def _construct_dif_fft_stage( ports_from_previous_stage: list["OutputPort"], number_of_stages: int, @@ -465,6 +532,26 @@ def _construct_dif_fft_stage( return ports +def _extract_diagonal(elems): + n = 0 + k = 0 + while k <= len(elems): + k += n + 1 + n += 1 + n -= 1 + k -= n + 1 + if k != len(elems): + return None + + diagonal = np.zeros(n) + index = 0 + for i in range(n): + diagonal[n] = elems[index] + index += i + 2 + + return diagonal + + def _get_bit_reversed_number(number: int, number_of_bits: int) -> int: reversed_number = 0 for i in range(number_of_bits): -- GitLab From a79cfd9998215da296f5e87bc5bd9a12c33e3425 Mon Sep 17 00:00:00 2001 From: Simon Bjurek <simbj106@student.liu.se> Date: Wed, 12 Feb 2025 11:17:21 +0100 Subject: [PATCH 09/15] ldlt inverse now verified for matrices up to and including size 3x3 --- .gitignore | 1 + b_asic/core_operations.py | 17 ++++++++--- b_asic/sfg_generators.py | 29 +++++++++---------- test/test_sfg_generators.py | 56 +++++++++++++++++++++++++++++++++++++ 4 files changed, 85 insertions(+), 18 deletions(-) diff --git a/.gitignore b/.gitignore index c5b2148a..d240bbd1 100644 --- a/.gitignore +++ b/.gitignore @@ -116,6 +116,7 @@ TODO.txt b_asic/_version.py docs_sphinx/_build/ docs_sphinx/examples +docs_sphinx/sg_execution_times.rst result_images/ .coverage Digraph.gv diff --git a/b_asic/core_operations.py b/b_asic/core_operations.py index 70d43370..67d6b07f 100644 --- a/b_asic/core_operations.py +++ b/b_asic/core_operations.py @@ -1123,7 +1123,7 @@ class MADS(AbstractOperation): def __init__( self, - is_add: Optional[bool] = False, + is_add: Optional[bool] = True, src0: Optional[SignalSourceProvider] = None, src1: Optional[SignalSourceProvider] = None, src2: Optional[SignalSourceProvider] = None, @@ -1142,15 +1142,24 @@ class MADS(AbstractOperation): latency_offsets=latency_offsets, execution_time=execution_time, ) - self._is_add = is_add - # self.set_param("is_add", is_add) + self.set_param("is_add", is_add) @classmethod def type_name(cls) -> TypeName: return TypeName("mads") def evaluate(self, a, b, c): - return a + b * c if self._is_add else a - b * c + return a + b * c if self.is_add else a - b * c + + @property + def is_add(self) -> bool: + """Get if operation is an addition.""" + return self.param("is_add") + + @is_add.setter + def is_add(self, is_add: bool) -> None: + """Set if operation is an addition.""" + self.set_param("is_add", is_add) @property def is_linear(self) -> bool: diff --git a/b_asic/sfg_generators.py b/b_asic/sfg_generators.py index 0ceeb4f6..a0847de4 100644 --- a/b_asic/sfg_generators.py +++ b/b_asic/sfg_generators.py @@ -456,40 +456,41 @@ def ldlt_matrix_inverse(N: int, is_complex: bool) -> SFG: # R*di*R^T factorization for i in range(N): - for k in range(1, i): - D[i] = MADS(False, D[i], M[k][i], R[k][i]) + for k in range(i): + D[i] = MADS(False, D[i], M[k][i], R[k][i], name="M1") D_inv[i] = Reciprocal(D[i]) - for j in range(i, N): + for j in range(i + 1, N): R[i][j] = A[i][j] - for k in range(1, i): - R[i][j] = MADS(False, R[i][j], M[k][i], R[k][j]) + for k in range(i): + R[i][j] = MADS(False, R[i][j], M[k][i], R[k][j], name="M2") if is_complex: M[i][j] = ComplexConjugate(R[i][j]) else: M[i][j] = R[i][j] - R[i][j] = MADS(True, Constant(0, name="0"), R[i][j], D_inv[i]) + R[i][j] = MADS(True, Constant(0, name="0"), R[i][j], D_inv[i], name="M3") # back substitution A_inv = [[None for _ in range(N)] for _ in range(N)] for i in reversed(range(N)): A_inv[i][i] = D_inv[i] - for j in reversed(range(i)): - for k in reversed(range(j, N)): + for j in reversed(range(i + 1)): + for k in reversed(range(j + 1, N)): if k == N - 1 and i != j: - # my custom - if A_inv[k][i]: + A_inv[j][i] = MADS( + False, Constant(0, name="0"), R[j][k], A_inv[i][k], name="M4" + ) + else: + if A_inv[i][k]: A_inv[j][i] = MADS( - False, Constant(0, name="0"), R[j][k], A_inv[k][i] + False, A_inv[j][i], R[j][k], A_inv[i][k], name="M5" ) else: - A_inv[j][i] = Constant(0, name="0") - else: - A_inv[j][i] = MADS(False, A_inv[j][i], R[j][k], A_inv[k][i]) + A_inv[j][i] = MADS(False, A_inv[j][i], R[j][k], A_inv[k][i]) outputs = [] for i in range(N): diff --git a/test/test_sfg_generators.py b/test/test_sfg_generators.py index 7064658c..e3f56ec0 100644 --- a/test/test_sfg_generators.py +++ b/test/test_sfg_generators.py @@ -12,6 +12,7 @@ from b_asic.sfg_generators import ( direct_form_1_iir, direct_form_2_iir, direct_form_fir, + ldlt_matrix_inverse, radix_2_dif_fft, transposed_direct_form_fir, wdf_allpass, @@ -637,3 +638,58 @@ class TestRadix2FFT: POINTS = 5 with pytest.raises(ValueError, match="Points must be a power of two."): radix_2_dif_fft(points=POINTS) + + +class TestLdltMatrixInverse: + def test_1x1(self): + sfg = ldlt_matrix_inverse(N=1, is_complex=False) + + A_input = [Constant(5)] + + sim = Simulation(sfg, A_input) + sim.run_for(1) + + res = sim.results + assert np.isclose(res["0"], 0.2) + + def test_2x2_simple_spd(self): + sfg = ldlt_matrix_inverse(N=2, is_complex=False) + + A = np.array([[1, 2], [2, 1]]) + A_input = [Constant(1), Constant(2), Constant(1)] + + A_inv = np.linalg.inv(A) + + sim = Simulation(sfg, A_input) + sim.run_for(1) + + res = sim.results + assert np.isclose(res["0"], A_inv[0, 0]) + assert np.isclose(res["1"], A_inv[0, 1]) + assert np.isclose(res["2"], A_inv[1, 1]) + + def test_3x3_simple_spd(self): + sfg = ldlt_matrix_inverse(N=3, is_complex=False) + + A = np.array([[2, -1, 0], [-1, 3, -1], [0, -1, 2]]) + A_input = [ + Constant(2), + Constant(-1), + Constant(0), + Constant(3), + Constant(-1), + Constant(2), + ] + + A_inv = np.linalg.inv(A) + + sim = Simulation(sfg, A_input) + sim.run_for(1) + + res = sim.results + assert np.isclose(res["0"], A_inv[0, 0]) + assert np.isclose(res["1"], A_inv[0, 1]) + assert np.isclose(res["2"], A_inv[0, 2]) + assert np.isclose(res["3"], A_inv[1, 1]) + assert np.isclose(res["4"], A_inv[1, 2]) + assert np.isclose(res["5"], A_inv[2, 2]) -- GitLab From 8af39d1f6a5fadc454be8ecaeceaeba0b38624bb Mon Sep 17 00:00:00 2001 From: Simon Bjurek <simbj106@student.liu.se> Date: Wed, 12 Feb 2025 18:01:38 +0100 Subject: [PATCH 10/15] added more tests and one commented out test for complex matrices --- test/test_sfg_generators.py | 120 ++++++++++++++++++++++++++++++++++++ 1 file changed, 120 insertions(+) diff --git a/test/test_sfg_generators.py b/test/test_sfg_generators.py index e3f56ec0..0a57b116 100644 --- a/test/test_sfg_generators.py +++ b/test/test_sfg_generators.py @@ -3,9 +3,11 @@ import pytest from scipy import signal from b_asic.core_operations import ( + MADS, Addition, Butterfly, ConstantMultiplication, + Reciprocal, SymmetricTwoportAdaptor, ) from b_asic.sfg_generators import ( @@ -644,6 +646,12 @@ class TestLdltMatrixInverse: def test_1x1(self): sfg = ldlt_matrix_inverse(N=1, is_complex=False) + assert len(sfg.inputs) == 1 + assert len(sfg.outputs) == 1 + + assert len(sfg.find_by_type_name(MADS.type_name())) == 0 + assert len(sfg.find_by_type_name(Reciprocal.type_name())) == 1 + A_input = [Constant(5)] sim = Simulation(sfg, A_input) @@ -655,6 +663,12 @@ class TestLdltMatrixInverse: def test_2x2_simple_spd(self): sfg = ldlt_matrix_inverse(N=2, is_complex=False) + assert len(sfg.inputs) == 3 + assert len(sfg.outputs) == 3 + + assert len(sfg.find_by_type_name(MADS.type_name())) == 4 + assert len(sfg.find_by_type_name(Reciprocal.type_name())) == 2 + A = np.array([[1, 2], [2, 1]]) A_input = [Constant(1), Constant(2), Constant(1)] @@ -671,6 +685,12 @@ class TestLdltMatrixInverse: def test_3x3_simple_spd(self): sfg = ldlt_matrix_inverse(N=3, is_complex=False) + assert len(sfg.inputs) == 6 + assert len(sfg.outputs) == 6 + + assert len(sfg.find_by_type_name(MADS.type_name())) == 15 + assert len(sfg.find_by_type_name(Reciprocal.type_name())) == 3 + A = np.array([[2, -1, 0], [-1, 3, -1], [0, -1, 2]]) A_input = [ Constant(2), @@ -693,3 +713,103 @@ class TestLdltMatrixInverse: assert np.isclose(res["3"], A_inv[1, 1]) assert np.isclose(res["4"], A_inv[1, 2]) assert np.isclose(res["5"], A_inv[2, 2]) + + def test_5x5_random_spd(self): + N = 5 + + sfg = ldlt_matrix_inverse(N=N, is_complex=False) + + assert len(sfg.inputs) == 15 + assert len(sfg.outputs) == 15 + + assert len(sfg.find_by_type_name(MADS.type_name())) == 70 + assert len(sfg.find_by_type_name(Reciprocal.type_name())) == N + + A = self._generate_random_spd_matrix(N) + + upper_tridiag = A[np.triu_indices_from(A)] + + A_input = [Constant(num) for num in upper_tridiag] + A_inv = np.linalg.inv(A) + + sim = Simulation(sfg, A_input) + sim.run_for(1) + res = sim.results + + row_indices, col_indices = np.triu_indices(N) + expected_values = A_inv[row_indices, col_indices] + actual_values = [res[str(i)] for i in range(len(expected_values))] + + for i in range(len(expected_values)): + assert np.isclose(actual_values[i], expected_values[i]) + + def test_30x30_random_spd(self): + N = 30 + + sfg = ldlt_matrix_inverse(N=N, is_complex=False) + + A = self._generate_random_spd_matrix(N) + + assert len(sfg.inputs) == len(A[np.triu_indices_from(A)]) + assert len(sfg.outputs) == len(A[np.triu_indices_from(A)]) + + assert len(sfg.find_by_type_name(Reciprocal.type_name())) == N + + upper_tridiag = A[np.triu_indices_from(A)] + + A_input = [Constant(num) for num in upper_tridiag] + A_inv = np.linalg.inv(A) + + sim = Simulation(sfg, A_input) + sim.run_for(1) + res = sim.results + + row_indices, col_indices = np.triu_indices(N) + expected_values = A_inv[row_indices, col_indices] + actual_values = [res[str(i)] for i in range(len(expected_values))] + + for i in range(len(expected_values)): + assert np.isclose(actual_values[i], expected_values[i]) + + # def test_2x2_random_complex_spd(self): + # N = 2 + + # sfg = ldlt_matrix_inverse(N=N, is_complex=True) + + # # A = self._generate_random_complex_spd_matrix(N) + # A = np.array([[2, 1+1j],[1-1j, 3]]) + + # assert len(sfg.inputs) == len(A[np.triu_indices_from(A)]) + # assert len(sfg.outputs) == len(A[np.triu_indices_from(A)]) + + # assert len(sfg.find_by_type_name(Reciprocal.type_name())) == N + + # upper_tridiag = A[np.triu_indices_from(A)] + + # A_input = [Constant(num) for num in upper_tridiag] + # A_inv = np.linalg.inv(A) + + # sim = Simulation(sfg, A_input) + # sim.run_for(1) + # res = sim.results + + # row_indices, col_indices = np.triu_indices(N) + # expected_values = A_inv[row_indices, col_indices] + # actual_values = [res[str(i)] for i in range(len(expected_values))] + + # for i in range(len(expected_values)): + # assert np.isclose(actual_values[i], expected_values[i]) + + def _generate_random_spd_matrix(self, N: int) -> np.ndarray: + A = np.random.rand(N, N) + A = (A + A.T) / 2 # ensure symmetric + min_eig = np.min(np.linalg.eigvals(A)) + A += (np.abs(min_eig) + 0.1) * np.eye(N) # ensure positive definiteness + return A + + def _generate_random_complex_spd_matrix(self, N: int) -> np.ndarray: + A = np.random.randn(N, N) + 1j * np.random.randn(N, N) + A = (A + A.conj().T) / 2 # ensure symmetric + min_eig = np.min(np.linalg.eigvals(A)) + A += (np.abs(min_eig) + 0.1) * np.eye(N) # ensure positive definiteness + return A -- GitLab From 3f133d6a72af064f754f425b0b804dcb54d244dd Mon Sep 17 00:00:00 2001 From: Simon Bjurek <simbj106@student.liu.se> Date: Wed, 12 Feb 2025 20:14:11 +0100 Subject: [PATCH 11/15] changed test that took too long --- test/test_sfg_generators.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/test_sfg_generators.py b/test/test_sfg_generators.py index 0a57b116..40dd973f 100644 --- a/test/test_sfg_generators.py +++ b/test/test_sfg_generators.py @@ -743,7 +743,7 @@ class TestLdltMatrixInverse: for i in range(len(expected_values)): assert np.isclose(actual_values[i], expected_values[i]) - def test_30x30_random_spd(self): + def test_20x20_random_spd(self): N = 30 sfg = ldlt_matrix_inverse(N=N, is_complex=False) -- GitLab From 2963486dde9c833274913bd984aae568940c7d68 Mon Sep 17 00:00:00 2001 From: Simon Bjurek <simbj106@student.liu.se> Date: Wed, 12 Feb 2025 20:15:33 +0100 Subject: [PATCH 12/15] fix from last commit --- test/test_sfg_generators.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/test_sfg_generators.py b/test/test_sfg_generators.py index 40dd973f..e3430bc4 100644 --- a/test/test_sfg_generators.py +++ b/test/test_sfg_generators.py @@ -744,7 +744,7 @@ class TestLdltMatrixInverse: assert np.isclose(actual_values[i], expected_values[i]) def test_20x20_random_spd(self): - N = 30 + N = 20 sfg = ldlt_matrix_inverse(N=N, is_complex=False) -- GitLab From f85cd36a50a572105c1c6f07baac4f15374b3d21 Mon Sep 17 00:00:00 2001 From: Simon Bjurek <simbj106@student.liu.se> Date: Thu, 13 Feb 2025 12:35:29 +0100 Subject: [PATCH 13/15] fixes from mr comments and added tests for 100 percent test coverage in core_operations --- b_asic/core_operations.py | 2 +- b_asic/sfg_generators.py | 32 ++----- test/test_core_operations.py | 166 ++++++++++++++++++++++++++++++++++- test/test_sfg_generators.py | 12 +-- 4 files changed, 175 insertions(+), 37 deletions(-) diff --git a/b_asic/core_operations.py b/b_asic/core_operations.py index 67d6b07f..fd75359d 100644 --- a/b_asic/core_operations.py +++ b/b_asic/core_operations.py @@ -1170,9 +1170,9 @@ class MADS(AbstractOperation): def swap_io(self) -> None: self._input_ports = [ - self._input_ports[1], self._input_ports[0], self._input_ports[2], + self._input_ports[1], ] for i, p in enumerate(self._input_ports): p._index = i diff --git a/b_asic/sfg_generators.py b/b_asic/sfg_generators.py index a0847de4..0f7d26c5 100644 --- a/b_asic/sfg_generators.py +++ b/b_asic/sfg_generators.py @@ -457,7 +457,7 @@ def ldlt_matrix_inverse(N: int, is_complex: bool) -> SFG: # R*di*R^T factorization for i in range(N): for k in range(i): - D[i] = MADS(False, D[i], M[k][i], R[k][i], name="M1") + D[i] = MADS(False, D[i], M[k][i], R[k][i]) D_inv[i] = Reciprocal(D[i]) @@ -465,14 +465,14 @@ def ldlt_matrix_inverse(N: int, is_complex: bool) -> SFG: R[i][j] = A[i][j] for k in range(i): - R[i][j] = MADS(False, R[i][j], M[k][i], R[k][j], name="M2") + R[i][j] = MADS(False, R[i][j], M[k][i], R[k][j]) if is_complex: M[i][j] = ComplexConjugate(R[i][j]) else: M[i][j] = R[i][j] - R[i][j] = MADS(True, Constant(0, name="0"), R[i][j], D_inv[i], name="M3") + R[i][j] = MADS(True, Constant(0, name="0"), R[i][j], D_inv[i]) # back substitution A_inv = [[None for _ in range(N)] for _ in range(N)] @@ -482,13 +482,11 @@ def ldlt_matrix_inverse(N: int, is_complex: bool) -> SFG: for k in reversed(range(j + 1, N)): if k == N - 1 and i != j: A_inv[j][i] = MADS( - False, Constant(0, name="0"), R[j][k], A_inv[i][k], name="M4" + False, Constant(0, name="0"), R[j][k], A_inv[i][k] ) else: if A_inv[i][k]: - A_inv[j][i] = MADS( - False, A_inv[j][i], R[j][k], A_inv[i][k], name="M5" - ) + A_inv[j][i] = MADS(False, A_inv[j][i], R[j][k], A_inv[i][k]) else: A_inv[j][i] = MADS(False, A_inv[j][i], R[j][k], A_inv[k][i]) @@ -533,26 +531,6 @@ def _construct_dif_fft_stage( return ports -def _extract_diagonal(elems): - n = 0 - k = 0 - while k <= len(elems): - k += n + 1 - n += 1 - n -= 1 - k -= n + 1 - if k != len(elems): - return None - - diagonal = np.zeros(n) - index = 0 - for i in range(n): - diagonal[n] = elems[index] - index += i + 2 - - return diagonal - - def _get_bit_reversed_number(number: int, number_of_bits: int) -> int: reversed_number = 0 for i in range(number_of_bits): diff --git a/test/test_core_operations.py b/test/test_core_operations.py index 1a5f367b..328acad2 100644 --- a/test/test_core_operations.py +++ b/test/test_core_operations.py @@ -3,6 +3,8 @@ import pytest from b_asic import ( + MAD, + MADS, Absolute, Addition, AddSub, @@ -11,6 +13,7 @@ from b_asic import ( Constant, ConstantMultiplication, Division, + Input, LeftShift, Max, Min, @@ -47,6 +50,14 @@ class TestConstant: test_operation.value = 4 assert test_operation.value == 4 + def test_constant_repr(self): + test_operation = Constant(3) + assert repr(test_operation) == "Constant(3)" + + def test_constant_str(self): + test_operation = Constant(3) + assert str(test_operation) == "3" + class TestAddition: """Tests for Addition class.""" @@ -83,16 +94,16 @@ class TestSubtraction: class TestAddSub: """Tests for AddSub class.""" - def test_addition_positive(self): + def test_addsub_positive(self): test_operation = AddSub(is_add=True) assert test_operation.evaluate_output(0, [3, 5]) == 8 - def test_addition_negative(self): + def test_addsub_negative(self): test_operation = AddSub(is_add=True) assert test_operation.evaluate_output(0, [-3, -5]) == -8 assert test_operation.is_add - def test_addition_complex(self): + def test_addsub_complex(self): test_operation = AddSub(is_add=True) assert test_operation.evaluate_output(0, [3 + 5j, 4 + 6j]) == 7 + 11j @@ -116,6 +127,22 @@ class TestAddSub: test_operation = AddSub(is_add=True) assert test_operation.is_swappable + def test_addsub_is_add_getter(self): + test_operation = AddSub(is_add=False) + assert not test_operation.is_add + + test_operation = AddSub(is_add=True) + assert test_operation.is_add + + def test_addsub_is_add_setter(self): + test_operation = AddSub(is_add=False) + test_operation.is_add = True + assert test_operation.is_add + + test_operation = AddSub(is_add=True) + test_operation.is_add = False + assert not test_operation.is_add + class TestMultiplication: """Tests for Multiplication class.""" @@ -148,6 +175,13 @@ class TestDivision: test_operation = Division() assert test_operation.evaluate_output(0, [60 + 40j, 10 + 20j]) == 2.8 - 1.6j + def test_mads_is_linear(self): + test_operation = Division(Constant(3), Addition(Input(), Constant(3))) + assert not test_operation.is_linear + + test_operation = Division(Addition(Input(), Constant(3)), Constant(3)) + assert test_operation.is_linear + class TestSquareRoot: """Tests for SquareRoot class.""" @@ -200,6 +234,13 @@ class TestMin: test_operation = Min() assert test_operation.evaluate_output(0, [-30, -5]) == -30 + def test_min_complex(self): + test_operation = Min() + with pytest.raises( + ValueError, match="core_operations.Min does not support complex numbers." + ): + test_operation.evaluate_output(0, [-1 - 1j, 2 + 2j]) + class TestAbsolute: """Tests for Absolute class.""" @@ -216,6 +257,13 @@ class TestAbsolute: test_operation = Absolute() assert test_operation.evaluate_output(0, [3 + 4j]) == 5.0 + def test_max_complex(self): + test_operation = Max() + with pytest.raises( + ValueError, match="core_operations.Max does not support complex numbers." + ): + test_operation.evaluate_output(0, [-1 - 1j, 2 + 2j]) + class TestConstantMultiplication: """Tests for ConstantMultiplication class.""" @@ -234,6 +282,106 @@ class TestConstantMultiplication: assert test_operation.evaluate_output(0, [3 + 4j]) == 1 + 18j +class TestMAD: + def test_mad_positive(self): + test_operation = MAD() + assert test_operation.evaluate_output(0, [1, 2, 3]) == 5 + + def test_mad_negative(self): + test_operation = MAD() + assert test_operation.evaluate_output(0, [-3, -5, -8]) == 7 + + def test_mad_complex(self): + test_operation = MAD() + assert test_operation.evaluate_output(0, [3 + 6j, 2 + 6j, 1 + 1j]) == -29 + 31j + + def test_mad_is_linear(self): + test_operation = MAD( + Constant(3), Addition(Input(), Constant(3)), Addition(Input(), Constant(3)) + ) + assert test_operation.is_linear + + test_operation = MAD( + Addition(Input(), Constant(3)), Constant(3), Addition(Input(), Constant(3)) + ) + assert test_operation.is_linear + + test_operation = MAD( + Addition(Input(), Constant(3)), Addition(Input(), Constant(3)), Constant(3) + ) + assert not test_operation.is_linear + + def test_mad_swap_io(self): + test_operation = MAD() + assert test_operation.evaluate_output(0, [1, 2, 3]) == 5 + test_operation.swap_io() + assert test_operation.evaluate_output(0, [1, 2, 3]) == 5 + + +class TestMADS: + def test_mads_positive(self): + test_operation = MADS(is_add=False) + assert test_operation.evaluate_output(0, [1, 2, 3]) == -5 + + def test_mads_negative(self): + test_operation = MADS(is_add=False) + assert test_operation.evaluate_output(0, [-3, -5, -8]) == -43 + + def test_mads_complex(self): + test_operation = MADS(is_add=False) + assert test_operation.evaluate_output(0, [3 + 6j, 2 + 6j, 1 + 1j]) == 7 - 2j + + def test_mads_positive_add(self): + test_operation = MADS(is_add=True) + assert test_operation.evaluate_output(0, [1, 2, 3]) == 7 + + def test_mads_negative_add(self): + test_operation = MADS(is_add=True) + assert test_operation.evaluate_output(0, [-3, -5, -8]) == 37 + + def test_mads_complex_add(self): + test_operation = MADS(is_add=True) + assert test_operation.evaluate_output(0, [3 + 6j, 2 + 6j, 1 + 1j]) == -1 + 14j + + def test_mads_is_linear(self): + test_operation = MADS( + Constant(3), Addition(Input(), Constant(3)), Addition(Input(), Constant(3)) + ) + assert not test_operation.is_linear + + test_operation = MADS( + Addition(Input(), Constant(3)), Constant(3), Addition(Input(), Constant(3)) + ) + assert test_operation.is_linear + + test_operation = MADS( + Addition(Input(), Constant(3)), Addition(Input(), Constant(3)), Constant(3) + ) + assert test_operation.is_linear + + def test_mads_swap_io(self): + test_operation = MADS(is_add=False) + assert test_operation.evaluate_output(0, [1, 2, 3]) == -5 + test_operation.swap_io() + assert test_operation.evaluate_output(0, [1, 2, 3]) == -5 + + def test_mads_is_add_getter(self): + test_operation = MADS(is_add=False) + assert not test_operation.is_add + + test_operation = MADS(is_add=True) + assert test_operation.is_add + + def test_mads_is_add_setter(self): + test_operation = MADS(is_add=False) + test_operation.is_add = True + assert test_operation.is_add + + test_operation = MADS(is_add=True) + test_operation.is_add = False + assert not test_operation.is_add + + class TestRightShift: """Tests for RightShift class.""" @@ -420,3 +568,15 @@ class TestSink: assert sfg1.input_count == 2 assert sfg.evaluate_output(1, [0, 1]) == sfg1.evaluate_output(0, [0, 1]) + + def test_sink_latency_getter(self): + test_operation = Sink() + assert test_operation.latency == 0 + + def test_sink_repr(self): + test_operation = Sink() + assert repr(test_operation) == "Sink()" + + def test_sink_str(self): + test_operation = Sink() + assert str(test_operation) == "sink" diff --git a/test/test_sfg_generators.py b/test/test_sfg_generators.py index e3430bc4..3d876d75 100644 --- a/test/test_sfg_generators.py +++ b/test/test_sfg_generators.py @@ -807,9 +807,9 @@ class TestLdltMatrixInverse: A += (np.abs(min_eig) + 0.1) * np.eye(N) # ensure positive definiteness return A - def _generate_random_complex_spd_matrix(self, N: int) -> np.ndarray: - A = np.random.randn(N, N) + 1j * np.random.randn(N, N) - A = (A + A.conj().T) / 2 # ensure symmetric - min_eig = np.min(np.linalg.eigvals(A)) - A += (np.abs(min_eig) + 0.1) * np.eye(N) # ensure positive definiteness - return A + # def _generate_random_complex_spd_matrix(self, N: int) -> np.ndarray: + # A = np.random.randn(N, N) + 1j * np.random.randn(N, N) + # A = (A + A.conj().T) / 2 # ensure symmetric + # min_eig = np.min(np.linalg.eigvals(A)) + # A += (np.abs(min_eig) + 0.1) * np.eye(N) # ensure positive definiteness + # return A -- GitLab From 58084c2a869ea4c98bfbbb29db0876db3985fc91 Mon Sep 17 00:00:00 2001 From: Simon Bjurek <simbj106@student.liu.se> Date: Fri, 14 Feb 2025 15:55:27 +0100 Subject: [PATCH 14/15] Changed sfg generator to use DontCare. --- b_asic/core_operations.py | 74 ++++++++++++++++++++++++++++++++++-- b_asic/sfg_generators.py | 43 +++++++++++++-------- test/test_core_operations.py | 66 ++++++++++++++++++++++++++++++-- test/test_sfg_generators.py | 10 ++--- 4 files changed, 167 insertions(+), 26 deletions(-) diff --git a/b_asic/core_operations.py b/b_asic/core_operations.py index fd75359d..3a1474cc 100644 --- a/b_asic/core_operations.py +++ b/b_asic/core_operations.py @@ -1102,6 +1102,7 @@ class MAD(AbstractOperation): class MADS(AbstractOperation): __slots__ = ( "_is_add", + "_override_zero_on_src0", "_src0", "_src1", "_src2", @@ -1111,6 +1112,7 @@ class MADS(AbstractOperation): "_execution_time", ) _is_add: Optional[bool] + _override_zero_on_src0: Optional[bool] _src0: Optional[SignalSourceProvider] _src1: Optional[SignalSourceProvider] _src2: Optional[SignalSourceProvider] @@ -1124,6 +1126,7 @@ class MADS(AbstractOperation): def __init__( self, is_add: Optional[bool] = True, + override_zero_on_src0: Optional[bool] = False, src0: Optional[SignalSourceProvider] = None, src1: Optional[SignalSourceProvider] = None, src2: Optional[SignalSourceProvider] = None, @@ -1143,13 +1146,23 @@ class MADS(AbstractOperation): execution_time=execution_time, ) self.set_param("is_add", is_add) + self.set_param("override_zero_on_src0", override_zero_on_src0) @classmethod def type_name(cls) -> TypeName: return TypeName("mads") def evaluate(self, a, b, c): - return a + b * c if self.is_add else a - b * c + if self.is_add: + if self.override_zero_on_src0: + return b * c + else: + return a + b * c + else: + if self.override_zero_on_src0: + return -b * c + else: + return a - b * c @property def is_add(self) -> bool: @@ -1161,11 +1174,21 @@ class MADS(AbstractOperation): """Set if operation is an addition.""" self.set_param("is_add", is_add) + @property + def override_zero_on_src0(self) -> bool: + """Get if operation is overriding a zero on port src0.""" + return self.param("override_zero_on_src0") + + @override_zero_on_src0.setter + def override_zero_on_src0(self, override_zero_on_src0: bool) -> None: + """Set if operation is overriding a zero on port src0.""" + self.set_param("override_zero_on_src0", override_zero_on_src0) + @property def is_linear(self) -> bool: return ( - self.input(0).connected_source.operation.is_constant - or self.input(1).connected_source.operation.is_constant + self.input(1).connected_source.operation.is_constant + or self.input(2).connected_source.operation.is_constant ) def swap_io(self) -> None: @@ -1598,6 +1621,51 @@ class Shift(AbstractOperation): self.set_param("value", value) +class DontCare(AbstractOperation): + r""" + Dont-care operation + + Used for ignoring the input to another operation and thus avoiding dangling input nodes. + + Parameters + ---------- + name : Name, optional + Operation name. + + """ + + __slots__ = "_name" + _name: Name + + is_linear = True + + def __init__(self, name: Name = ""): + """Construct a DontCare operation.""" + super().__init__( + input_count=0, + output_count=1, + name=name, + latency_offsets={"out0": 0}, + ) + + @classmethod + def type_name(cls) -> TypeName: + return TypeName("dontcare") + + def evaluate(self): + return 0 + + @property + def latency(self) -> int: + return self.latency_offsets["out0"] + + def __repr__(self) -> str: + return "DontCare()" + + def __str__(self) -> str: + return "dontcare" + + class Sink(AbstractOperation): r""" Sink operation. diff --git a/b_asic/sfg_generators.py b/b_asic/sfg_generators.py index 0f7d26c5..3e76f159 100644 --- a/b_asic/sfg_generators.py +++ b/b_asic/sfg_generators.py @@ -12,9 +12,8 @@ from b_asic.core_operations import ( MADS, Addition, Butterfly, - ComplexConjugate, - Constant, ConstantMultiplication, + DontCare, Name, Reciprocal, SymmetricTwoportAdaptor, @@ -436,7 +435,19 @@ def radix_2_dif_fft(points: int) -> SFG: return SFG(inputs=inputs, outputs=outputs) -def ldlt_matrix_inverse(N: int, is_complex: bool) -> SFG: +def ldlt_matrix_inverse(N: int) -> SFG: + """Generates an SFG for the LDLT matrix inverse algorithm. + + Parameters + ---------- + N : int + Dimension of the square input matrix. + + Returns + ------- + SFG + Signal Flow Graph + """ inputs = [] A = [[None for _ in range(N)] for _ in range(N)] for i in range(N): @@ -457,7 +468,7 @@ def ldlt_matrix_inverse(N: int, is_complex: bool) -> SFG: # R*di*R^T factorization for i in range(N): for k in range(i): - D[i] = MADS(False, D[i], M[k][i], R[k][i]) + D[i] = MADS(False, False, D[i], M[k][i], R[k][i]) D_inv[i] = Reciprocal(D[i]) @@ -465,14 +476,14 @@ def ldlt_matrix_inverse(N: int, is_complex: bool) -> SFG: R[i][j] = A[i][j] for k in range(i): - R[i][j] = MADS(False, R[i][j], M[k][i], R[k][j]) + R[i][j] = MADS(False, False, R[i][j], M[k][i], R[k][j]) - if is_complex: - M[i][j] = ComplexConjugate(R[i][j]) - else: - M[i][j] = R[i][j] + # if is_complex: + # M[i][j] = ComplexConjugate(R[i][j]) + # else: + M[i][j] = R[i][j] - R[i][j] = MADS(True, Constant(0, name="0"), R[i][j], D_inv[i]) + R[i][j] = MADS(True, True, DontCare(), R[i][j], D_inv[i]) # back substitution A_inv = [[None for _ in range(N)] for _ in range(N)] @@ -481,14 +492,16 @@ def ldlt_matrix_inverse(N: int, is_complex: bool) -> SFG: for j in reversed(range(i + 1)): for k in reversed(range(j + 1, N)): if k == N - 1 and i != j: - A_inv[j][i] = MADS( - False, Constant(0, name="0"), R[j][k], A_inv[i][k] - ) + A_inv[j][i] = MADS(False, True, DontCare(), R[j][k], A_inv[i][k]) else: if A_inv[i][k]: - A_inv[j][i] = MADS(False, A_inv[j][i], R[j][k], A_inv[i][k]) + A_inv[j][i] = MADS( + False, False, A_inv[j][i], R[j][k], A_inv[i][k] + ) else: - A_inv[j][i] = MADS(False, A_inv[j][i], R[j][k], A_inv[k][i]) + A_inv[j][i] = MADS( + False, False, A_inv[j][i], R[j][k], A_inv[k][i] + ) outputs = [] for i in range(N): diff --git a/test/test_core_operations.py b/test/test_core_operations.py index 328acad2..4b34ac58 100644 --- a/test/test_core_operations.py +++ b/test/test_core_operations.py @@ -5,6 +5,7 @@ import pytest from b_asic import ( MAD, MADS, + SFG, Absolute, Addition, AddSub, @@ -13,11 +14,13 @@ from b_asic import ( Constant, ConstantMultiplication, Division, + DontCare, Input, LeftShift, Max, Min, Multiplication, + Output, Reciprocal, RightShift, Shift, @@ -343,19 +346,33 @@ class TestMADS: test_operation = MADS(is_add=True) assert test_operation.evaluate_output(0, [3 + 6j, 2 + 6j, 1 + 1j]) == -1 + 14j + def test_mads_zero_override(self): + test_operation = MADS(is_add=True, override_zero_on_src0=True) + assert test_operation.evaluate_output(0, [1, 1, 1]) == 1 + + def test_mads_sub_zero_override(self): + test_operation = MADS(is_add=False, override_zero_on_src0=True) + assert test_operation.evaluate_output(0, [1, 1, 1]) == -1 + def test_mads_is_linear(self): test_operation = MADS( - Constant(3), Addition(Input(), Constant(3)), Addition(Input(), Constant(3)) + src0=Constant(3), + src1=Addition(Input(), Constant(3)), + src2=Addition(Input(), Constant(3)), ) assert not test_operation.is_linear test_operation = MADS( - Addition(Input(), Constant(3)), Constant(3), Addition(Input(), Constant(3)) + src0=Addition(Input(), Constant(3)), + src1=Constant(3), + src2=Addition(Input(), Constant(3)), ) assert test_operation.is_linear test_operation = MADS( - Addition(Input(), Constant(3)), Addition(Input(), Constant(3)), Constant(3) + src0=Addition(Input(), Constant(3)), + src1=Addition(Input(), Constant(3)), + src2=Constant(3), ) assert test_operation.is_linear @@ -381,6 +398,22 @@ class TestMADS: test_operation.is_add = False assert not test_operation.is_add + def test_mads_override_zero_on_src0_getter(self): + test_operation = MADS(override_zero_on_src0=False) + assert not test_operation.override_zero_on_src0 + + test_operation = MADS(override_zero_on_src0=True) + assert test_operation.override_zero_on_src0 + + def test_mads_override_zero_on_src0_setter(self): + test_operation = MADS(override_zero_on_src0=False) + test_operation.override_zero_on_src0 = True + assert test_operation.override_zero_on_src0 + + test_operation = MADS(override_zero_on_src0=True) + test_operation.override_zero_on_src0 = False + assert not test_operation.override_zero_on_src0 + class TestRightShift: """Tests for RightShift class.""" @@ -556,6 +589,33 @@ class TestDepends: assert set(bfly1.inputs_required_for_output(1)) == {0, 1} +class TestDontCare: + def test_create_sfg_with_dontcare(self): + i1 = Input() + dc = DontCare() + a = Addition(i1, dc) + o = Output(a) + sfg = SFG([i1], [o]) + + assert sfg.output_count == 1 + assert sfg.input_count == 1 + + assert sfg.evaluate_output(0, [0]) == 0 + assert sfg.evaluate_output(0, [1]) == 1 + + def test_dontcare_latency_getter(self): + test_operation = DontCare() + assert test_operation.latency == 0 + + def test_dontcare_repr(self): + test_operation = DontCare() + assert repr(test_operation) == "DontCare()" + + def test_dontcare_str(self): + test_operation = DontCare() + assert str(test_operation) == "dontcare" + + class TestSink: def test_create_sfg_with_sink(self): bfly = Butterfly() diff --git a/test/test_sfg_generators.py b/test/test_sfg_generators.py index 3d876d75..bbd8916e 100644 --- a/test/test_sfg_generators.py +++ b/test/test_sfg_generators.py @@ -644,7 +644,7 @@ class TestRadix2FFT: class TestLdltMatrixInverse: def test_1x1(self): - sfg = ldlt_matrix_inverse(N=1, is_complex=False) + sfg = ldlt_matrix_inverse(N=1) assert len(sfg.inputs) == 1 assert len(sfg.outputs) == 1 @@ -661,7 +661,7 @@ class TestLdltMatrixInverse: assert np.isclose(res["0"], 0.2) def test_2x2_simple_spd(self): - sfg = ldlt_matrix_inverse(N=2, is_complex=False) + sfg = ldlt_matrix_inverse(N=2) assert len(sfg.inputs) == 3 assert len(sfg.outputs) == 3 @@ -683,7 +683,7 @@ class TestLdltMatrixInverse: assert np.isclose(res["2"], A_inv[1, 1]) def test_3x3_simple_spd(self): - sfg = ldlt_matrix_inverse(N=3, is_complex=False) + sfg = ldlt_matrix_inverse(N=3) assert len(sfg.inputs) == 6 assert len(sfg.outputs) == 6 @@ -717,7 +717,7 @@ class TestLdltMatrixInverse: def test_5x5_random_spd(self): N = 5 - sfg = ldlt_matrix_inverse(N=N, is_complex=False) + sfg = ldlt_matrix_inverse(N=N) assert len(sfg.inputs) == 15 assert len(sfg.outputs) == 15 @@ -746,7 +746,7 @@ class TestLdltMatrixInverse: def test_20x20_random_spd(self): N = 20 - sfg = ldlt_matrix_inverse(N=N, is_complex=False) + sfg = ldlt_matrix_inverse(N=N) A = self._generate_random_spd_matrix(N) -- GitLab From 1f22409fe1a49c4fe3623cdbdd9d468b69a783e3 Mon Sep 17 00:00:00 2001 From: Simon Bjurek <simbj106@student.liu.se> Date: Mon, 17 Feb 2025 10:18:50 +0100 Subject: [PATCH 15/15] added schedule __str__ function that prints id, start time, backward slack and forward slack --- b_asic/schedule.py | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/b_asic/schedule.py b/b_asic/schedule.py index 922f3dbc..6cfabb17 100644 --- a/b_asic/schedule.py +++ b/b_asic/schedule.py @@ -124,6 +124,40 @@ class Schedule: elif schedule_time < max_end_time: raise ValueError(f"Too short schedule time. Minimum is {max_end_time}.") + def __str__(self) -> str: + """Return a string representation of this Schedule.""" + res: List[Tuple[GraphID, int, int, int]] = [ + ( + op.graph_id, + self.start_time_of_operation(op.graph_id), + cast(int, self.backward_slack(op.graph_id)), + self.forward_slack(op.graph_id), + ) + for op in self._sfg.operations + ] + res.sort(key=lambda tup: tup[0]) + res_str = [ + ( + r[0], + r[1], + f"{r[2]}".rjust(8) if r[2] < sys.maxsize else "oo".rjust(8), + f"{r[3]}".rjust(8) if r[3] < sys.maxsize else "oo".rjust(8), + ) + for r in res + ] + + string_io = io.StringIO() + + header = ["Graph ID", "Start time", "Backward slack", "Forward slack"] + string_io.write("|".join(f"{col:^15}" for i, col in enumerate(header)) + "\n") + string_io.write("-" * (15 * len(header) + len(header) - 1) + "\n") + + for r in res_str: + row_str = "|".join(f"{str(item):^15}" for i, item in enumerate(r)) + string_io.write(row_str + "\n") + + return string_io.getvalue() + def start_time_of_operation(self, graph_id: GraphID) -> int: """ Return the start time of the operation with the specified by *graph_id*. -- GitLab