Newer
Older
"""
B-ASIC signal flow graph generators.
This module contains a number of functions generating SFGs for specific functions.
"""
from typing import TYPE_CHECKING, Dict, Optional, Sequence, Union
import numpy as np
from b_asic.core_operations import (
SymmetricTwoportAdaptor,
)
from b_asic.signal import Signal
from b_asic.signal_flow_graph import SFG
from b_asic.special_operations import Delay, Input, Output
if TYPE_CHECKING:
from b_asic.port import OutputPort
def wdf_allpass(
coefficients: Sequence[float],
name: Optional[str] = None,
latency: Optional[int] = None,
latency_offsets: Optional[Dict[str, int]] = None,
execution_time: Optional[int] = None,
) -> SFG:
"""
Generate a signal flow graph of a WDF allpass section based on symmetric two-port\
adaptors.
Simplifies the SFG in case an adaptor operation is 0.
Parameters
----------
coefficients : 1D-array
Coefficients to use for the allpass section.
name : Name, optional
The name of the SFG. If None, "WDF allpass section".
latency : int, optional
Latency of the symmetric two-port adaptors.
latency_offsets : optional
Latency offsets of the symmetric two-port adaptors.
execution_time : int, optional
Execution time of the symmetric two-port adaptors.
Returns
-------
Signal flow graph
"""
np_coefficients = np.atleast_1d(np.squeeze(np.asarray(coefficients)))
order = len(np_coefficients)
if not order:
raise ValueError("Coefficients cannot be empty")
if np_coefficients.ndim != 1:
raise TypeError("coefficients must be a 1D-array")
if name is None:
name = "WDF allpass section"
input_op = Input()
output = Output()
if np_coefficients[0]:
# First-order section
adaptor0 = SymmetricTwoportAdaptor(
np_coefficients[0],
input_op,
latency=latency,
latency_offsets=latency_offsets,
execution_time=execution_time,
)
signal_out = Signal(adaptor0.output(0))
delay = Delay(adaptor0.output(1))
Signal(delay, adaptor0.input(1))
else:
signal_out = Delay(input_op)
else:
signal_out = Signal(input_op)
# Second-order sections
sos_count = (order - 1) // 2 if odd_order else order // 2
offset1, offset2 = (1, 2) if odd_order else (0, 1)
for n in range(sos_count):
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
if np_coefficients[2 * n + offset1]:
adaptor1 = SymmetricTwoportAdaptor(
np_coefficients[2 * n + offset1],
signal_out,
latency=latency,
latency_offsets=latency_offsets,
execution_time=execution_time,
)
# Signal(prev_adaptor., adaptor1.input(0), name="Previous-stage to next")
delay1 = Delay(adaptor1.output(1))
else:
delay1 = Delay(signal_out)
if np_coefficients[2 * n + offset2]:
delay2 = Delay()
adaptor2 = SymmetricTwoportAdaptor(
np_coefficients[2 * n + offset2],
delay1,
delay2,
latency=latency,
latency_offsets=latency_offsets,
execution_time=execution_time,
)
Signal(adaptor2.output(0), adaptor1.input(1))
Signal(adaptor2.output(1), delay2)
signal_out = Signal(adaptor1.output(0))
else:
delay2 = Delay(delay1)
if np_coefficients[2 * n + offset1]:
Signal(delay2, adaptor1.input(1))
signal_out = Signal(adaptor1.output(0))
else:
signal_out = Signal(delay2)
output <<= signal_out
return SFG([input_op], [output], name=Name(name))
def direct_form_fir(
coefficients: Sequence[complex],
name: Optional[str] = None,
mult_properties: Optional[Union[Dict[str, int], Dict[str, Dict[str, int]]]] = None,
add_properties: Optional[Union[Dict[str, int], Dict[str, Dict[str, int]]]] = None,
Generate a signal flow graph of a direct form FIR filter.
The *coefficients* parameter is a sequence of impulse response values::
coefficients = [h0, h1, h2, ..., hN]
Leading to the transfer function:
.. math:: \sum_{i=0}^N h_iz^{-i}
Parameters
----------
coefficients : 1D-array
Coefficients to use for the FIR filter section.
The name of the SFG. If None, "Direct-form FIR filter".
mult_properties : dictionary, optional
Properties passed to :class:`~b_asic.core_operations.ConstantMultiplication`.
add_properties : dictionary, optional
Properties passed to :class:`~b_asic.core_operations.Addition`.
Returns
-------
np_coefficients = np.atleast_1d(np.squeeze(np.asarray(coefficients)))
taps = len(np_coefficients)
if not taps:
raise ValueError("Coefficients cannot be empty")
if np_coefficients.ndim != 1:
raise TypeError("coefficients must be a 1D-array")
if name is None:
name = "Direct-form FIR filter"
if mult_properties is None:
mult_properties = {}
if add_properties is None:
add_properties = {}
input_op = Input()
output = Output()
for i, coefficient in enumerate(np_coefficients):
tmp_mul = ConstantMultiplication(coefficient, prev_delay, **mult_properties)
prev_add = (
tmp_mul
if prev_add is None
else Addition(tmp_mul, prev_add, **add_properties)
)
if i < taps - 1:
prev_delay = Delay(prev_delay)
output <<= prev_add
return SFG([input_op], [output], name=Name(name))
def transposed_direct_form_fir(
coefficients: Sequence[complex],
name: Optional[str] = None,
mult_properties: Optional[Union[Dict[str, int], Dict[str, Dict[str, int]]]] = None,
add_properties: Optional[Union[Dict[str, int], Dict[str, Dict[str, int]]]] = None,
Generate a signal flow graph of a transposed direct form FIR filter.
The *coefficients* parameter is a sequence of impulse response values::
coefficients = [h0, h1, h2, ..., hN]
Leading to the transfer function:
.. math:: \sum_{i=0}^N h_iz^{-i}
Parameters
----------
coefficients : 1D-array
Coefficients to use for the FIR filter section.
The name of the SFG. If None, "Transposed direct-form FIR filter".
mult_properties : dictionary, optional
Properties passed to :class:`~b_asic.core_operations.ConstantMultiplication`.
add_properties : dictionary, optional
Properties passed to :class:`~b_asic.core_operations.Addition`.
Returns
-------
np_coefficients = np.atleast_1d(np.squeeze(np.asarray(coefficients)))
taps = len(np_coefficients)
if not taps:
raise ValueError("Coefficients cannot be empty")
if np_coefficients.ndim != 1:
raise TypeError("coefficients must be a 1D-array")
if name is None:
name = "Transposed direct-form FIR filter"
if mult_properties is None:
mult_properties = {}
if add_properties is None:
add_properties = {}
input_op = Input()
output = Output()
for i, coefficient in enumerate(reversed(np_coefficients)):
tmp_mul = ConstantMultiplication(coefficient, input_op, **mult_properties)
tmp_add = (
tmp_mul
if prev_delay is None
else Addition(tmp_mul, prev_delay, **add_properties)
)
if i < taps - 1:
prev_delay = Delay(tmp_add)
output <<= tmp_add
return SFG([input_op], [output], name=Name(name))
def direct_form_1_iir(
b: Sequence[complex],
a: Sequence[complex],
name: Optional[str] = None,
mult_properties: Optional[Union[Dict[str, int], Dict[str, Dict[str, int]]]] = None,
add_properties: Optional[Union[Dict[str, int], Dict[str, Dict[str, int]]]] = None,
) -> SFG:
"""Generates a direct-form IIR filter of type I with coefficients a and b."""
if len(a) < 2 or len(b) < 2:
raise ValueError(
"Size of coefficient lists a and b needs to contain at least 2 element."
)
raise ValueError("Size of coefficient lists a and b are not the same.")
if a[0] != 1:
raise ValueError("The value of a[0] must be 1.")
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
if name is None:
name = "Direct-form I IIR filter"
if mult_properties is None:
mult_properties = {}
if add_properties is None:
add_properties = {}
# construct the feed-forward part
input_op = Input()
muls = [ConstantMultiplication(b[0], input_op, **mult_properties)]
delays = []
prev_delay = input_op
for i, coeff in enumerate(b[1:]):
prev_delay = Delay(prev_delay)
delays.append(prev_delay)
if i < len(b) - 1:
muls.append(ConstantMultiplication(coeff, prev_delay, **mult_properties))
op_a = muls[-1]
for i in range(len(muls) - 1):
op_a = Addition(op_a, muls[-i - 2], **add_properties)
# construct the feedback part
tmp_add = Addition(op_a, None, **add_properties)
muls = []
output = Output()
output <<= tmp_add
delays = []
prev_delay = tmp_add
for i, coeff in enumerate(a[1:]):
prev_delay = Delay(prev_delay)
delays.append(prev_delay)
if i < len(a) - 1:
muls.append(ConstantMultiplication(-coeff, prev_delay, **mult_properties))
op_a = muls[-1]
for i in range(len(muls) - 1):
op_a = Addition(op_a, muls[-i - 2], **add_properties)
tmp_add.input(1).connect(op_a)
return SFG([input_op], [output], name=Name(name))
def direct_form_2_iir(
b: Sequence[complex],
a: Sequence[complex],
name: Optional[str] = None,
mult_properties: Optional[Union[Dict[str, int], Dict[str, Dict[str, int]]]] = None,
add_properties: Optional[Union[Dict[str, int], Dict[str, Dict[str, int]]]] = None,
) -> SFG:
"""Generates a direct-form IIR filter of type II with coefficients a and b."""
if len(a) < 2 or len(b) < 2:
raise ValueError(
"Size of coefficient lists a and b needs to contain at least 2 element."
)
raise ValueError("Size of coefficient lists a and b are not the same.")
if a[0] != 1:
raise ValueError("The value of a[0] must be 1.")
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
if name is None:
name = "Direct-form II IIR filter"
if mult_properties is None:
mult_properties = {}
if add_properties is None:
add_properties = {}
# construct the repeated part of the SFG
left_adds = []
right_adds = []
left_muls = []
right_muls = []
delays = [Delay()]
op_a_left = None
op_a_right = None
for i in range(len(a) - 1):
a_coeff = a[-i - 1]
b_coeff = b[-i - 1]
if len(left_muls) != 0: # not first iteration
new_delay = Delay()
delays[-1] <<= new_delay
delays.append(new_delay)
left_muls.append(
ConstantMultiplication(-a_coeff, delays[-1], **mult_properties)
)
right_muls.append(
ConstantMultiplication(b_coeff, delays[-1], **mult_properties)
)
if len(left_muls) > 1: # not first iteration
left_adds.append(Addition(op_a_left, left_muls[-1], **add_properties))
right_adds.append(Addition(op_a_right, right_muls[-1], **add_properties))
op_a_left = left_adds[-1]
op_a_right = right_adds[-1]
else:
op_a_left = left_muls[-1]
op_a_right = right_muls[-1]
# finalize the SFG
input_op = Input()
if left_adds:
left_adds.append(Addition(input_op, left_adds[-1], **add_properties))
else:
left_adds.append(Addition(input_op, left_muls[-1], **add_properties))
delays[-1] <<= left_adds[-1]
mul = ConstantMultiplication(b[0], left_adds[-1], **mult_properties)
if right_adds:
add = Addition(mul, right_adds[-1], **add_properties)
else:
add = Addition(mul, right_muls[-1], **add_properties)
output = Output()
output <<= add
return SFG([input_op], [output], name=Name(name))
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
def radix_2_dif_fft(points: int) -> SFG:
"""Generates a radix-2 decimation-in-frequency FFT structure.
Parameters
----------
points : int
Number of points for the FFT, needs to be a positive power of 2.
Returns
-------
SFG
Signal Flow Graph
"""
if points < 0:
raise ValueError("Points must be positive number.")
if points & (points - 1) != 0:
raise ValueError("Points must be a power of two.")
inputs = []
for i in range(points):
inputs.append(Input(name=f"Input: {i}"))
ports = inputs
number_of_stages = int(np.log2(points))
twiddles = _generate_twiddles(points, number_of_stages)
for stage in range(number_of_stages):
ports = _construct_dif_fft_stage(
ports, number_of_stages, stage, twiddles[stage]
)
ports = _get_bit_reversed_ports(ports)
outputs = []
for i, port in enumerate(ports):
outputs.append(Output(port, name=f"Output: {i}"))
return SFG(inputs=inputs, outputs=outputs)
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
def ldlt_matrix_inverse(N: int) -> SFG:
"""Generates an SFG for the LDLT matrix inverse algorithm.
Parameters
----------
N : int
Dimension of the square input matrix.
Returns
-------
SFG
Signal Flow Graph
"""
inputs = []
A = [[None for _ in range(N)] for _ in range(N)]
for i in range(N):
for j in range(i, N):
in_op = Input()
A[i][j] = in_op
inputs.append(in_op)
D = [None for _ in range(N)]
for i in range(N):
D[i] = A[i][i]
D_inv = [None for _ in range(N)]
R = [[None for _ in range(N)] for _ in range(N)]
M = [[None for _ in range(N)] for _ in range(N)]
# R*di*R^T factorization
for i in range(N):
for k in range(i):
D[i] = MADS(False, False, D[i], M[k][i], R[k][i])
D_inv[i] = Reciprocal(D[i])
for j in range(i + 1, N):
R[i][j] = A[i][j]
for k in range(i):
R[i][j] = MADS(False, False, R[i][j], M[k][i], R[k][j])
# if is_complex:
# M[i][j] = ComplexConjugate(R[i][j])
# else:
M[i][j] = R[i][j]
R[i][j] = MADS(True, True, DontCare(), R[i][j], D_inv[i])
# back substitution
A_inv = [[None for _ in range(N)] for _ in range(N)]
for i in reversed(range(N)):
A_inv[i][i] = D_inv[i]
for j in reversed(range(i + 1)):
for k in reversed(range(j + 1, N)):
if k == N - 1 and i != j:
A_inv[j][i] = MADS(False, True, DontCare(), R[j][k], A_inv[i][k])
else:
if A_inv[i][k]:
A_inv[j][i] = MADS(
False, False, A_inv[j][i], R[j][k], A_inv[i][k]
)
else:
A_inv[j][i] = MADS(
False, False, A_inv[j][i], R[j][k], A_inv[k][i]
)
outputs = []
for i in range(N):
for j in range(i, N):
outputs.append(Output(A_inv[i][j]))
return SFG(inputs, outputs)
def _construct_dif_fft_stage(
ports_from_previous_stage: list["OutputPort"],
number_of_stages: int,
stage: int,
twiddles: list[np.complex128],
):
ports = ports_from_previous_stage.copy()
number_of_butterflies = len(ports) // 2
number_of_groups = 2**stage
group_size = number_of_butterflies // number_of_groups
for group_index in range(number_of_groups):
for bf_index in range(group_size):
input1_index = group_index * 2 * group_size + bf_index
input2_index = input1_index + group_size
input1 = ports[input1_index]
input2 = ports[input2_index]
butterfly = Butterfly(input1, input2)
output1, output2 = butterfly.outputs
twiddle_factor = twiddles[bf_index]
if twiddle_factor != 1:
twiddle_mul = ConstantMultiplication(twiddles[bf_index], output2)
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
output2 = twiddle_mul.output(0)
ports[input1_index] = output1
ports[input2_index] = output2
return ports
def _get_bit_reversed_number(number: int, number_of_bits: int) -> int:
reversed_number = 0
for i in range(number_of_bits):
# mask out the current bit
shift_num = number
current_bit = (shift_num >> i) & 1
# compute the position of the current bit in the reversed string
reversed_pos = number_of_bits - 1 - i
# place the current bit in that position
reversed_number |= current_bit << reversed_pos
return reversed_number
def _get_bit_reversed_ports(ports: list["OutputPort"]) -> list["OutputPort"]:
num_of_ports = len(ports)
bits = int(np.log2(num_of_ports))
return [ports[_get_bit_reversed_number(i, bits)] for i in range(num_of_ports)]
def _generate_twiddles(points: int, number_of_stages: int) -> list[np.complex128]:
twiddles = []
for stage in range(1, number_of_stages + 1):
stage_twiddles = []
for k in range(points // 2 ** (stage)):
a = 2 ** (stage - 1)
twiddle = np.exp(-1j * 2 * np.pi * a * k / points)
stage_twiddles.append(twiddle)
twiddles.append(stage_twiddles)
return twiddles