From 99f377419983466bf107009c13e33a09020250b9 Mon Sep 17 00:00:00 2001 From: Oscar Gustafsson <oscar.gustafsson@gmail.com> Date: Wed, 1 Feb 2023 13:00:23 +0100 Subject: [PATCH] Add WDF allpass generator --- b_asic/core_operations.py | 2 +- b_asic/operation.py | 6 +- b_asic/port.py | 3 + b_asic/sfg_generator.py | 111 ++++++++++++++++++++++++++++++ b_asic/signal_flow_graph.py | 12 +++- docs_sphinx/api/sfg_generator.rst | 6 ++ test/test_sfg_generator.py | 28 ++++++++ 7 files changed, 164 insertions(+), 4 deletions(-) create mode 100644 b_asic/sfg_generator.py create mode 100644 docs_sphinx/api/sfg_generator.rst create mode 100644 test/test_sfg_generator.py diff --git a/b_asic/core_operations.py b/b_asic/core_operations.py index 5b74b83e..55952427 100644 --- a/b_asic/core_operations.py +++ b/b_asic/core_operations.py @@ -707,7 +707,7 @@ class SymmetricTwoportAdaptor(AbstractOperation): latency_offsets: Optional[Dict[str, int]] = None, execution_time: Optional[int] = None, ): - """Construct a Butterfly operation.""" + """Construct a SymmetricTwoportAdaptor operation.""" super().__init__( input_count=2, output_count=2, diff --git a/b_asic/operation.py b/b_asic/operation.py index f43d7d52..0fe66840 100644 --- a/b_asic/operation.py +++ b/b_asic/operation.py @@ -468,7 +468,11 @@ class AbstractOperation(Operation, AbstractGraphComponent): ) for i, src in enumerate(input_sources): if src is not None: - self._input_ports[i].connect(src.source) + if isinstance(src, Signal): + # Already existing signal + src.set_destination(self._input_ports[i]) + else: + self._input_ports[i].connect(src.source) # Set specific latency_offsets if latency_offsets is not None: diff --git a/b_asic/port.py b/b_asic/port.py index 9b60a6fc..d823a1bb 100644 --- a/b_asic/port.py +++ b/b_asic/port.py @@ -209,6 +209,9 @@ class InputPort(AbstractPort): """ if self._source_signal is not None: raise ValueError("Cannot connect already connected input port.") + if isinstance(src, Signal): + src.set_destination(self) + return src # self._source_signal is set by the signal constructor. return Signal(source=src.source, destination=self, name=Name(name)) diff --git a/b_asic/sfg_generator.py b/b_asic/sfg_generator.py new file mode 100644 index 00000000..faa6e3b1 --- /dev/null +++ b/b_asic/sfg_generator.py @@ -0,0 +1,111 @@ +""" +B-ASIC signal flow graph generators. + +This module contains a number of functions generating SFGs for specific functions. +""" +from typing import Dict, Optional, Sequence, Union + +import numpy as np + +from b_asic.core_operations import Name, SymmetricTwoportAdaptor +from b_asic.port import InputPort, OutputPort +from b_asic.signal import Signal +from b_asic.signal_flow_graph import SFG +from b_asic.special_operations import Delay, Input, Output + + +def wdf_allpass( + coefficients: Sequence[float], + input_op: Optional[Union[Input, Signal, InputPort]] = None, + output: Optional[Union[Output, Signal, OutputPort]] = None, + name: Optional[str] = None, + latency: Optional[int] = None, + latency_offsets: Optional[Dict[str, int]] = None, + execution_time: Optional[int] = None, +) -> SFG: + """ + Generate a signal flow graph of a WDF allpass section based on symmetric two-port adaptors. + + Parameters + ---------- + coefficients : 1D-array + Coefficients to use for the allpass section + + input_op : Input, optional + The Input to connect the SFG to. If not provided, one will be generated. + + output : Output, optional + The Output to connect the SFG to. If not provided, one will be generated. + + name : Name, optional + The name of the SFG. If None, "WDF allpass section". + + latency : int, optional + Latency of the symmetric two-port adaptors. + + latency_offsets : optional + Latency offsets of the symmetric two-port adaptors. + + execution_time : int, optional + Execution time of the symmetric two-port adaptors. + + Returns + ------- + Signal flow graph + """ + np.asarray(coefficients) + coefficients = np.squeeze(coefficients) + if coefficients.ndim != 1: + raise TypeError("coefficients must be a 1D-array") + if input_op is None: + input_op = Input() + if output is None: + output = Output() + if name is None: + name = "WDF allpass section" + order = len(coefficients) + odd_order = order % 2 + if odd_order: + # First-order section + coeff = coefficients[0] + adaptor0 = SymmetricTwoportAdaptor( + coeff, + input_op, + latency=latency, + latency_offsets=latency_offsets, + execution_time=execution_time, + ) + signal_out = Signal(adaptor0.output(0)) + delay = Delay(adaptor0.output(1)) + Signal(delay, adaptor0.input(1)) + else: + signal_out = Signal(input_op) + + # Second-order sections + sos_count = (order - 1) // 2 if odd_order else order // 2 + offset1, offset2 = (1, 2) if odd_order else (0, 1) + for n in range(sos_count): + adaptor1 = SymmetricTwoportAdaptor( + coefficients[2 * n + offset1], + signal_out, + latency=latency, + latency_offsets=latency_offsets, + execution_time=execution_time, + ) + # Signal(prev_adaptor., adaptor1.input(0), name="Previous-stage to next") + delay1 = Delay(adaptor1.output(1)) + delay2 = Delay() + adaptor2 = SymmetricTwoportAdaptor( + coefficients[2 * n + offset2], + delay1, + delay2, + latency=latency, + latency_offsets=latency_offsets, + execution_time=execution_time, + ) + Signal(adaptor2.output(1), adaptor1.input(1)) + Signal(adaptor2.output(0), delay2) + signal_out = Signal(adaptor1.output(0)) + + output << signal_out + return SFG([input_op], [output], name=Name(name)) diff --git a/b_asic/signal_flow_graph.py b/b_asic/signal_flow_graph.py index aae96f37..4584d761 100644 --- a/b_asic/signal_flow_graph.py +++ b/b_asic/signal_flow_graph.py @@ -1141,7 +1141,8 @@ class SFG(AbstractOperation): ): if original_signal.source is None: raise ValueError( - "Dangling signal without source in SFG" + "Dangling signal ({original_signal}) without" + " source in SFG" ) new_signal = cast( @@ -1158,10 +1159,17 @@ class SFG(AbstractOperation): original_destination = cast( InputPort, original_signal.destination ) + if original_destination is None: + raise ValueError( + f"Signal ({original_signal}) without" + " destination in SFG" + ) + original_connected_op = original_destination.operation if original_connected_op is None: raise ValueError( - "Signal without destination in SFG" + "Signal with empty destination port" + f" ({original_destination}) in SFG" ) # Check if connected operation has been added. if ( diff --git a/docs_sphinx/api/sfg_generator.rst b/docs_sphinx/api/sfg_generator.rst new file mode 100644 index 00000000..1af58fe4 --- /dev/null +++ b/docs_sphinx/api/sfg_generator.rst @@ -0,0 +1,6 @@ +************************ +``b_asic.sfg_generator`` +************************ + +.. automodule:: b_asic.signal_flow_graph + :members: diff --git a/test/test_sfg_generator.py b/test/test_sfg_generator.py new file mode 100644 index 00000000..523d1c0f --- /dev/null +++ b/test/test_sfg_generator.py @@ -0,0 +1,28 @@ +from b_asic.core_operations import SymmetricTwoportAdaptor +from b_asic.sfg_generator import wdf_allpass + + +def test_wdf_allpass(): + sfg = wdf_allpass([0.3, 0.5, 0.7]) + assert ( + len( + [ + comp + for comp in sfg.components + if isinstance(comp, SymmetricTwoportAdaptor) + ] + ) + == 3 + ) + + sfg = wdf_allpass([0.3, 0.5, 0.7, 0.9]) + assert ( + len( + [ + comp + for comp in sfg.components + if isinstance(comp, SymmetricTwoportAdaptor) + ] + ) + == 4 + ) -- GitLab