Skip to content
Snippets Groups Projects
wdf_operations.py 11.2 KiB
Newer Older
  • Learn to ignore specific revisions
  • Oscar Gustafsson's avatar
    Oscar Gustafsson committed
    """
    B-ASIC Core Operations Module.
    
    Contains wave digital filter adaptors.
    """
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
    from typing import Dict, Iterable, Optional, Tuple
    
    from b_asic.graph_component import Name, TypeName
    from b_asic.operation import AbstractOperation
    from b_asic.port import SignalSourceProvider
    from b_asic.types import Num
    
    
    class SymmetricTwoportAdaptor(AbstractOperation):
        r"""
        Wave digital filter symmetric twoport-adaptor operation.
    
        .. math::
            \begin{eqnarray}
            y_0 & = & x_1 + \text{value}\times\left(x_1 - x_0\right)\\
            y_1 & = & x_0 + \text{value}\times\left(x_1 - x_0\right)
            \end{eqnarray}
        """
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        is_linear = True
        is_swappable = True
    
        def __init__(
            self,
            value: Num = 0,
            src0: Optional[SignalSourceProvider] = None,
            src1: Optional[SignalSourceProvider] = None,
            name: Name = Name(""),
            latency: Optional[int] = None,
            latency_offsets: Optional[Dict[str, int]] = None,
            execution_time: Optional[int] = None,
        ):
            """Construct a SymmetricTwoportAdaptor operation."""
            super().__init__(
                input_count=2,
                output_count=2,
                name=Name(name),
                input_sources=[src0, src1],
                latency=latency,
                latency_offsets=latency_offsets,
                execution_time=execution_time,
            )
            self.value = value
    
        @classmethod
        def type_name(cls) -> TypeName:
            return TypeName("sym2p")
    
        def evaluate(self, a, b):
            tmp = self.value * (b - a)
            return b + tmp, a + tmp
    
        @property
        def value(self) -> Num:
            """Get the constant value of this operation."""
            return self.param("value")
    
        @value.setter
        def value(self, value: Num) -> None:
            """Set the constant value of this operation."""
            if -1 <= value <= 1:
                self.set_param("value", value)
            else:
                raise ValueError('value must be between -1 and 1 (inclusive)')
    
        def swap_io(self) -> None:
            # Swap inputs and outputs and change sign of coefficient
            self._input_ports.reverse()
            for i, p in enumerate(self._input_ports):
                p._index = i
            self._output_ports.reverse()
            for i, p in enumerate(self._output_ports):
                p._index = i
            self.set_param("value", -self.value)
    
    
    class SeriesTwoportAdaptor(AbstractOperation):
        r"""
        Wave digital filter series twoport-adaptor operation.
    
        .. math::
            \begin{eqnarray}
            y_0 & = & x_0 - \text{value}\times\left(x_0 + x_1\right)\\
            y_1 & = & x_1 - (2-\text{value})\times\left(x_0 + x_1\right)\\
                & = & -2x_0 - x_1  + \text{value}\times\left(x_0 + x_1\right)
            \end{eqnarray}
    
        Port 1 is the dependent port.
        """
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        is_linear = True
        is_swappable = True
    
        def __init__(
            self,
            value: Num = 0,
            src0: Optional[SignalSourceProvider] = None,
            src1: Optional[SignalSourceProvider] = None,
            name: Name = Name(""),
            latency: Optional[int] = None,
            latency_offsets: Optional[Dict[str, int]] = None,
            execution_time: Optional[int] = None,
        ):
            """Construct a SeriesTwoportAdaptor operation."""
            super().__init__(
                input_count=2,
                output_count=2,
                name=Name(name),
                input_sources=[src0, src1],
                latency=latency,
                latency_offsets=latency_offsets,
                execution_time=execution_time,
            )
            self.value = value
    
        @classmethod
        def type_name(cls) -> TypeName:
            return TypeName("ser2p")
    
        def evaluate(self, a, b):
            s = a + b
            val = self.value
            t = val * a
            y0 = a - t
            y1 = -(s + y0)
            return y0, y1
    
        @property
        def value(self) -> Num:
            """Get the constant value of this operation."""
            return self.param("value")
    
        @value.setter
        def value(self, value: Num) -> None:
            """Set the constant value of this operation."""
            if 0 <= value <= 2:
                self.set_param("value", value)
            else:
                raise ValueError('value must be between 0 and 2 (inclusive)')
    
        def swap_io(self) -> None:
            # Swap inputs and outputs and, hence, which port is dependent
            self._input_ports.reverse()
            for i, p in enumerate(self._input_ports):
                p._index = i
            self._output_ports.reverse()
            for i, p in enumerate(self._output_ports):
                p._index = i
            self.set_param("value", 2 - self.value)
    
    
    class ParallelTwoportAdaptor(AbstractOperation):
        r"""
        Wave digital filter parallel twoport-adaptor operation.
    
        .. math::
            \begin{eqnarray}
            y_0 & = & - x_0 + \text{value}\times x_0 + (2 - \text{value}) \times x_1\\
                & = & 2x_1 - x_0 + \text{value}\times \left(x_0 - x_1\right)
            y_1 & = & - x_1 + \text{value}\times x_0 + (2 - \text{value}) \times x_1\\
                & = & x_1 + \text{value}\times\left(x_0 - x_1\right)
            \end{eqnarray}
    
        Port 1 is the dependent port.
        """
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        is_linear = True
        is_swappable = True
    
        def __init__(
            self,
            value: Num = 0,
            src0: Optional[SignalSourceProvider] = None,
            src1: Optional[SignalSourceProvider] = None,
            name: Name = Name(""),
            latency: Optional[int] = None,
            latency_offsets: Optional[Dict[str, int]] = None,
            execution_time: Optional[int] = None,
        ):
            """Construct a ParallelTwoportAdaptor operation."""
            super().__init__(
                input_count=2,
                output_count=2,
                name=Name(name),
                input_sources=[src0, src1],
                latency=latency,
                latency_offsets=latency_offsets,
                execution_time=execution_time,
            )
            self.value = value
    
        @classmethod
        def type_name(cls) -> TypeName:
            return TypeName("par2p")
    
        def evaluate(self, a, b):
            s = b - a
            val = self.value
            t = val * s
            y1 = b - t
            y0 = y1 + s
            return y0, y1
    
        @property
        def value(self) -> Num:
            """Get the constant value of this operation."""
            return self.param("value")
    
        @value.setter
        def value(self, value: Num) -> None:
            """Set the constant value of this operation."""
            if 0 <= value <= 2:
                self.set_param("value", value)
            else:
                raise ValueError('value must be between 0 and 2 (inclusive)')
    
        def swap_io(self) -> None:
            # Swap inputs and outputs and, hence, which port is dependent
            self._input_ports.reverse()
            for i, p in enumerate(self._input_ports):
                p._index = i
            self._output_ports.reverse()
            for i, p in enumerate(self._output_ports):
                p._index = i
            self.set_param("value", 2 - self.value)
    
    
    class SeriesThreeportAdaptor(AbstractOperation):
        r"""
        Wave digital filter series threeport-adaptor operation.
    
        .. math::
            \begin{eqnarray}
            y_0 & = & x_0 - \text{value}_0\times\left(x_0 + x_1 + x_2\right)\\
            y_1 & = & x_1 - \text{value}_1\times\left(x_0 + x_1 + x_2\right)\\
            y_2 & = & x_2 - \left(2 - \text{value}_0 - \text{value}_1\right)\times\left(x_0
                    + x_1 + x_2\right)
            \end{eqnarray}
    
        Port 2 is the dependent port.
    
        """
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        is_linear = True
        is_swappable = True
    
        def __init__(
            self,
            value: Tuple[Num, Num] = (0, 0),
            src0: Optional[SignalSourceProvider] = None,
            src1: Optional[SignalSourceProvider] = None,
            src2: Optional[SignalSourceProvider] = None,
            name: Name = Name(""),
            latency: Optional[int] = None,
            latency_offsets: Optional[Dict[str, int]] = None,
            execution_time: Optional[int] = None,
        ):
            """Construct a SeriesThreeportAdaptor operation."""
            super().__init__(
                input_count=3,
                output_count=3,
                name=Name(name),
                input_sources=[src0, src1, src2],
                latency=latency,
                latency_offsets=latency_offsets,
                execution_time=execution_time,
            )
            self.value = value
    
        @classmethod
        def type_name(cls) -> TypeName:
            return TypeName("ser3p")
    
        def evaluate(self, a, b, c):
            s = a + b + c
            val0, val1 = self.value
            y0 = a - val0 * s
            y1 = b - val1 * s
            y2 = -(y0 + y1 + s)
            return y0, y1, y2
    
        @property
        def value(self) -> Tuple[Num, Num]:
            """Get the constant value of this operation."""
            return self.param("value")
    
        @value.setter
        def value(self, value: Tuple[Num, Num]) -> None:
            """Set the constant value of this operation."""
            if not all(0 <= v <= 2 for v in value):
                raise ValueError('each value must be between 0 and 2 (inclusive)')
            if 0 <= sum(value) <= 2:
                self.set_param("value", value)
            else:
                raise ValueError('sum of values must be between 0 and 2 (inclusive)')
    
    
    class ReflectionFreeSeriesThreeportAdaptor(AbstractOperation):
        r"""
        Wave digital filter reflection free series threeport-adaptor operation.
    
        .. math::
            \begin{eqnarray}
            y_0 & = & x_0 - \text{value}\times\left(x_0 + x_1 + x_2\right)\\
            y_1 & = & -x_0 - x_2\\
            y_2 & = & x_2 - \left(1 - \text{value}\right)\times\left(x_0
                    + x_1 + x_2\right) \\
                & = & -x_0 - x_1 + \text{value}\times\left(x_0
                        + x_1 + x_2\right)
            \end{eqnarray}
    
        Port 1 is the reflection-free port and port 2 is the dependent port.
        """
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        is_linear = True
        is_swappable = True
    
        def __init__(
            self,
            value: Num = 0,
            src0: Optional[SignalSourceProvider] = None,
            src1: Optional[SignalSourceProvider] = None,
            src2: Optional[SignalSourceProvider] = None,
            name: Name = Name(""),
            latency: Optional[int] = None,
            latency_offsets: Optional[Dict[str, int]] = None,
            execution_time: Optional[int] = None,
        ):
            """Construct a ReflectionFreeSeriesThreeportAdaptor operation."""
            super().__init__(
                input_count=3,
                output_count=3,
                name=Name(name),
                input_sources=[src0, src1, src2],
                latency=latency,
                latency_offsets=latency_offsets,
                execution_time=execution_time,
            )
            self.value = value
    
        @classmethod
        def type_name(cls) -> TypeName:
            return TypeName("rfs3p")
    
        def evaluate(self, a, b, c):
            s = a + c
            y1 = -s
            y0 = a - self.value * (b + s)
            y2 = -(y0 + b)
            return y0, y1, y2
    
        @property
        def value(self) -> Num:
            """Get the constant value of this operation."""
            return self.param("value")
    
        @value.setter
        def value(self, value: Num) -> None:
            """Set the constant value of this operation."""
            if 0 <= value <= 1:
                self.set_param("value", value)
            else:
                raise ValueError('value must be between 0 and 1 (inclusive)')
    
        def inputs_required_for_output(self, output_index: int) -> Iterable[int]:
            """
            Get the input indices of all inputs in this operation whose values are
            required in order to evaluate the output at the given output index.
            """
            return {0: (0, 1, 2), 1: (0, 2), 2: (0, 1, 2)}