Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
wdf_operations.py 11.18 KiB
"""
B-ASIC Core Operations Module.

Contains wave digital filter adaptors.
"""

from typing import Dict, Iterable, Optional, Tuple

from b_asic.graph_component import Name, TypeName
from b_asic.operation import AbstractOperation
from b_asic.port import SignalSourceProvider
from b_asic.types import Num


class SymmetricTwoportAdaptor(AbstractOperation):
    r"""
    Wave digital filter symmetric twoport-adaptor operation.

    .. math::
        \begin{eqnarray}
        y_0 & = & x_1 + \text{value}\times\left(x_1 - x_0\right)\\
        y_1 & = & x_0 + \text{value}\times\left(x_1 - x_0\right)
        \end{eqnarray}
    """

    is_linear = True
    is_swappable = True

    def __init__(
        self,
        value: Num = 0,
        src0: Optional[SignalSourceProvider] = None,
        src1: Optional[SignalSourceProvider] = None,
        name: Name = Name(""),
        latency: Optional[int] = None,
        latency_offsets: Optional[Dict[str, int]] = None,
        execution_time: Optional[int] = None,
    ):
        """Construct a SymmetricTwoportAdaptor operation."""
        super().__init__(
            input_count=2,
            output_count=2,
            name=Name(name),
            input_sources=[src0, src1],
            latency=latency,
            latency_offsets=latency_offsets,
            execution_time=execution_time,
        )
        self.value = value

    @classmethod
    def type_name(cls) -> TypeName:
        return TypeName("sym2p")

    def evaluate(self, a, b):
        tmp = self.value * (b - a)
        return b + tmp, a + tmp

    @property
    def value(self) -> Num:
        """Get the constant value of this operation."""
        return self.param("value")

    @value.setter
    def value(self, value: Num) -> None:
        """Set the constant value of this operation."""
        if -1 <= value <= 1:
            self.set_param("value", value)
        else:
            raise ValueError('value must be between -1 and 1 (inclusive)')

    def swap_io(self) -> None:
        # Swap inputs and outputs and change sign of coefficient
        self._input_ports.reverse()
        for i, p in enumerate(self._input_ports):
            p._index = i
        self._output_ports.reverse()
        for i, p in enumerate(self._output_ports):
            p._index = i
        self.set_param("value", -self.value)


class SeriesTwoportAdaptor(AbstractOperation):
    r"""
    Wave digital filter series twoport-adaptor operation.

    .. math::
        \begin{eqnarray}
        y_0 & = & x_0 - \text{value}\times\left(x_0 + x_1\right)\\
        y_1 & = & x_1 - (2-\text{value})\times\left(x_0 + x_1\right)\\
            & = & -2x_0 - x_1  + \text{value}\times\left(x_0 + x_1\right)
        \end{eqnarray}

    Port 1 is the dependent port.
    """

    is_linear = True
    is_swappable = True

    def __init__(
        self,
        value: Num = 0,
        src0: Optional[SignalSourceProvider] = None,
        src1: Optional[SignalSourceProvider] = None,
        name: Name = Name(""),
        latency: Optional[int] = None,
        latency_offsets: Optional[Dict[str, int]] = None,
        execution_time: Optional[int] = None,
    ):
        """Construct a SeriesTwoportAdaptor operation."""
        super().__init__(
            input_count=2,
            output_count=2,
            name=Name(name),
            input_sources=[src0, src1],
            latency=latency,
            latency_offsets=latency_offsets,
            execution_time=execution_time,
        )
        self.value = value

    @classmethod
    def type_name(cls) -> TypeName:
        return TypeName("ser2p")

    def evaluate(self, a, b):
        s = a + b
        val = self.value
        t = val * a
        y0 = a - t
        y1 = -(s + y0)
        return y0, y1

    @property
    def value(self) -> Num:
        """Get the constant value of this operation."""
        return self.param("value")

    @value.setter
    def value(self, value: Num) -> None:
        """Set the constant value of this operation."""
        if 0 <= value <= 2:
            self.set_param("value", value)
        else:
            raise ValueError('value must be between 0 and 2 (inclusive)')

    def swap_io(self) -> None:
        # Swap inputs and outputs and, hence, which port is dependent
        self._input_ports.reverse()
        for i, p in enumerate(self._input_ports):
            p._index = i
        self._output_ports.reverse()
        for i, p in enumerate(self._output_ports):
            p._index = i
        self.set_param("value", 2 - self.value)


class ParallelTwoportAdaptor(AbstractOperation):
    r"""
    Wave digital filter parallel twoport-adaptor operation.

    .. math::
        \begin{eqnarray}
        y_0 & = & - x_0 + \text{value}\times x_0 + (2 - \text{value}) \times x_1\\
            & = & 2x_1 - x_0 + \text{value}\times \left(x_0 - x_1\right)
        y_1 & = & - x_1 + \text{value}\times x_0 + (2 - \text{value}) \times x_1\\
            & = & x_1 + \text{value}\times\left(x_0 - x_1\right)
        \end{eqnarray}

    Port 1 is the dependent port.
    """

    is_linear = True
    is_swappable = True

    def __init__(
        self,
        value: Num = 0,
        src0: Optional[SignalSourceProvider] = None,
        src1: Optional[SignalSourceProvider] = None,
        name: Name = Name(""),
        latency: Optional[int] = None,
        latency_offsets: Optional[Dict[str, int]] = None,
        execution_time: Optional[int] = None,
    ):
        """Construct a ParallelTwoportAdaptor operation."""
        super().__init__(
            input_count=2,
            output_count=2,
            name=Name(name),
            input_sources=[src0, src1],
            latency=latency,
            latency_offsets=latency_offsets,
            execution_time=execution_time,
        )
        self.value = value

    @classmethod
    def type_name(cls) -> TypeName:
        return TypeName("par2p")

    def evaluate(self, a, b):
        s = b - a
        val = self.value
        t = val * s
        y1 = b - t
        y0 = y1 + s
        return y0, y1

    @property
    def value(self) -> Num:
        """Get the constant value of this operation."""
        return self.param("value")

    @value.setter
    def value(self, value: Num) -> None:
        """Set the constant value of this operation."""
        if 0 <= value <= 2:
            self.set_param("value", value)
        else:
            raise ValueError('value must be between 0 and 2 (inclusive)')

    def swap_io(self) -> None:
        # Swap inputs and outputs and, hence, which port is dependent
        self._input_ports.reverse()
        for i, p in enumerate(self._input_ports):
            p._index = i
        self._output_ports.reverse()
        for i, p in enumerate(self._output_ports):
            p._index = i
        self.set_param("value", 2 - self.value)


class SeriesThreeportAdaptor(AbstractOperation):
    r"""
    Wave digital filter series threeport-adaptor operation.

    .. math::
        \begin{eqnarray}
        y_0 & = & x_0 - \text{value}_0\times\left(x_0 + x_1 + x_2\right)\\
        y_1 & = & x_1 - \text{value}_1\times\left(x_0 + x_1 + x_2\right)\\
        y_2 & = & x_2 - \left(2 - \text{value}_0 - \text{value}_1\right)\times\left(x_0
                + x_1 + x_2\right)
        \end{eqnarray}

    Port 2 is the dependent port.

    """

    is_linear = True
    is_swappable = True

    def __init__(
        self,
        value: Tuple[Num, Num] = (0, 0),
        src0: Optional[SignalSourceProvider] = None,
        src1: Optional[SignalSourceProvider] = None,
        src2: Optional[SignalSourceProvider] = None,
        name: Name = Name(""),
        latency: Optional[int] = None,
        latency_offsets: Optional[Dict[str, int]] = None,
        execution_time: Optional[int] = None,
    ):
        """Construct a SeriesThreeportAdaptor operation."""
        super().__init__(
            input_count=3,
            output_count=3,
            name=Name(name),
            input_sources=[src0, src1, src2],
            latency=latency,
            latency_offsets=latency_offsets,
            execution_time=execution_time,
        )
        self.value = value

    @classmethod
    def type_name(cls) -> TypeName:
        return TypeName("ser3p")

    def evaluate(self, a, b, c):
        s = a + b + c
        val0, val1 = self.value
        y0 = a - val0 * s
        y1 = b - val1 * s
        y2 = -(y0 + y1 + s)
        return y0, y1, y2

    @property
    def value(self) -> Tuple[Num, Num]:
        """Get the constant value of this operation."""
        return self.param("value")

    @value.setter
    def value(self, value: Tuple[Num, Num]) -> None:
        """Set the constant value of this operation."""
        if not all(0 <= v <= 2 for v in value):
            raise ValueError('each value must be between 0 and 2 (inclusive)')
        if 0 <= sum(value) <= 2:
            self.set_param("value", value)
        else:
            raise ValueError('sum of values must be between 0 and 2 (inclusive)')


class ReflectionFreeSeriesThreeportAdaptor(AbstractOperation):
    r"""
    Wave digital filter reflection free series threeport-adaptor operation.

    .. math::
        \begin{eqnarray}
        y_0 & = & x_0 - \text{value}\times\left(x_0 + x_1 + x_2\right)\\
        y_1 & = & -x_0 - x_2\\
        y_2 & = & x_2 - \left(1 - \text{value}\right)\times\left(x_0
                + x_1 + x_2\right) \\
            & = & -x_0 - x_1 + \text{value}\times\left(x_0
                    + x_1 + x_2\right)
        \end{eqnarray}

    Port 1 is the reflection-free port and port 2 is the dependent port.
    """

    is_linear = True
    is_swappable = True

    def __init__(
        self,
        value: Num = 0,
        src0: Optional[SignalSourceProvider] = None,
        src1: Optional[SignalSourceProvider] = None,
        src2: Optional[SignalSourceProvider] = None,
        name: Name = Name(""),
        latency: Optional[int] = None,
        latency_offsets: Optional[Dict[str, int]] = None,
        execution_time: Optional[int] = None,
    ):
        """Construct a ReflectionFreeSeriesThreeportAdaptor operation."""
        super().__init__(
            input_count=3,
            output_count=3,
            name=Name(name),
            input_sources=[src0, src1, src2],
            latency=latency,
            latency_offsets=latency_offsets,
            execution_time=execution_time,
        )
        self.value = value

    @classmethod
    def type_name(cls) -> TypeName:
        return TypeName("rfs3p")

    def evaluate(self, a, b, c):
        s = a + c
        y1 = -s
        y0 = a - self.value * (b + s)
        y2 = -(y0 + b)
        return y0, y1, y2

    @property
    def value(self) -> Num:
        """Get the constant value of this operation."""
        return self.param("value")

    @value.setter
    def value(self, value: Num) -> None:
        """Set the constant value of this operation."""
        if 0 <= value <= 1:
            self.set_param("value", value)
        else:
            raise ValueError('value must be between 0 and 1 (inclusive)')

    def inputs_required_for_output(self, output_index: int) -> Iterable[int]:
        """
        Get the input indices of all inputs in this operation whose values are
        required in order to evaluate the output at the given output index.
        """
        return {0: (0, 1, 2), 1: (0, 2), 2: (0, 1, 2)}