Skip to content
Snippets Groups Projects
Commit 1e53a893 authored by Oscar Gustafsson's avatar Oscar Gustafsson :bicyclist:
Browse files

Add more WDF adaptors

parent 1e432b0e
No related branches found
No related tags found
No related merge requests found
Pipeline #102717 passed
This commit is part of merge request !428. Comments created here will be created in the context of that merge request.
...@@ -12,3 +12,4 @@ from b_asic.signal import * ...@@ -12,3 +12,4 @@ from b_asic.signal import *
from b_asic.signal_flow_graph import * from b_asic.signal_flow_graph import *
from b_asic.simulation import * from b_asic.simulation import *
from b_asic.special_operations import * from b_asic.special_operations import *
from b_asic.wdf_operations import *
...@@ -913,73 +913,6 @@ class MAD(AbstractOperation): ...@@ -913,73 +913,6 @@ class MAD(AbstractOperation):
p._index = i p._index = i
class SymmetricTwoportAdaptor(AbstractOperation):
r"""
Wave digital filter symmetric twoport-adaptor operation.
.. math::
\begin{eqnarray}
y_0 & = & x_1 + \text{value}\times\left(x_1 - x_0\right)\\
y_1 & = & x_0 + \text{value}\times\left(x_1 - x_0\right)
\end{eqnarray}
"""
is_linear = True
is_swappable = True
def __init__(
self,
value: Num = 0,
src0: Optional[SignalSourceProvider] = None,
src1: Optional[SignalSourceProvider] = None,
name: Name = Name(""),
latency: Optional[int] = None,
latency_offsets: Optional[Dict[str, int]] = None,
execution_time: Optional[int] = None,
):
"""Construct a SymmetricTwoportAdaptor operation."""
super().__init__(
input_count=2,
output_count=2,
name=Name(name),
input_sources=[src0, src1],
latency=latency,
latency_offsets=latency_offsets,
execution_time=execution_time,
)
self.value = value
@classmethod
def type_name(cls) -> TypeName:
return TypeName("sym2p")
def evaluate(self, a, b):
tmp = self.value * (b - a)
return b + tmp, a + tmp
@property
def value(self) -> Num:
"""Get the constant value of this operation."""
return self.param("value")
@value.setter
def value(self, value: Num) -> None:
"""Set the constant value of this operation."""
if -1 <= value <= 1:
self.set_param("value", value)
else:
raise ValueError('value must be between -1 and 1 (inclusive)')
def swap_io(self) -> None:
# Swap inputs and outputs and change sign of coefficient
self._input_ports.reverse()
for i, p in enumerate(self._input_ports):
p._index = i
self._output_ports.reverse()
for i, p in enumerate(self._output_ports):
p._index = i
self.set_param("value", -self.value)
class Reciprocal(AbstractOperation): class Reciprocal(AbstractOperation):
r""" r"""
Reciprocal operation. Reciprocal operation.
......
...@@ -7,15 +7,11 @@ from typing import Dict, Optional, Sequence, Union ...@@ -7,15 +7,11 @@ from typing import Dict, Optional, Sequence, Union
import numpy as np import numpy as np
from b_asic.core_operations import ( from b_asic.core_operations import Addition, ConstantMultiplication, Name
Addition,
ConstantMultiplication,
Name,
SymmetricTwoportAdaptor,
)
from b_asic.signal import Signal from b_asic.signal import Signal
from b_asic.signal_flow_graph import SFG from b_asic.signal_flow_graph import SFG
from b_asic.special_operations import Delay, Input, Output from b_asic.special_operations import Delay, Input, Output
from b_asic.wdf_operations import SymmetricTwoportAdaptor
def wdf_allpass( def wdf_allpass(
......
"""
B-ASIC Core Operations Module.
Contains wave digital filter adaptors.
"""
from typing import Dict, Optional, Tuple
from b_asic.graph_component import Name, TypeName
from b_asic.operation import AbstractOperation
from b_asic.port import SignalSourceProvider
from b_asic.types import Num
class SymmetricTwoportAdaptor(AbstractOperation):
r"""
Wave digital filter symmetric twoport-adaptor operation.
.. math::
\begin{eqnarray}
y_0 & = & x_1 + \text{value}\times\left(x_1 - x_0\right)\\
y_1 & = & x_0 + \text{value}\times\left(x_1 - x_0\right)
\end{eqnarray}
"""
is_linear = True
is_swappable = True
def __init__(
self,
value: Num = 0,
src0: Optional[SignalSourceProvider] = None,
src1: Optional[SignalSourceProvider] = None,
name: Name = Name(""),
latency: Optional[int] = None,
latency_offsets: Optional[Dict[str, int]] = None,
execution_time: Optional[int] = None,
):
"""Construct a SymmetricTwoportAdaptor operation."""
super().__init__(
input_count=2,
output_count=2,
name=Name(name),
input_sources=[src0, src1],
latency=latency,
latency_offsets=latency_offsets,
execution_time=execution_time,
)
self.value = value
@classmethod
def type_name(cls) -> TypeName:
return TypeName("sym2p")
def evaluate(self, a, b):
tmp = self.value * (b - a)
return b + tmp, a + tmp
@property
def value(self) -> Num:
"""Get the constant value of this operation."""
return self.param("value")
@value.setter
def value(self, value: Num) -> None:
"""Set the constant value of this operation."""
if -1 <= value <= 1:
self.set_param("value", value)
else:
raise ValueError('value must be between -1 and 1 (inclusive)')
def swap_io(self) -> None:
# Swap inputs and outputs and change sign of coefficient
self._input_ports.reverse()
for i, p in enumerate(self._input_ports):
p._index = i
self._output_ports.reverse()
for i, p in enumerate(self._output_ports):
p._index = i
self.set_param("value", -self.value)
class SeriesTwoportAdaptor(AbstractOperation):
r"""
Wave digital filter series twoport-adaptor operation.
.. math::
\begin{eqnarray}
y_0 & = & x_0 - \text{value}\times\left(x_0 + x_1\right)\\
y_1 & = & x_1 - (2-\text{value})\times\left(x_0 + x_1\right)
\end{eqnarray}
"""
is_linear = True
is_swappable = True
def __init__(
self,
value: Num = 0,
src0: Optional[SignalSourceProvider] = None,
src1: Optional[SignalSourceProvider] = None,
name: Name = Name(""),
latency: Optional[int] = None,
latency_offsets: Optional[Dict[str, int]] = None,
execution_time: Optional[int] = None,
):
"""Construct a SeriesTwoportAdaptor operation."""
super().__init__(
input_count=2,
output_count=2,
name=Name(name),
input_sources=[src0, src1],
latency=latency,
latency_offsets=latency_offsets,
execution_time=execution_time,
)
self.value = value
@classmethod
def type_name(cls) -> TypeName:
return TypeName("ser2p")
def evaluate(self, a, b):
s = a + b
val = self.value
return a - val * s, b - (2 - val) * s
@property
def value(self) -> Num:
"""Get the constant value of this operation."""
return self.param("value")
@value.setter
def value(self, value: Num) -> None:
"""Set the constant value of this operation."""
if 0 <= value <= 2:
self.set_param("value", value)
else:
raise ValueError('value must be between 0 and 2 (inclusive)')
def swap_io(self) -> None:
# Swap inputs and outputs and, hence, which port is dependent
self._input_ports.reverse()
for i, p in enumerate(self._input_ports):
p._index = i
self._output_ports.reverse()
for i, p in enumerate(self._output_ports):
p._index = i
self.set_param("value", 2 - self.value)
class SeriesThreeportAdaptor(AbstractOperation):
r"""
Wave digital filter series threeport-adaptor operation.
.. math::
\begin{eqnarray}
y_0 & = & x_0 - \text{value}_0\times\left(x_0 + x_1 + x_2\right)\\
y_1 & = & x_1 - \text{value}_1\times\left(x_0 + x_1 + x_2\right)\\
y_2 & = & x_2 - \left(2 - \text{value}_0 - \text{value}_1\right)\times\left(x_0
+ x_1 + x_2\right)
\end{eqnarray}
"""
is_linear = True
is_swappable = True
def __init__(
self,
value: Tuple[Num, Num] = (0, 0),
src0: Optional[SignalSourceProvider] = None,
src1: Optional[SignalSourceProvider] = None,
src2: Optional[SignalSourceProvider] = None,
name: Name = Name(""),
latency: Optional[int] = None,
latency_offsets: Optional[Dict[str, int]] = None,
execution_time: Optional[int] = None,
):
"""Construct a SeriesThreeportAdaptor operation."""
super().__init__(
input_count=3,
output_count=3,
name=Name(name),
input_sources=[src0, src1, src2],
latency=latency,
latency_offsets=latency_offsets,
execution_time=execution_time,
)
self.value = value
@classmethod
def type_name(cls) -> TypeName:
return TypeName("ser3p")
def evaluate(self, a, b, c):
s = a + b + c
val0, val1 = self.value
return a - val0 * s, b - val1 * s, c - (2 - val0 - val1) * s
@property
def value(self) -> Num:
"""Get the constant value of this operation."""
return self.param("value")
@value.setter
def value(self, value: Num) -> None:
"""Set the constant value of this operation."""
if 0 <= sum(value) <= 2:
self.set_param("value", value)
else:
raise ValueError('sum of value must be between 0 and 2 (inclusive)')
*************************
``b_asic.wdf_operations``
*************************
.. inheritance-diagram:: b_asic.wdf_operations
:parts: 1
:top-classes: b_asic.graph_component.GraphComponent, b_asic.port.SignalSourceProvider
.. automodule:: b_asic.wdf_operations
:members:
:undoc-members:
:show-inheritance:
...@@ -7,10 +7,10 @@ LWDF first-order allpass section ...@@ -7,10 +7,10 @@ LWDF first-order allpass section
This has different latency offsets for the different inputs/outputs. This has different latency offsets for the different inputs/outputs.
""" """
from b_asic.core_operations import SymmetricTwoportAdaptor
from b_asic.schedule import Schedule from b_asic.schedule import Schedule
from b_asic.signal_flow_graph import SFG from b_asic.signal_flow_graph import SFG
from b_asic.special_operations import Delay, Input, Output from b_asic.special_operations import Delay, Input, Output
from b_asic.wdf_operations import SymmetricTwoportAdaptor
in0 = Input() in0 = Input()
......
...@@ -8,12 +8,13 @@ Small bireciprocal lattice wave digital filter. ...@@ -8,12 +8,13 @@ Small bireciprocal lattice wave digital filter.
import numpy as np import numpy as np
from mplsignal.freq_plots import freqz_fir from mplsignal.freq_plots import freqz_fir
from b_asic.core_operations import Addition, SymmetricTwoportAdaptor from b_asic.core_operations import Addition
from b_asic.schedule import Schedule from b_asic.schedule import Schedule
from b_asic.signal_flow_graph import SFG from b_asic.signal_flow_graph import SFG
from b_asic.signal_generator import Impulse from b_asic.signal_generator import Impulse
from b_asic.simulation import Simulation from b_asic.simulation import Simulation
from b_asic.special_operations import Delay, Input, Output from b_asic.special_operations import Delay, Input, Output
from b_asic.wdf_operations import SymmetricTwoportAdaptor
in0 = Input("x") in0 = Input("x")
D0 = Delay(in0) D0 = Delay(in0)
......
...@@ -19,7 +19,6 @@ from b_asic import ( ...@@ -19,7 +19,6 @@ from b_asic import (
Shift, Shift,
SquareRoot, SquareRoot,
Subtraction, Subtraction,
SymmetricTwoportAdaptor,
) )
...@@ -346,39 +345,6 @@ class TestButterfly: ...@@ -346,39 +345,6 @@ class TestButterfly:
assert test_operation.evaluate_output(1, [2 + 1j, 3 - 2j]) == -1 + 3j assert test_operation.evaluate_output(1, [2 + 1j, 3 - 2j]) == -1 + 3j
class TestSymmetricTwoportAdaptor:
"""Tests for SymmetricTwoportAdaptor class."""
def test_symmetrictwoportadaptor_positive(self):
test_operation = SymmetricTwoportAdaptor(0.5)
assert test_operation.evaluate_output(0, [2, 3]) == 3.5
assert test_operation.evaluate_output(1, [2, 3]) == 2.5
assert test_operation.value == 0.5
def test_symmetrictwoportadaptor_negative(self):
test_operation = SymmetricTwoportAdaptor(0.5)
assert test_operation.evaluate_output(0, [-2, -3]) == -3.5
assert test_operation.evaluate_output(1, [-2, -3]) == -2.5
def test_symmetrictwoportadaptor_complex(self):
test_operation = SymmetricTwoportAdaptor(0.5)
assert test_operation.evaluate_output(0, [2 + 1j, 3 - 2j]) == 3.5 - 3.5j
assert test_operation.evaluate_output(1, [2 + 1j, 3 - 2j]) == 2.5 - 0.5j
def test_symmetrictwoportadaptor_swap_io(self):
test_operation = SymmetricTwoportAdaptor(0.5)
assert test_operation.value == 0.5
test_operation.swap_io()
assert test_operation.value == -0.5
def test_symmetrictwoportadaptor_error(self):
with pytest.raises(ValueError, match="value must be between -1 and 1"):
_ = SymmetricTwoportAdaptor(-2)
test_operation = SymmetricTwoportAdaptor(0)
with pytest.raises(ValueError, match="value must be between -1 and 1"):
test_operation.value = 2
class TestReciprocal: class TestReciprocal:
"""Tests for Absolute class.""" """Tests for Absolute class."""
......
...@@ -18,14 +18,14 @@ from b_asic.core_operations import ( ...@@ -18,14 +18,14 @@ from b_asic.core_operations import (
Multiplication, Multiplication,
SquareRoot, SquareRoot,
Subtraction, Subtraction,
SymmetricTwoportAdaptor,
) )
from b_asic.operation import ResultKey from b_asic.operation import ResultKey
from b_asic.save_load_structure import python_to_sfg, sfg_to_python from b_asic.save_load_structure import python_to_sfg, sfg_to_python
from b_asic.sfg_generators import wdf_allpass
from b_asic.signal_flow_graph import SFG, GraphID from b_asic.signal_flow_graph import SFG, GraphID
from b_asic.simulation import Simulation from b_asic.simulation import Simulation
from b_asic.special_operations import Delay from b_asic.special_operations import Delay
from b_asic.sfg_generators import wdf_allpass from b_asic.wdf_operations import SymmetricTwoportAdaptor
class TestInit: class TestInit:
...@@ -816,7 +816,7 @@ class TestConnectExternalSignalsToComponentsSoloComp: ...@@ -816,7 +816,7 @@ class TestConnectExternalSignalsToComponentsSoloComp:
assert not test_sfg.connect_external_signals_to_components() assert not test_sfg.connect_external_signals_to_components()
def test_connect_external_signals_to_components_multiple_operations_after_input( def test_connect_external_signals_to_components_multiple_operations_after_input(
self self,
): ):
""" """
Replaces an SFG with a symmetric two-port adaptor to test when the input Replaces an SFG with a symmetric two-port adaptor to test when the input
...@@ -830,6 +830,7 @@ class TestConnectExternalSignalsToComponentsSoloComp: ...@@ -830,6 +830,7 @@ class TestConnectExternalSignalsToComponentsSoloComp:
assert test_sfg.evaluate(1) == -0.5 assert test_sfg.evaluate(1) == -0.5
assert not test_sfg.connect_external_signals_to_components() assert not test_sfg.connect_external_signals_to_components()
class TestConnectExternalSignalsToComponentsMultipleComp: class TestConnectExternalSignalsToComponentsMultipleComp:
def test_connect_external_signals_to_components_operation_tree( def test_connect_external_signals_to_components_operation_tree(
self, operation_tree self, operation_tree
...@@ -1480,9 +1481,7 @@ class TestUnfold: ...@@ -1480,9 +1481,7 @@ class TestUnfold:
): ):
self.do_tests(sfg_two_inputs_two_outputs_independent) self.do_tests(sfg_two_inputs_two_outputs_independent)
def test_threetapiir( def test_threetapiir(self, sfg_direct_form_iir_lp_filter: SFG):
self, sfg_direct_form_iir_lp_filter: SFG
):
self.do_tests(sfg_direct_form_iir_lp_filter) self.do_tests(sfg_direct_form_iir_lp_filter)
def do_tests(self, sfg: SFG): def do_tests(self, sfg: SFG):
...@@ -1635,6 +1634,7 @@ class TestInsertComponentAfter: ...@@ -1635,6 +1634,7 @@ class TestInsertComponentAfter:
with pytest.raises(ValueError, match="Unknown component:"): with pytest.raises(ValueError, match="Unknown component:"):
sfg.insert_operation_after('foo', SquareRoot()) sfg.insert_operation_after('foo', SquareRoot())
class TestInsertComponentBefore: class TestInsertComponentBefore:
def test_insert_component_before_in_sfg(self, butterfly_operation_tree): def test_insert_component_before_in_sfg(self, butterfly_operation_tree):
sfg = SFG(outputs=list(map(Output, butterfly_operation_tree.outputs))) sfg = SFG(outputs=list(map(Output, butterfly_operation_tree.outputs)))
...@@ -1653,22 +1653,22 @@ class TestInsertComponentBefore: ...@@ -1653,22 +1653,22 @@ class TestInsertComponentBefore:
SquareRoot, SquareRoot,
) )
assert isinstance( assert isinstance(
_sfg.find_by_name("bfly1")[0] _sfg.find_by_name("bfly1")[0].input(0).signals[0].source.operation,
.input(0)
.signals[0]
.source.operation,
SquareRoot, SquareRoot,
) )
assert sfg.find_by_name("bfly1")[0].input(0).signals[ assert (
0 sfg.find_by_name("bfly1")[0].input(0).signals[0].source.operation
].source.operation is sfg.find_by_name("bfly2")[0] is sfg.find_by_name("bfly2")[0]
assert _sfg.find_by_name("bfly1")[0].input(0).signals[ )
0 assert (
].destination.operation is not _sfg.find_by_name("bfly2")[0] _sfg.find_by_name("bfly1")[0].input(0).signals[0].destination.operation
assert _sfg.find_by_id("sqrt0").input(0).signals[ is not _sfg.find_by_name("bfly2")[0]
0 )
].source.operation is _sfg.find_by_name("bfly2")[0] assert (
_sfg.find_by_id("sqrt0").input(0).signals[0].source.operation
is _sfg.find_by_name("bfly2")[0]
)
def test_insert_component_before_mimo_operation_error( def test_insert_component_before_mimo_operation_error(
self, large_operation_tree_names self, large_operation_tree_names
......
import numpy as np import numpy as np
import pytest import pytest
from b_asic.core_operations import ( from b_asic.core_operations import Addition, ConstantMultiplication
Addition,
ConstantMultiplication,
SymmetricTwoportAdaptor,
)
from b_asic.sfg_generators import ( from b_asic.sfg_generators import (
direct_form_fir, direct_form_fir,
transposed_direct_form_fir, transposed_direct_form_fir,
...@@ -14,6 +10,7 @@ from b_asic.sfg_generators import ( ...@@ -14,6 +10,7 @@ from b_asic.sfg_generators import (
from b_asic.signal_generator import Impulse from b_asic.signal_generator import Impulse
from b_asic.simulation import Simulation from b_asic.simulation import Simulation
from b_asic.special_operations import Delay from b_asic.special_operations import Delay
from b_asic.wdf_operations import SymmetricTwoportAdaptor
def test_wdf_allpass(): def test_wdf_allpass():
......
"""B-ASIC test suite for the core operations."""
import pytest
from b_asic.wdf_operations import (
SeriesThreeportAdaptor,
SeriesTwoportAdaptor,
SymmetricTwoportAdaptor,
)
class TestSymmetricTwoportAdaptor:
"""Tests for SymmetricTwoportAdaptor class."""
def test_symmetrictwoportadaptor_positive(self):
test_operation = SymmetricTwoportAdaptor(0.5)
assert test_operation.evaluate_output(0, [2, 3]) == 3.5
assert test_operation.evaluate_output(1, [2, 3]) == 2.5
assert test_operation.value == 0.5
def test_symmetrictwoportadaptor_negative(self):
test_operation = SymmetricTwoportAdaptor(0.5)
assert test_operation.evaluate_output(0, [-2, -3]) == -3.5
assert test_operation.evaluate_output(1, [-2, -3]) == -2.5
def test_symmetrictwoportadaptor_complex(self):
test_operation = SymmetricTwoportAdaptor(0.5)
assert test_operation.evaluate_output(0, [2 + 1j, 3 - 2j]) == 3.5 - 3.5j
assert test_operation.evaluate_output(1, [2 + 1j, 3 - 2j]) == 2.5 - 0.5j
def test_symmetrictwoportadaptor_swap_io(self):
test_operation = SymmetricTwoportAdaptor(0.5)
assert test_operation.value == 0.5
test_operation.swap_io()
assert test_operation.value == -0.5
def test_symmetrictwoportadaptor_error(self):
with pytest.raises(ValueError, match="value must be between -1 and 1"):
_ = SymmetricTwoportAdaptor(-2)
test_operation = SymmetricTwoportAdaptor(0)
with pytest.raises(ValueError, match="value must be between -1 and 1"):
test_operation.value = 2
class TestSeriesTwoportAdaptor:
"""Tests for SeriesTwoportAdaptor class."""
def test_seriestwoportadaptor_positive(self):
test_operation = SeriesTwoportAdaptor(0.5)
assert test_operation.evaluate_output(0, [2, 3]) == -0.5
assert test_operation.evaluate_output(1, [2, 3]) == -4.5
assert test_operation.value == 0.5
def test_seriestwoportadaptor_negative(self):
test_operation = SeriesTwoportAdaptor(0.5)
assert test_operation.evaluate_output(0, [-2, -3]) == 0.5
assert test_operation.evaluate_output(1, [-2, -3]) == 4.5
def test_seriestwoportadaptor_complex(self):
test_operation = SeriesTwoportAdaptor(0.5)
assert test_operation.evaluate_output(0, [2 + 1j, 3 - 2j]) == -0.5 + 1.5j
assert test_operation.evaluate_output(1, [2 + 1j, 3 - 2j]) == -4.5 - 0.5j
def test_seriestwoportadaptor_swap_io(self):
test_operation = SeriesTwoportAdaptor(0.5)
assert test_operation.value == 0.5
test_operation.swap_io()
assert test_operation.value == 1.5
def test_seriestwoportadaptor_error(self):
with pytest.raises(ValueError, match="value must be between 0 and 2"):
_ = SeriesTwoportAdaptor(-1)
test_operation = SeriesTwoportAdaptor(0)
with pytest.raises(ValueError, match="value must be between 0 and 2"):
test_operation.value = 3
class TestSeriesThreeportAdaptor:
"""Tests for SeriesThreeportAdaptor class."""
def test_seriesthreeportadaptor_positive(self):
test_operation = SeriesThreeportAdaptor((0.5, 1.25))
assert test_operation.evaluate_output(0, [2, 3, 4]) == -2.5
assert test_operation.evaluate_output(1, [2, 3, 4]) == -8.25
assert test_operation.evaluate_output(2, [2, 3, 4]) == 1.75
assert test_operation.value == (0.5, 1.25)
def test_seriesthreeportadaptor_negative(self):
test_operation = SeriesThreeportAdaptor((0.5, 1.25))
assert test_operation.evaluate_output(0, [-2, -3, -4]) == 2.5
assert test_operation.evaluate_output(1, [-2, -3, -4]) == 8.25
assert test_operation.evaluate_output(2, [-2, -3, -4]) == -1.75
def test_seriesthreeportadaptor_complex(self):
test_operation = SeriesThreeportAdaptor((0.5, 1.25))
assert test_operation.evaluate_output(0, [2 + 1j, 3 - 2j, 4 + 3j]) == -2.5 + 0j
assert (
test_operation.evaluate_output(1, [2 + 1j, 3 - 2j, 4 + 3j]) == -8.25 - 4.5j
)
assert (
test_operation.evaluate_output(2, [2 + 1j, 3 - 2j, 4 + 3j]) == 1.75 + 2.5j
)
def test_seriestwoportadaptor_error(self):
with pytest.raises(ValueError, match="sum of value must be between 0 and 2"):
_ = SeriesThreeportAdaptor((0, 3))
test_operation = SeriesThreeportAdaptor((0, 0.5))
with pytest.raises(ValueError, match="sum of value must be between 0 and 2"):
test_operation.value = (-0.5, 4)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment