From 183b967d7b0ff7ed88ba324efd81a419f070976d Mon Sep 17 00:00:00 2001 From: Simon Bjurek <simbj106@student.liu.se> Date: Wed, 5 Feb 2025 17:46:54 +0100 Subject: [PATCH 1/3] radix 2 dif fft work in progress, adding twiddle factors remaining --- b_asic/sfg_generators.py | 85 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 85 insertions(+) diff --git a/b_asic/sfg_generators.py b/b_asic/sfg_generators.py index 2b693af3..b9ca6a4f 100644 --- a/b_asic/sfg_generators.py +++ b/b_asic/sfg_generators.py @@ -10,6 +10,7 @@ import numpy as np from b_asic.core_operations import ( Addition, + Butterfly, ConstantMultiplication, Name, SymmetricTwoportAdaptor, @@ -371,3 +372,87 @@ def direct_form_2_iir( output = Output() output <<= add return SFG([input_op], [output], name=Name(name)) + + +def radix_2_dif_fft( + points: int, + mult_properties: Optional[Union[Dict[str, int], Dict[str, Dict[str, int]]]] = None, + add_properties: Optional[Union[Dict[str, int], Dict[str, Dict[str, int]]]] = None, +) -> SFG: + if points < 0: + raise ValueError("Points must be positive number.") + if points & (points - 1) != 0: + raise ValueError("Points must be a power of two.") + + inputs = [] + for i in range(points): + inputs.append(Input(name=f"Input: {i}")) + + ports = inputs + number_of_stages = int(np.log2(points)) + + twiddles = _generate_twiddles(points, number_of_stages) + print(twiddles) + + for stage in range(number_of_stages): + ports = _construct_dif_fft_stage(ports, number_of_stages, stage) + + ports = _get_bit_reversed_ports(ports) + outputs = [] + for i, port in enumerate(ports): + outputs.append(Output(port, name=f"Output: {i}")) + + return SFG(inputs=inputs, outputs=outputs) + + +def _construct_dif_fft_stage(ports_from_previous_stage, number_of_stages, stage): + ports = ports_from_previous_stage.copy() + number_of_butterflies = len(ports) // 2 + number_of_groups = 2 ** (stage) + group_size = number_of_butterflies // number_of_groups + + for group_index in range(number_of_groups): + for bf_index in range(group_size): + input1_index = group_index * 2 * group_size + bf_index + input2_index = input1_index + group_size + + input1 = ports[input1_index] + input2 = ports[input2_index] + + butterfly = Butterfly(input1, input2, name=f"bf: {stage*4+bf_index}") + output1, output2 = butterfly.outputs + + ports[input1_index] = output1 + ports[input2_index] = output2 + + return ports + + +def _get_bit_reversed_number(number, number_of_bits) -> int: + reversed_number = 0 + for i in range(number_of_bits): + # mask out the current bit + current_bit = (number >> i) & 1 + # compute the position of the current bit in the reversed string + reversed_pos = number_of_bits - 1 - i + # place the current bit in that position + reversed_number |= current_bit << reversed_pos + return reversed_number + + +def _get_bit_reversed_ports(ports): + num_of_ports = len(ports) + bits = int(np.log2(num_of_ports)) + return [ports[_get_bit_reversed_number(i, bits)] for i in range(num_of_ports)] + + +def _generate_twiddles(points, number_of_stages): + twiddles = [] + for stage in range(1, number_of_stages + 1): + stage_twiddles = [] + for k in range(points // 2 ** (stage)): + twiddle = np.exp(-1j * 2 * np.pi * stage * k / points) + print("STAGE:", stage, "k:", k, "TW:", twiddle) + stage_twiddles.append(twiddle) + twiddles.append(stage_twiddles) + return twiddles -- GitLab From 30719d0b792e9d75d6a930e9c9abdadbf3f9a484 Mon Sep 17 00:00:00 2001 From: Simon Bjurek <simbj106@student.liu.se> Date: Fri, 7 Feb 2025 09:30:31 +0100 Subject: [PATCH 2/3] Finalized FFT by adding twiddle factors and added automatic tests. --- .gitignore | 2 + b_asic/sfg_generators.py | 59 ++++++++++---- test/test_sfg_generators.py | 158 +++++++++++++++++++++++++++++++++++- 3 files changed, 201 insertions(+), 18 deletions(-) diff --git a/.gitignore b/.gitignore index 4530b5b1..52db8cb9 100644 --- a/.gitignore +++ b/.gitignore @@ -118,3 +118,5 @@ docs_sphinx/_build/ docs_sphinx/examples result_images/ .coverage +Digraph.gv +Digraph.gv.pdf diff --git a/b_asic/sfg_generators.py b/b_asic/sfg_generators.py index b9ca6a4f..e431ef97 100644 --- a/b_asic/sfg_generators.py +++ b/b_asic/sfg_generators.py @@ -4,7 +4,7 @@ B-ASIC signal flow graph generators. This module contains a number of functions generating SFGs for specific functions. """ -from typing import Dict, Optional, Sequence, Union +from typing import TYPE_CHECKING, Dict, Optional, Sequence, Union import numpy as np @@ -19,6 +19,9 @@ from b_asic.signal import Signal from b_asic.signal_flow_graph import SFG from b_asic.special_operations import Delay, Input, Output +if TYPE_CHECKING: + from b_asic.port import OutputPort + def wdf_allpass( coefficients: Sequence[float], @@ -374,11 +377,7 @@ def direct_form_2_iir( return SFG([input_op], [output], name=Name(name)) -def radix_2_dif_fft( - points: int, - mult_properties: Optional[Union[Dict[str, int], Dict[str, Dict[str, int]]]] = None, - add_properties: Optional[Union[Dict[str, int], Dict[str, Dict[str, int]]]] = None, -) -> SFG: +def radix_2_dif_fft(points: int) -> SFG: if points < 0: raise ValueError("Points must be positive number.") if points & (points - 1) != 0: @@ -392,10 +391,11 @@ def radix_2_dif_fft( number_of_stages = int(np.log2(points)) twiddles = _generate_twiddles(points, number_of_stages) - print(twiddles) for stage in range(number_of_stages): - ports = _construct_dif_fft_stage(ports, number_of_stages, stage) + ports = _construct_dif_fft_stage( + ports, number_of_stages, stage, twiddles[stage] + ) ports = _get_bit_reversed_ports(ports) outputs = [] @@ -405,10 +405,15 @@ def radix_2_dif_fft( return SFG(inputs=inputs, outputs=outputs) -def _construct_dif_fft_stage(ports_from_previous_stage, number_of_stages, stage): +def _construct_dif_fft_stage( + ports_from_previous_stage: list["OutputPort"], + number_of_stages: int, + stage: int, + twiddles: list[np.complex128], +): ports = ports_from_previous_stage.copy() number_of_butterflies = len(ports) // 2 - number_of_groups = 2 ** (stage) + number_of_groups = 2**stage group_size = number_of_butterflies // number_of_groups for group_index in range(number_of_groups): @@ -419,20 +424,40 @@ def _construct_dif_fft_stage(ports_from_previous_stage, number_of_stages, stage) input1 = ports[input1_index] input2 = ports[input2_index] - butterfly = Butterfly(input1, input2, name=f"bf: {stage*4+bf_index}") + butterfly = Butterfly(input1, input2) output1, output2 = butterfly.outputs + twiddle_factor = twiddles[bf_index] + if twiddle_factor != 1: + name = _get_formatted_complex_number(twiddle_factor, 2) + twiddle_mul = ConstantMultiplication( + twiddles[bf_index], output2, name=name + ) + output2 = twiddle_mul.output(0) + ports[input1_index] = output1 ports[input2_index] = output2 return ports -def _get_bit_reversed_number(number, number_of_bits) -> int: +def _get_formatted_complex_number(number: np.complex128, digits: int) -> str: + real_str = str(np.round(number.real, digits)) + imag_str = str(np.round(number.imag, digits)) + if number.imag == 0: + return real_str + elif number.imag > 0: + return f"{real_str} + j{imag_str}" + else: + return f"{real_str} - j{str(-np.round(number.imag, digits))}" + + +def _get_bit_reversed_number(number: int, number_of_bits: int) -> int: reversed_number = 0 for i in range(number_of_bits): # mask out the current bit - current_bit = (number >> i) & 1 + shift_num = number + current_bit = (shift_num >> i) & 1 # compute the position of the current bit in the reversed string reversed_pos = number_of_bits - 1 - i # place the current bit in that position @@ -440,19 +465,19 @@ def _get_bit_reversed_number(number, number_of_bits) -> int: return reversed_number -def _get_bit_reversed_ports(ports): +def _get_bit_reversed_ports(ports: list["OutputPort"]) -> list["OutputPort"]: num_of_ports = len(ports) bits = int(np.log2(num_of_ports)) return [ports[_get_bit_reversed_number(i, bits)] for i in range(num_of_ports)] -def _generate_twiddles(points, number_of_stages): +def _generate_twiddles(points: int, number_of_stages: int) -> list[np.complex128]: twiddles = [] for stage in range(1, number_of_stages + 1): stage_twiddles = [] for k in range(points // 2 ** (stage)): - twiddle = np.exp(-1j * 2 * np.pi * stage * k / points) - print("STAGE:", stage, "k:", k, "TW:", twiddle) + a = 2 ** (stage - 1) + twiddle = np.exp(-1j * 2 * np.pi * a * k / points) stage_twiddles.append(twiddle) twiddles.append(stage_twiddles) return twiddles diff --git a/test/test_sfg_generators.py b/test/test_sfg_generators.py index 31293462..6f2eaf2e 100644 --- a/test/test_sfg_generators.py +++ b/test/test_sfg_generators.py @@ -3,15 +3,17 @@ import pytest from b_asic.core_operations import ( Addition, + Butterfly, ConstantMultiplication, SymmetricTwoportAdaptor, ) from b_asic.sfg_generators import ( direct_form_fir, + radix_2_dif_fft, transposed_direct_form_fir, wdf_allpass, ) -from b_asic.signal_generator import Impulse +from b_asic.signal_generator import Constant, Impulse from b_asic.simulation import Simulation from b_asic.special_operations import Delay @@ -234,3 +236,157 @@ def test_sfg_generator_errors(): gen([]) with pytest.raises(TypeError, match="coefficients must be a 1D-array"): gen([[1, 2], [1, 3]]) + + +def test_radix_2_dif_fft_4_points_constant_input(): + sfg = radix_2_dif_fft(points=4) + + assert len(sfg.inputs) == 4 + assert len(sfg.outputs) == 4 + + bfs = sfg.find_by_type_name(Butterfly.type_name()) + assert len(bfs) == 4 + + muls = sfg.find_by_type_name(ConstantMultiplication.type_name()) + assert len(muls) == 1 + + # simulate when the input signal is a constant 1 + input_samples = [Impulse() for _ in range(4)] + sim = Simulation(sfg, input_samples) + sim.run_for(1) + + # ensure that the result is an impulse at time 0 with weight 4 + res = sim.results + for i in range(4): + exp_res = 4 if i == 0 else 0 + assert np.allclose(res[str(i)], exp_res) + + +def test_radix_2_dif_fft_8_points_impulse_input(): + sfg = radix_2_dif_fft(points=8) + + assert len(sfg.inputs) == 8 + assert len(sfg.outputs) == 8 + + bfs = sfg.find_by_type_name(Butterfly.type_name()) + assert len(bfs) == 12 + + muls = sfg.find_by_type_name(ConstantMultiplication.type_name()) + assert len(muls) == 5 + + # simulate when the input signal is an impulse at time 0 + input_samples = [Impulse(), 0, 0, 0, 0, 0, 0, 0] + sim = Simulation(sfg, input_samples) + sim.run_for(1) + + # ensure that the result is a constant 1 + res = sim.results + for i in range(8): + assert np.allclose(res[str(i)], 1) + + +def test_radix_2_dif_fft_8_points_sinus_input(): + POINTS = 8 + sfg = radix_2_dif_fft(points=POINTS) + + assert len(sfg.inputs) == POINTS + assert len(sfg.outputs) == POINTS + + n = np.linspace(0, 2 * np.pi, POINTS) + waveform = np.sin(n) + input_samples = [Constant(waveform[i]) for i in range(POINTS)] + + sim = Simulation(sfg, input_samples) + sim.run_for(1) + + exp_res = abs(np.fft.fft(waveform)) + + res = sim.results + for i in range(POINTS): + a = abs(res[str(i)]) + b = exp_res[i] + assert np.isclose(a, b) + + +def test_radix_2_dif_fft_16_points_sinus_input(): + POINTS = 16 + sfg = radix_2_dif_fft(points=POINTS) + + assert len(sfg.inputs) == POINTS + assert len(sfg.outputs) == POINTS + + bfs = sfg.find_by_type_name(Butterfly.type_name()) + assert len(bfs) == 8 * 4 + + muls = sfg.find_by_type_name(ConstantMultiplication.type_name()) + assert len(muls) == 17 + + n = np.linspace(0, 2 * np.pi, POINTS) + waveform = np.sin(n) + input_samples = [Constant(waveform[i]) for i in range(POINTS)] + + sim = Simulation(sfg, input_samples) + sim.run_for(1) + + exp_res = np.fft.fft(waveform) + res = sim.results + for i in range(POINTS): + a = res[str(i)] + b = exp_res[i] + assert np.isclose(a, b) + + +def test_radix_2_dif_fft_256_points_sinus_input(): + POINTS = 256 + sfg = radix_2_dif_fft(points=POINTS) + + assert len(sfg.inputs) == POINTS + assert len(sfg.outputs) == POINTS + + n = np.linspace(0, 2 * np.pi, POINTS) + waveform = np.sin(n) + input_samples = [Constant(waveform[i]) for i in range(POINTS)] + + sim = Simulation(sfg, input_samples) + sim.run_for(1) + + exp_res = np.fft.fft(waveform) + res = sim.results + for i in range(POINTS): + a = res[str(i)] + b = exp_res[i] + assert np.isclose(a, b) + + +def test_radix_2_dif_fft_512_points_sinus_input_half_frequency(): + POINTS = 512 + sfg = radix_2_dif_fft(points=POINTS) + + assert len(sfg.inputs) == POINTS + assert len(sfg.outputs) == POINTS + + n = np.linspace(0, 2 * np.pi, POINTS) + waveform = np.sin(0.5 * n) + input_samples = [Constant(waveform[i]) for i in range(POINTS)] + + sim = Simulation(sfg, input_samples) + sim.run_for(1) + + exp_res = np.fft.fft(waveform) + res = sim.results + for i in range(POINTS): + a = res[str(i)] + b = exp_res[i] + assert np.isclose(a, b) + + +def test_radix_2_dif_fft_negative_number_of_points(): + POINTS = -8 + with pytest.raises(ValueError, match="Points must be positive number."): + radix_2_dif_fft(points=POINTS) + + +def test_radix_2_dif_fft_number_of_points_not_power_of_2(): + POINTS = 5 + with pytest.raises(ValueError, match="Points must be a power of two."): + radix_2_dif_fft(points=POINTS) -- GitLab From f0a7458ad9cf9fcdfca7f84a804f1fc2c75f4131 Mon Sep 17 00:00:00 2001 From: Simon Bjurek <simbj106@student.liu.se> Date: Fri, 7 Feb 2025 09:40:54 +0100 Subject: [PATCH 3/3] Removed test that took too long for pipeline and added docstring for the fft function. --- b_asic/sfg_generators.py | 12 ++++++++++++ test/test_sfg_generators.py | 22 ---------------------- 2 files changed, 12 insertions(+), 22 deletions(-) diff --git a/b_asic/sfg_generators.py b/b_asic/sfg_generators.py index e431ef97..b71a35ed 100644 --- a/b_asic/sfg_generators.py +++ b/b_asic/sfg_generators.py @@ -378,6 +378,18 @@ def direct_form_2_iir( def radix_2_dif_fft(points: int) -> SFG: + """Generates a radix-2 decimation-in-frequency FFT structure. + + Parameters + ---------- + points : int + Number of points for the FFT, needs to be a positive power of 2. + + Returns + ------- + SFG + Signal Flow Graph + """ if points < 0: raise ValueError("Points must be positive number.") if points & (points - 1) != 0: diff --git a/test/test_sfg_generators.py b/test/test_sfg_generators.py index 6f2eaf2e..c199fb63 100644 --- a/test/test_sfg_generators.py +++ b/test/test_sfg_generators.py @@ -358,28 +358,6 @@ def test_radix_2_dif_fft_256_points_sinus_input(): assert np.isclose(a, b) -def test_radix_2_dif_fft_512_points_sinus_input_half_frequency(): - POINTS = 512 - sfg = radix_2_dif_fft(points=POINTS) - - assert len(sfg.inputs) == POINTS - assert len(sfg.outputs) == POINTS - - n = np.linspace(0, 2 * np.pi, POINTS) - waveform = np.sin(0.5 * n) - input_samples = [Constant(waveform[i]) for i in range(POINTS)] - - sim = Simulation(sfg, input_samples) - sim.run_for(1) - - exp_res = np.fft.fft(waveform) - res = sim.results - for i in range(POINTS): - a = res[str(i)] - b = exp_res[i] - assert np.isclose(a, b) - - def test_radix_2_dif_fft_negative_number_of_points(): POINTS = -8 with pytest.raises(ValueError, match="Points must be positive number."): -- GitLab