From 34335d9fb8bb11f4775796da007fcfce9b2388cd Mon Sep 17 00:00:00 2001
From: Simon Bjurek <simbj106@student.liu.se>
Date: Tue, 25 Mar 2025 08:28:46 +0000
Subject: [PATCH] Added symmetric fir generator

---
 b_asic/scheduler.py              |  2 +
 b_asic/sfg_generators.py         | 83 ++++++++++++++++++++++++++-
 test/unit/test_sfg_generators.py | 98 +++++++++++++++++++++++++++++++-
 3 files changed, 180 insertions(+), 3 deletions(-)

diff --git a/b_asic/scheduler.py b/b_asic/scheduler.py
index 306cdccd..41ed7044 100644
--- a/b_asic/scheduler.py
+++ b/b_asic/scheduler.py
@@ -1020,6 +1020,8 @@ class RecursiveListScheduler(ListScheduler):
             destination_port = output_port.signals[0].destination
             destination_op = destination_port.operation
             if destination_op.graph_id not in self._remaining_ops:
+                if isinstance(destination_op, Delay):
+                    continue
                 # spotted a recursive operation -> check if ok
                 op_available_time = (
                     self._current_time + op.latency_offsets[f"out{output_port.index}"]
diff --git a/b_asic/sfg_generators.py b/b_asic/sfg_generators.py
index faf79d2b..7c85ad56 100644
--- a/b_asic/sfg_generators.py
+++ b/b_asic/sfg_generators.py
@@ -164,7 +164,7 @@ def direct_form_fir(
 
     See Also
     --------
-    transposed_direct_form_fir
+    transposed_direct_form_fir, symmetric_fir
     """
     np_coefficients = np.atleast_1d(np.squeeze(np.asarray(coefficients)))
     taps = len(np_coefficients)
@@ -232,7 +232,7 @@ def transposed_direct_form_fir(
 
     See Also
     --------
-    direct_form_fir
+    direct_form_fir, symmetric_fir
     """
     np_coefficients = np.atleast_1d(np.squeeze(np.asarray(coefficients)))
     taps = len(np_coefficients)
@@ -265,6 +265,85 @@ def transposed_direct_form_fir(
     return SFG([input_op], [output], name=Name(name))
 
 
+def symmetric_fir(
+    coefficients: Sequence[complex],
+    name: str | None = None,
+    mult_properties: dict[str, int] | dict[str, dict[str, int]] | None = None,
+    add_properties: dict[str, int] | dict[str, dict[str, int]] | None = None,
+) -> SFG:
+    r"""Generate a signal flow graph of a symmetric FIR filter.
+
+    The *coefficients* parameter is a sequence of impulse response values of even length::
+
+        coefficients = [h0, h1, h2, ..., hN]
+
+    Leading to the transfer function:
+
+    .. math:: \sum_{i=0}^N h_iz^{-i}
+
+    Parameters
+    ----------
+    coefficients : 1D-array
+        Coefficients to use for the FIR filter section.
+    name : Name, optional
+        The name of the SFG. If None, "Transposed direct-form FIR filter".
+    mult_properties : dictionary, optional
+        Properties passed to :class:`~b_asic.core_operations.ConstantMultiplication`.
+    add_properties : dictionary, optional
+        Properties passed to :class:`~b_asic.core_operations.Addition`.
+
+    Returns
+    -------
+    Signal flow graph
+
+    See Also
+    --------
+    direct_form_fir, transposed_direct_form_fir
+    """
+    np_coefficients = np.atleast_1d(np.squeeze(np.asarray(coefficients)))
+    taps = len(np_coefficients)
+    if not taps:
+        raise ValueError("Coefficients cannot be empty")
+    if taps > 1 and taps % 2 != 0:
+        raise ValueError("Coefficients must be of even length")
+    if np_coefficients.ndim != 1:
+        raise TypeError("coefficients must be a 1D-array")
+    if name is None:
+        name = "Symmetric FIR filter"
+    if mult_properties is None:
+        mult_properties = {}
+    if add_properties is None:
+        add_properties = {}
+    input_op = Input()
+    output = Output()
+
+    delays = [input_op]
+    for _ in range(taps - 1):
+        delays.append(Delay(delays[-1]))
+
+    add_layer_1 = [
+        Addition(delays[i], delays[-i - 1], **add_properties) for i in range(taps // 2)
+    ]
+
+    if taps == 1:
+        muls = [ConstantMultiplication(coefficients[0], input_op, **mult_properties)]
+    else:
+        muls = [
+            ConstantMultiplication(coefficients[i], add_layer_1[i], **mult_properties)
+            for i in range(taps // 2)
+        ]
+
+    previous_op = muls[0]
+    add_layer_2 = []
+    for i in range(taps // 2 - 1):
+        add_layer_2.append(Addition(previous_op, muls[i + 1], **add_properties))
+        previous_op = add_layer_2[-1]
+
+    output <<= add_layer_2[-1] if add_layer_2 else muls[0]
+
+    return SFG([input_op], [output], name=Name(name))
+
+
 def direct_form_1_iir(
     b: Sequence[complex],
     a: Sequence[complex],
diff --git a/test/unit/test_sfg_generators.py b/test/unit/test_sfg_generators.py
index 569528b8..f164ee50 100644
--- a/test/unit/test_sfg_generators.py
+++ b/test/unit/test_sfg_generators.py
@@ -16,6 +16,7 @@ from b_asic.sfg_generators import (
     direct_form_fir,
     ldlt_matrix_inverse,
     radix_2_dif_fft,
+    symmetric_fir,
     transposed_direct_form_fir,
     wdf_allpass,
 )
@@ -235,8 +236,103 @@ def test_transposed_direct_form_fir():
     assert len([comp for comp in sfg.components if isinstance(comp, Delay)]) == 0
 
 
+def test_symmetric_fir():
+    impulse_response = [0.3, 0.5, 0.5, 0.3]
+    sfg = symmetric_fir(impulse_response)
+    assert (
+        len(
+            [
+                comp
+                for comp in sfg.components
+                if isinstance(comp, ConstantMultiplication)
+            ]
+        )
+        == 2
+    )
+    assert len([comp for comp in sfg.components if isinstance(comp, Addition)]) == 3
+    assert len([comp for comp in sfg.components if isinstance(comp, Delay)]) == 3
+
+    sim = Simulation(sfg, [Impulse()])
+    sim.run_for(5)
+    impulse_response.append(0.0)
+    assert np.allclose(sim.results["0"], impulse_response)
+
+    impulse_response = [0.1, 0.2, 0.3, 0.4, 0.4, 0.3, 0.2, 0.1]
+    sfg = symmetric_fir(
+        impulse_response,
+        mult_properties={"latency": 2, "execution_time": 1},
+        add_properties={"latency": 1, "execution_time": 1},
+    )
+    assert sfg.critical_path_time() == 6
+
+    sim = Simulation(sfg, [Impulse()])
+    sim.run_for(9)
+    impulse_response.append(0.0)
+    assert np.allclose(sim.results["0"], impulse_response)
+
+    impulse_response = [0.3]
+    sfg = symmetric_fir(impulse_response)
+    assert (
+        len(
+            [
+                comp
+                for comp in sfg.components
+                if isinstance(comp, ConstantMultiplication)
+            ]
+        )
+        == 1
+    )
+    assert len([comp for comp in sfg.components if isinstance(comp, Addition)]) == 0
+    assert len([comp for comp in sfg.components if isinstance(comp, Delay)]) == 0
+
+    impulse_response = [0.1 + i for i in range(8)]
+    impulse_response += reversed(impulse_response)
+    sfg = symmetric_fir(impulse_response)
+    assert (
+        len(
+            [
+                comp
+                for comp in sfg.components
+                if isinstance(comp, ConstantMultiplication)
+            ]
+        )
+        == 8
+    )
+    assert len([comp for comp in sfg.components if isinstance(comp, Addition)]) == 15
+    assert len([comp for comp in sfg.components if isinstance(comp, Delay)]) == 15
+
+    sim = Simulation(sfg, [Impulse()])
+    sim.run_for(17)
+    impulse_response.append(0.0)
+    assert np.allclose(sim.results["0"], impulse_response)
+
+    impulse_response = [0.1 + i for i in range(50)]
+    impulse_response += reversed(impulse_response)
+    sfg = symmetric_fir(impulse_response)
+    assert (
+        len(
+            [
+                comp
+                for comp in sfg.components
+                if isinstance(comp, ConstantMultiplication)
+            ]
+        )
+        == 50
+    )
+    assert len([comp for comp in sfg.components if isinstance(comp, Addition)]) == 99
+    assert len([comp for comp in sfg.components if isinstance(comp, Delay)]) == 99
+
+    sim = Simulation(sfg, [Impulse()])
+    sim.run_for(101)
+    impulse_response.append(0.0)
+    assert np.allclose(sim.results["0"], impulse_response)
+
+    with pytest.raises(ValueError, match="Coefficients must be of even length"):
+        symmetric_fir([0.1, 0.2, 0.1])
+
+
 def test_sfg_generator_errors():
-    sfg_gens = [wdf_allpass, transposed_direct_form_fir, direct_form_fir]
+    sfg_gens = [wdf_allpass, transposed_direct_form_fir, direct_form_fir, symmetric_fir]
     for gen in sfg_gens:
         with pytest.raises(ValueError, match="Coefficients cannot be empty"):
             gen([])
-- 
GitLab