From b84b3a2b5e8574d7b2c12726924d20f9a1bb77b6 Mon Sep 17 00:00:00 2001
From: Oscar Gustafsson <oscar.gustafsson@gmail.com>
Date: Wed, 1 Feb 2023 13:00:23 +0100
Subject: [PATCH] Add WDF allpass generator

---
 b_asic/core_operations.py   |  2 +-
 b_asic/operation.py         |  6 +++-
 b_asic/port.py              |  3 ++
 b_asic/sfg_generator.py     | 60 +++++++++++++++++++++++++++++++++++++
 b_asic/signal_flow_graph.py | 12 ++++++--
 5 files changed, 79 insertions(+), 4 deletions(-)
 create mode 100644 b_asic/sfg_generator.py

diff --git a/b_asic/core_operations.py b/b_asic/core_operations.py
index 5b74b83e..55952427 100644
--- a/b_asic/core_operations.py
+++ b/b_asic/core_operations.py
@@ -707,7 +707,7 @@ class SymmetricTwoportAdaptor(AbstractOperation):
         latency_offsets: Optional[Dict[str, int]] = None,
         execution_time: Optional[int] = None,
     ):
-        """Construct a Butterfly operation."""
+        """Construct a SymmetricTwoportAdaptor operation."""
         super().__init__(
             input_count=2,
             output_count=2,
diff --git a/b_asic/operation.py b/b_asic/operation.py
index f43d7d52..0fe66840 100644
--- a/b_asic/operation.py
+++ b/b_asic/operation.py
@@ -468,7 +468,11 @@ class AbstractOperation(Operation, AbstractGraphComponent):
                 )
             for i, src in enumerate(input_sources):
                 if src is not None:
-                    self._input_ports[i].connect(src.source)
+                    if isinstance(src, Signal):
+                        # Already existing signal
+                        src.set_destination(self._input_ports[i])
+                    else:
+                        self._input_ports[i].connect(src.source)
 
         # Set specific latency_offsets
         if latency_offsets is not None:
diff --git a/b_asic/port.py b/b_asic/port.py
index 9b60a6fc..d823a1bb 100644
--- a/b_asic/port.py
+++ b/b_asic/port.py
@@ -209,6 +209,9 @@ class InputPort(AbstractPort):
         """
         if self._source_signal is not None:
             raise ValueError("Cannot connect already connected input port.")
+        if isinstance(src, Signal):
+            src.set_destination(self)
+            return src
         # self._source_signal is set by the signal constructor.
         return Signal(source=src.source, destination=self, name=Name(name))
 
diff --git a/b_asic/sfg_generator.py b/b_asic/sfg_generator.py
new file mode 100644
index 00000000..423614c5
--- /dev/null
+++ b/b_asic/sfg_generator.py
@@ -0,0 +1,60 @@
+"""
+B-ASIC signal flow graph generators.
+
+This module contains a number of functions generating SFGs for specific functions.
+"""
+from typing import Optional, Union
+
+import numpy as np
+
+from b_asic.core_operations import SymmetricTwoportAdaptor
+from b_asic.port import InputPort, OutputPort
+from b_asic.signal import Signal
+from b_asic.signal_flow_graph import SFG
+from b_asic.special_operations import Delay, Input, Output
+
+
+def wdf_allpass(
+    coefficients: np.ndarray,
+    input_op: Optional[Union[Input, Signal, InputPort]] = None,
+    output: Optional[Union[Output, Signal, OutputPort]] = None,
+    name: Optional[str] = None,
+):
+    np.asarray(coefficients)
+    coefficients = np.squeeze(coefficients)
+    if coefficients.ndim != 1:
+        raise TypeError("coefficients must be a 1D-array")
+    if input_op is None:
+        input_op = Input("x")
+    if output is None:
+        output = Output(name="y")
+    if name is None:
+        name = "WDF allpass section"
+    # First-order section
+    coeff = coefficients[0]
+    adaptor0 = SymmetricTwoportAdaptor(coeff)
+    Signal(input_op, adaptor0.input(0), name="Input")
+    signal_out = Signal(adaptor0.output(0), name="First-order to next")
+    delay = Delay(adaptor0.output(1))
+    Signal(delay, adaptor0.input(1), name="First-order delay")
+    # prev_adaptor = adaptor0
+    # Second-order sections
+    sos_count = (len(coefficients) - 1) // 2
+    for n in range(sos_count):
+        adaptor1 = SymmetricTwoportAdaptor(coefficients[2 * n + 1], signal_out)
+        # Signal(prev_adaptor., adaptor1.input(0), name="Previous-stage to next")
+        delay1 = Delay(adaptor1.output(1))
+        delay2 = Delay()
+        adaptor2 = SymmetricTwoportAdaptor(
+            coefficients[2 * n + 2], delay1, delay2
+        )
+        Signal(
+            adaptor2.output(1),
+            adaptor1.input(1),
+            name="Adaptor 2 to adaptor 1",
+        )
+        Signal(adaptor2.output(0), delay2, name="Adaptor 2 to delay")
+        signal_out = Signal(adaptor1.output(0), name="Adaptor 1 to next")
+
+    output << signal_out
+    return SFG([input_op], [output], name=name)
diff --git a/b_asic/signal_flow_graph.py b/b_asic/signal_flow_graph.py
index aae96f37..4584d761 100644
--- a/b_asic/signal_flow_graph.py
+++ b/b_asic/signal_flow_graph.py
@@ -1141,7 +1141,8 @@ class SFG(AbstractOperation):
                     ):
                         if original_signal.source is None:
                             raise ValueError(
-                                "Dangling signal without source in SFG"
+                                "Dangling signal ({original_signal}) without"
+                                " source in SFG"
                             )
 
                         new_signal = cast(
@@ -1158,10 +1159,17 @@ class SFG(AbstractOperation):
                         original_destination = cast(
                             InputPort, original_signal.destination
                         )
+                        if original_destination is None:
+                            raise ValueError(
+                                f"Signal ({original_signal}) without"
+                                " destination in SFG"
+                            )
+
                         original_connected_op = original_destination.operation
                         if original_connected_op is None:
                             raise ValueError(
-                                "Signal without destination in SFG"
+                                "Signal with empty destination port"
+                                f" ({original_destination}) in SFG"
                             )
                         # Check if connected operation has been added.
                         if (
-- 
GitLab