From 1135959b81651bfcdbd1cdc7a475e8dc9557204d Mon Sep 17 00:00:00 2001 From: Simon Bjurek <simbj106@student.liu.se> Date: Tue, 11 Feb 2025 11:30:18 +0000 Subject: [PATCH] Added direct form IIR tests and updated file for 100% coverage --- b_asic/sfg_generators.py | 37 +-- pyproject.toml | 3 +- test/test_core_schedulers.py | 6 +- test/test_sfg_generators.py | 457 ++++++++++++++++++++++++++++------- 4 files changed, 387 insertions(+), 116 deletions(-) diff --git a/b_asic/sfg_generators.py b/b_asic/sfg_generators.py index b71a35ed..737501f1 100644 --- a/b_asic/sfg_generators.py +++ b/b_asic/sfg_generators.py @@ -269,8 +269,14 @@ def direct_form_1_iir( add_properties: Optional[Union[Dict[str, int], Dict[str, Dict[str, int]]]] = None, ) -> SFG: """Generates a direct-form IIR filter of type I with coefficients a and b.""" + if len(a) < 2 or len(b) < 2: + raise ValueError( + "Size of coefficient lists a and b needs to contain at least 2 element." + ) if len(a) != len(b): - raise ValueError("size of coefficient lists a and b are not the same") + raise ValueError("Size of coefficient lists a and b are not the same.") + if a[0] != 1: + raise ValueError("The value of a[0] must be 1.") if name is None: name = "Direct-form I IIR filter" if mult_properties is None: @@ -324,8 +330,14 @@ def direct_form_2_iir( add_properties: Optional[Union[Dict[str, int], Dict[str, Dict[str, int]]]] = None, ) -> SFG: """Generates a direct-form IIR filter of type II with coefficients a and b.""" + if len(a) < 2 or len(b) < 2: + raise ValueError( + "Size of coefficient lists a and b needs to contain at least 2 element." + ) if len(a) != len(b): - raise ValueError("size of coefficient lists a and b are not the same") + raise ValueError("Size of coefficient lists a and b are not the same.") + if a[0] != 1: + raise ValueError("The value of a[0] must be 1.") if name is None: name = "Direct-form II IIR filter" if mult_properties is None: @@ -371,7 +383,10 @@ def direct_form_2_iir( left_adds.append(Addition(input_op, left_muls[-1], **add_properties)) delays[-1] <<= left_adds[-1] mul = ConstantMultiplication(b[0], left_adds[-1], **mult_properties) - add = Addition(mul, right_adds[-1], **add_properties) + if right_adds: + add = Addition(mul, right_adds[-1], **add_properties) + else: + add = Addition(mul, right_muls[-1], **add_properties) output = Output() output <<= add return SFG([input_op], [output], name=Name(name)) @@ -441,10 +456,7 @@ def _construct_dif_fft_stage( twiddle_factor = twiddles[bf_index] if twiddle_factor != 1: - name = _get_formatted_complex_number(twiddle_factor, 2) - twiddle_mul = ConstantMultiplication( - twiddles[bf_index], output2, name=name - ) + twiddle_mul = ConstantMultiplication(twiddles[bf_index], output2) output2 = twiddle_mul.output(0) ports[input1_index] = output1 @@ -453,17 +465,6 @@ def _construct_dif_fft_stage( return ports -def _get_formatted_complex_number(number: np.complex128, digits: int) -> str: - real_str = str(np.round(number.real, digits)) - imag_str = str(np.round(number.imag, digits)) - if number.imag == 0: - return real_str - elif number.imag > 0: - return f"{real_str} + j{imag_str}" - else: - return f"{real_str} - j{str(-np.round(number.imag, digits))}" - - def _get_bit_reversed_number(number: int, number_of_bits: int) -> int: reversed_number = 0 for i in range(number_of_bits): diff --git a/pyproject.toml b/pyproject.toml index 4250e3be..58dc2c51 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -40,6 +40,7 @@ test = [ "pytest-timeout", "pytest-xvfb", "pytest-xdist", + "scipy", ] doc = [ "sphinx", @@ -99,4 +100,4 @@ precision = 2 lint.ignore = ["F403"] [tool.typos] -default.extend-identifiers = { addd0 = "addd0", inout = "inout", ArChItEctUrE = "ArChItEctUrE" } +default.extend-identifiers = { ba = "ba", addd0 = "addd0", inout = "inout", ArChItEctUrE = "ArChItEctUrE" } diff --git a/test/test_core_schedulers.py b/test/test_core_schedulers.py index 147d230d..f934f157 100644 --- a/test/test_core_schedulers.py +++ b/test/test_core_schedulers.py @@ -18,7 +18,7 @@ class TestASAPScheduler: Schedule(sfg_empty, scheduler=ASAPScheduler()) def test_direct_form_1_iir(self): - sfg = direct_form_1_iir([1, 2, 3], [4, 5, 6]) + sfg = direct_form_1_iir([1, 2, 3], [1, 2, 3]) sfg.set_latency_of_type(ConstantMultiplication.type_name(), 2) sfg.set_execution_time_of_type(ConstantMultiplication.type_name(), 1) @@ -148,7 +148,7 @@ class TestALAPScheduler: Schedule(sfg_empty, scheduler=ALAPScheduler()) def test_direct_form_1_iir(self): - sfg = direct_form_1_iir([1, 2, 3], [4, 5, 6]) + sfg = direct_form_1_iir([1, 2, 3], [1, 2, 3]) sfg.set_latency_of_type(ConstantMultiplication.type_name(), 2) sfg.set_execution_time_of_type(ConstantMultiplication.type_name(), 1) @@ -278,7 +278,7 @@ class TestEarliestDeadlineScheduler: Schedule(sfg_empty, scheduler=EarliestDeadlineScheduler()) def test_direct_form_1_iir(self): - sfg = direct_form_1_iir([1, 2, 3], [4, 5, 6]) + sfg = direct_form_1_iir([1, 2, 3], [1, 2, 3]) sfg.set_latency_of_type(ConstantMultiplication.type_name(), 2) sfg.set_execution_time_of_type(ConstantMultiplication.type_name(), 1) diff --git a/test/test_sfg_generators.py b/test/test_sfg_generators.py index c199fb63..7064658c 100644 --- a/test/test_sfg_generators.py +++ b/test/test_sfg_generators.py @@ -1,5 +1,6 @@ import numpy as np import pytest +from scipy import signal from b_asic.core_operations import ( Addition, @@ -8,12 +9,14 @@ from b_asic.core_operations import ( SymmetricTwoportAdaptor, ) from b_asic.sfg_generators import ( + direct_form_1_iir, + direct_form_2_iir, direct_form_fir, radix_2_dif_fft, transposed_direct_form_fir, wdf_allpass, ) -from b_asic.signal_generator import Constant, Impulse +from b_asic.signal_generator import Constant, Impulse, ZeroPad from b_asic.simulation import Simulation from b_asic.special_operations import Delay @@ -238,133 +241,399 @@ def test_sfg_generator_errors(): gen([[1, 2], [1, 3]]) -def test_radix_2_dif_fft_4_points_constant_input(): - sfg = radix_2_dif_fft(points=4) +class TestDirectFormIIRType1: + def test_correct_number_of_operations_and_name(self): + N = 17 - assert len(sfg.inputs) == 4 - assert len(sfg.outputs) == 4 + b = [i + 1 for i in range(N + 1)] + a = [i + 1 for i in range(N + 1)] - bfs = sfg.find_by_type_name(Butterfly.type_name()) - assert len(bfs) == 4 + sfg = direct_form_1_iir(b, a, name="test iir direct form 1") - muls = sfg.find_by_type_name(ConstantMultiplication.type_name()) - assert len(muls) == 1 + amount_of_muls = len(sfg.find_by_type_name(ConstantMultiplication.type_name())) + assert amount_of_muls == 2 * N + 1 - # simulate when the input signal is a constant 1 - input_samples = [Impulse() for _ in range(4)] - sim = Simulation(sfg, input_samples) - sim.run_for(1) + amount_of_adds = len(sfg.find_by_type_name(Addition.type_name())) + assert amount_of_adds == 2 * N - # ensure that the result is an impulse at time 0 with weight 4 - res = sim.results - for i in range(4): - exp_res = 4 if i == 0 else 0 - assert np.allclose(res[str(i)], exp_res) + amount_of_delays = len(sfg.find_by_type_name(Delay.type_name())) + assert amount_of_delays == 2 * N + amount_of_ops = len(sfg.operations) + assert amount_of_ops == 6 * N + 3 -def test_radix_2_dif_fft_8_points_impulse_input(): - sfg = radix_2_dif_fft(points=8) + assert sfg.name == "test iir direct form 1" - assert len(sfg.inputs) == 8 - assert len(sfg.outputs) == 8 + def test_b_single_coeff(self): + with pytest.raises( + ValueError, + match="Size of coefficient lists a and b needs to contain at least 2 element.", + ): + direct_form_1_iir([1], [2, 3]) - bfs = sfg.find_by_type_name(Butterfly.type_name()) - assert len(bfs) == 12 + def test_a_single_coeff(self): + with pytest.raises( + ValueError, + match="Size of coefficient lists a and b needs to contain at least 2 element.", + ): + direct_form_1_iir([1, 2], [3]) - muls = sfg.find_by_type_name(ConstantMultiplication.type_name()) - assert len(muls) == 5 + def test_coeffs_not_same_size(self): + with pytest.raises( + ValueError, match="Size of coefficient lists a and b are not the same." + ): + direct_form_1_iir([1, 2, 3], [1, 2]) - # simulate when the input signal is an impulse at time 0 - input_samples = [Impulse(), 0, 0, 0, 0, 0, 0, 0] - sim = Simulation(sfg, input_samples) - sim.run_for(1) + with pytest.raises( + ValueError, match="Size of coefficient lists a and b are not the same." + ): + direct_form_1_iir([i for i in range(10)], [i for i in range(11)]) - # ensure that the result is a constant 1 - res = sim.results - for i in range(8): - assert np.allclose(res[str(i)], 1) + with pytest.raises( + ValueError, match="Size of coefficient lists a and b are not the same." + ): + direct_form_1_iir([i for i in range(10)], [i for i in range(11)]) + def test_a0_not_1(self): + with pytest.raises(ValueError, match=r"The value of a\[0] must be 1\."): + direct_form_1_iir(b=[1, 2, 3], a=[1.1, 2, 3]) -def test_radix_2_dif_fft_8_points_sinus_input(): - POINTS = 8 - sfg = radix_2_dif_fft(points=POINTS) + def test_first_order_filter(self): + N = 1 + Wc = 0.5 - assert len(sfg.inputs) == POINTS - assert len(sfg.outputs) == POINTS + b, a = signal.butter(N, Wc, btype="lowpass", output="ba") - n = np.linspace(0, 2 * np.pi, POINTS) - waveform = np.sin(n) - input_samples = [Constant(waveform[i]) for i in range(POINTS)] + input_signal = np.random.randn(100) + reference_filter_output = signal.lfilter(b, a, input_signal) - sim = Simulation(sfg, input_samples) - sim.run_for(1) + sfg = direct_form_1_iir(b, a, name="test iir direct form 1") - exp_res = abs(np.fft.fft(waveform)) + sim = Simulation(sfg, [ZeroPad(input_signal)]) + sim.run_for(100) - res = sim.results - for i in range(POINTS): - a = abs(res[str(i)]) - b = exp_res[i] - assert np.isclose(a, b) + assert np.allclose(sim.results['0'], reference_filter_output) + def test_random_input_compare_with_scipy_butterworth_filter(self): + N = 10 + Wc = 0.3 -def test_radix_2_dif_fft_16_points_sinus_input(): - POINTS = 16 - sfg = radix_2_dif_fft(points=POINTS) + b, a = signal.butter(N, Wc, btype="lowpass", output="ba") - assert len(sfg.inputs) == POINTS - assert len(sfg.outputs) == POINTS + input_signal = np.random.randn(100) + reference_filter_output = signal.lfilter(b, a, input_signal) - bfs = sfg.find_by_type_name(Butterfly.type_name()) - assert len(bfs) == 8 * 4 + sfg = direct_form_1_iir(b, a, name="test iir direct form 1") - muls = sfg.find_by_type_name(ConstantMultiplication.type_name()) - assert len(muls) == 17 + sim = Simulation(sfg, [ZeroPad(input_signal)]) + sim.run_for(100) - n = np.linspace(0, 2 * np.pi, POINTS) - waveform = np.sin(n) - input_samples = [Constant(waveform[i]) for i in range(POINTS)] + assert np.allclose(sim.results['0'], reference_filter_output) - sim = Simulation(sfg, input_samples) - sim.run_for(1) + def test_random_input_compare_with_scipy_elliptic_filter(self): + N = 2 + Wc = 0.3 - exp_res = np.fft.fft(waveform) - res = sim.results - for i in range(POINTS): - a = res[str(i)] - b = exp_res[i] - assert np.isclose(a, b) + b, a = signal.ellip(N, 0.1, 60, Wc, btype='low', analog=False) + b, a = signal.butter(N, Wc, btype="lowpass", output="ba") + input_signal = np.random.randn(100) + reference_filter_output = signal.lfilter(b, a, input_signal) -def test_radix_2_dif_fft_256_points_sinus_input(): - POINTS = 256 - sfg = radix_2_dif_fft(points=POINTS) + sfg = direct_form_1_iir(b, a, name="test iir direct form 1") - assert len(sfg.inputs) == POINTS - assert len(sfg.outputs) == POINTS + sim = Simulation(sfg, [ZeroPad(input_signal)]) + sim.run_for(100) - n = np.linspace(0, 2 * np.pi, POINTS) - waveform = np.sin(n) - input_samples = [Constant(waveform[i]) for i in range(POINTS)] + assert np.allclose(sim.results['0'], reference_filter_output) - sim = Simulation(sfg, input_samples) - sim.run_for(1) + def test_add_and_mult_properties(self): + N = 17 - exp_res = np.fft.fft(waveform) - res = sim.results - for i in range(POINTS): - a = res[str(i)] - b = exp_res[i] - assert np.isclose(a, b) + b = [i + 1 for i in range(N + 1)] + a = [i + 1 for i in range(N + 1)] + sfg = direct_form_1_iir( + b, + a, + mult_properties={"latency": 5, "execution_time": 2}, + add_properties={"latency": 3, "execution_time": 1}, + ) + + adds = sfg.find_by_type_name(Addition.type_name()) + for add in adds: + assert add.latency == 3 + assert add.execution_time == 1 + + muls = sfg.find_by_type_name(ConstantMultiplication.type_name()) + for mul in muls: + assert mul.latency == 5 + assert mul.execution_time == 2 + + +class TestDirectFormIIRType2: + def test_correct_number_of_operations_and_name(self): + N = 17 + + b = [i + 1 for i in range(N + 1)] + a = [i + 1 for i in range(N + 1)] + + sfg = direct_form_2_iir(b, a, name="test iir direct form 2") + + amount_of_muls = len(sfg.find_by_type_name(ConstantMultiplication.type_name())) + assert amount_of_muls == 2 * N + 1 + + amount_of_adds = len(sfg.find_by_type_name(Addition.type_name())) + assert amount_of_adds == 2 * N + + amount_of_delays = len(sfg.find_by_type_name(Delay.type_name())) + assert amount_of_delays == N + + amount_of_ops = len(sfg.operations) + assert amount_of_ops == 5 * N + 3 + + assert sfg.name == "test iir direct form 2" + + def test_b_single_coeff(self): + with pytest.raises( + ValueError, + match="Size of coefficient lists a and b needs to contain at least 2 element.", + ): + direct_form_2_iir([1], [2, 3]) + + def test_a_single_coeff(self): + with pytest.raises( + ValueError, + match="Size of coefficient lists a and b needs to contain at least 2 element.", + ): + direct_form_2_iir([1, 2], [3]) + + def test_a0_not_1(self): + with pytest.raises(ValueError, match=r"The value of a\[0] must be 1\."): + direct_form_2_iir(b=[1, 2, 3], a=[1.1, 2, 3]) + + def test_coeffs_not_same_size(self): + with pytest.raises( + ValueError, match="Size of coefficient lists a and b are not the same." + ): + direct_form_2_iir([1, 2, 3], [1, 2]) + + def test_first_order_filter(self): + N = 1 + Wc = 0.5 + + b, a = signal.butter(N, Wc, btype="lowpass", output="ba") + + input_signal = np.random.randn(100) + reference_filter_output = signal.lfilter(b, a, input_signal) + + sfg = direct_form_2_iir(b, a, name="test iir direct form 1") + + sim = Simulation(sfg, [ZeroPad(input_signal)]) + sim.run_for(100) + + assert np.allclose(sim.results['0'], reference_filter_output) + + def test_random_input_compare_with_scipy_butterworth_filter(self): + N = 10 + Wc = 0.3 + + b, a = signal.butter(N, Wc, btype="lowpass", output="ba") + + input_signal = np.random.randn(100) + reference_filter_output = signal.lfilter(b, a, input_signal) + + sfg = direct_form_2_iir(b, a, name="test iir direct form 1") + + sim = Simulation(sfg, [ZeroPad(input_signal)]) + sim.run_for(100) + + assert np.allclose(sim.results['0'], reference_filter_output) + + def test_random_input_compare_with_scipy_elliptic_filter(self): + N = 2 + Wc = 0.3 + + b, a = signal.ellip(N, 0.1, 60, Wc, btype='low', analog=False) + b, a = signal.butter(N, Wc, btype="lowpass", output="ba") + + input_signal = np.random.randn(100) + reference_filter_output = signal.lfilter(b, a, input_signal) + + sfg = direct_form_2_iir(b, a, name="test iir direct form 1") + + sim = Simulation(sfg, [ZeroPad(input_signal)]) + sim.run_for(100) + + assert np.allclose(sim.results['0'], reference_filter_output) + + def test_add_and_mult_properties(self): + N = 17 + + b = [i + 1 for i in range(N + 1)] + a = [i + 1 for i in range(N + 1)] + + sfg = direct_form_2_iir( + b, + a, + mult_properties={"latency": 5, "execution_time": 2}, + add_properties={"latency": 3, "execution_time": 1}, + ) + + adds = sfg.find_by_type_name(Addition.type_name()) + for add in adds: + assert add.latency == 3 + assert add.execution_time == 1 + + muls = sfg.find_by_type_name(ConstantMultiplication.type_name()) + for mul in muls: + assert mul.latency == 5 + assert mul.execution_time == 2 + + +class TestRadix2FFT: + def test_4_points_constant_input(self): + sfg = radix_2_dif_fft(points=4) + + assert len(sfg.inputs) == 4 + assert len(sfg.outputs) == 4 + + bfs = sfg.find_by_type_name(Butterfly.type_name()) + assert len(bfs) == 4 + + muls = sfg.find_by_type_name(ConstantMultiplication.type_name()) + assert len(muls) == 1 + + # simulate when the input signal is a constant 1 + input_samples = [Impulse() for _ in range(4)] + sim = Simulation(sfg, input_samples) + sim.run_for(1) + + # ensure that the result is an impulse at time 0 with weight 4 + res = sim.results + for i in range(4): + exp_res = 4 if i == 0 else 0 + assert np.allclose(res[str(i)], exp_res) + + def test_8_points_impulse_input(self): + sfg = radix_2_dif_fft(points=8) + + assert len(sfg.inputs) == 8 + assert len(sfg.outputs) == 8 + + bfs = sfg.find_by_type_name(Butterfly.type_name()) + assert len(bfs) == 12 + + muls = sfg.find_by_type_name(ConstantMultiplication.type_name()) + assert len(muls) == 5 + + # simulate when the input signal is an impulse at time 0 + input_samples = [Impulse(), 0, 0, 0, 0, 0, 0, 0] + sim = Simulation(sfg, input_samples) + sim.run_for(1) + + # ensure that the result is a constant 1 + res = sim.results + for i in range(8): + assert np.allclose(res[str(i)], 1) + + def test_8_points_sinus_input(self): + POINTS = 8 + sfg = radix_2_dif_fft(points=POINTS) + + assert len(sfg.inputs) == POINTS + assert len(sfg.outputs) == POINTS + + n = np.linspace(0, 2 * np.pi, POINTS) + waveform = np.sin(n) + input_samples = [Constant(waveform[i]) for i in range(POINTS)] + + sim = Simulation(sfg, input_samples) + sim.run_for(1) + + exp_res = abs(np.fft.fft(waveform)) + + res = sim.results + for i in range(POINTS): + a = abs(res[str(i)]) + b = exp_res[i] + assert np.isclose(a, b) + + def test_16_points_sinus_input(self): + POINTS = 16 + sfg = radix_2_dif_fft(points=POINTS) + + assert len(sfg.inputs) == POINTS + assert len(sfg.outputs) == POINTS + + bfs = sfg.find_by_type_name(Butterfly.type_name()) + assert len(bfs) == 8 * 4 + + muls = sfg.find_by_type_name(ConstantMultiplication.type_name()) + assert len(muls) == 17 + + n = np.linspace(0, 2 * np.pi, POINTS) + waveform = np.sin(n) + input_samples = [Constant(waveform[i]) for i in range(POINTS)] + + sim = Simulation(sfg, input_samples) + sim.run_for(1) + + exp_res = np.fft.fft(waveform) + res = sim.results + for i in range(POINTS): + a = res[str(i)] + b = exp_res[i] + assert np.isclose(a, b) + + def test_256_points_sinus_input(self): + POINTS = 256 + sfg = radix_2_dif_fft(points=POINTS) + + assert len(sfg.inputs) == POINTS + assert len(sfg.outputs) == POINTS + + n = np.linspace(0, 2 * np.pi, POINTS) + waveform = np.sin(n) + input_samples = [Constant(waveform[i]) for i in range(POINTS)] + + sim = Simulation(sfg, input_samples) + sim.run_for(1) + + exp_res = np.fft.fft(waveform) + res = sim.results + for i in range(POINTS): + a = res[str(i)] + b = exp_res[i] + assert np.isclose(a, b) + + def test_512_points_multi_tone_input(self): + POINTS = 512 + sfg = radix_2_dif_fft(points=POINTS) + + assert len(sfg.inputs) == POINTS + assert len(sfg.outputs) == POINTS -def test_radix_2_dif_fft_negative_number_of_points(): - POINTS = -8 - with pytest.raises(ValueError, match="Points must be positive number."): - radix_2_dif_fft(points=POINTS) + n = np.linspace(0, 2 * np.pi, POINTS) + waveform = np.sin(n) + np.sin(0.5 * n) + np.sin(0.1 * n) + input_samples = [Constant(waveform[i]) for i in range(POINTS)] + sim = Simulation(sfg, input_samples) + sim.run_for(1) -def test_radix_2_dif_fft_number_of_points_not_power_of_2(): - POINTS = 5 - with pytest.raises(ValueError, match="Points must be a power of two."): - radix_2_dif_fft(points=POINTS) + exp_res = np.fft.fft(waveform) + res = sim.results + for i in range(POINTS): + a = res[str(i)] + b = exp_res[i] + assert np.isclose(a, b) + + def test_negative_number_of_points(self): + POINTS = -8 + with pytest.raises(ValueError, match="Points must be positive number."): + radix_2_dif_fft(points=POINTS) + + def test_number_of_points_not_power_of_2(self): + POINTS = 5 + with pytest.raises(ValueError, match="Points must be a power of two."): + radix_2_dif_fft(points=POINTS) -- GitLab