From d4d6f12aeca6448888da5e82f11c55801e6efb89 Mon Sep 17 00:00:00 2001 From: Oscar Gustafsson <oscar.gustafsson@gmail.com> Date: Wed, 15 Feb 2023 06:41:13 +0100 Subject: [PATCH] Add FIR generators --- b_asic/sfg_generator.py | 111 ------------ b_asic/sfg_generators.py | 278 +++++++++++++++++++++++++++++ docs_sphinx/api/index.rst | 2 +- docs_sphinx/api/sfg_generator.rst | 6 - docs_sphinx/api/sfg_generators.rst | 6 + test/test_sfg_generator.py | 28 --- test/test_sfg_generators.py | 79 ++++++++ 7 files changed, 364 insertions(+), 146 deletions(-) delete mode 100644 b_asic/sfg_generator.py create mode 100644 b_asic/sfg_generators.py delete mode 100644 docs_sphinx/api/sfg_generator.rst create mode 100644 docs_sphinx/api/sfg_generators.rst delete mode 100644 test/test_sfg_generator.py create mode 100644 test/test_sfg_generators.py diff --git a/b_asic/sfg_generator.py b/b_asic/sfg_generator.py deleted file mode 100644 index c9440460..00000000 --- a/b_asic/sfg_generator.py +++ /dev/null @@ -1,111 +0,0 @@ -""" -B-ASIC signal flow graph generators. - -This module contains a number of functions generating SFGs for specific functions. -""" -from typing import Dict, Optional, Sequence, Union - -import numpy as np - -from b_asic.core_operations import Name, SymmetricTwoportAdaptor -from b_asic.port import InputPort, OutputPort -from b_asic.signal import Signal -from b_asic.signal_flow_graph import SFG -from b_asic.special_operations import Delay, Input, Output - - -def wdf_allpass( - coefficients: Sequence[float], - input_op: Optional[Union[Input, Signal, InputPort]] = None, - output: Optional[Union[Output, Signal, OutputPort]] = None, - name: Optional[str] = None, - latency: Optional[int] = None, - latency_offsets: Optional[Dict[str, int]] = None, - execution_time: Optional[int] = None, -) -> SFG: - """ - Generate a signal flow graph of a WDF allpass section based on symmetric two-port - adaptors. - - Parameters - ---------- - coefficients : 1D-array - Coefficients to use for the allpass section - - input_op : Input, optional - The Input to connect the SFG to. If not provided, one will be generated. - - output : Output, optional - The Output to connect the SFG to. If not provided, one will be generated. - - name : Name, optional - The name of the SFG. If None, "WDF allpass section". - - latency : int, optional - Latency of the symmetric two-port adaptors. - - latency_offsets : optional - Latency offsets of the symmetric two-port adaptors. - - execution_time : int, optional - Execution time of the symmetric two-port adaptors. - - Returns - ------- - Signal flow graph - """ - np_coefficients = np.squeeze(np.asarray(coefficients)) - if np_coefficients.ndim != 1: - raise TypeError("coefficients must be a 1D-array") - if input_op is None: - input_op = Input() - if output is None: - output = Output() - if name is None: - name = "WDF allpass section" - order = len(np_coefficients) - odd_order = order % 2 - if odd_order: - # First-order section - coeff = np_coefficients[0] - adaptor0 = SymmetricTwoportAdaptor( - coeff, - input_op, - latency=latency, - latency_offsets=latency_offsets, - execution_time=execution_time, - ) - signal_out = Signal(adaptor0.output(0)) - delay = Delay(adaptor0.output(1)) - Signal(delay, adaptor0.input(1)) - else: - signal_out = Signal(input_op) - - # Second-order sections - sos_count = (order - 1) // 2 if odd_order else order // 2 - offset1, offset2 = (1, 2) if odd_order else (0, 1) - for n in range(sos_count): - adaptor1 = SymmetricTwoportAdaptor( - np_coefficients[2 * n + offset1], - signal_out, - latency=latency, - latency_offsets=latency_offsets, - execution_time=execution_time, - ) - # Signal(prev_adaptor., adaptor1.input(0), name="Previous-stage to next") - delay1 = Delay(adaptor1.output(1)) - delay2 = Delay() - adaptor2 = SymmetricTwoportAdaptor( - np_coefficients[2 * n + offset2], - delay1, - delay2, - latency=latency, - latency_offsets=latency_offsets, - execution_time=execution_time, - ) - Signal(adaptor2.output(1), adaptor1.input(1)) - Signal(adaptor2.output(0), delay2) - signal_out = Signal(adaptor1.output(0)) - - output << signal_out - return SFG([input_op], [output], name=Name(name)) diff --git a/b_asic/sfg_generators.py b/b_asic/sfg_generators.py new file mode 100644 index 00000000..48edf226 --- /dev/null +++ b/b_asic/sfg_generators.py @@ -0,0 +1,278 @@ +""" +B-ASIC signal flow graph generators. + +This module contains a number of functions generating SFGs for specific functions. +""" +from typing import Dict, Optional, Sequence, Union + +import numpy as np + +from b_asic.core_operations import ( + Addition, + ConstantMultiplication, + Name, + SymmetricTwoportAdaptor, +) +from b_asic.port import InputPort, OutputPort +from b_asic.signal import Signal +from b_asic.signal_flow_graph import SFG +from b_asic.special_operations import Delay, Input, Output + + +def wdf_allpass( + coefficients: Sequence[float], + input_op: Optional[Union[Input, Signal, InputPort]] = None, + output: Optional[Union[Output, Signal, OutputPort]] = None, + name: Optional[str] = None, + latency: Optional[int] = None, + latency_offsets: Optional[Dict[str, int]] = None, + execution_time: Optional[int] = None, +) -> SFG: + """ + Generate a signal flow graph of a WDF allpass section based on symmetric two-port + adaptors. + + Parameters + ---------- + coefficients : 1D-array + Coefficients to use for the allpass section + + input_op : Input, optional + The Input to connect the SFG to. If not provided, one will be generated. + + output : Output, optional + The Output to connect the SFG to. If not provided, one will be generated. + + name : Name, optional + The name of the SFG. If None, "WDF allpass section". + + latency : int, optional + Latency of the symmetric two-port adaptors. + + latency_offsets : optional + Latency offsets of the symmetric two-port adaptors. + + execution_time : int, optional + Execution time of the symmetric two-port adaptors. + + Returns + ------- + Signal flow graph + """ + np_coefficients = np.squeeze(np.asarray(coefficients)) + if np_coefficients.ndim != 1: + raise TypeError("coefficients must be a 1D-array") + if input_op is None: + input_op = Input() + if output is None: + output = Output() + if name is None: + name = "WDF allpass section" + order = len(np_coefficients) + odd_order = order % 2 + if odd_order: + # First-order section + coeff = np_coefficients[0] + adaptor0 = SymmetricTwoportAdaptor( + coeff, + input_op, + latency=latency, + latency_offsets=latency_offsets, + execution_time=execution_time, + ) + signal_out = Signal(adaptor0.output(0)) + delay = Delay(adaptor0.output(1)) + Signal(delay, adaptor0.input(1)) + else: + signal_out = Signal(input_op) + + # Second-order sections + sos_count = (order - 1) // 2 if odd_order else order // 2 + offset1, offset2 = (1, 2) if odd_order else (0, 1) + for n in range(sos_count): + adaptor1 = SymmetricTwoportAdaptor( + np_coefficients[2 * n + offset1], + signal_out, + latency=latency, + latency_offsets=latency_offsets, + execution_time=execution_time, + ) + # Signal(prev_adaptor., adaptor1.input(0), name="Previous-stage to next") + delay1 = Delay(adaptor1.output(1)) + delay2 = Delay() + adaptor2 = SymmetricTwoportAdaptor( + np_coefficients[2 * n + offset2], + delay1, + delay2, + latency=latency, + latency_offsets=latency_offsets, + execution_time=execution_time, + ) + Signal(adaptor2.output(1), adaptor1.input(1)) + Signal(adaptor2.output(0), delay2) + signal_out = Signal(adaptor1.output(0)) + + output << signal_out + return SFG([input_op], [output], name=Name(name)) + + +def direct_form_fir( + coefficients: Sequence[complex], + input_op: Optional[Union[Input, Signal, InputPort]] = None, + output: Optional[Union[Output, Signal, OutputPort]] = None, + name: Optional[str] = None, + mult_properties: Optional[ + Union[Dict[str, int], Dict[str, Dict[str, int]]] + ] = None, + add_properties: Optional[ + Union[Dict[str, int], Dict[str, Dict[str, int]]] + ] = None, +): + r""" + Generate a signal flow graph of a direct form FIR filter. The *coefficients* parameter is a + sequence of impulse response values:: + + coefficients = [h0, h1, h2, ..., hN] + + Leading to the transfer function: + + .. math:: \sum_{i=0}^N h_iz^{-i} + + Parameters + ---------- + coefficients : 1D-array + Coefficients to use for the FIR filter section + + input_op : Input, optional + The Input to connect the SFG to. If not provided, one will be generated. + + output : Output, optional + The Output to connect the SFG to. If not provided, one will be generated. + + name : Name, optional + The name of the SFG. If None, "WDF allpass section". + + mult_properties : dictionary, optional + Properties passed to :class:`~b_asic.core_operations.ConstantMultiplication`. + + add_properties : dictionary, optional + Properties passed to :class:`~b_asic.core_operations.Addition`. + + Returns + ------- + Signal flow graph + + See also + -------- + transposed_direct_form_fir + """ + np_coefficients = np.squeeze(np.asarray(coefficients)) + if np_coefficients.ndim != 1: + raise TypeError("coefficients must be a 1D-array") + if input_op is None: + input_op = Input() + if output is None: + output = Output() + if name is None: + name = "Direct-form FIR filter" + if mult_properties is None: + mult_properties = {} + if add_properties is None: + add_properties = {} + + taps = len(np_coefficients) + prev_delay = input_op + prev_add = None + for i, coeff in enumerate(np_coefficients): + tmp_mul = ConstantMultiplication(coeff, prev_delay, **mult_properties) + if prev_add is None: + prev_add = tmp_mul + else: + prev_add = Addition(tmp_mul, prev_add, **add_properties) + if i < taps - 1: + prev_delay = Delay(prev_delay) + + output << prev_add + + return SFG([input_op], [output], name=Name(name)) + + +def transposed_direct_form_fir( + coefficients: Sequence[complex], + input_op: Optional[Union[Input, Signal, InputPort]] = None, + output: Optional[Union[Output, Signal, OutputPort]] = None, + name: Optional[str] = None, + mult_properties: Optional[ + Union[Dict[str, int], Dict[str, Dict[str, int]]] + ] = None, + add_properties: Optional[ + Union[Dict[str, int], Dict[str, Dict[str, int]]] + ] = None, +): + r""" + Generate a signal flow graph of a transposed direct form FIR filter. The *coefficients* parameter is a + sequence of impulse response values:: + + coefficients = [h0, h1, h2, ..., hN] + + Leading to the transfer function: + + .. math:: \sum_{i=0}^N h_iz^{-i} + + Parameters + ---------- + coefficients : 1D-array + Coefficients to use for the FIR filter section + + input_op : Input, optional + The Input to connect the SFG to. If not provided, one will be generated. + + output : Output, optional + The Output to connect the SFG to. If not provided, one will be generated. + + name : Name, optional + The name of the SFG. If None, "WDF allpass section". + + mult_properties : dictionary, optional + Properties passed to :class:`~b_asic.core_operations.ConstantMultiplication`. + + add_properties : dictionary, optional + Properties passed to :class:`~b_asic.core_operations.Addition`. + + Returns + ------- + Signal flow graph + + See also + -------- + direct_form_fir + """ + np_coefficients = np.squeeze(np.asarray(coefficients)) + if np_coefficients.ndim != 1: + raise TypeError("coefficients must be a 1D-array") + if input_op is None: + input_op = Input() + if output is None: + output = Output() + if name is None: + name = "Transposed direct-form FIR filter" + if mult_properties is None: + mult_properties = {} + if add_properties is None: + add_properties = {} + + taps = len(np_coefficients) + prev_delay = None + prev_add = None + for i, coeff in enumerate(reversed(np_coefficients)): + tmp_mul = ConstantMultiplication(coeff, input_op, **mult_properties) + if prev_delay is None: + tmp_add = tmp_mul + else: + tmp_add = Addition(tmp_mul, prev_delay, **add_properties) + if i < taps - 1: + prev_delay = Delay(tmp_add) + + output << tmp_add + + return SFG([input_op], [output], name=Name(name)) diff --git a/docs_sphinx/api/index.rst b/docs_sphinx/api/index.rst index a36b3efc..8ad0965a 100644 --- a/docs_sphinx/api/index.rst +++ b/docs_sphinx/api/index.rst @@ -11,7 +11,7 @@ API port.rst process.rst schedule.rst - sfg_generator.rst + sfg_generators.rst signal.rst signal_flow_graph.rst signal_generator.rst diff --git a/docs_sphinx/api/sfg_generator.rst b/docs_sphinx/api/sfg_generator.rst deleted file mode 100644 index 44a2d049..00000000 --- a/docs_sphinx/api/sfg_generator.rst +++ /dev/null @@ -1,6 +0,0 @@ -************************ -``b_asic.sfg_generator`` -************************ - -.. automodule:: b_asic.sfg_generator - :members: diff --git a/docs_sphinx/api/sfg_generators.rst b/docs_sphinx/api/sfg_generators.rst new file mode 100644 index 00000000..f162a619 --- /dev/null +++ b/docs_sphinx/api/sfg_generators.rst @@ -0,0 +1,6 @@ +************************* +``b_asic.sfg_generators`` +************************* + +.. automodule:: b_asic.sfg_generators + :members: diff --git a/test/test_sfg_generator.py b/test/test_sfg_generator.py deleted file mode 100644 index 523d1c0f..00000000 --- a/test/test_sfg_generator.py +++ /dev/null @@ -1,28 +0,0 @@ -from b_asic.core_operations import SymmetricTwoportAdaptor -from b_asic.sfg_generator import wdf_allpass - - -def test_wdf_allpass(): - sfg = wdf_allpass([0.3, 0.5, 0.7]) - assert ( - len( - [ - comp - for comp in sfg.components - if isinstance(comp, SymmetricTwoportAdaptor) - ] - ) - == 3 - ) - - sfg = wdf_allpass([0.3, 0.5, 0.7, 0.9]) - assert ( - len( - [ - comp - for comp in sfg.components - if isinstance(comp, SymmetricTwoportAdaptor) - ] - ) - == 4 - ) diff --git a/test/test_sfg_generators.py b/test/test_sfg_generators.py new file mode 100644 index 00000000..699ba682 --- /dev/null +++ b/test/test_sfg_generators.py @@ -0,0 +1,79 @@ +from b_asic.core_operations import ( + Addition, + ConstantMultiplication, + SymmetricTwoportAdaptor, +) +from b_asic.sfg_generators import ( + direct_form_fir, + transposed_direct_form_fir, + wdf_allpass, +) +from b_asic.special_operations import Delay + + +def test_wdf_allpass(): + sfg = wdf_allpass([0.3, 0.5, 0.7]) + assert ( + len( + [ + comp + for comp in sfg.components + if isinstance(comp, SymmetricTwoportAdaptor) + ] + ) + == 3 + ) + + sfg = wdf_allpass([0.3, 0.5, 0.7, 0.9]) + assert ( + len( + [ + comp + for comp in sfg.components + if isinstance(comp, SymmetricTwoportAdaptor) + ] + ) + == 4 + ) + + +def test_direct_form_fir(): + sfg = direct_form_fir([0.3, 0.5, 0.7]) + assert ( + len( + [ + comp + for comp in sfg.components + if isinstance(comp, ConstantMultiplication) + ] + ) + == 3 + ) + assert ( + len([comp for comp in sfg.components if isinstance(comp, Addition)]) + == 2 + ) + assert ( + len([comp for comp in sfg.components if isinstance(comp, Delay)]) == 2 + ) + + +def test_transposed_direct_form_fir(): + sfg = transposed_direct_form_fir([0.3, 0.5, 0.7]) + assert ( + len( + [ + comp + for comp in sfg.components + if isinstance(comp, ConstantMultiplication) + ] + ) + == 3 + ) + assert ( + len([comp for comp in sfg.components if isinstance(comp, Addition)]) + == 2 + ) + assert ( + len([comp for comp in sfg.components if isinstance(comp, Delay)]) == 2 + ) -- GitLab