diff --git a/b_asic/scheduler.py b/b_asic/scheduler.py index 306cdccd5a5d492434459cce925cc987c5bee96b..41ed7044831049f4edb59296de443d54363e06d7 100644 --- a/b_asic/scheduler.py +++ b/b_asic/scheduler.py @@ -1020,6 +1020,8 @@ class RecursiveListScheduler(ListScheduler): destination_port = output_port.signals[0].destination destination_op = destination_port.operation if destination_op.graph_id not in self._remaining_ops: + if isinstance(destination_op, Delay): + continue # spotted a recursive operation -> check if ok op_available_time = ( self._current_time + op.latency_offsets[f"out{output_port.index}"] diff --git a/b_asic/sfg_generators.py b/b_asic/sfg_generators.py index faf79d2b6175f70646e81b4289cb9356bd4db939..7c85ad56188be85f527cecf5c25ebc2042c00275 100644 --- a/b_asic/sfg_generators.py +++ b/b_asic/sfg_generators.py @@ -164,7 +164,7 @@ def direct_form_fir( See Also -------- - transposed_direct_form_fir + transposed_direct_form_fir, symmetric_fir """ np_coefficients = np.atleast_1d(np.squeeze(np.asarray(coefficients))) taps = len(np_coefficients) @@ -232,7 +232,7 @@ def transposed_direct_form_fir( See Also -------- - direct_form_fir + direct_form_fir, symmetric_fir """ np_coefficients = np.atleast_1d(np.squeeze(np.asarray(coefficients))) taps = len(np_coefficients) @@ -265,6 +265,85 @@ def transposed_direct_form_fir( return SFG([input_op], [output], name=Name(name)) +def symmetric_fir( + coefficients: Sequence[complex], + name: str | None = None, + mult_properties: dict[str, int] | dict[str, dict[str, int]] | None = None, + add_properties: dict[str, int] | dict[str, dict[str, int]] | None = None, +) -> SFG: + r"""Generate a signal flow graph of a symmetric FIR filter. + + The *coefficients* parameter is a sequence of impulse response values of even length:: + + coefficients = [h0, h1, h2, ..., hN] + + Leading to the transfer function: + + .. math:: \sum_{i=0}^N h_iz^{-i} + + Parameters + ---------- + coefficients : 1D-array + Coefficients to use for the FIR filter section. + name : Name, optional + The name of the SFG. If None, "Transposed direct-form FIR filter". + mult_properties : dictionary, optional + Properties passed to :class:`~b_asic.core_operations.ConstantMultiplication`. + add_properties : dictionary, optional + Properties passed to :class:`~b_asic.core_operations.Addition`. + + Returns + ------- + Signal flow graph + + See Also + -------- + direct_form_fir, transposed_direct_form_fir + """ + np_coefficients = np.atleast_1d(np.squeeze(np.asarray(coefficients))) + taps = len(np_coefficients) + if not taps: + raise ValueError("Coefficients cannot be empty") + if taps > 1 and taps % 2 != 0: + raise ValueError("Coefficients must be of even length") + if np_coefficients.ndim != 1: + raise TypeError("coefficients must be a 1D-array") + if name is None: + name = "Symmetric FIR filter" + if mult_properties is None: + mult_properties = {} + if add_properties is None: + add_properties = {} + input_op = Input() + output = Output() + + delays = [input_op] + for _ in range(taps - 1): + delays.append(Delay(delays[-1])) + + add_layer_1 = [ + Addition(delays[i], delays[-i - 1], **add_properties) for i in range(taps // 2) + ] + + if taps == 1: + muls = [ConstantMultiplication(coefficients[0], input_op, **mult_properties)] + else: + muls = [ + ConstantMultiplication(coefficients[i], add_layer_1[i], **mult_properties) + for i in range(taps // 2) + ] + + previous_op = muls[0] + add_layer_2 = [] + for i in range(taps // 2 - 1): + add_layer_2.append(Addition(previous_op, muls[i + 1], **add_properties)) + previous_op = add_layer_2[-1] + + output <<= add_layer_2[-1] if add_layer_2 else muls[0] + + return SFG([input_op], [output], name=Name(name)) + + def direct_form_1_iir( b: Sequence[complex], a: Sequence[complex], diff --git a/test/unit/test_sfg_generators.py b/test/unit/test_sfg_generators.py index 569528b88c0875209d6208aa3b3fb14d3851e1d9..f164ee50866565da9b20d4c8e2e58ab5921641bb 100644 --- a/test/unit/test_sfg_generators.py +++ b/test/unit/test_sfg_generators.py @@ -16,6 +16,7 @@ from b_asic.sfg_generators import ( direct_form_fir, ldlt_matrix_inverse, radix_2_dif_fft, + symmetric_fir, transposed_direct_form_fir, wdf_allpass, ) @@ -235,8 +236,103 @@ def test_transposed_direct_form_fir(): assert len([comp for comp in sfg.components if isinstance(comp, Delay)]) == 0 +def test_symmetric_fir(): + impulse_response = [0.3, 0.5, 0.5, 0.3] + sfg = symmetric_fir(impulse_response) + assert ( + len( + [ + comp + for comp in sfg.components + if isinstance(comp, ConstantMultiplication) + ] + ) + == 2 + ) + assert len([comp for comp in sfg.components if isinstance(comp, Addition)]) == 3 + assert len([comp for comp in sfg.components if isinstance(comp, Delay)]) == 3 + + sim = Simulation(sfg, [Impulse()]) + sim.run_for(5) + impulse_response.append(0.0) + assert np.allclose(sim.results["0"], impulse_response) + + impulse_response = [0.1, 0.2, 0.3, 0.4, 0.4, 0.3, 0.2, 0.1] + sfg = symmetric_fir( + impulse_response, + mult_properties={"latency": 2, "execution_time": 1}, + add_properties={"latency": 1, "execution_time": 1}, + ) + assert sfg.critical_path_time() == 6 + + sim = Simulation(sfg, [Impulse()]) + sim.run_for(9) + impulse_response.append(0.0) + assert np.allclose(sim.results["0"], impulse_response) + + impulse_response = [0.3] + sfg = symmetric_fir(impulse_response) + assert ( + len( + [ + comp + for comp in sfg.components + if isinstance(comp, ConstantMultiplication) + ] + ) + == 1 + ) + assert len([comp for comp in sfg.components if isinstance(comp, Addition)]) == 0 + assert len([comp for comp in sfg.components if isinstance(comp, Delay)]) == 0 + + impulse_response = [0.1 + i for i in range(8)] + impulse_response += reversed(impulse_response) + sfg = symmetric_fir(impulse_response) + assert ( + len( + [ + comp + for comp in sfg.components + if isinstance(comp, ConstantMultiplication) + ] + ) + == 8 + ) + assert len([comp for comp in sfg.components if isinstance(comp, Addition)]) == 15 + assert len([comp for comp in sfg.components if isinstance(comp, Delay)]) == 15 + + sim = Simulation(sfg, [Impulse()]) + sim.run_for(17) + impulse_response.append(0.0) + assert np.allclose(sim.results["0"], impulse_response) + + impulse_response = [0.1 + i for i in range(50)] + impulse_response += reversed(impulse_response) + sfg = symmetric_fir(impulse_response) + assert ( + len( + [ + comp + for comp in sfg.components + if isinstance(comp, ConstantMultiplication) + ] + ) + == 50 + ) + assert len([comp for comp in sfg.components if isinstance(comp, Addition)]) == 99 + assert len([comp for comp in sfg.components if isinstance(comp, Delay)]) == 99 + + sim = Simulation(sfg, [Impulse()]) + sim.run_for(101) + impulse_response.append(0.0) + assert np.allclose(sim.results["0"], impulse_response) + + with pytest.raises(ValueError, match="Coefficients must be of even length"): + symmetric_fir([0.1, 0.2, 0.1]) + + def test_sfg_generator_errors(): - sfg_gens = [wdf_allpass, transposed_direct_form_fir, direct_form_fir] + sfg_gens = [wdf_allpass, transposed_direct_form_fir, direct_form_fir, symmetric_fir] for gen in sfg_gens: with pytest.raises(ValueError, match="Coefficients cannot be empty"): gen([])