diff --git a/b_asic/core_operations.py b/b_asic/core_operations.py index df0868ee53b0b2d8b0d74706e8f3b31cad813892..70d43370aa89f53163c365ba6e93c7f4990cd8cb 100644 --- a/b_asic/core_operations.py +++ b/b_asic/core_operations.py @@ -1099,6 +1099,76 @@ class MAD(AbstractOperation): p._index = i +class MADS(AbstractOperation): + __slots__ = ( + "_is_add", + "_src0", + "_src1", + "_src2", + "_name", + "_latency", + "_latency_offsets", + "_execution_time", + ) + _is_add: Optional[bool] + _src0: Optional[SignalSourceProvider] + _src1: Optional[SignalSourceProvider] + _src2: Optional[SignalSourceProvider] + _name: Name + _latency: Optional[int] + _latency_offsets: Optional[Dict[str, int]] + _execution_time: Optional[int] + + is_swappable = True + + def __init__( + self, + is_add: Optional[bool] = False, + src0: Optional[SignalSourceProvider] = None, + src1: Optional[SignalSourceProvider] = None, + src2: Optional[SignalSourceProvider] = None, + name: Name = Name(""), + latency: Optional[int] = None, + latency_offsets: Optional[Dict[str, int]] = None, + execution_time: Optional[int] = None, + ): + """Construct a MADS operation.""" + super().__init__( + input_count=3, + output_count=1, + name=Name(name), + input_sources=[src0, src1, src2], + latency=latency, + latency_offsets=latency_offsets, + execution_time=execution_time, + ) + self._is_add = is_add + # self.set_param("is_add", is_add) + + @classmethod + def type_name(cls) -> TypeName: + return TypeName("mads") + + def evaluate(self, a, b, c): + return a + b * c if self._is_add else a - b * c + + @property + def is_linear(self) -> bool: + return ( + self.input(0).connected_source.operation.is_constant + or self.input(1).connected_source.operation.is_constant + ) + + def swap_io(self) -> None: + self._input_ports = [ + self._input_ports[1], + self._input_ports[0], + self._input_ports[2], + ] + for i, p in enumerate(self._input_ports): + p._index = i + + class SymmetricTwoportAdaptor(AbstractOperation): r""" Wave digital filter symmetric twoport-adaptor operation. diff --git a/b_asic/sfg_generators.py b/b_asic/sfg_generators.py index 737501f1650c0b4756f218fb45675a64b4ea0bcb..0ceeb4f62aeba0c89b0021350dc0e8d469374f8b 100644 --- a/b_asic/sfg_generators.py +++ b/b_asic/sfg_generators.py @@ -9,10 +9,14 @@ from typing import TYPE_CHECKING, Dict, Optional, Sequence, Union import numpy as np from b_asic.core_operations import ( + MADS, Addition, Butterfly, + ComplexConjugate, + Constant, ConstantMultiplication, Name, + Reciprocal, SymmetricTwoportAdaptor, ) from b_asic.signal import Signal @@ -432,6 +436,69 @@ def radix_2_dif_fft(points: int) -> SFG: return SFG(inputs=inputs, outputs=outputs) +def ldlt_matrix_inverse(N: int, is_complex: bool) -> SFG: + inputs = [] + A = [[None for _ in range(N)] for _ in range(N)] + for i in range(N): + for j in range(i, N): + in_op = Input() + A[i][j] = in_op + inputs.append(in_op) + + D = [None for _ in range(N)] + for i in range(N): + D[i] = A[i][i] + + D_inv = [None for _ in range(N)] + + R = [[None for _ in range(N)] for _ in range(N)] + M = [[None for _ in range(N)] for _ in range(N)] + + # R*di*R^T factorization + for i in range(N): + for k in range(1, i): + D[i] = MADS(False, D[i], M[k][i], R[k][i]) + + D_inv[i] = Reciprocal(D[i]) + + for j in range(i, N): + R[i][j] = A[i][j] + + for k in range(1, i): + R[i][j] = MADS(False, R[i][j], M[k][i], R[k][j]) + + if is_complex: + M[i][j] = ComplexConjugate(R[i][j]) + else: + M[i][j] = R[i][j] + + R[i][j] = MADS(True, Constant(0, name="0"), R[i][j], D_inv[i]) + + # back substitution + A_inv = [[None for _ in range(N)] for _ in range(N)] + for i in reversed(range(N)): + A_inv[i][i] = D_inv[i] + for j in reversed(range(i)): + for k in reversed(range(j, N)): + if k == N - 1 and i != j: + # my custom + if A_inv[k][i]: + A_inv[j][i] = MADS( + False, Constant(0, name="0"), R[j][k], A_inv[k][i] + ) + else: + A_inv[j][i] = Constant(0, name="0") + else: + A_inv[j][i] = MADS(False, A_inv[j][i], R[j][k], A_inv[k][i]) + + outputs = [] + for i in range(N): + for j in range(i, N): + outputs.append(Output(A_inv[i][j])) + + return SFG(inputs, outputs) + + def _construct_dif_fft_stage( ports_from_previous_stage: list["OutputPort"], number_of_stages: int, @@ -465,6 +532,26 @@ def _construct_dif_fft_stage( return ports +def _extract_diagonal(elems): + n = 0 + k = 0 + while k <= len(elems): + k += n + 1 + n += 1 + n -= 1 + k -= n + 1 + if k != len(elems): + return None + + diagonal = np.zeros(n) + index = 0 + for i in range(n): + diagonal[n] = elems[index] + index += i + 2 + + return diagonal + + def _get_bit_reversed_number(number: int, number_of_bits: int) -> int: reversed_number = 0 for i in range(number_of_bits):