Skip to content
Snippets Groups Projects
sfg_generators.py 17.8 KiB
Newer Older
  • Learn to ignore specific revisions
  • Oscar Gustafsson's avatar
    Oscar Gustafsson committed
    """
    B-ASIC signal flow graph generators.
    
    This module contains a number of functions generating SFGs for specific functions.
    """
    
    Samuel Fagerlund's avatar
    Samuel Fagerlund committed
    
    
    from typing import TYPE_CHECKING, Dict, Optional, Sequence, Union
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
    
    import numpy as np
    
    from b_asic.core_operations import (
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        Addition,
    
        Butterfly,
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        ConstantMultiplication,
    
        DontCare,
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        Name,
    
        Reciprocal,
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        SymmetricTwoportAdaptor,
    )
    from b_asic.signal import Signal
    from b_asic.signal_flow_graph import SFG
    from b_asic.special_operations import Delay, Input, Output
    
    
    if TYPE_CHECKING:
        from b_asic.port import OutputPort
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
    
    def wdf_allpass(
        coefficients: Sequence[float],
        name: Optional[str] = None,
        latency: Optional[int] = None,
        latency_offsets: Optional[Dict[str, int]] = None,
        execution_time: Optional[int] = None,
    ) -> SFG:
        """
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        Generate a signal flow graph of a WDF allpass section based on symmetric two-port\
     adaptors.
    
        Simplifies the SFG in case an adaptor operation is 0.
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        Parameters
        ----------
        coefficients : 1D-array
    
            Coefficients to use for the allpass section.
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
    
        name : Name, optional
            The name of the SFG. If None, "WDF allpass section".
    
        latency : int, optional
            Latency of the symmetric two-port adaptors.
    
        latency_offsets : optional
            Latency offsets of the symmetric two-port adaptors.
    
        execution_time : int, optional
            Execution time of the symmetric two-port adaptors.
    
        Returns
        -------
            Signal flow graph
        """
    
        np_coefficients = np.atleast_1d(np.squeeze(np.asarray(coefficients)))
    
        order = len(np_coefficients)
        if not order:
            raise ValueError("Coefficients cannot be empty")
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        if np_coefficients.ndim != 1:
            raise TypeError("coefficients must be a 1D-array")
        if name is None:
            name = "WDF allpass section"
    
        input_op = Input()
        output = Output()
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        odd_order = order % 2
        if odd_order:
    
            if np_coefficients[0]:
                # First-order section
                adaptor0 = SymmetricTwoportAdaptor(
                    np_coefficients[0],
                    input_op,
                    latency=latency,
                    latency_offsets=latency_offsets,
                    execution_time=execution_time,
                )
                signal_out = Signal(adaptor0.output(0))
                delay = Delay(adaptor0.output(1))
                Signal(delay, adaptor0.input(1))
            else:
                signal_out = Delay(input_op)
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        else:
            signal_out = Signal(input_op)
    
        # Second-order sections
        sos_count = (order - 1) // 2 if odd_order else order // 2
        offset1, offset2 = (1, 2) if odd_order else (0, 1)
        for n in range(sos_count):
    
            if np_coefficients[2 * n + offset1]:
                adaptor1 = SymmetricTwoportAdaptor(
                    np_coefficients[2 * n + offset1],
                    signal_out,
                    latency=latency,
                    latency_offsets=latency_offsets,
                    execution_time=execution_time,
                )
                # Signal(prev_adaptor., adaptor1.input(0), name="Previous-stage to next")
                delay1 = Delay(adaptor1.output(1))
            else:
                delay1 = Delay(signal_out)
            if np_coefficients[2 * n + offset2]:
                delay2 = Delay()
                adaptor2 = SymmetricTwoportAdaptor(
                    np_coefficients[2 * n + offset2],
                    delay1,
                    delay2,
                    latency=latency,
                    latency_offsets=latency_offsets,
                    execution_time=execution_time,
                )
                Signal(adaptor2.output(0), adaptor1.input(1))
                Signal(adaptor2.output(1), delay2)
                signal_out = Signal(adaptor1.output(0))
            else:
                delay2 = Delay(delay1)
                if np_coefficients[2 * n + offset1]:
                    Signal(delay2, adaptor1.input(1))
                    signal_out = Signal(adaptor1.output(0))
                else:
                    signal_out = Signal(delay2)
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        return SFG([input_op], [output], name=Name(name))
    
    
    def direct_form_fir(
        coefficients: Sequence[complex],
        name: Optional[str] = None,
    
        mult_properties: Optional[Union[Dict[str, int], Dict[str, Dict[str, int]]]] = None,
        add_properties: Optional[Union[Dict[str, int], Dict[str, Dict[str, int]]]] = None,
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
    ) -> SFG:
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        r"""
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        Generate a signal flow graph of a direct form FIR filter.
    
        The *coefficients* parameter is a sequence of impulse response values::
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
    
            coefficients = [h0, h1, h2, ..., hN]
    
        Leading to the transfer function:
    
        .. math:: \sum_{i=0}^N h_iz^{-i}
    
        Parameters
        ----------
        coefficients : 1D-array
    
            Coefficients to use for the FIR filter section.
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        name : Name, optional
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            The name of the SFG. If None, "Direct-form FIR filter".
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        mult_properties : dictionary, optional
            Properties passed to :class:`~b_asic.core_operations.ConstantMultiplication`.
        add_properties : dictionary, optional
            Properties passed to :class:`~b_asic.core_operations.Addition`.
    
        Returns
        -------
    
        Signal flow graph
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        --------
        transposed_direct_form_fir
        """
    
        np_coefficients = np.atleast_1d(np.squeeze(np.asarray(coefficients)))
    
        taps = len(np_coefficients)
        if not taps:
            raise ValueError("Coefficients cannot be empty")
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        if np_coefficients.ndim != 1:
            raise TypeError("coefficients must be a 1D-array")
        if name is None:
            name = "Direct-form FIR filter"
        if mult_properties is None:
            mult_properties = {}
        if add_properties is None:
            add_properties = {}
    
        input_op = Input()
        output = Output()
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
    
        prev_delay = input_op
        prev_add = None
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        for i, coefficient in enumerate(np_coefficients):
            tmp_mul = ConstantMultiplication(coefficient, prev_delay, **mult_properties)
    
            prev_add = (
                tmp_mul
                if prev_add is None
                else Addition(tmp_mul, prev_add, **add_properties)
            )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            if i < taps - 1:
                prev_delay = Delay(prev_delay)
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
    
        return SFG([input_op], [output], name=Name(name))
    
    
    def transposed_direct_form_fir(
        coefficients: Sequence[complex],
        name: Optional[str] = None,
    
        mult_properties: Optional[Union[Dict[str, int], Dict[str, Dict[str, int]]]] = None,
        add_properties: Optional[Union[Dict[str, int], Dict[str, Dict[str, int]]]] = None,
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
    ) -> SFG:
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        r"""
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        Generate a signal flow graph of a transposed direct form FIR filter.
    
        The *coefficients* parameter is a sequence of impulse response values::
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
    
            coefficients = [h0, h1, h2, ..., hN]
    
        Leading to the transfer function:
    
        .. math:: \sum_{i=0}^N h_iz^{-i}
    
        Parameters
        ----------
        coefficients : 1D-array
    
            Coefficients to use for the FIR filter section.
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        name : Name, optional
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            The name of the SFG. If None, "Transposed direct-form FIR filter".
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        mult_properties : dictionary, optional
            Properties passed to :class:`~b_asic.core_operations.ConstantMultiplication`.
        add_properties : dictionary, optional
            Properties passed to :class:`~b_asic.core_operations.Addition`.
    
        Returns
        -------
    
        Signal flow graph
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        --------
        direct_form_fir
        """
    
        np_coefficients = np.atleast_1d(np.squeeze(np.asarray(coefficients)))
    
        taps = len(np_coefficients)
        if not taps:
            raise ValueError("Coefficients cannot be empty")
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        if np_coefficients.ndim != 1:
            raise TypeError("coefficients must be a 1D-array")
        if name is None:
            name = "Transposed direct-form FIR filter"
        if mult_properties is None:
            mult_properties = {}
        if add_properties is None:
            add_properties = {}
    
        input_op = Input()
        output = Output()
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
    
        prev_delay = None
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        for i, coefficient in enumerate(reversed(np_coefficients)):
            tmp_mul = ConstantMultiplication(coefficient, input_op, **mult_properties)
    
            tmp_add = (
                tmp_mul
                if prev_delay is None
                else Addition(tmp_mul, prev_delay, **add_properties)
            )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            if i < taps - 1:
                prev_delay = Delay(tmp_add)
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
    
        return SFG([input_op], [output], name=Name(name))
    
    
    
    def direct_form_1_iir(
        b: Sequence[complex],
        a: Sequence[complex],
        name: Optional[str] = None,
        mult_properties: Optional[Union[Dict[str, int], Dict[str, Dict[str, int]]]] = None,
        add_properties: Optional[Union[Dict[str, int], Dict[str, Dict[str, int]]]] = None,
    ) -> SFG:
        """Generates a direct-form IIR filter of type I with coefficients a and b."""
    
        if len(a) < 2 or len(b) < 2:
            raise ValueError(
                "Size of coefficient lists a and b needs to contain at least 2 element."
            )
    
        if len(a) != len(b):
    
            raise ValueError("Size of coefficient lists a and b are not the same.")
        if a[0] != 1:
            raise ValueError("The value of a[0] must be 1.")
    
        if name is None:
            name = "Direct-form I IIR filter"
        if mult_properties is None:
            mult_properties = {}
        if add_properties is None:
            add_properties = {}
    
        # construct the feed-forward part
        input_op = Input()
        muls = [ConstantMultiplication(b[0], input_op, **mult_properties)]
        delays = []
        prev_delay = input_op
        for i, coeff in enumerate(b[1:]):
            prev_delay = Delay(prev_delay)
            delays.append(prev_delay)
            if i < len(b) - 1:
                muls.append(ConstantMultiplication(coeff, prev_delay, **mult_properties))
    
        op_a = muls[-1]
        for i in range(len(muls) - 1):
            op_a = Addition(op_a, muls[-i - 2], **add_properties)
    
        # construct the feedback part
        tmp_add = Addition(op_a, None, **add_properties)
        muls = []
        output = Output()
        output <<= tmp_add
    
        delays = []
        prev_delay = tmp_add
        for i, coeff in enumerate(a[1:]):
            prev_delay = Delay(prev_delay)
            delays.append(prev_delay)
            if i < len(a) - 1:
                muls.append(ConstantMultiplication(-coeff, prev_delay, **mult_properties))
    
        op_a = muls[-1]
        for i in range(len(muls) - 1):
            op_a = Addition(op_a, muls[-i - 2], **add_properties)
    
        tmp_add.input(1).connect(op_a)
    
        return SFG([input_op], [output], name=Name(name))
    
    
    def direct_form_2_iir(
        b: Sequence[complex],
        a: Sequence[complex],
        name: Optional[str] = None,
        mult_properties: Optional[Union[Dict[str, int], Dict[str, Dict[str, int]]]] = None,
        add_properties: Optional[Union[Dict[str, int], Dict[str, Dict[str, int]]]] = None,
    ) -> SFG:
        """Generates a direct-form IIR filter of type II with coefficients a and b."""
    
        if len(a) < 2 or len(b) < 2:
            raise ValueError(
                "Size of coefficient lists a and b needs to contain at least 2 element."
            )
    
        if len(a) != len(b):
    
            raise ValueError("Size of coefficient lists a and b are not the same.")
        if a[0] != 1:
            raise ValueError("The value of a[0] must be 1.")
    
        if name is None:
            name = "Direct-form II IIR filter"
        if mult_properties is None:
            mult_properties = {}
        if add_properties is None:
            add_properties = {}
    
        # construct the repeated part of the SFG
        left_adds = []
        right_adds = []
        left_muls = []
        right_muls = []
        delays = [Delay()]
        op_a_left = None
        op_a_right = None
        for i in range(len(a) - 1):
            a_coeff = a[-i - 1]
            b_coeff = b[-i - 1]
            if len(left_muls) != 0:  # not first iteration
                new_delay = Delay()
                delays[-1] <<= new_delay
                delays.append(new_delay)
            left_muls.append(
                ConstantMultiplication(-a_coeff, delays[-1], **mult_properties)
            )
            right_muls.append(
                ConstantMultiplication(b_coeff, delays[-1], **mult_properties)
            )
            if len(left_muls) > 1:  # not first iteration
                left_adds.append(Addition(op_a_left, left_muls[-1], **add_properties))
                right_adds.append(Addition(op_a_right, right_muls[-1], **add_properties))
                op_a_left = left_adds[-1]
                op_a_right = right_adds[-1]
            else:
                op_a_left = left_muls[-1]
                op_a_right = right_muls[-1]
    
        # finalize the SFG
        input_op = Input()
        if left_adds:
            left_adds.append(Addition(input_op, left_adds[-1], **add_properties))
        else:
            left_adds.append(Addition(input_op, left_muls[-1], **add_properties))
        delays[-1] <<= left_adds[-1]
        mul = ConstantMultiplication(b[0], left_adds[-1], **mult_properties)
    
        if right_adds:
            add = Addition(mul, right_adds[-1], **add_properties)
        else:
            add = Addition(mul, right_muls[-1], **add_properties)
    
        output = Output()
        output <<= add
        return SFG([input_op], [output], name=Name(name))
    
    
    
    def radix_2_dif_fft(points: int) -> SFG:
        """Generates a radix-2 decimation-in-frequency FFT structure.
    
        Parameters
        ----------
        points : int
            Number of points for the FFT, needs to be a positive power of 2.
    
        Returns
        -------
        SFG
            Signal Flow Graph
        """
        if points < 0:
            raise ValueError("Points must be positive number.")
        if points & (points - 1) != 0:
            raise ValueError("Points must be a power of two.")
    
        inputs = []
        for i in range(points):
            inputs.append(Input(name=f"Input: {i}"))
    
        ports = inputs
        number_of_stages = int(np.log2(points))
    
        twiddles = _generate_twiddles(points, number_of_stages)
    
        for stage in range(number_of_stages):
            ports = _construct_dif_fft_stage(
                ports, number_of_stages, stage, twiddles[stage]
            )
    
        ports = _get_bit_reversed_ports(ports)
        outputs = []
        for i, port in enumerate(ports):
            outputs.append(Output(port, name=f"Output: {i}"))
    
        return SFG(inputs=inputs, outputs=outputs)
    
    
    
    def ldlt_matrix_inverse(N: int) -> SFG:
        """Generates an SFG for the LDLT matrix inverse algorithm.
    
        Parameters
        ----------
        N : int
            Dimension of the square input matrix.
    
        Returns
        -------
        SFG
            Signal Flow Graph
        """
        inputs = []
        A = [[None for _ in range(N)] for _ in range(N)]
        for i in range(N):
            for j in range(i, N):
                in_op = Input()
                A[i][j] = in_op
                inputs.append(in_op)
    
        D = [None for _ in range(N)]
        for i in range(N):
            D[i] = A[i][i]
    
        D_inv = [None for _ in range(N)]
    
        R = [[None for _ in range(N)] for _ in range(N)]
        M = [[None for _ in range(N)] for _ in range(N)]
    
        # R*di*R^T factorization
        for i in range(N):
            for k in range(i):
                D[i] = MADS(False, False, D[i], M[k][i], R[k][i])
    
            D_inv[i] = Reciprocal(D[i])
    
            for j in range(i + 1, N):
                R[i][j] = A[i][j]
    
                for k in range(i):
                    R[i][j] = MADS(False, False, R[i][j], M[k][i], R[k][j])
    
                # if is_complex:
                #     M[i][j] = ComplexConjugate(R[i][j])
                # else:
                M[i][j] = R[i][j]
    
                R[i][j] = MADS(True, True, DontCare(), R[i][j], D_inv[i])
    
        # back substitution
        A_inv = [[None for _ in range(N)] for _ in range(N)]
        for i in reversed(range(N)):
            A_inv[i][i] = D_inv[i]
            for j in reversed(range(i + 1)):
                for k in reversed(range(j + 1, N)):
                    if k == N - 1 and i != j:
                        A_inv[j][i] = MADS(False, True, DontCare(), R[j][k], A_inv[i][k])
                    else:
                        if A_inv[i][k]:
                            A_inv[j][i] = MADS(
                                False, False, A_inv[j][i], R[j][k], A_inv[i][k]
                            )
                        else:
                            A_inv[j][i] = MADS(
                                False, False, A_inv[j][i], R[j][k], A_inv[k][i]
                            )
    
        outputs = []
        for i in range(N):
            for j in range(i, N):
                outputs.append(Output(A_inv[i][j]))
    
        return SFG(inputs, outputs)
    
    
    
    def _construct_dif_fft_stage(
        ports_from_previous_stage: list["OutputPort"],
        number_of_stages: int,
        stage: int,
        twiddles: list[np.complex128],
    ):
        ports = ports_from_previous_stage.copy()
        number_of_butterflies = len(ports) // 2
        number_of_groups = 2**stage
        group_size = number_of_butterflies // number_of_groups
    
        for group_index in range(number_of_groups):
            for bf_index in range(group_size):
                input1_index = group_index * 2 * group_size + bf_index
                input2_index = input1_index + group_size
    
                input1 = ports[input1_index]
                input2 = ports[input2_index]
    
                butterfly = Butterfly(input1, input2)
                output1, output2 = butterfly.outputs
    
                twiddle_factor = twiddles[bf_index]
                if twiddle_factor != 1:
    
                    twiddle_mul = ConstantMultiplication(twiddles[bf_index], output2)
    
                    output2 = twiddle_mul.output(0)
    
                ports[input1_index] = output1
                ports[input2_index] = output2
    
        return ports
    
    
    def _get_bit_reversed_number(number: int, number_of_bits: int) -> int:
        reversed_number = 0
        for i in range(number_of_bits):
            # mask out the current bit
            shift_num = number
            current_bit = (shift_num >> i) & 1
            # compute the position of the current bit in the reversed string
            reversed_pos = number_of_bits - 1 - i
            # place the current bit in that position
            reversed_number |= current_bit << reversed_pos
        return reversed_number
    
    
    def _get_bit_reversed_ports(ports: list["OutputPort"]) -> list["OutputPort"]:
        num_of_ports = len(ports)
        bits = int(np.log2(num_of_ports))
        return [ports[_get_bit_reversed_number(i, bits)] for i in range(num_of_ports)]
    
    
    def _generate_twiddles(points: int, number_of_stages: int) -> list[np.complex128]:
        twiddles = []
        for stage in range(1, number_of_stages + 1):
            stage_twiddles = []
            for k in range(points // 2 ** (stage)):
                a = 2 ** (stage - 1)
                twiddle = np.exp(-1j * 2 * np.pi * a * k / points)
                stage_twiddles.append(twiddle)
            twiddles.append(stage_twiddles)
        return twiddles