From 1e53a893724e20c707fd13494fb2bd4ec15b5f9b Mon Sep 17 00:00:00 2001 From: Oscar Gustafsson <oscar.gustafsson@gmail.com> Date: Fri, 25 Aug 2023 12:17:17 +0200 Subject: [PATCH] Add more WDF adaptors --- b_asic/__init__.py | 1 + b_asic/core_operations.py | 67 ---------- b_asic/sfg_generators.py | 8 +- b_asic/wdf_operations.py | 207 +++++++++++++++++++++++++++++ docs_sphinx/api/wdf_operations.rst | 12 ++ examples/lwdfallpass.py | 2 +- examples/thirdorderblwdf.py | 3 +- test/test_core_operations.py | 34 ----- test/test_sfg.py | 38 +++--- test/test_sfg_generators.py | 7 +- test/test_wdf_operations.py | 108 +++++++++++++++ 11 files changed, 354 insertions(+), 133 deletions(-) create mode 100644 b_asic/wdf_operations.py create mode 100644 docs_sphinx/api/wdf_operations.rst create mode 100644 test/test_wdf_operations.py diff --git a/b_asic/__init__.py b/b_asic/__init__.py index f1fded40..fbf1fe4f 100644 --- a/b_asic/__init__.py +++ b/b_asic/__init__.py @@ -12,3 +12,4 @@ from b_asic.signal import * from b_asic.signal_flow_graph import * from b_asic.simulation import * from b_asic.special_operations import * +from b_asic.wdf_operations import * diff --git a/b_asic/core_operations.py b/b_asic/core_operations.py index 17e762e7..deaf0d22 100644 --- a/b_asic/core_operations.py +++ b/b_asic/core_operations.py @@ -913,73 +913,6 @@ class MAD(AbstractOperation): p._index = i -class SymmetricTwoportAdaptor(AbstractOperation): - r""" - Wave digital filter symmetric twoport-adaptor operation. - - .. math:: - \begin{eqnarray} - y_0 & = & x_1 + \text{value}\times\left(x_1 - x_0\right)\\ - y_1 & = & x_0 + \text{value}\times\left(x_1 - x_0\right) - \end{eqnarray} - """ - is_linear = True - is_swappable = True - - def __init__( - self, - value: Num = 0, - src0: Optional[SignalSourceProvider] = None, - src1: Optional[SignalSourceProvider] = None, - name: Name = Name(""), - latency: Optional[int] = None, - latency_offsets: Optional[Dict[str, int]] = None, - execution_time: Optional[int] = None, - ): - """Construct a SymmetricTwoportAdaptor operation.""" - super().__init__( - input_count=2, - output_count=2, - name=Name(name), - input_sources=[src0, src1], - latency=latency, - latency_offsets=latency_offsets, - execution_time=execution_time, - ) - self.value = value - - @classmethod - def type_name(cls) -> TypeName: - return TypeName("sym2p") - - def evaluate(self, a, b): - tmp = self.value * (b - a) - return b + tmp, a + tmp - - @property - def value(self) -> Num: - """Get the constant value of this operation.""" - return self.param("value") - - @value.setter - def value(self, value: Num) -> None: - """Set the constant value of this operation.""" - if -1 <= value <= 1: - self.set_param("value", value) - else: - raise ValueError('value must be between -1 and 1 (inclusive)') - - def swap_io(self) -> None: - # Swap inputs and outputs and change sign of coefficient - self._input_ports.reverse() - for i, p in enumerate(self._input_ports): - p._index = i - self._output_ports.reverse() - for i, p in enumerate(self._output_ports): - p._index = i - self.set_param("value", -self.value) - - class Reciprocal(AbstractOperation): r""" Reciprocal operation. diff --git a/b_asic/sfg_generators.py b/b_asic/sfg_generators.py index 8682db1b..1f1bf2c7 100644 --- a/b_asic/sfg_generators.py +++ b/b_asic/sfg_generators.py @@ -7,15 +7,11 @@ from typing import Dict, Optional, Sequence, Union import numpy as np -from b_asic.core_operations import ( - Addition, - ConstantMultiplication, - Name, - SymmetricTwoportAdaptor, -) +from b_asic.core_operations import Addition, ConstantMultiplication, Name from b_asic.signal import Signal from b_asic.signal_flow_graph import SFG from b_asic.special_operations import Delay, Input, Output +from b_asic.wdf_operations import SymmetricTwoportAdaptor def wdf_allpass( diff --git a/b_asic/wdf_operations.py b/b_asic/wdf_operations.py new file mode 100644 index 00000000..84f08ec0 --- /dev/null +++ b/b_asic/wdf_operations.py @@ -0,0 +1,207 @@ +""" +B-ASIC Core Operations Module. + +Contains wave digital filter adaptors. +""" +from typing import Dict, Optional, Tuple + +from b_asic.graph_component import Name, TypeName +from b_asic.operation import AbstractOperation +from b_asic.port import SignalSourceProvider +from b_asic.types import Num + + +class SymmetricTwoportAdaptor(AbstractOperation): + r""" + Wave digital filter symmetric twoport-adaptor operation. + + .. math:: + \begin{eqnarray} + y_0 & = & x_1 + \text{value}\times\left(x_1 - x_0\right)\\ + y_1 & = & x_0 + \text{value}\times\left(x_1 - x_0\right) + \end{eqnarray} + """ + is_linear = True + is_swappable = True + + def __init__( + self, + value: Num = 0, + src0: Optional[SignalSourceProvider] = None, + src1: Optional[SignalSourceProvider] = None, + name: Name = Name(""), + latency: Optional[int] = None, + latency_offsets: Optional[Dict[str, int]] = None, + execution_time: Optional[int] = None, + ): + """Construct a SymmetricTwoportAdaptor operation.""" + super().__init__( + input_count=2, + output_count=2, + name=Name(name), + input_sources=[src0, src1], + latency=latency, + latency_offsets=latency_offsets, + execution_time=execution_time, + ) + self.value = value + + @classmethod + def type_name(cls) -> TypeName: + return TypeName("sym2p") + + def evaluate(self, a, b): + tmp = self.value * (b - a) + return b + tmp, a + tmp + + @property + def value(self) -> Num: + """Get the constant value of this operation.""" + return self.param("value") + + @value.setter + def value(self, value: Num) -> None: + """Set the constant value of this operation.""" + if -1 <= value <= 1: + self.set_param("value", value) + else: + raise ValueError('value must be between -1 and 1 (inclusive)') + + def swap_io(self) -> None: + # Swap inputs and outputs and change sign of coefficient + self._input_ports.reverse() + for i, p in enumerate(self._input_ports): + p._index = i + self._output_ports.reverse() + for i, p in enumerate(self._output_ports): + p._index = i + self.set_param("value", -self.value) + + +class SeriesTwoportAdaptor(AbstractOperation): + r""" + Wave digital filter series twoport-adaptor operation. + + .. math:: + \begin{eqnarray} + y_0 & = & x_0 - \text{value}\times\left(x_0 + x_1\right)\\ + y_1 & = & x_1 - (2-\text{value})\times\left(x_0 + x_1\right) + \end{eqnarray} + """ + is_linear = True + is_swappable = True + + def __init__( + self, + value: Num = 0, + src0: Optional[SignalSourceProvider] = None, + src1: Optional[SignalSourceProvider] = None, + name: Name = Name(""), + latency: Optional[int] = None, + latency_offsets: Optional[Dict[str, int]] = None, + execution_time: Optional[int] = None, + ): + """Construct a SeriesTwoportAdaptor operation.""" + super().__init__( + input_count=2, + output_count=2, + name=Name(name), + input_sources=[src0, src1], + latency=latency, + latency_offsets=latency_offsets, + execution_time=execution_time, + ) + self.value = value + + @classmethod + def type_name(cls) -> TypeName: + return TypeName("ser2p") + + def evaluate(self, a, b): + s = a + b + val = self.value + return a - val * s, b - (2 - val) * s + + @property + def value(self) -> Num: + """Get the constant value of this operation.""" + return self.param("value") + + @value.setter + def value(self, value: Num) -> None: + """Set the constant value of this operation.""" + if 0 <= value <= 2: + self.set_param("value", value) + else: + raise ValueError('value must be between 0 and 2 (inclusive)') + + def swap_io(self) -> None: + # Swap inputs and outputs and, hence, which port is dependent + self._input_ports.reverse() + for i, p in enumerate(self._input_ports): + p._index = i + self._output_ports.reverse() + for i, p in enumerate(self._output_ports): + p._index = i + self.set_param("value", 2 - self.value) + + +class SeriesThreeportAdaptor(AbstractOperation): + r""" + Wave digital filter series threeport-adaptor operation. + + .. math:: + \begin{eqnarray} + y_0 & = & x_0 - \text{value}_0\times\left(x_0 + x_1 + x_2\right)\\ + y_1 & = & x_1 - \text{value}_1\times\left(x_0 + x_1 + x_2\right)\\ + y_2 & = & x_2 - \left(2 - \text{value}_0 - \text{value}_1\right)\times\left(x_0 + + x_1 + x_2\right) + \end{eqnarray} + """ + is_linear = True + is_swappable = True + + def __init__( + self, + value: Tuple[Num, Num] = (0, 0), + src0: Optional[SignalSourceProvider] = None, + src1: Optional[SignalSourceProvider] = None, + src2: Optional[SignalSourceProvider] = None, + name: Name = Name(""), + latency: Optional[int] = None, + latency_offsets: Optional[Dict[str, int]] = None, + execution_time: Optional[int] = None, + ): + """Construct a SeriesThreeportAdaptor operation.""" + super().__init__( + input_count=3, + output_count=3, + name=Name(name), + input_sources=[src0, src1, src2], + latency=latency, + latency_offsets=latency_offsets, + execution_time=execution_time, + ) + self.value = value + + @classmethod + def type_name(cls) -> TypeName: + return TypeName("ser3p") + + def evaluate(self, a, b, c): + s = a + b + c + val0, val1 = self.value + return a - val0 * s, b - val1 * s, c - (2 - val0 - val1) * s + + @property + def value(self) -> Num: + """Get the constant value of this operation.""" + return self.param("value") + + @value.setter + def value(self, value: Num) -> None: + """Set the constant value of this operation.""" + if 0 <= sum(value) <= 2: + self.set_param("value", value) + else: + raise ValueError('sum of value must be between 0 and 2 (inclusive)') diff --git a/docs_sphinx/api/wdf_operations.rst b/docs_sphinx/api/wdf_operations.rst new file mode 100644 index 00000000..10ecdee5 --- /dev/null +++ b/docs_sphinx/api/wdf_operations.rst @@ -0,0 +1,12 @@ +************************* +``b_asic.wdf_operations`` +************************* + +.. inheritance-diagram:: b_asic.wdf_operations + :parts: 1 + :top-classes: b_asic.graph_component.GraphComponent, b_asic.port.SignalSourceProvider + +.. automodule:: b_asic.wdf_operations + :members: + :undoc-members: + :show-inheritance: diff --git a/examples/lwdfallpass.py b/examples/lwdfallpass.py index 281856fe..68b9b082 100644 --- a/examples/lwdfallpass.py +++ b/examples/lwdfallpass.py @@ -7,10 +7,10 @@ LWDF first-order allpass section This has different latency offsets for the different inputs/outputs. """ -from b_asic.core_operations import SymmetricTwoportAdaptor from b_asic.schedule import Schedule from b_asic.signal_flow_graph import SFG from b_asic.special_operations import Delay, Input, Output +from b_asic.wdf_operations import SymmetricTwoportAdaptor in0 = Input() diff --git a/examples/thirdorderblwdf.py b/examples/thirdorderblwdf.py index fc289e24..02b45486 100644 --- a/examples/thirdorderblwdf.py +++ b/examples/thirdorderblwdf.py @@ -8,12 +8,13 @@ Small bireciprocal lattice wave digital filter. import numpy as np from mplsignal.freq_plots import freqz_fir -from b_asic.core_operations import Addition, SymmetricTwoportAdaptor +from b_asic.core_operations import Addition from b_asic.schedule import Schedule from b_asic.signal_flow_graph import SFG from b_asic.signal_generator import Impulse from b_asic.simulation import Simulation from b_asic.special_operations import Delay, Input, Output +from b_asic.wdf_operations import SymmetricTwoportAdaptor in0 = Input("x") D0 = Delay(in0) diff --git a/test/test_core_operations.py b/test/test_core_operations.py index 40f15f2f..f167a20b 100644 --- a/test/test_core_operations.py +++ b/test/test_core_operations.py @@ -19,7 +19,6 @@ from b_asic import ( Shift, SquareRoot, Subtraction, - SymmetricTwoportAdaptor, ) @@ -346,39 +345,6 @@ class TestButterfly: assert test_operation.evaluate_output(1, [2 + 1j, 3 - 2j]) == -1 + 3j -class TestSymmetricTwoportAdaptor: - """Tests for SymmetricTwoportAdaptor class.""" - - def test_symmetrictwoportadaptor_positive(self): - test_operation = SymmetricTwoportAdaptor(0.5) - assert test_operation.evaluate_output(0, [2, 3]) == 3.5 - assert test_operation.evaluate_output(1, [2, 3]) == 2.5 - assert test_operation.value == 0.5 - - def test_symmetrictwoportadaptor_negative(self): - test_operation = SymmetricTwoportAdaptor(0.5) - assert test_operation.evaluate_output(0, [-2, -3]) == -3.5 - assert test_operation.evaluate_output(1, [-2, -3]) == -2.5 - - def test_symmetrictwoportadaptor_complex(self): - test_operation = SymmetricTwoportAdaptor(0.5) - assert test_operation.evaluate_output(0, [2 + 1j, 3 - 2j]) == 3.5 - 3.5j - assert test_operation.evaluate_output(1, [2 + 1j, 3 - 2j]) == 2.5 - 0.5j - - def test_symmetrictwoportadaptor_swap_io(self): - test_operation = SymmetricTwoportAdaptor(0.5) - assert test_operation.value == 0.5 - test_operation.swap_io() - assert test_operation.value == -0.5 - - def test_symmetrictwoportadaptor_error(self): - with pytest.raises(ValueError, match="value must be between -1 and 1"): - _ = SymmetricTwoportAdaptor(-2) - test_operation = SymmetricTwoportAdaptor(0) - with pytest.raises(ValueError, match="value must be between -1 and 1"): - test_operation.value = 2 - - class TestReciprocal: """Tests for Absolute class.""" diff --git a/test/test_sfg.py b/test/test_sfg.py index da60341d..cfb57799 100644 --- a/test/test_sfg.py +++ b/test/test_sfg.py @@ -18,14 +18,14 @@ from b_asic.core_operations import ( Multiplication, SquareRoot, Subtraction, - SymmetricTwoportAdaptor, ) from b_asic.operation import ResultKey from b_asic.save_load_structure import python_to_sfg, sfg_to_python +from b_asic.sfg_generators import wdf_allpass from b_asic.signal_flow_graph import SFG, GraphID from b_asic.simulation import Simulation from b_asic.special_operations import Delay -from b_asic.sfg_generators import wdf_allpass +from b_asic.wdf_operations import SymmetricTwoportAdaptor class TestInit: @@ -816,7 +816,7 @@ class TestConnectExternalSignalsToComponentsSoloComp: assert not test_sfg.connect_external_signals_to_components() def test_connect_external_signals_to_components_multiple_operations_after_input( - self + self, ): """ Replaces an SFG with a symmetric two-port adaptor to test when the input @@ -830,6 +830,7 @@ class TestConnectExternalSignalsToComponentsSoloComp: assert test_sfg.evaluate(1) == -0.5 assert not test_sfg.connect_external_signals_to_components() + class TestConnectExternalSignalsToComponentsMultipleComp: def test_connect_external_signals_to_components_operation_tree( self, operation_tree @@ -1480,9 +1481,7 @@ class TestUnfold: ): self.do_tests(sfg_two_inputs_two_outputs_independent) - def test_threetapiir( - self, sfg_direct_form_iir_lp_filter: SFG - ): + def test_threetapiir(self, sfg_direct_form_iir_lp_filter: SFG): self.do_tests(sfg_direct_form_iir_lp_filter) def do_tests(self, sfg: SFG): @@ -1635,6 +1634,7 @@ class TestInsertComponentAfter: with pytest.raises(ValueError, match="Unknown component:"): sfg.insert_operation_after('foo', SquareRoot()) + class TestInsertComponentBefore: def test_insert_component_before_in_sfg(self, butterfly_operation_tree): sfg = SFG(outputs=list(map(Output, butterfly_operation_tree.outputs))) @@ -1653,22 +1653,22 @@ class TestInsertComponentBefore: SquareRoot, ) assert isinstance( - _sfg.find_by_name("bfly1")[0] - .input(0) - .signals[0] - .source.operation, + _sfg.find_by_name("bfly1")[0].input(0).signals[0].source.operation, SquareRoot, ) - assert sfg.find_by_name("bfly1")[0].input(0).signals[ - 0 - ].source.operation is sfg.find_by_name("bfly2")[0] - assert _sfg.find_by_name("bfly1")[0].input(0).signals[ - 0 - ].destination.operation is not _sfg.find_by_name("bfly2")[0] - assert _sfg.find_by_id("sqrt0").input(0).signals[ - 0 - ].source.operation is _sfg.find_by_name("bfly2")[0] + assert ( + sfg.find_by_name("bfly1")[0].input(0).signals[0].source.operation + is sfg.find_by_name("bfly2")[0] + ) + assert ( + _sfg.find_by_name("bfly1")[0].input(0).signals[0].destination.operation + is not _sfg.find_by_name("bfly2")[0] + ) + assert ( + _sfg.find_by_id("sqrt0").input(0).signals[0].source.operation + is _sfg.find_by_name("bfly2")[0] + ) def test_insert_component_before_mimo_operation_error( self, large_operation_tree_names diff --git a/test/test_sfg_generators.py b/test/test_sfg_generators.py index 31293462..b7771315 100644 --- a/test/test_sfg_generators.py +++ b/test/test_sfg_generators.py @@ -1,11 +1,7 @@ import numpy as np import pytest -from b_asic.core_operations import ( - Addition, - ConstantMultiplication, - SymmetricTwoportAdaptor, -) +from b_asic.core_operations import Addition, ConstantMultiplication from b_asic.sfg_generators import ( direct_form_fir, transposed_direct_form_fir, @@ -14,6 +10,7 @@ from b_asic.sfg_generators import ( from b_asic.signal_generator import Impulse from b_asic.simulation import Simulation from b_asic.special_operations import Delay +from b_asic.wdf_operations import SymmetricTwoportAdaptor def test_wdf_allpass(): diff --git a/test/test_wdf_operations.py b/test/test_wdf_operations.py new file mode 100644 index 00000000..ae883713 --- /dev/null +++ b/test/test_wdf_operations.py @@ -0,0 +1,108 @@ +"""B-ASIC test suite for the core operations.""" +import pytest + +from b_asic.wdf_operations import ( + SeriesThreeportAdaptor, + SeriesTwoportAdaptor, + SymmetricTwoportAdaptor, +) + + +class TestSymmetricTwoportAdaptor: + """Tests for SymmetricTwoportAdaptor class.""" + + def test_symmetrictwoportadaptor_positive(self): + test_operation = SymmetricTwoportAdaptor(0.5) + assert test_operation.evaluate_output(0, [2, 3]) == 3.5 + assert test_operation.evaluate_output(1, [2, 3]) == 2.5 + assert test_operation.value == 0.5 + + def test_symmetrictwoportadaptor_negative(self): + test_operation = SymmetricTwoportAdaptor(0.5) + assert test_operation.evaluate_output(0, [-2, -3]) == -3.5 + assert test_operation.evaluate_output(1, [-2, -3]) == -2.5 + + def test_symmetrictwoportadaptor_complex(self): + test_operation = SymmetricTwoportAdaptor(0.5) + assert test_operation.evaluate_output(0, [2 + 1j, 3 - 2j]) == 3.5 - 3.5j + assert test_operation.evaluate_output(1, [2 + 1j, 3 - 2j]) == 2.5 - 0.5j + + def test_symmetrictwoportadaptor_swap_io(self): + test_operation = SymmetricTwoportAdaptor(0.5) + assert test_operation.value == 0.5 + test_operation.swap_io() + assert test_operation.value == -0.5 + + def test_symmetrictwoportadaptor_error(self): + with pytest.raises(ValueError, match="value must be between -1 and 1"): + _ = SymmetricTwoportAdaptor(-2) + test_operation = SymmetricTwoportAdaptor(0) + with pytest.raises(ValueError, match="value must be between -1 and 1"): + test_operation.value = 2 + + +class TestSeriesTwoportAdaptor: + """Tests for SeriesTwoportAdaptor class.""" + + def test_seriestwoportadaptor_positive(self): + test_operation = SeriesTwoportAdaptor(0.5) + assert test_operation.evaluate_output(0, [2, 3]) == -0.5 + assert test_operation.evaluate_output(1, [2, 3]) == -4.5 + assert test_operation.value == 0.5 + + def test_seriestwoportadaptor_negative(self): + test_operation = SeriesTwoportAdaptor(0.5) + assert test_operation.evaluate_output(0, [-2, -3]) == 0.5 + assert test_operation.evaluate_output(1, [-2, -3]) == 4.5 + + def test_seriestwoportadaptor_complex(self): + test_operation = SeriesTwoportAdaptor(0.5) + assert test_operation.evaluate_output(0, [2 + 1j, 3 - 2j]) == -0.5 + 1.5j + assert test_operation.evaluate_output(1, [2 + 1j, 3 - 2j]) == -4.5 - 0.5j + + def test_seriestwoportadaptor_swap_io(self): + test_operation = SeriesTwoportAdaptor(0.5) + assert test_operation.value == 0.5 + test_operation.swap_io() + assert test_operation.value == 1.5 + + def test_seriestwoportadaptor_error(self): + with pytest.raises(ValueError, match="value must be between 0 and 2"): + _ = SeriesTwoportAdaptor(-1) + test_operation = SeriesTwoportAdaptor(0) + with pytest.raises(ValueError, match="value must be between 0 and 2"): + test_operation.value = 3 + + +class TestSeriesThreeportAdaptor: + """Tests for SeriesThreeportAdaptor class.""" + + def test_seriesthreeportadaptor_positive(self): + test_operation = SeriesThreeportAdaptor((0.5, 1.25)) + assert test_operation.evaluate_output(0, [2, 3, 4]) == -2.5 + assert test_operation.evaluate_output(1, [2, 3, 4]) == -8.25 + assert test_operation.evaluate_output(2, [2, 3, 4]) == 1.75 + assert test_operation.value == (0.5, 1.25) + + def test_seriesthreeportadaptor_negative(self): + test_operation = SeriesThreeportAdaptor((0.5, 1.25)) + assert test_operation.evaluate_output(0, [-2, -3, -4]) == 2.5 + assert test_operation.evaluate_output(1, [-2, -3, -4]) == 8.25 + assert test_operation.evaluate_output(2, [-2, -3, -4]) == -1.75 + + def test_seriesthreeportadaptor_complex(self): + test_operation = SeriesThreeportAdaptor((0.5, 1.25)) + assert test_operation.evaluate_output(0, [2 + 1j, 3 - 2j, 4 + 3j]) == -2.5 + 0j + assert ( + test_operation.evaluate_output(1, [2 + 1j, 3 - 2j, 4 + 3j]) == -8.25 - 4.5j + ) + assert ( + test_operation.evaluate_output(2, [2 + 1j, 3 - 2j, 4 + 3j]) == 1.75 + 2.5j + ) + + def test_seriestwoportadaptor_error(self): + with pytest.raises(ValueError, match="sum of value must be between 0 and 2"): + _ = SeriesThreeportAdaptor((0, 3)) + test_operation = SeriesThreeportAdaptor((0, 0.5)) + with pytest.raises(ValueError, match="sum of value must be between 0 and 2"): + test_operation.value = (-0.5, 4) -- GitLab