Skip to content
Snippets Groups Projects
Commit b84b3a2b authored by Oscar Gustafsson's avatar Oscar Gustafsson :bicyclist:
Browse files

Add WDF allpass generator

parent 2a90ea3c
No related branches found
No related tags found
No related merge requests found
Pipeline #88562 passed
...@@ -707,7 +707,7 @@ class SymmetricTwoportAdaptor(AbstractOperation): ...@@ -707,7 +707,7 @@ class SymmetricTwoportAdaptor(AbstractOperation):
latency_offsets: Optional[Dict[str, int]] = None, latency_offsets: Optional[Dict[str, int]] = None,
execution_time: Optional[int] = None, execution_time: Optional[int] = None,
): ):
"""Construct a Butterfly operation.""" """Construct a SymmetricTwoportAdaptor operation."""
super().__init__( super().__init__(
input_count=2, input_count=2,
output_count=2, output_count=2,
......
...@@ -468,7 +468,11 @@ class AbstractOperation(Operation, AbstractGraphComponent): ...@@ -468,7 +468,11 @@ class AbstractOperation(Operation, AbstractGraphComponent):
) )
for i, src in enumerate(input_sources): for i, src in enumerate(input_sources):
if src is not None: if src is not None:
self._input_ports[i].connect(src.source) if isinstance(src, Signal):
# Already existing signal
src.set_destination(self._input_ports[i])
else:
self._input_ports[i].connect(src.source)
# Set specific latency_offsets # Set specific latency_offsets
if latency_offsets is not None: if latency_offsets is not None:
......
...@@ -209,6 +209,9 @@ class InputPort(AbstractPort): ...@@ -209,6 +209,9 @@ class InputPort(AbstractPort):
""" """
if self._source_signal is not None: if self._source_signal is not None:
raise ValueError("Cannot connect already connected input port.") raise ValueError("Cannot connect already connected input port.")
if isinstance(src, Signal):
src.set_destination(self)
return src
# self._source_signal is set by the signal constructor. # self._source_signal is set by the signal constructor.
return Signal(source=src.source, destination=self, name=Name(name)) return Signal(source=src.source, destination=self, name=Name(name))
......
"""
B-ASIC signal flow graph generators.
This module contains a number of functions generating SFGs for specific functions.
"""
from typing import Optional, Union
import numpy as np
from b_asic.core_operations import SymmetricTwoportAdaptor
from b_asic.port import InputPort, OutputPort
from b_asic.signal import Signal
from b_asic.signal_flow_graph import SFG
from b_asic.special_operations import Delay, Input, Output
def wdf_allpass(
coefficients: np.ndarray,
input_op: Optional[Union[Input, Signal, InputPort]] = None,
output: Optional[Union[Output, Signal, OutputPort]] = None,
name: Optional[str] = None,
):
np.asarray(coefficients)
coefficients = np.squeeze(coefficients)
if coefficients.ndim != 1:
raise TypeError("coefficients must be a 1D-array")
if input_op is None:
input_op = Input("x")
if output is None:
output = Output(name="y")
if name is None:
name = "WDF allpass section"
# First-order section
coeff = coefficients[0]
adaptor0 = SymmetricTwoportAdaptor(coeff)
Signal(input_op, adaptor0.input(0), name="Input")
signal_out = Signal(adaptor0.output(0), name="First-order to next")
delay = Delay(adaptor0.output(1))
Signal(delay, adaptor0.input(1), name="First-order delay")
# prev_adaptor = adaptor0
# Second-order sections
sos_count = (len(coefficients) - 1) // 2
for n in range(sos_count):
adaptor1 = SymmetricTwoportAdaptor(coefficients[2 * n + 1], signal_out)
# Signal(prev_adaptor., adaptor1.input(0), name="Previous-stage to next")
delay1 = Delay(adaptor1.output(1))
delay2 = Delay()
adaptor2 = SymmetricTwoportAdaptor(
coefficients[2 * n + 2], delay1, delay2
)
Signal(
adaptor2.output(1),
adaptor1.input(1),
name="Adaptor 2 to adaptor 1",
)
Signal(adaptor2.output(0), delay2, name="Adaptor 2 to delay")
signal_out = Signal(adaptor1.output(0), name="Adaptor 1 to next")
output << signal_out
return SFG([input_op], [output], name=name)
...@@ -1141,7 +1141,8 @@ class SFG(AbstractOperation): ...@@ -1141,7 +1141,8 @@ class SFG(AbstractOperation):
): ):
if original_signal.source is None: if original_signal.source is None:
raise ValueError( raise ValueError(
"Dangling signal without source in SFG" "Dangling signal ({original_signal}) without"
" source in SFG"
) )
new_signal = cast( new_signal = cast(
...@@ -1158,10 +1159,17 @@ class SFG(AbstractOperation): ...@@ -1158,10 +1159,17 @@ class SFG(AbstractOperation):
original_destination = cast( original_destination = cast(
InputPort, original_signal.destination InputPort, original_signal.destination
) )
if original_destination is None:
raise ValueError(
f"Signal ({original_signal}) without"
" destination in SFG"
)
original_connected_op = original_destination.operation original_connected_op = original_destination.operation
if original_connected_op is None: if original_connected_op is None:
raise ValueError( raise ValueError(
"Signal without destination in SFG" "Signal with empty destination port"
f" ({original_destination}) in SFG"
) )
# Check if connected operation has been added. # Check if connected operation has been added.
if ( if (
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment