From f1c01389dcc63e36a5c548dbe1f574a0e47e9409 Mon Sep 17 00:00:00 2001 From: Oscar Gustafsson <oscar.gustafsson@gmail.com> Date: Wed, 1 Feb 2023 13:00:23 +0100 Subject: [PATCH] Add WDF allpass generator --- b_asic/core_operations.py | 2 +- b_asic/operation.py | 12 ++++++-- b_asic/sfg_generator.py | 60 +++++++++++++++++++++++++++++++++++++ b_asic/signal_flow_graph.py | 12 ++++++-- 4 files changed, 81 insertions(+), 5 deletions(-) create mode 100644 b_asic/sfg_generator.py diff --git a/b_asic/core_operations.py b/b_asic/core_operations.py index 5b74b83e..55952427 100644 --- a/b_asic/core_operations.py +++ b/b_asic/core_operations.py @@ -707,7 +707,7 @@ class SymmetricTwoportAdaptor(AbstractOperation): latency_offsets: Optional[Dict[str, int]] = None, execution_time: Optional[int] = None, ): - """Construct a Butterfly operation.""" + """Construct a SymmetricTwoportAdaptor operation.""" super().__init__( input_count=2, output_count=2, diff --git a/b_asic/operation.py b/b_asic/operation.py index f43d7d52..8c8d25b1 100644 --- a/b_asic/operation.py +++ b/b_asic/operation.py @@ -468,7 +468,11 @@ class AbstractOperation(Operation, AbstractGraphComponent): ) for i, src in enumerate(input_sources): if src is not None: - self._input_ports[i].connect(src.source) + if isinstance(src, Signal): + # Already existing signal + src.set_destination(self._input_ports[i]) + else: + self._input_ports[i].connect(src.source) # Set specific latency_offsets if latency_offsets is not None: @@ -588,7 +592,11 @@ class AbstractOperation(Operation, AbstractGraphComponent): f"{self.__class__.__name__} cannot be used as a destination" f" because it has {diff} than 1 input" ) - return self.input(0).connect(src) + if isinstance(src, Signal): + src.set_destination(self.input(0)) + return self + else: + return self.input(0).connect(src) def __str__(self) -> str: """Get a string representation of this operation.""" diff --git a/b_asic/sfg_generator.py b/b_asic/sfg_generator.py new file mode 100644 index 00000000..423614c5 --- /dev/null +++ b/b_asic/sfg_generator.py @@ -0,0 +1,60 @@ +""" +B-ASIC signal flow graph generators. + +This module contains a number of functions generating SFGs for specific functions. +""" +from typing import Optional, Union + +import numpy as np + +from b_asic.core_operations import SymmetricTwoportAdaptor +from b_asic.port import InputPort, OutputPort +from b_asic.signal import Signal +from b_asic.signal_flow_graph import SFG +from b_asic.special_operations import Delay, Input, Output + + +def wdf_allpass( + coefficients: np.ndarray, + input_op: Optional[Union[Input, Signal, InputPort]] = None, + output: Optional[Union[Output, Signal, OutputPort]] = None, + name: Optional[str] = None, +): + np.asarray(coefficients) + coefficients = np.squeeze(coefficients) + if coefficients.ndim != 1: + raise TypeError("coefficients must be a 1D-array") + if input_op is None: + input_op = Input("x") + if output is None: + output = Output(name="y") + if name is None: + name = "WDF allpass section" + # First-order section + coeff = coefficients[0] + adaptor0 = SymmetricTwoportAdaptor(coeff) + Signal(input_op, adaptor0.input(0), name="Input") + signal_out = Signal(adaptor0.output(0), name="First-order to next") + delay = Delay(adaptor0.output(1)) + Signal(delay, adaptor0.input(1), name="First-order delay") + # prev_adaptor = adaptor0 + # Second-order sections + sos_count = (len(coefficients) - 1) // 2 + for n in range(sos_count): + adaptor1 = SymmetricTwoportAdaptor(coefficients[2 * n + 1], signal_out) + # Signal(prev_adaptor., adaptor1.input(0), name="Previous-stage to next") + delay1 = Delay(adaptor1.output(1)) + delay2 = Delay() + adaptor2 = SymmetricTwoportAdaptor( + coefficients[2 * n + 2], delay1, delay2 + ) + Signal( + adaptor2.output(1), + adaptor1.input(1), + name="Adaptor 2 to adaptor 1", + ) + Signal(adaptor2.output(0), delay2, name="Adaptor 2 to delay") + signal_out = Signal(adaptor1.output(0), name="Adaptor 1 to next") + + output << signal_out + return SFG([input_op], [output], name=name) diff --git a/b_asic/signal_flow_graph.py b/b_asic/signal_flow_graph.py index aae96f37..4584d761 100644 --- a/b_asic/signal_flow_graph.py +++ b/b_asic/signal_flow_graph.py @@ -1141,7 +1141,8 @@ class SFG(AbstractOperation): ): if original_signal.source is None: raise ValueError( - "Dangling signal without source in SFG" + "Dangling signal ({original_signal}) without" + " source in SFG" ) new_signal = cast( @@ -1158,10 +1159,17 @@ class SFG(AbstractOperation): original_destination = cast( InputPort, original_signal.destination ) + if original_destination is None: + raise ValueError( + f"Signal ({original_signal}) without" + " destination in SFG" + ) + original_connected_op = original_destination.operation if original_connected_op is None: raise ValueError( - "Signal without destination in SFG" + "Signal with empty destination port" + f" ({original_destination}) in SFG" ) # Check if connected operation has been added. if ( -- GitLab