diff --git a/b_asic/sfg_generator.py b/b_asic/sfg_generator.py index c9440460acb45325a3624121f72a978d82eef8b8..fa6556204f1a7a306adcb54f086393e76f3aec6d 100644 --- a/b_asic/sfg_generator.py +++ b/b_asic/sfg_generator.py @@ -7,7 +7,12 @@ from typing import Dict, Optional, Sequence, Union import numpy as np -from b_asic.core_operations import Name, SymmetricTwoportAdaptor +from b_asic.core_operations import ( + Addition, + ConstantMultiplication, + Name, + SymmetricTwoportAdaptor, +) from b_asic.port import InputPort, OutputPort from b_asic.signal import Signal from b_asic.signal_flow_graph import SFG @@ -109,3 +114,165 @@ def wdf_allpass( output << signal_out return SFG([input_op], [output], name=Name(name)) + + +def direct_form_fir( + coefficients: Sequence[complex], + input_op: Optional[Union[Input, Signal, InputPort]] = None, + output: Optional[Union[Output, Signal, OutputPort]] = None, + name: Optional[str] = None, + mult_properties: Optional[ + Union[Dict[str, int], Dict[str, Dict[str, int]]] + ] = None, + add_properties: Optional[ + Union[Dict[str, int], Dict[str, Dict[str, int]]] + ] = None, +): + r""" + Generate a signal flow graph of a direct form FIR filter. The *coefficients* parameter is a + sequence of impulse response values:: + + coefficients = [h0, h1, h2, ..., hN] + + Leading to the transfer function: + + .. math:: \sum_{i=0}^N h_iz^{-i} + + Parameters + ---------- + coefficients : 1D-array + Coefficients to use for the FIR filter section + + input_op : Input, optional + The Input to connect the SFG to. If not provided, one will be generated. + + output : Output, optional + The Output to connect the SFG to. If not provided, one will be generated. + + name : Name, optional + The name of the SFG. If None, "WDF allpass section". + + mult_properties : dictionary, optional + Properties passed to :class:`~b_asic.core_operations.ConstantMultiplication`. + + add_properties : dictionary, optional + Properties passed to :class:`~b_asic.core_operations.Addition`. + + Returns + ------- + Signal flow graph + + See also + -------- + transposed_direct_form_fir + """ + np_coefficients = np.squeeze(np.asarray(coefficients)) + if np_coefficients.ndim != 1: + raise TypeError("coefficients must be a 1D-array") + if input_op is None: + input_op = Input() + if output is None: + output = Output() + if name is None: + name = "Direct-form FIR filter" + if mult_properties is None: + mult_properties = {} + if add_properties is None: + add_properties = {} + + taps = len(np_coefficients) + prev_delay = input_op + prev_add = None + for i, coeff in enumerate(np_coefficients): + tmp_mul = ConstantMultiplication(coeff, prev_delay, **mult_properties) + if prev_add is None: + prev_add = tmp_mul + else: + prev_add = Addition(tmp_mul, prev_add, **add_properties) + if i < taps - 1: + prev_delay = Delay(prev_delay) + + output << prev_add + + return SFG([input_op], [output], name=Name(name)) + + +def transposed_direct_form_fir( + coefficients: Sequence[complex], + input_op: Optional[Union[Input, Signal, InputPort]] = None, + output: Optional[Union[Output, Signal, OutputPort]] = None, + name: Optional[str] = None, + mult_properties: Optional[ + Union[Dict[str, int], Dict[str, Dict[str, int]]] + ] = None, + add_properties: Optional[ + Union[Dict[str, int], Dict[str, Dict[str, int]]] + ] = None, +): + """ + Generate a signal flow graph of a transposed direct form FIR filter. The *coefficients* parameter is a + sequence of impulse response values:: + + coefficients = [h0, h1, h2, ..., hN] + + Leading to the transfer function: + + .. math:: \sum_{i=0}^N h_iz^{-i} + + Parameters + ---------- + coefficients : 1D-array + Coefficients to use for the FIR filter section + + input_op : Input, optional + The Input to connect the SFG to. If not provided, one will be generated. + + output : Output, optional + The Output to connect the SFG to. If not provided, one will be generated. + + name : Name, optional + The name of the SFG. If None, "WDF allpass section". + + mult_properties : dictionary, optional + Properties passed to :class:`~b_asic.core_operations.ConstantMultiplication`. + + add_properties : dictionary, optional + Properties passed to :class:`~b_asic.core_operations.Addition`. + + Returns + ------- + Signal flow graph + + See also + -------- + direct_form_fir + """ + np_coefficients = np.squeeze(np.asarray(coefficients)) + if np_coefficients.ndim != 1: + raise TypeError("coefficients must be a 1D-array") + if input_op is None: + input_op = Input() + if output is None: + output = Output() + if name is None: + name = "Transposed direct-form FIR filter" + if mult_properties is None: + mult_properties = {} + if add_properties is None: + add_properties = {} + + taps = len(np_coefficients) + prev_delay = None + prev_add = None + for i, coeff in enumerate(reversed(np_coefficients)): + tmp_mul = ConstantMultiplication(coeff, input_op, **mult_properties) + if prev_delay is None: + tmp_add = tmp_mul + else: + tmp_add = Addition(tmp_mul, prev_delay, **add_properties) + if i < taps - 1: + prev_delay = Delay(tmp_add) + + output << tmp_add + + return SFG([input_op], [output], name=Name(name)) diff --git a/test/test_sfg_generator.py b/test/test_sfg_generator.py index 523d1c0fcac9084e4adc7ac8c208aa73280cc42d..1024a48e056778cafbee4a8dba1978219d524b77 100644 --- a/test/test_sfg_generator.py +++ b/test/test_sfg_generator.py @@ -1,5 +1,14 @@ -from b_asic.core_operations import SymmetricTwoportAdaptor -from b_asic.sfg_generator import wdf_allpass +from b_asic.core_operations import ( + Addition, + ConstantMultiplication, + SymmetricTwoportAdaptor, +) +from b_asic.sfg_generator import ( + direct_form_fir, + transposed_direct_form_fir, + wdf_allpass, +) +from b_asic.special_operations import Delay def test_wdf_allpass(): @@ -26,3 +35,45 @@ def test_wdf_allpass(): ) == 4 ) + + +def test_direct_form_fir(): + sfg = direct_form_fir([0.3, 0.5, 0.7]) + assert ( + len( + [ + comp + for comp in sfg.components + if isinstance(comp, ConstantMultiplication) + ] + ) + == 3 + ) + assert ( + len([comp for comp in sfg.components if isinstance(comp, Addition)]) + == 2 + ) + assert ( + len([comp for comp in sfg.components if isinstance(comp, Delay)]) == 2 + ) + + +def test_transposed_direct_form_fir(): + sfg = transposed_direct_form_fir([0.3, 0.5, 0.7]) + assert ( + len( + [ + comp + for comp in sfg.components + if isinstance(comp, ConstantMultiplication) + ] + ) + == 3 + ) + assert ( + len([comp for comp in sfg.components if isinstance(comp, Addition)]) + == 2 + ) + assert ( + len([comp for comp in sfg.components if isinstance(comp, Delay)]) == 2 + )