Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • da/B-ASIC
  • lukja239/B-ASIC
  • robal695/B-ASIC
3 results
Show changes
Commits on Source (2)
......@@ -116,6 +116,7 @@ TODO.txt
b_asic/_version.py
docs_sphinx/_build/
docs_sphinx/examples
docs_sphinx/sg_execution_times.rst
result_images/
.coverage
Digraph.gv
......
......@@ -77,6 +77,37 @@ class Constant(AbstractOperation):
def __str__(self) -> str:
return f"{self.value}"
def get_plot_coordinates(
self,
) -> tuple[tuple[tuple[float, float], ...], tuple[tuple[float, float], ...]]:
# Doc-string inherited
return (
(
(-0.5, 0),
(-0.5, 1),
(-0.25, 1),
(0, 0.5),
(-0.25, 0),
(-0.5, 0),
),
(
(-0.5, 0),
(-0.5, 1),
(-0.25, 1),
(0, 0.5),
(-0.25, 0),
(-0.5, 0),
),
)
def get_input_coordinates(self) -> tuple[tuple[float, float], ...]:
# doc-string inherited
return tuple()
def get_output_coordinates(self) -> tuple[tuple[float, float], ...]:
# doc-string inherited
return ((0, 0.5),)
class Addition(AbstractOperation):
"""
......@@ -1099,6 +1130,108 @@ class MAD(AbstractOperation):
p._index = i
class MADS(AbstractOperation):
__slots__ = (
"_is_add",
"_override_zero_on_src0",
"_src0",
"_src1",
"_src2",
"_name",
"_latency",
"_latency_offsets",
"_execution_time",
)
_is_add: Optional[bool]
_override_zero_on_src0: Optional[bool]
_src0: Optional[SignalSourceProvider]
_src1: Optional[SignalSourceProvider]
_src2: Optional[SignalSourceProvider]
_name: Name
_latency: Optional[int]
_latency_offsets: Optional[Dict[str, int]]
_execution_time: Optional[int]
is_swappable = True
def __init__(
self,
is_add: Optional[bool] = True,
override_zero_on_src0: Optional[bool] = False,
src0: Optional[SignalSourceProvider] = None,
src1: Optional[SignalSourceProvider] = None,
src2: Optional[SignalSourceProvider] = None,
name: Name = Name(""),
latency: Optional[int] = None,
latency_offsets: Optional[Dict[str, int]] = None,
execution_time: Optional[int] = None,
):
"""Construct a MADS operation."""
super().__init__(
input_count=3,
output_count=1,
name=Name(name),
input_sources=[src0, src1, src2],
latency=latency,
latency_offsets=latency_offsets,
execution_time=execution_time,
)
self.set_param("is_add", is_add)
self.set_param("override_zero_on_src0", override_zero_on_src0)
@classmethod
def type_name(cls) -> TypeName:
return TypeName("mads")
def evaluate(self, a, b, c):
if self.is_add:
if self.override_zero_on_src0:
return b * c
else:
return a + b * c
else:
if self.override_zero_on_src0:
return -b * c
else:
return a - b * c
@property
def is_add(self) -> bool:
"""Get if operation is an addition."""
return self.param("is_add")
@is_add.setter
def is_add(self, is_add: bool) -> None:
"""Set if operation is an addition."""
self.set_param("is_add", is_add)
@property
def override_zero_on_src0(self) -> bool:
"""Get if operation is overriding a zero on port src0."""
return self.param("override_zero_on_src0")
@override_zero_on_src0.setter
def override_zero_on_src0(self, override_zero_on_src0: bool) -> None:
"""Set if operation is overriding a zero on port src0."""
self.set_param("override_zero_on_src0", override_zero_on_src0)
@property
def is_linear(self) -> bool:
return (
self.input(1).connected_source.operation.is_constant
or self.input(2).connected_source.operation.is_constant
)
def swap_io(self) -> None:
self._input_ports = [
self._input_ports[0],
self._input_ports[2],
self._input_ports[1],
]
for i, p in enumerate(self._input_ports):
p._index = i
class SymmetricTwoportAdaptor(AbstractOperation):
r"""
Wave digital filter symmetric twoport-adaptor operation.
......@@ -1519,6 +1652,83 @@ class Shift(AbstractOperation):
self.set_param("value", value)
class DontCare(AbstractOperation):
r"""
Dont-care operation
Used for ignoring the input to another operation and thus avoiding dangling input nodes.
Parameters
----------
name : Name, optional
Operation name.
"""
__slots__ = "_name"
_name: Name
is_linear = True
def __init__(self, name: Name = ""):
"""Construct a DontCare operation."""
super().__init__(
input_count=0,
output_count=1,
name=name,
latency_offsets={"out0": 0},
execution_time=0,
)
@classmethod
def type_name(cls) -> TypeName:
return TypeName("dontcare")
def evaluate(self):
return 0
@property
def latency(self) -> int:
return self.latency_offsets["out0"]
def __repr__(self) -> str:
return "DontCare()"
def __str__(self) -> str:
return "dontcare"
def get_plot_coordinates(
self,
) -> tuple[tuple[tuple[float, float], ...], tuple[tuple[float, float], ...]]:
# Doc-string inherited
return (
(
(-0.5, 0),
(-0.5, 1),
(-0.25, 1),
(0, 0.5),
(-0.25, 0),
(-0.5, 0),
),
(
(-0.5, 0),
(-0.5, 1),
(-0.25, 1),
(0, 0.5),
(-0.25, 0),
(-0.5, 0),
),
)
def get_input_coordinates(self) -> tuple[tuple[float, float], ...]:
# doc-string inherited
return tuple()
def get_output_coordinates(self) -> tuple[tuple[float, float], ...]:
# doc-string inherited
return ((0, 0.5),)
class Sink(AbstractOperation):
r"""
Sink operation.
......@@ -1544,6 +1754,7 @@ class Sink(AbstractOperation):
output_count=0,
name=name,
latency_offsets={"in0": 0},
execution_time=0,
)
@classmethod
......@@ -1562,3 +1773,20 @@ class Sink(AbstractOperation):
def __str__(self) -> str:
return "sink"
def get_plot_coordinates(
self,
) -> tuple[tuple[tuple[float, float], ...], tuple[tuple[float, float], ...]]:
# Doc-string inherited
return (
((0, 0), (0, 1), (0.25, 1), (0.5, 0.5), (0.25, 0), (0, 0)),
((0, 0), (0, 1), (0.25, 1), (0.5, 0.5), (0.25, 0), (0, 0)),
)
def get_input_coordinates(self) -> tuple[tuple[float, float], ...]:
# doc-string inherited
return ((0, 0.5),)
def get_output_coordinates(self) -> tuple[tuple[float, float], ...]:
# doc-string inherited
return tuple()
......@@ -119,6 +119,84 @@ class EarliestDeadlineScheduler(ListScheduler):
@staticmethod
def _get_sorted_operations(schedule: "Schedule") -> list["GraphID"]:
schedule_copy = copy.deepcopy(schedule)
schedule_copy = copy.copy(schedule)
ALAPScheduler().apply_scheduling(schedule_copy)
deadlines = {}
for op_id, start_time in schedule_copy.start_times.items():
deadlines[op_id] = start_time + schedule.sfg.find_by_id(op_id).latency
return sorted(deadlines, key=deadlines.get)
class LeastSlackTimeScheduler(ListScheduler):
"""Scheduler that implements the least slack time first algorithm."""
@staticmethod
def _get_sorted_operations(schedule: "Schedule") -> list["GraphID"]:
schedule_copy = copy.copy(schedule)
ALAPScheduler().apply_scheduling(schedule_copy)
return sorted(schedule_copy.start_times, key=schedule_copy.start_times.get)
class MaxFanOutScheduler(ListScheduler):
"""Scheduler that implements the maximum fan-out algorithm."""
@staticmethod
def _get_sorted_operations(schedule: "Schedule") -> list["GraphID"]:
schedule_copy = copy.copy(schedule)
ALAPScheduler().apply_scheduling(schedule_copy)
fan_outs = {}
for op_id, start_time in schedule_copy.start_times.items():
fan_outs[op_id] = len(schedule.sfg.find_by_id(op_id).output_signals)
return sorted(fan_outs, key=fan_outs.get, reverse=True)
class HybridScheduler(ListScheduler):
"""Scheduler that implements a hybrid algorithm. Will receive a new name once finalized."""
@staticmethod
def _get_sorted_operations(schedule: "Schedule") -> list["GraphID"]:
# sort least-slack and then resort ties according to max-fan-out
schedule_copy = copy.copy(schedule)
ALAPScheduler().apply_scheduling(schedule_copy)
sorted_items = sorted(
schedule_copy.start_times.items(), key=lambda item: item[1]
)
fan_out_sorted_items = []
last_value = sorted_items[0][0]
current_group = []
for key, value in sorted_items:
if value != last_value:
# the group is completed, sort it internally
sorted_group = sorted(
current_group,
key=lambda pair: len(
schedule.sfg.find_by_id(pair[0]).output_signals
),
reverse=True,
)
fan_out_sorted_items += sorted_group
current_group = []
current_group.append((key, value))
last_value = value
sorted_group = sorted(
current_group,
key=lambda pair: len(schedule.sfg.find_by_id(pair[0]).output_signals),
reverse=True,
)
fan_out_sorted_items += sorted_group
sorted_op_list = [pair[0] for pair in fan_out_sorted_items]
return sorted_op_list
......@@ -59,7 +59,7 @@ class AboutWindow(QDialog):
" href=\"https://gitlab.liu.se/da/B-ASIC/-/blob/master/LICENSE\">"
"MIT-license</a>"
" and any extension to the program should follow that same"
f" license.\n\n*Version: {__version__}*\n\nCopyright 2020-2023,"
f" license.\n\n*Version: {__version__}*\n\nCopyright 2020-2025,"
" Oscar Gustafsson et al."
)
label1.setTextFormat(Qt.MarkdownText)
......
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Optional, cast
from b_asic.core_operations import DontCare
from b_asic.port import OutputPort
from b_asic.special_operations import Delay, Input, Output
from b_asic.types import TypeName
......@@ -45,6 +46,15 @@ class Scheduler(ABC):
class ListScheduler(Scheduler, ABC):
def __init__(self, max_resources: Optional[dict[TypeName, int]] = None) -> None:
if max_resources:
if not isinstance(max_resources, dict):
raise ValueError("max_resources must be a dictionary.")
for key, value in max_resources.items():
if not isinstance(key, str):
raise ValueError("max_resources key must be a valid type_name.")
if not isinstance(value, int):
raise ValueError("max_resources value must be an integer.")
if max_resources:
self._max_resources = max_resources
else:
......@@ -114,11 +124,16 @@ class ListScheduler(Scheduler, ABC):
self._handle_outputs(schedule)
schedule.remove_delays()
# move all inputs and outputs ALAP now that operations have moved
# move all inputs ALAP now that operations have moved
for input_op in schedule.sfg.find_by_type_name(Input.type_name()):
input_op = cast(Input, input_op)
schedule.move_operation_alap(input_op.graph_id)
# move all dont cares ALAP
for dc_op in schedule.sfg.find_by_type_name(DontCare.type_name()):
dc_op = cast(DontCare, dc_op)
schedule.move_operation_alap(dc_op.graph_id)
@staticmethod
def _candidate_is_schedulable(
start_times: dict["GraphID"],
......
......@@ -9,10 +9,13 @@ from typing import TYPE_CHECKING, Dict, Optional, Sequence, Union
import numpy as np
from b_asic.core_operations import (
MADS,
Addition,
Butterfly,
ConstantMultiplication,
DontCare,
Name,
Reciprocal,
SymmetricTwoportAdaptor,
)
from b_asic.signal import Signal
......@@ -432,6 +435,82 @@ def radix_2_dif_fft(points: int) -> SFG:
return SFG(inputs=inputs, outputs=outputs)
def ldlt_matrix_inverse(N: int) -> SFG:
"""Generates an SFG for the LDLT matrix inverse algorithm.
Parameters
----------
N : int
Dimension of the square input matrix.
Returns
-------
SFG
Signal Flow Graph
"""
inputs = []
A = [[None for _ in range(N)] for _ in range(N)]
for i in range(N):
for j in range(i, N):
in_op = Input()
A[i][j] = in_op
inputs.append(in_op)
D = [None for _ in range(N)]
for i in range(N):
D[i] = A[i][i]
D_inv = [None for _ in range(N)]
R = [[None for _ in range(N)] for _ in range(N)]
M = [[None for _ in range(N)] for _ in range(N)]
# R*di*R^T factorization
for i in range(N):
for k in range(i):
D[i] = MADS(False, False, D[i], M[k][i], R[k][i])
D_inv[i] = Reciprocal(D[i])
for j in range(i + 1, N):
R[i][j] = A[i][j]
for k in range(i):
R[i][j] = MADS(False, False, R[i][j], M[k][i], R[k][j])
# if is_complex:
# M[i][j] = ComplexConjugate(R[i][j])
# else:
M[i][j] = R[i][j]
R[i][j] = MADS(True, True, DontCare(), R[i][j], D_inv[i])
# back substitution
A_inv = [[None for _ in range(N)] for _ in range(N)]
for i in reversed(range(N)):
A_inv[i][i] = D_inv[i]
for j in reversed(range(i + 1)):
for k in reversed(range(j + 1, N)):
if k == N - 1 and i != j:
A_inv[j][i] = MADS(False, True, DontCare(), R[j][k], A_inv[i][k])
else:
if A_inv[i][k]:
A_inv[j][i] = MADS(
False, False, A_inv[j][i], R[j][k], A_inv[i][k]
)
else:
A_inv[j][i] = MADS(
False, False, A_inv[j][i], R[j][k], A_inv[k][i]
)
outputs = []
for i in range(N):
for j in range(i, N):
outputs.append(Output(A_inv[i][j]))
return SFG(inputs, outputs)
def _construct_dif_fft_stage(
ports_from_previous_stage: list["OutputPort"],
number_of_stages: int,
......
......@@ -9,7 +9,7 @@
import shutil
project = 'B-ASIC'
copyright = '2020-2023, Oscar Gustafsson et al'
copyright = '2020-2025, Oscar Gustafsson et al'
author = 'Oscar Gustafsson et al'
html_logo = "../logos/logo_tiny.png"
......
"""
=========================================
LDLT Matrix Inversion Algorithm
=========================================
"""
from b_asic.architecture import Architecture, Memory, ProcessingElement
from b_asic.core_operations import MADS, DontCare, Reciprocal
from b_asic.core_schedulers import (
ALAPScheduler,
ASAPScheduler,
EarliestDeadlineScheduler,
HybridScheduler,
LeastSlackTimeScheduler,
MaxFanOutScheduler,
)
from b_asic.schedule import Schedule
from b_asic.sfg_generators import ldlt_matrix_inverse
from b_asic.special_operations import Input, Output
sfg = ldlt_matrix_inverse(N=3)
# %%
# The SFG is
sfg
# %%
# Set latencies and execution times.
sfg.set_latency_of_type(MADS.type_name(), 3)
sfg.set_latency_of_type(Reciprocal.type_name(), 2)
sfg.set_execution_time_of_type(MADS.type_name(), 1)
sfg.set_execution_time_of_type(Reciprocal.type_name(), 1)
# %%
# Create an ASAP schedule.
schedule = Schedule(sfg, scheduler=ASAPScheduler())
print("Scheduling time:", schedule.schedule_time)
schedule.show()
# %%
# Create an ALAP schedule.
schedule = Schedule(sfg, scheduler=ALAPScheduler())
print("Scheduling time:", schedule.schedule_time)
schedule.show()
# %%
# Create an EarliestDeadline schedule that satisfies the resource constraints.
resources = {MADS.type_name(): 1, Reciprocal.type_name(): 1}
schedule = Schedule(sfg, scheduler=EarliestDeadlineScheduler(resources))
print("Scheduling time:", schedule.schedule_time)
schedule.show()
# %%
# Create a LeastSlackTime schedule that satisfies the resource constraints.
schedule = Schedule(sfg, scheduler=LeastSlackTimeScheduler(resources))
print("Scheduling time:", schedule.schedule_time)
schedule.show()
# %%
# Create a MaxFanOutScheduler schedule that satisfies the resource constraints.
schedule = Schedule(sfg, scheduler=MaxFanOutScheduler(resources))
print("Scheduling time:", schedule.schedule_time)
schedule.show()
# %%
# Create a HybridScheduler schedule that satisfies the resource constraints.
schedule = Schedule(sfg, scheduler=HybridScheduler(resources))
print("Scheduling time:", schedule.schedule_time)
schedule.show()
# %%
operations = schedule.get_operations()
mads = operations.get_by_type_name(MADS.type_name())
mads.show(title="MADS executions")
reciprocals = operations.get_by_type_name(Reciprocal.type_name())
reciprocals.show(title="Reciprocal executions")
dont_cares = operations.get_by_type_name(DontCare.type_name())
dont_cares.show(title="Dont-care executions")
inputs = operations.get_by_type_name(Input.type_name())
inputs.show(title="Input executions")
outputs = operations.get_by_type_name(Output.type_name())
outputs.show(title="Output executions")
mads_pe = ProcessingElement(mads, entity_name="mad")
reciprocal_pe = ProcessingElement(reciprocals, entity_name="rec")
dont_care_pe = ProcessingElement(dont_cares, entity_name="dc")
pe_in = ProcessingElement(inputs, entity_name='input')
pe_out = ProcessingElement(outputs, entity_name='output')
mem_vars = schedule.get_memory_variables()
mem_vars.show(title="All memory variables")
direct, mem_vars = mem_vars.split_on_length()
mem_vars.show(title="Non-zero time memory variables")
mem_vars_set = mem_vars.split_on_ports(read_ports=1, write_ports=1, total_ports=2)
# %%
memories = []
for i, mem in enumerate(mem_vars_set):
memory = Memory(mem, memory_type="RAM", entity_name=f"memory{i}")
memories.append(memory)
mem.show(title=f"{memory.entity_name}")
memory.assign("left_edge")
memory.show_content(title=f"Assigned {memory.entity_name}")
direct.show(title="Direct interconnects")
# %%
arch = Architecture(
{mads_pe, reciprocal_pe, dont_care_pe, pe_in, pe_out},
memories,
direct_interconnects=direct,
)
# %%
arch
# schedule.edit()
......@@ -3,6 +3,9 @@
import pytest
from b_asic import (
MAD,
MADS,
SFG,
Absolute,
Addition,
AddSub,
......@@ -11,10 +14,13 @@ from b_asic import (
Constant,
ConstantMultiplication,
Division,
DontCare,
Input,
LeftShift,
Max,
Min,
Multiplication,
Output,
Reciprocal,
RightShift,
Shift,
......@@ -47,6 +53,14 @@ class TestConstant:
test_operation.value = 4
assert test_operation.value == 4
def test_constant_repr(self):
test_operation = Constant(3)
assert repr(test_operation) == "Constant(3)"
def test_constant_str(self):
test_operation = Constant(3)
assert str(test_operation) == "3"
class TestAddition:
"""Tests for Addition class."""
......@@ -83,16 +97,16 @@ class TestSubtraction:
class TestAddSub:
"""Tests for AddSub class."""
def test_addition_positive(self):
def test_addsub_positive(self):
test_operation = AddSub(is_add=True)
assert test_operation.evaluate_output(0, [3, 5]) == 8
def test_addition_negative(self):
def test_addsub_negative(self):
test_operation = AddSub(is_add=True)
assert test_operation.evaluate_output(0, [-3, -5]) == -8
assert test_operation.is_add
def test_addition_complex(self):
def test_addsub_complex(self):
test_operation = AddSub(is_add=True)
assert test_operation.evaluate_output(0, [3 + 5j, 4 + 6j]) == 7 + 11j
......@@ -116,6 +130,22 @@ class TestAddSub:
test_operation = AddSub(is_add=True)
assert test_operation.is_swappable
def test_addsub_is_add_getter(self):
test_operation = AddSub(is_add=False)
assert not test_operation.is_add
test_operation = AddSub(is_add=True)
assert test_operation.is_add
def test_addsub_is_add_setter(self):
test_operation = AddSub(is_add=False)
test_operation.is_add = True
assert test_operation.is_add
test_operation = AddSub(is_add=True)
test_operation.is_add = False
assert not test_operation.is_add
class TestMultiplication:
"""Tests for Multiplication class."""
......@@ -148,6 +178,13 @@ class TestDivision:
test_operation = Division()
assert test_operation.evaluate_output(0, [60 + 40j, 10 + 20j]) == 2.8 - 1.6j
def test_mads_is_linear(self):
test_operation = Division(Constant(3), Addition(Input(), Constant(3)))
assert not test_operation.is_linear
test_operation = Division(Addition(Input(), Constant(3)), Constant(3))
assert test_operation.is_linear
class TestSquareRoot:
"""Tests for SquareRoot class."""
......@@ -200,6 +237,13 @@ class TestMin:
test_operation = Min()
assert test_operation.evaluate_output(0, [-30, -5]) == -30
def test_min_complex(self):
test_operation = Min()
with pytest.raises(
ValueError, match="core_operations.Min does not support complex numbers."
):
test_operation.evaluate_output(0, [-1 - 1j, 2 + 2j])
class TestAbsolute:
"""Tests for Absolute class."""
......@@ -216,6 +260,13 @@ class TestAbsolute:
test_operation = Absolute()
assert test_operation.evaluate_output(0, [3 + 4j]) == 5.0
def test_max_complex(self):
test_operation = Max()
with pytest.raises(
ValueError, match="core_operations.Max does not support complex numbers."
):
test_operation.evaluate_output(0, [-1 - 1j, 2 + 2j])
class TestConstantMultiplication:
"""Tests for ConstantMultiplication class."""
......@@ -234,6 +285,136 @@ class TestConstantMultiplication:
assert test_operation.evaluate_output(0, [3 + 4j]) == 1 + 18j
class TestMAD:
def test_mad_positive(self):
test_operation = MAD()
assert test_operation.evaluate_output(0, [1, 2, 3]) == 5
def test_mad_negative(self):
test_operation = MAD()
assert test_operation.evaluate_output(0, [-3, -5, -8]) == 7
def test_mad_complex(self):
test_operation = MAD()
assert test_operation.evaluate_output(0, [3 + 6j, 2 + 6j, 1 + 1j]) == -29 + 31j
def test_mad_is_linear(self):
test_operation = MAD(
Constant(3), Addition(Input(), Constant(3)), Addition(Input(), Constant(3))
)
assert test_operation.is_linear
test_operation = MAD(
Addition(Input(), Constant(3)), Constant(3), Addition(Input(), Constant(3))
)
assert test_operation.is_linear
test_operation = MAD(
Addition(Input(), Constant(3)), Addition(Input(), Constant(3)), Constant(3)
)
assert not test_operation.is_linear
def test_mad_swap_io(self):
test_operation = MAD()
assert test_operation.evaluate_output(0, [1, 2, 3]) == 5
test_operation.swap_io()
assert test_operation.evaluate_output(0, [1, 2, 3]) == 5
class TestMADS:
def test_mads_positive(self):
test_operation = MADS(is_add=False)
assert test_operation.evaluate_output(0, [1, 2, 3]) == -5
def test_mads_negative(self):
test_operation = MADS(is_add=False)
assert test_operation.evaluate_output(0, [-3, -5, -8]) == -43
def test_mads_complex(self):
test_operation = MADS(is_add=False)
assert test_operation.evaluate_output(0, [3 + 6j, 2 + 6j, 1 + 1j]) == 7 - 2j
def test_mads_positive_add(self):
test_operation = MADS(is_add=True)
assert test_operation.evaluate_output(0, [1, 2, 3]) == 7
def test_mads_negative_add(self):
test_operation = MADS(is_add=True)
assert test_operation.evaluate_output(0, [-3, -5, -8]) == 37
def test_mads_complex_add(self):
test_operation = MADS(is_add=True)
assert test_operation.evaluate_output(0, [3 + 6j, 2 + 6j, 1 + 1j]) == -1 + 14j
def test_mads_zero_override(self):
test_operation = MADS(is_add=True, override_zero_on_src0=True)
assert test_operation.evaluate_output(0, [1, 1, 1]) == 1
def test_mads_sub_zero_override(self):
test_operation = MADS(is_add=False, override_zero_on_src0=True)
assert test_operation.evaluate_output(0, [1, 1, 1]) == -1
def test_mads_is_linear(self):
test_operation = MADS(
src0=Constant(3),
src1=Addition(Input(), Constant(3)),
src2=Addition(Input(), Constant(3)),
)
assert not test_operation.is_linear
test_operation = MADS(
src0=Addition(Input(), Constant(3)),
src1=Constant(3),
src2=Addition(Input(), Constant(3)),
)
assert test_operation.is_linear
test_operation = MADS(
src0=Addition(Input(), Constant(3)),
src1=Addition(Input(), Constant(3)),
src2=Constant(3),
)
assert test_operation.is_linear
def test_mads_swap_io(self):
test_operation = MADS(is_add=False)
assert test_operation.evaluate_output(0, [1, 2, 3]) == -5
test_operation.swap_io()
assert test_operation.evaluate_output(0, [1, 2, 3]) == -5
def test_mads_is_add_getter(self):
test_operation = MADS(is_add=False)
assert not test_operation.is_add
test_operation = MADS(is_add=True)
assert test_operation.is_add
def test_mads_is_add_setter(self):
test_operation = MADS(is_add=False)
test_operation.is_add = True
assert test_operation.is_add
test_operation = MADS(is_add=True)
test_operation.is_add = False
assert not test_operation.is_add
def test_mads_override_zero_on_src0_getter(self):
test_operation = MADS(override_zero_on_src0=False)
assert not test_operation.override_zero_on_src0
test_operation = MADS(override_zero_on_src0=True)
assert test_operation.override_zero_on_src0
def test_mads_override_zero_on_src0_setter(self):
test_operation = MADS(override_zero_on_src0=False)
test_operation.override_zero_on_src0 = True
assert test_operation.override_zero_on_src0
test_operation = MADS(override_zero_on_src0=True)
test_operation.override_zero_on_src0 = False
assert not test_operation.override_zero_on_src0
class TestRightShift:
"""Tests for RightShift class."""
......@@ -408,6 +589,33 @@ class TestDepends:
assert set(bfly1.inputs_required_for_output(1)) == {0, 1}
class TestDontCare:
def test_create_sfg_with_dontcare(self):
i1 = Input()
dc = DontCare()
a = Addition(i1, dc)
o = Output(a)
sfg = SFG([i1], [o])
assert sfg.output_count == 1
assert sfg.input_count == 1
assert sfg.evaluate_output(0, [0]) == 0
assert sfg.evaluate_output(0, [1]) == 1
def test_dontcare_latency_getter(self):
test_operation = DontCare()
assert test_operation.latency == 0
def test_dontcare_repr(self):
test_operation = DontCare()
assert repr(test_operation) == "DontCare()"
def test_dontcare_str(self):
test_operation = DontCare()
assert str(test_operation) == "dontcare"
class TestSink:
def test_create_sfg_with_sink(self):
bfly = Butterfly()
......@@ -420,3 +628,15 @@ class TestSink:
assert sfg1.input_count == 2
assert sfg.evaluate_output(1, [0, 1]) == sfg1.evaluate_output(0, [0, 1])
def test_sink_latency_getter(self):
test_operation = Sink()
assert test_operation.latency == 0
def test_sink_repr(self):
test_operation = Sink()
assert repr(test_operation) == "Sink()"
def test_sink_str(self):
test_operation = Sink()
assert str(test_operation) == "sink"
......@@ -3,15 +3,18 @@ import pytest
from scipy import signal
from b_asic.core_operations import (
MADS,
Addition,
Butterfly,
ConstantMultiplication,
Reciprocal,
SymmetricTwoportAdaptor,
)
from b_asic.sfg_generators import (
direct_form_1_iir,
direct_form_2_iir,
direct_form_fir,
ldlt_matrix_inverse,
radix_2_dif_fft,
transposed_direct_form_fir,
wdf_allpass,
......@@ -637,3 +640,176 @@ class TestRadix2FFT:
POINTS = 5
with pytest.raises(ValueError, match="Points must be a power of two."):
radix_2_dif_fft(points=POINTS)
class TestLdltMatrixInverse:
def test_1x1(self):
sfg = ldlt_matrix_inverse(N=1)
assert len(sfg.inputs) == 1
assert len(sfg.outputs) == 1
assert len(sfg.find_by_type_name(MADS.type_name())) == 0
assert len(sfg.find_by_type_name(Reciprocal.type_name())) == 1
A_input = [Constant(5)]
sim = Simulation(sfg, A_input)
sim.run_for(1)
res = sim.results
assert np.isclose(res["0"], 0.2)
def test_2x2_simple_spd(self):
sfg = ldlt_matrix_inverse(N=2)
assert len(sfg.inputs) == 3
assert len(sfg.outputs) == 3
assert len(sfg.find_by_type_name(MADS.type_name())) == 4
assert len(sfg.find_by_type_name(Reciprocal.type_name())) == 2
A = np.array([[1, 2], [2, 1]])
A_input = [Constant(1), Constant(2), Constant(1)]
A_inv = np.linalg.inv(A)
sim = Simulation(sfg, A_input)
sim.run_for(1)
res = sim.results
assert np.isclose(res["0"], A_inv[0, 0])
assert np.isclose(res["1"], A_inv[0, 1])
assert np.isclose(res["2"], A_inv[1, 1])
def test_3x3_simple_spd(self):
sfg = ldlt_matrix_inverse(N=3)
assert len(sfg.inputs) == 6
assert len(sfg.outputs) == 6
assert len(sfg.find_by_type_name(MADS.type_name())) == 15
assert len(sfg.find_by_type_name(Reciprocal.type_name())) == 3
A = np.array([[2, -1, 0], [-1, 3, -1], [0, -1, 2]])
A_input = [
Constant(2),
Constant(-1),
Constant(0),
Constant(3),
Constant(-1),
Constant(2),
]
A_inv = np.linalg.inv(A)
sim = Simulation(sfg, A_input)
sim.run_for(1)
res = sim.results
assert np.isclose(res["0"], A_inv[0, 0])
assert np.isclose(res["1"], A_inv[0, 1])
assert np.isclose(res["2"], A_inv[0, 2])
assert np.isclose(res["3"], A_inv[1, 1])
assert np.isclose(res["4"], A_inv[1, 2])
assert np.isclose(res["5"], A_inv[2, 2])
def test_5x5_random_spd(self):
N = 5
sfg = ldlt_matrix_inverse(N=N)
assert len(sfg.inputs) == 15
assert len(sfg.outputs) == 15
assert len(sfg.find_by_type_name(MADS.type_name())) == 70
assert len(sfg.find_by_type_name(Reciprocal.type_name())) == N
A = self._generate_random_spd_matrix(N)
upper_tridiag = A[np.triu_indices_from(A)]
A_input = [Constant(num) for num in upper_tridiag]
A_inv = np.linalg.inv(A)
sim = Simulation(sfg, A_input)
sim.run_for(1)
res = sim.results
row_indices, col_indices = np.triu_indices(N)
expected_values = A_inv[row_indices, col_indices]
actual_values = [res[str(i)] for i in range(len(expected_values))]
for i in range(len(expected_values)):
assert np.isclose(actual_values[i], expected_values[i])
def test_20x20_random_spd(self):
N = 20
sfg = ldlt_matrix_inverse(N=N)
A = self._generate_random_spd_matrix(N)
assert len(sfg.inputs) == len(A[np.triu_indices_from(A)])
assert len(sfg.outputs) == len(A[np.triu_indices_from(A)])
assert len(sfg.find_by_type_name(Reciprocal.type_name())) == N
upper_tridiag = A[np.triu_indices_from(A)]
A_input = [Constant(num) for num in upper_tridiag]
A_inv = np.linalg.inv(A)
sim = Simulation(sfg, A_input)
sim.run_for(1)
res = sim.results
row_indices, col_indices = np.triu_indices(N)
expected_values = A_inv[row_indices, col_indices]
actual_values = [res[str(i)] for i in range(len(expected_values))]
for i in range(len(expected_values)):
assert np.isclose(actual_values[i], expected_values[i])
# def test_2x2_random_complex_spd(self):
# N = 2
# sfg = ldlt_matrix_inverse(N=N, is_complex=True)
# # A = self._generate_random_complex_spd_matrix(N)
# A = np.array([[2, 1+1j],[1-1j, 3]])
# assert len(sfg.inputs) == len(A[np.triu_indices_from(A)])
# assert len(sfg.outputs) == len(A[np.triu_indices_from(A)])
# assert len(sfg.find_by_type_name(Reciprocal.type_name())) == N
# upper_tridiag = A[np.triu_indices_from(A)]
# A_input = [Constant(num) for num in upper_tridiag]
# A_inv = np.linalg.inv(A)
# sim = Simulation(sfg, A_input)
# sim.run_for(1)
# res = sim.results
# row_indices, col_indices = np.triu_indices(N)
# expected_values = A_inv[row_indices, col_indices]
# actual_values = [res[str(i)] for i in range(len(expected_values))]
# for i in range(len(expected_values)):
# assert np.isclose(actual_values[i], expected_values[i])
def _generate_random_spd_matrix(self, N: int) -> np.ndarray:
A = np.random.rand(N, N)
A = (A + A.T) / 2 # ensure symmetric
min_eig = np.min(np.linalg.eigvals(A))
A += (np.abs(min_eig) + 0.1) * np.eye(N) # ensure positive definiteness
return A
# def _generate_random_complex_spd_matrix(self, N: int) -> np.ndarray:
# A = np.random.randn(N, N) + 1j * np.random.randn(N, N)
# A = (A + A.conj().T) / 2 # ensure symmetric
# min_eig = np.min(np.linalg.eigvals(A))
# A += (np.abs(min_eig) + 0.1) * np.eye(N) # ensure positive definiteness
# return A