diff --git a/b_asic/core_operations.py b/b_asic/core_operations.py index 5b74b83e7c89f883ce913ca2f0846ddf7e9d8a6d..559524278ed440c8a7eb21ee28a8ab66d836bdb6 100644 --- a/b_asic/core_operations.py +++ b/b_asic/core_operations.py @@ -707,7 +707,7 @@ class SymmetricTwoportAdaptor(AbstractOperation): latency_offsets: Optional[Dict[str, int]] = None, execution_time: Optional[int] = None, ): - """Construct a Butterfly operation.""" + """Construct a SymmetricTwoportAdaptor operation.""" super().__init__( input_count=2, output_count=2, diff --git a/b_asic/operation.py b/b_asic/operation.py index f43d7d526b0a86ec54d73bd9b13b63ecb4155d5f..0fe66840bcde65e35bb2c24de3b7a9e3ac32b42e 100644 --- a/b_asic/operation.py +++ b/b_asic/operation.py @@ -468,7 +468,11 @@ class AbstractOperation(Operation, AbstractGraphComponent): ) for i, src in enumerate(input_sources): if src is not None: - self._input_ports[i].connect(src.source) + if isinstance(src, Signal): + # Already existing signal + src.set_destination(self._input_ports[i]) + else: + self._input_ports[i].connect(src.source) # Set specific latency_offsets if latency_offsets is not None: diff --git a/b_asic/port.py b/b_asic/port.py index 9b60a6fc4d308132c702e79a0e3cf99a2e630f63..d823a1bbc8f56da836e11c545ec32c079f096d73 100644 --- a/b_asic/port.py +++ b/b_asic/port.py @@ -209,6 +209,9 @@ class InputPort(AbstractPort): """ if self._source_signal is not None: raise ValueError("Cannot connect already connected input port.") + if isinstance(src, Signal): + src.set_destination(self) + return src # self._source_signal is set by the signal constructor. return Signal(source=src.source, destination=self, name=Name(name)) diff --git a/b_asic/sfg_generator.py b/b_asic/sfg_generator.py new file mode 100644 index 0000000000000000000000000000000000000000..6e3906913b731e0c9aaf063dc8c1d50ac51bd7c6 --- /dev/null +++ b/b_asic/sfg_generator.py @@ -0,0 +1,75 @@ +""" +B-ASIC signal flow graph generators. + +This module contains a number of functions generating SFGs for specific functions. +""" +from typing import Dict, Optional, Union + +import numpy as np + +from b_asic.core_operations import SymmetricTwoportAdaptor +from b_asic.port import InputPort, OutputPort +from b_asic.signal import Signal +from b_asic.signal_flow_graph import SFG +from b_asic.special_operations import Delay, Input, Output + + +def wdf_allpass( + coefficients: np.ndarray, + input_op: Optional[Union[Input, Signal, InputPort]] = None, + output: Optional[Union[Output, Signal, OutputPort]] = None, + name: Optional[str] = None, + latency: Optional[int] = None, + latency_offsets: Optional[Dict[str, int]] = None, + execution_time: Optional[int] = None, +): + np.asarray(coefficients) + coefficients = np.squeeze(coefficients) + if coefficients.ndim != 1: + raise TypeError("coefficients must be a 1D-array") + if input_op is None: + input_op = Input() + if output is None: + output = Output() + if name is None: + name = "WDF allpass section" + # First-order section + coeff = coefficients[0] + adaptor0 = SymmetricTwoportAdaptor( + coeff, + input_op, + latency=latency, + latency_offsets=latency_offsets, + execution_time=execution_time, + ) + signal_out = Signal(adaptor0.output(0)) + delay = Delay(adaptor0.output(1)) + Signal(delay, adaptor0.input(1)) + # prev_adaptor = adaptor0 + # Second-order sections + sos_count = (len(coefficients) - 1) // 2 + for n in range(sos_count): + adaptor1 = SymmetricTwoportAdaptor( + coefficients[2 * n + 1], + signal_out, + latency=latency, + latency_offsets=latency_offsets, + execution_time=execution_time, + ) + # Signal(prev_adaptor., adaptor1.input(0), name="Previous-stage to next") + delay1 = Delay(adaptor1.output(1)) + delay2 = Delay() + adaptor2 = SymmetricTwoportAdaptor( + coefficients[2 * n + 2], + delay1, + delay2, + latency=latency, + latency_offsets=latency_offsets, + execution_time=execution_time, + ) + Signal(adaptor2.output(1), adaptor1.input(1)) + Signal(adaptor2.output(0), delay2) + signal_out = Signal(adaptor1.output(0)) + + output << signal_out + return SFG([input_op], [output], name=name) diff --git a/b_asic/signal_flow_graph.py b/b_asic/signal_flow_graph.py index aae96f37f38b61697f27210caea93c3e135c3af0..4584d761fc0f45944dea02d86e09335e9fc63ce4 100644 --- a/b_asic/signal_flow_graph.py +++ b/b_asic/signal_flow_graph.py @@ -1141,7 +1141,8 @@ class SFG(AbstractOperation): ): if original_signal.source is None: raise ValueError( - "Dangling signal without source in SFG" + "Dangling signal ({original_signal}) without" + " source in SFG" ) new_signal = cast( @@ -1158,10 +1159,17 @@ class SFG(AbstractOperation): original_destination = cast( InputPort, original_signal.destination ) + if original_destination is None: + raise ValueError( + f"Signal ({original_signal}) without" + " destination in SFG" + ) + original_connected_op = original_destination.operation if original_connected_op is None: raise ValueError( - "Signal without destination in SFG" + "Signal with empty destination port" + f" ({original_destination}) in SFG" ) # Check if connected operation has been added. if (