Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • 13-add-coupled-operations-to-sfg
  • 14-coupling-sfg-s
  • 17-operation-replacement-in-a-sfg
  • 2-operation-id-system
  • 22-deep-copy-of-sfg
  • 23-create-custom-operation
  • 31-print-schedule
  • 34-processor-and-memory-information-in-schedules
  • 65-implement-a-workspace-in-the-gui-2
  • 66-load-save-sfg-to-file
  • 82-explode-sfg
  • 84-unfolding-of-an-sfg
  • 87-resize-gui-window
  • MultiSimPlot
  • add-fft-generator
  • add-memory-constraint-for-list-schedulers
  • add-more-bfs
  • add-slack-time-sched
  • aptcache
  • codegen-commit-hash
  • commutative
  • constantpropagation
  • develop
  • fixes
  • fixschedule2
  • fpl-2023
  • master
  • operationspc2
  • petka86-find_by_type_name
  • ruff
  • scheduler-gui
  • scriptdebug
  • sinkdc
  • tmin
  • wdfadaptors
  • 1.0.0
  • cbasedsim
37 results

Target

Select target project
  • da/B-ASIC
  • lukja239/B-ASIC
  • robal695/B-ASIC
3 results
Select Git revision
  • 13-add-coupled-operations-to-sfg
  • 14-coupling-sfg-s
  • 17-operation-replacement-in-a-sfg
  • 2-operation-id-system
  • 22-deep-copy-of-sfg
  • 23-create-custom-operation
  • 31-print-schedule
  • 34-processor-and-memory-information-in-schedules
  • 65-implement-a-workspace-in-the-gui-2
  • 66-load-save-sfg-to-file
  • 82-explode-sfg
  • 84-unfolding-of-an-sfg
  • 87-resize-gui-window
  • MultiSimPlot
  • add-fft-generator
  • add-memory-constraint-for-list-schedulers
  • add-more-bfs
  • add-slack-time-sched
  • aptcache
  • codegen-commit-hash
  • commutative
  • constantpropagation
  • develop
  • fixes
  • fixschedule2
  • fpl-2023
  • master
  • operationspc2
  • petka86-find_by_type_name
  • ruff
  • scheduler-gui
  • scriptdebug
  • sinkdc
  • tmin
  • wdfadaptors
  • 1.0.0
  • cbasedsim
37 results
Show changes
#include <pybind11/pybind11.h> #include <pybind11/pybind11.h>
namespace py = pybind11; namespace py = pybind11;
namespace asic { namespace asic {
int add(int a, int b) { int add(int a, int b) {
return a + b; return a + b;
} }
int sub(int a, int b) { int sub(int a, int b) {
return a - b; return a - b;
} }
} // namespace asic } // namespace asic
PYBIND11_MODULE(_b_asic, m) { PYBIND11_MODULE(_b_asic, m) {
m.doc() = "Better ASIC Toolbox Extension Module."; m.doc() = "Better ASIC Toolbox Extension Module.";
m.def("add", &asic::add, "A function which adds two numbers.", py::arg("a"), py::arg("b")); m.def("add", &asic::add, "A function which adds two numbers.", py::arg("a"), py::arg("b"));
m.def("sub", &asic::sub, "A function which subtracts two numbers.", py::arg("a"), py::arg("b")); m.def("sub", &asic::sub, "A function which subtracts two numbers.", py::arg("a"), py::arg("b"));
} }
\ No newline at end of file
from test.fixtures.signal import signal, signals from test.fixtures.signal import signal, signals
from test.fixtures.operation_tree import * from test.fixtures.operation_tree import *
from test.fixtures.port import * from test.fixtures.port import *
from test.fixtures.signal_flow_graph import *
import pytest import pytest
from b_asic.core_operations import Addition, Constant
from b_asic.signal import Signal
import pytest import pytest
from b_asic import Addition, Constant, Signal, Butterfly
@pytest.fixture @pytest.fixture
def operation(): def operation():
return Constant(2) return Constant(2)
def create_operation(_type, dest_oper, index, **kwargs):
oper = _type(**kwargs)
oper_signal = Signal()
oper._output_ports[0].add_signal(oper_signal)
dest_oper._input_ports[index].add_signal(oper_signal)
return oper
@pytest.fixture @pytest.fixture
def operation_tree(): def operation_tree():
"""Return a addition operation connected with 2 constants. """Valid addition operation connected with 2 constants.
---C---+ 2---+
---A |
---C---+ v
add = 2 + 3 = 5
^
|
3---+
""" """
add_oper = Addition() return Addition(Constant(2), Constant(3))
create_operation(Constant, add_oper, 0, value=2)
create_operation(Constant, add_oper, 1, value=3)
return add_oper
@pytest.fixture @pytest.fixture
def large_operation_tree(): def large_operation_tree():
"""Return a constant operation connected with a large operation tree with 3 other constants and 3 additions. """Valid addition operation connected with a large operation tree with 2 other additions and 4 constants.
---C---+ 2---+
---A---+ |
---C---+ | v
+---A add---+
---C---+ | ^ |
---A---+ | |
---C---+ 3---+ v
add = (2 + 3) + (4 + 5) = 14
4---+ ^
| |
v |
add---+
^
|
5---+
""" """
add_oper = Addition() return Addition(Addition(Constant(2), Constant(3)), Addition(Constant(4), Constant(5)))
add_oper_2 = Addition()
const_oper = create_operation(Constant, add_oper, 0, value=2)
create_operation(Constant, add_oper, 1, value=3)
create_operation(Constant, add_oper_2, 0, value=4) @pytest.fixture
create_operation(Constant, add_oper_2, 1, value=5) def large_operation_tree_names():
"""Valid addition operation connected with a large operation tree with 2 other additions and 4 constants.
With names.
2---+
|
v
add---+
^ |
| |
3---+ v
add = (2 + 3) + (4 + 5) = 14
4---+ ^
| |
v |
add---+
^
|
5---+
"""
return Addition(Addition(Constant(2, name="constant2"), Constant(3, name="constant3")), Addition(Constant(4, name="constant4"), Constant(5, name="constant5")))
add_oper_3 = Addition() @pytest.fixture
add_oper_signal = Signal(add_oper.output(0), add_oper_3.output(0)) def butterfly_operation_tree():
add_oper._output_ports[0].add_signal(add_oper_signal) """Valid butterfly operations connected to eachother with 3 butterfly operations and 2 constants as inputs and 2 outputs.
add_oper_3._input_ports[0].add_signal(add_oper_signal) 2 ---+ +--- (2 + 4) ---+ +--- (6 + (-2)) ---+ +--- (4 + 8) ---> out1 = 12
| | | | | |
v ^ v ^ v ^
butterfly butterfly butterfly
^ v ^ v ^ v
| | | | | |
4 ---+ +--- (2 - 4) ---+ +--- (6 - (-2)) ---+ +--- (4 - 8) ---> out2 = -4
"""
return Butterfly(*(Butterfly(*(Butterfly(Constant(2), Constant(4), name="bfly3").outputs), name="bfly2").outputs), name="bfly1")
add_oper_2_signal = Signal(add_oper_2.output(0), add_oper_3.output(0)) @pytest.fixture
add_oper_2._output_ports[0].add_signal(add_oper_2_signal) def operation_graph_with_cycle():
add_oper_3._input_ports[1].add_signal(add_oper_2_signal) """Invalid addition operation connected with an operation graph containing a cycle.
return const_oper +-+
| |
v |
add+---+
^ |
| v
7 add = (? + 7) + 6 = ?
^
|
6
"""
add1 = Addition(None, Constant(7))
add1.input(0).connect(add1)
return Addition(add1, Constant(6))
import pytest import pytest
from b_asic.port import InputPort, OutputPort
from b_asic import InputPort, OutputPort
@pytest.fixture @pytest.fixture
def input_port(): def input_port():
return InputPort(0, None) return InputPort(None, 0)
@pytest.fixture @pytest.fixture
def output_port(): def output_port():
return OutputPort(0, None) return OutputPort(None, 0)
@pytest.fixture
def list_of_input_ports():
return [InputPort(None, i) for i in range(0, 3)]
@pytest.fixture
def list_of_output_ports():
return [OutputPort(None, i) for i in range(0, 3)]
import pytest import pytest
from b_asic import Signal from b_asic import Signal
@pytest.fixture @pytest.fixture
def signal(): def signal():
"""Return a signal with no connections.""" """Return a signal with no connections."""
...@@ -9,4 +11,4 @@ def signal(): ...@@ -9,4 +11,4 @@ def signal():
@pytest.fixture @pytest.fixture
def signals(): def signals():
"""Return 3 signals with no connections.""" """Return 3 signals with no connections."""
return [Signal() for _ in range(0,3)] return [Signal() for _ in range(0, 3)]
import pytest
from b_asic import SFG, Input, Output, Constant, Register, ConstantMultiplication, Addition, Butterfly
@pytest.fixture
def sfg_two_inputs_two_outputs():
"""Valid SFG with two inputs and two outputs.
. .
in1-------+ +--------->out1
. | | .
. v | .
. add1+--+ .
. ^ | .
. | v .
in2+------+ add2---->out2
| . ^ .
| . | .
+------------+ .
. .
out1 = in1 + in2
out2 = in1 + 2 * in2
"""
in1 = Input("IN1")
in2 = Input("IN2")
add1 = Addition(in1, in2, "ADD1")
add2 = Addition(add1, in2, "ADD2")
out1 = Output(add1, "OUT1")
out2 = Output(add2, "OUT2")
return SFG(inputs=[in1, in2], outputs=[out1, out2])
@pytest.fixture
def sfg_two_inputs_two_outputs_independent():
"""Valid SFG with two inputs and two outputs, where the first output only depends
on the first input and the second output only depends on the second input.
. .
in1-------------------->out1
. .
. .
. c1--+ .
. | .
. v .
in2------+ add1---->out2
. | ^ .
. | | .
. +------+ .
. .
out1 = in1
out2 = in2 + 3
"""
in1 = Input("IN1")
in2 = Input("IN2")
c1 = Constant(3, "C1")
add1 = Addition(in2, c1, "ADD1")
out1 = Output(in1, "OUT1")
out2 = Output(add1, "OUT2")
return SFG(inputs=[in1, in2], outputs=[out1, out2])
@pytest.fixture
def sfg_two_inputs_two_outputs_independent_with_cmul():
"""Valid SFG with two inputs and two outputs, where the first output only depends
on the first input and the second output only depends on the second input.
. .
in1--->cmul1--->cmul2--->out1
. .
. .
c1------+ .
|
v .
in2--->add1---->cmul3--->out2
"""
in1 = Input("IN1")
in2 = Input("IN2")
c1 = Constant(3, "C1")
add1 = Addition(in2, c1, "ADD1", 7)
cmul3 = ConstantMultiplication(2, add1, "CMUL3", 3)
cmul1 = ConstantMultiplication(5, in1, "CMUL1", 5)
cmul2 = ConstantMultiplication(4, cmul1, "CMUL2", 4)
out1 = Output(in1, "OUT1")
out2 = Output(add1, "OUT2")
return SFG(inputs=[in1, in2], outputs=[out1, out2])
@pytest.fixture
def sfg_nested():
"""Valid SFG with two inputs and one output.
out1 = in1 + (in1 + in1 * in2) * (in1 + in2 * (in1 + in1 * in2))
"""
mac_in1 = Input()
mac_in2 = Input()
mac_in3 = Input()
mac_out1 = Output(mac_in1 + mac_in2 * mac_in3)
MAC = SFG(inputs=[mac_in1, mac_in2, mac_in3], outputs=[mac_out1])
in1 = Input()
in2 = Input()
mac1 = MAC(in1, in1, in2)
mac2 = MAC(in1, in2, mac1)
mac3 = MAC(in1, mac1, mac2)
out1 = Output(mac3)
return SFG(inputs=[in1, in2], outputs=[out1])
@pytest.fixture
def sfg_delay():
"""Valid SFG with one input and one output.
out1 = in1'
"""
in1 = Input()
reg1 = Register(in1)
out1 = Output(reg1)
return SFG(inputs=[in1], outputs=[out1])
@pytest.fixture
def sfg_accumulator():
"""Valid SFG with two inputs and one output.
data_out = (data_in' + data_in) * (1 - reset)
"""
data_in = Input()
reset = Input()
reg = Register()
reg.input(0).connect((reg + data_in) * (1 - reset))
data_out = Output(reg)
return SFG(inputs=[data_in, reset], outputs=[data_out])
@pytest.fixture
def simple_filter():
"""A valid SFG that is used as a filter in the first lab for TSTE87.
+----<constmul1----+
| |
| |
in1>------add1>------reg>------+------out1>
"""
in1 = Input("IN1")
constmul1 = ConstantMultiplication(0.5, name="CMUL1")
add1 = Addition(in1, constmul1, "ADD1")
add1.input(1).signals[0].name = "S2"
reg = Register(add1, name="REG1")
constmul1.input(0).connect(reg, "S1")
out1 = Output(reg, "OUT1")
return SFG(inputs=[in1], outputs=[out1], name="simple_filter")
@pytest.fixture
def precedence_sfg_registers():
"""A sfg with registers and interesting layout for precednce list generation.
IN1>--->C0>--->ADD1>--->Q1>---+--->A0>--->ADD4>--->OUT1
^ | ^
| T1 |
| | |
ADD2<---<B1<---+--->A1>--->ADD3
^ | ^
| T2 |
| | |
+-----<B2<---+--->A2>-----+
"""
in1 = Input("IN1")
c0 = ConstantMultiplication(5, in1, "C0")
add1 = Addition(c0, None, "ADD1")
# Not sure what operation "Q" is supposed to be in the example
Q1 = ConstantMultiplication(1, add1, "Q1")
T1 = Register(Q1, 0, "T1")
T2 = Register(T1, 0, "T2")
b2 = ConstantMultiplication(2, T2, "B2")
b1 = ConstantMultiplication(3, T1, "B1")
add2 = Addition(b1, b2, "ADD2")
add1.input(1).connect(add2)
a1 = ConstantMultiplication(4, T1, "A1")
a2 = ConstantMultiplication(6, T2, "A2")
add3 = Addition(a1, a2, "ADD3")
a0 = ConstantMultiplication(7, Q1, "A0")
add4 = Addition(a0, add3, "ADD4")
out1 = Output(add4, "OUT1")
return SFG(inputs=[in1], outputs=[out1], name="SFG")
@pytest.fixture
def precedence_sfg_registers_and_constants():
in1 = Input("IN1")
c0 = ConstantMultiplication(5, in1, "C0")
add1 = Addition(c0, None, "ADD1")
# Not sure what operation "Q" is supposed to be in the example
Q1 = ConstantMultiplication(1, add1, "Q1")
T1 = Register(Q1, 0, "T1")
const1 = Constant(10, "CONST1") # Replace T2 register with a constant
b2 = ConstantMultiplication(2, const1, "B2")
b1 = ConstantMultiplication(3, T1, "B1")
add2 = Addition(b1, b2, "ADD2")
add1.input(1).connect(add2)
a1 = ConstantMultiplication(4, T1, "A1")
a2 = ConstantMultiplication(10, const1, "A2")
add3 = Addition(a1, a2, "ADD3")
a0 = ConstantMultiplication(7, Q1, "A0")
# Replace ADD4 with a butterfly to test multiple output ports
bfly1 = Butterfly(a0, add3, "BFLY1")
out1 = Output(bfly1.output(0), "OUT1")
out2 = Output(bfly1.output(1), "OUT2")
return SFG(inputs=[in1], outputs=[out1], name="SFG")
"""
B-ASIC test suite for the AbstractOperation class.
"""
from b_asic.core_operations import Addition, ConstantAddition, Subtraction, ConstantSubtraction, \
Multiplication, ConstantMultiplication, Division, ConstantDivision
import pytest
def test_addition_overload():
"""Tests addition overloading for both operation and number argument."""
add1 = Addition(None, None, "add1")
add2 = Addition(None, None, "add2")
add3 = add1 + add2
assert isinstance(add3, Addition)
assert add3.input(0).signals == add1.output(0).signals
assert add3.input(1).signals == add2.output(0).signals
add4 = add3 + 5
assert isinstance(add4, ConstantAddition)
assert add4.input(0).signals == add3.output(0).signals
def test_subtraction_overload():
"""Tests subtraction overloading for both operation and number argument."""
add1 = Addition(None, None, "add1")
add2 = Addition(None, None, "add2")
sub1 = add1 - add2
assert isinstance(sub1, Subtraction)
assert sub1.input(0).signals == add1.output(0).signals
assert sub1.input(1).signals == add2.output(0).signals
sub2 = sub1 - 5
assert isinstance(sub2, ConstantSubtraction)
assert sub2.input(0).signals == sub1.output(0).signals
def test_multiplication_overload():
"""Tests multiplication overloading for both operation and number argument."""
add1 = Addition(None, None, "add1")
add2 = Addition(None, None, "add2")
mul1 = add1 * add2
assert isinstance(mul1, Multiplication)
assert mul1.input(0).signals == add1.output(0).signals
assert mul1.input(1).signals == add2.output(0).signals
mul2 = mul1 * 5
assert isinstance(mul2, ConstantMultiplication)
assert mul2.input(0).signals == mul1.output(0).signals
def test_division_overload():
"""Tests division overloading for both operation and number argument."""
add1 = Addition(None, None, "add1")
add2 = Addition(None, None, "add2")
div1 = add1 / add2
assert isinstance(div1, Division)
assert div1.input(0).signals == add1.output(0).signals
assert div1.input(1).signals == add2.output(0).signals
div2 = div1 / 5
assert isinstance(div2, ConstantDivision)
assert div2.input(0).signals == div1.output(0).signals
...@@ -2,226 +2,175 @@ ...@@ -2,226 +2,175 @@
B-ASIC test suite for the core operations. B-ASIC test suite for the core operations.
""" """
from b_asic.core_operations import Constant, Addition, Subtraction, Multiplication, Division, SquareRoot, ComplexConjugate, Max, Min, Absolute, ConstantMultiplication, ConstantAddition, ConstantSubtraction, ConstantDivision from b_asic import \
Constant, Addition, Subtraction, Multiplication, ConstantMultiplication, Division, \
# Constant tests. SquareRoot, ComplexConjugate, Max, Min, Absolute, Butterfly
def test_constant():
constant_operation = Constant(3) class TestConstant:
assert constant_operation.evaluate() == 3 def test_constant_positive(self):
test_operation = Constant(3)
def test_constant_negative(): assert test_operation.evaluate_output(0, []) == 3
constant_operation = Constant(-3)
assert constant_operation.evaluate() == -3 def test_constant_negative(self):
test_operation = Constant(-3)
def test_constant_complex(): assert test_operation.evaluate_output(0, []) == -3
constant_operation = Constant(3+4j)
assert constant_operation.evaluate() == 3+4j def test_constant_complex(self):
test_operation = Constant(3+4j)
# Addition tests. assert test_operation.evaluate_output(0, []) == 3+4j
def test_addition():
test_operation = Addition()
constant_operation = Constant(3) class TestAddition:
constant_operation_2 = Constant(5) def test_addition_positive(self):
assert test_operation.evaluate(constant_operation.evaluate(), constant_operation_2.evaluate()) == 8 test_operation = Addition()
assert test_operation.evaluate_output(0, [3, 5]) == 8
def test_addition_negative():
test_operation = Addition() def test_addition_negative(self):
constant_operation = Constant(-3) test_operation = Addition()
constant_operation_2 = Constant(-5) assert test_operation.evaluate_output(0, [-3, -5]) == -8
assert test_operation.evaluate(constant_operation.evaluate(), constant_operation_2.evaluate()) == -8
def test_addition_complex(self):
def test_addition_complex(): test_operation = Addition()
test_operation = Addition() assert test_operation.evaluate_output(0, [3+5j, 4+6j]) == 7+11j
constant_operation = Constant((3+5j))
constant_operation_2 = Constant((4+6j))
assert test_operation.evaluate(constant_operation.evaluate(), constant_operation_2.evaluate()) == (7+11j) class TestSubtraction:
def test_subtraction_positive(self):
# Subtraction tests. test_operation = Subtraction()
def test_subtraction(): assert test_operation.evaluate_output(0, [5, 3]) == 2
test_operation = Subtraction()
constant_operation = Constant(5) def test_subtraction_negative(self):
constant_operation_2 = Constant(3) test_operation = Subtraction()
assert test_operation.evaluate(constant_operation.evaluate(), constant_operation_2.evaluate()) == 2 assert test_operation.evaluate_output(0, [-5, -3]) == -2
def test_subtraction_negative(): def test_subtraction_complex(self):
test_operation = Subtraction() test_operation = Subtraction()
constant_operation = Constant(-5) assert test_operation.evaluate_output(0, [3+5j, 4+6j]) == -1-1j
constant_operation_2 = Constant(-3)
assert test_operation.evaluate(constant_operation.evaluate(), constant_operation_2.evaluate()) == -2
class TestMultiplication:
def test_subtraction_complex(): def test_multiplication_positive(self):
test_operation = Subtraction() test_operation = Multiplication()
constant_operation = Constant((3+5j)) assert test_operation.evaluate_output(0, [5, 3]) == 15
constant_operation_2 = Constant((4+6j))
assert test_operation.evaluate(constant_operation.evaluate(), constant_operation_2.evaluate()) == (-1-1j) def test_multiplication_negative(self):
test_operation = Multiplication()
# Multiplication tests. assert test_operation.evaluate_output(0, [-5, -3]) == 15
def test_multiplication():
test_operation = Multiplication() def test_multiplication_complex(self):
constant_operation = Constant(5) test_operation = Multiplication()
constant_operation_2 = Constant(3) assert test_operation.evaluate_output(0, [3+5j, 4+6j]) == -18+38j
assert test_operation.evaluate(constant_operation.evaluate(), constant_operation_2.evaluate()) == 15
def test_multiplication_negative(): class TestDivision:
test_operation = Multiplication() def test_division_positive(self):
constant_operation = Constant(-5) test_operation = Division()
constant_operation_2 = Constant(-3) assert test_operation.evaluate_output(0, [30, 5]) == 6
assert test_operation.evaluate(constant_operation.evaluate(), constant_operation_2.evaluate()) == 15
def test_division_negative(self):
def test_multiplication_complex(): test_operation = Division()
test_operation = Multiplication() assert test_operation.evaluate_output(0, [-30, -5]) == 6
constant_operation = Constant((3+5j))
constant_operation_2 = Constant((4+6j)) def test_division_complex(self):
assert test_operation.evaluate(constant_operation.evaluate(), constant_operation_2.evaluate()) == (-18+38j) test_operation = Division()
assert test_operation.evaluate_output(0, [60+40j, 10+20j]) == 2.8-1.6j
# Division tests.
def test_division():
test_operation = Division() class TestSquareRoot:
constant_operation = Constant(30) def test_squareroot_positive(self):
constant_operation_2 = Constant(5) test_operation = SquareRoot()
assert test_operation.evaluate(constant_operation.evaluate(), constant_operation_2.evaluate()) == 6 assert test_operation.evaluate_output(0, [36]) == 6
def test_division_negative(): def test_squareroot_negative(self):
test_operation = Division() test_operation = SquareRoot()
constant_operation = Constant(-30) assert test_operation.evaluate_output(0, [-36]) == 6j
constant_operation_2 = Constant(-5)
assert test_operation.evaluate(constant_operation.evaluate(), constant_operation_2.evaluate()) == 6 def test_squareroot_complex(self):
test_operation = SquareRoot()
def test_division_complex(): assert test_operation.evaluate_output(0, [48+64j]) == 8+4j
test_operation = Division()
constant_operation = Constant((60+40j))
constant_operation_2 = Constant((10+20j)) class TestComplexConjugate:
assert test_operation.evaluate(constant_operation.evaluate(), constant_operation_2.evaluate()) == (2.8-1.6j) def test_complexconjugate_positive(self):
test_operation = ComplexConjugate()
# SquareRoot tests. assert test_operation.evaluate_output(0, [3+4j]) == 3-4j
def test_squareroot():
test_operation = SquareRoot() def test_test_complexconjugate_negative(self):
constant_operation = Constant(36) test_operation = ComplexConjugate()
assert test_operation.evaluate(constant_operation.evaluate()) == 6 assert test_operation.evaluate_output(0, [-3-4j]) == -3+4j
def test_squareroot_negative():
test_operation = SquareRoot() class TestMax:
constant_operation = Constant(-36) def test_max_positive(self):
assert test_operation.evaluate(constant_operation.evaluate()) == 6j test_operation = Max()
assert test_operation.evaluate_output(0, [30, 5]) == 30
def test_squareroot_complex():
test_operation = SquareRoot() def test_max_negative(self):
constant_operation = Constant((48+64j)) test_operation = Max()
assert test_operation.evaluate(constant_operation.evaluate()) == (8+4j) assert test_operation.evaluate_output(0, [-30, -5]) == -5
# ComplexConjugate tests.
def test_complexconjugate(): class TestMin:
test_operation = ComplexConjugate() def test_min_positive(self):
constant_operation = Constant(3+4j) test_operation = Min()
assert test_operation.evaluate(constant_operation.evaluate()) == (3-4j) assert test_operation.evaluate_output(0, [30, 5]) == 5
def test_test_complexconjugate_negative(): def test_min_negative(self):
test_operation = ComplexConjugate() test_operation = Min()
constant_operation = Constant(-3-4j) assert test_operation.evaluate_output(0, [-30, -5]) == -30
assert test_operation.evaluate(constant_operation.evaluate()) == (-3+4j)
# Max tests. class TestAbsolute:
def test_max(): def test_absolute_positive(self):
test_operation = Max() test_operation = Absolute()
constant_operation = Constant(30) assert test_operation.evaluate_output(0, [30]) == 30
constant_operation_2 = Constant(5)
assert test_operation.evaluate(constant_operation.evaluate(), constant_operation_2.evaluate()) == 30 def test_absolute_negative(self):
test_operation = Absolute()
def test_max_negative(): assert test_operation.evaluate_output(0, [-5]) == 5
test_operation = Max()
constant_operation = Constant(-30) def test_absolute_complex(self):
constant_operation_2 = Constant(-5) test_operation = Absolute()
assert test_operation.evaluate(constant_operation.evaluate(), constant_operation_2.evaluate()) == -5 assert test_operation.evaluate_output(0, [3+4j]) == 5.0
# Min tests.
def test_min(): class TestConstantMultiplication:
test_operation = Min() def test_constantmultiplication_positive(self):
constant_operation = Constant(30) test_operation = ConstantMultiplication(5)
constant_operation_2 = Constant(5) assert test_operation.evaluate_output(0, [20]) == 100
assert test_operation.evaluate(constant_operation.evaluate(), constant_operation_2.evaluate()) == 5
def test_constantmultiplication_negative(self):
def test_min_negative(): test_operation = ConstantMultiplication(5)
test_operation = Min() assert test_operation.evaluate_output(0, [-5]) == -25
constant_operation = Constant(-30)
constant_operation_2 = Constant(-5) def test_constantmultiplication_complex(self):
assert test_operation.evaluate(constant_operation.evaluate(), constant_operation_2.evaluate()) == -30 test_operation = ConstantMultiplication(3+2j)
assert test_operation.evaluate_output(0, [3+4j]) == 1+18j
# Absolute tests.
def test_absolute():
test_operation = Absolute() class TestButterfly:
constant_operation = Constant(30) def test_butterfly_positive(self):
assert test_operation.evaluate(constant_operation.evaluate()) == 30 test_operation = Butterfly()
assert test_operation.evaluate_output(0, [2, 3]) == 5
def test_absolute_negative(): assert test_operation.evaluate_output(1, [2, 3]) == -1
test_operation = Absolute()
constant_operation = Constant(-5) def test_butterfly_negative(self):
assert test_operation.evaluate(constant_operation.evaluate()) == 5 test_operation = Butterfly()
assert test_operation.evaluate_output(0, [-2, -3]) == -5
def test_absolute_complex(): assert test_operation.evaluate_output(1, [-2, -3]) == 1
test_operation = Absolute()
constant_operation = Constant((3+4j)) def test_buttefly_complex(self):
assert test_operation.evaluate(constant_operation.evaluate()) == 5.0 test_operation = Butterfly()
assert test_operation.evaluate_output(0, [2+1j, 3-2j]) == 5-1j
# ConstantMultiplication tests. assert test_operation.evaluate_output(1, [2+1j, 3-2j]) == -1+3j
def test_constantmultiplication():
test_operation = ConstantMultiplication(5)
constant_operation = Constant(20) class TestDepends:
assert test_operation.evaluate(constant_operation.evaluate()) == 100 def test_depends_addition(self):
add1 = Addition()
def test_constantmultiplication_negative(): assert set(add1.inputs_required_for_output(0)) == {0, 1}
test_operation = ConstantMultiplication(5)
constant_operation = Constant(-5) def test_depends_butterfly(self):
assert test_operation.evaluate(constant_operation.evaluate()) == -25 bfly1 = Butterfly()
assert set(bfly1.inputs_required_for_output(0)) == {0, 1}
def test_constantmultiplication_complex(): assert set(bfly1.inputs_required_for_output(1)) == {0, 1}
test_operation = ConstantMultiplication(3+2j)
constant_operation = Constant((3+4j))
assert test_operation.evaluate(constant_operation.evaluate()) == (1+18j)
# ConstantAddition tests.
def test_constantaddition():
test_operation = ConstantAddition(5)
constant_operation = Constant(20)
assert test_operation.evaluate(constant_operation.evaluate()) == 25
def test_constantaddition_negative():
test_operation = ConstantAddition(4)
constant_operation = Constant(-5)
assert test_operation.evaluate(constant_operation.evaluate()) == -1
def test_constantaddition_complex():
test_operation = ConstantAddition(3+2j)
constant_operation = Constant((3+4j))
assert test_operation.evaluate(constant_operation.evaluate()) == (6+6j)
# ConstantSubtraction tests.
def test_constantsubtraction():
test_operation = ConstantSubtraction(5)
constant_operation = Constant(20)
assert test_operation.evaluate(constant_operation.evaluate()) == 15
def test_constantsubtraction_negative():
test_operation = ConstantSubtraction(4)
constant_operation = Constant(-5)
assert test_operation.evaluate(constant_operation.evaluate()) == -9
def test_constantsubtraction_complex():
test_operation = ConstantSubtraction(4+6j)
constant_operation = Constant((3+4j))
assert test_operation.evaluate(constant_operation.evaluate()) == (-1-2j)
# ConstantDivision tests.
def test_constantdivision():
test_operation = ConstantDivision(5)
constant_operation = Constant(20)
assert test_operation.evaluate(constant_operation.evaluate()) == 4
def test_constantdivision_negative():
test_operation = ConstantDivision(4)
constant_operation = Constant(-20)
assert test_operation.evaluate(constant_operation.evaluate()) == -5
def test_constantdivision_complex():
test_operation = ConstantDivision(2+2j)
constant_operation = Constant((10+10j))
assert test_operation.evaluate(constant_operation.evaluate()) == (5+0j)
...@@ -2,9 +2,10 @@ ...@@ -2,9 +2,10 @@
B-ASIC test suite for graph id generator. B-ASIC test suite for graph id generator.
""" """
from b_asic.graph_id import GraphIDGenerator, GraphID
import pytest import pytest
from b_asic import GraphIDGenerator, GraphID
@pytest.fixture @pytest.fixture
def graph_id_generator(): def graph_id_generator():
return GraphIDGenerator() return GraphIDGenerator()
...@@ -12,17 +13,17 @@ def graph_id_generator(): ...@@ -12,17 +13,17 @@ def graph_id_generator():
class TestGetNextId: class TestGetNextId:
def test_empty_string_generator(self, graph_id_generator): def test_empty_string_generator(self, graph_id_generator):
"""Test the graph id generator for an empty string type.""" """Test the graph id generator for an empty string type."""
assert graph_id_generator.get_next_id("") == "1" assert graph_id_generator.next_id("") == "1"
assert graph_id_generator.get_next_id("") == "2" assert graph_id_generator.next_id("") == "2"
def test_normal_string_generator(self, graph_id_generator): def test_normal_string_generator(self, graph_id_generator):
""""Test the graph id generator for a normal string type.""" """"Test the graph id generator for a normal string type."""
assert graph_id_generator.get_next_id("add") == "add1" assert graph_id_generator.next_id("add") == "add1"
assert graph_id_generator.get_next_id("add") == "add2" assert graph_id_generator.next_id("add") == "add2"
def test_different_strings_generator(self, graph_id_generator): def test_different_strings_generator(self, graph_id_generator):
"""Test the graph id generator for different strings.""" """Test the graph id generator for different strings."""
assert graph_id_generator.get_next_id("sub") == "sub1" assert graph_id_generator.next_id("sub") == "sub1"
assert graph_id_generator.get_next_id("mul") == "mul1" assert graph_id_generator.next_id("mul") == "mul1"
assert graph_id_generator.get_next_id("sub") == "sub2" assert graph_id_generator.next_id("sub") == "sub2"
assert graph_id_generator.get_next_id("mul") == "mul2" assert graph_id_generator.next_id("mul") == "mul2"
...@@ -4,92 +4,64 @@ B-ASIC test suite for Inputport ...@@ -4,92 +4,64 @@ B-ASIC test suite for Inputport
import pytest import pytest
from b_asic import InputPort, OutputPort from b_asic import InputPort, OutputPort, Signal
from b_asic import Signal
@pytest.fixture @pytest.fixture
def inp_port(): def output_port2():
return InputPort(0, None) return OutputPort(None, 1)
@pytest.fixture
def out_port():
return OutputPort(0, None)
@pytest.fixture
def out_port2():
return OutputPort(1, None)
@pytest.fixture @pytest.fixture
def dangling_sig(): def dangling_sig():
return Signal() return Signal()
@pytest.fixture @pytest.fixture
def s_w_source(): def s_w_source(output_port):
out_port = OutputPort(0, None) return Signal(source=output_port)
return Signal(source=out_port)
@pytest.fixture @pytest.fixture
def sig_with_dest(): def sig_with_dest(inp_port):
inp_port = InputPort(0, None) return Signal(destination=inp_port)
return Signal(destination=out_port)
@pytest.fixture @pytest.fixture
def connected_sig(): def connected_sig(inp_port, output_port):
out_port = OutputPort(0, None) return Signal(source=output_port, destination=inp_port)
inp_port = InputPort(0, None)
return Signal(source=out_port, destination=inp_port)
def test_connect_then_disconnect(inp_port, out_port): def test_connect_then_disconnect(input_port, output_port):
"""Test connect unused port to port.""" """Test connect unused port to port."""
s1 = inp_port.connect(out_port) s1 = input_port.connect(output_port)
assert inp_port.connected_ports == [out_port] assert input_port.connected_source == output_port
assert out_port.connected_ports == [inp_port] assert input_port.signals == [s1]
assert inp_port.signals == [s1] assert output_port.signals == [s1]
assert out_port.signals == [s1] assert s1.source is output_port
assert s1.source is out_port assert s1.destination is input_port
assert s1.destination is inp_port
input_port.remove_signal(s1)
inp_port.remove_signal(s1)
assert input_port.connected_source is None
assert inp_port.connected_ports == [] assert input_port.signals == []
assert out_port.connected_ports == [] assert output_port.signals == [s1]
assert inp_port.signals == [] assert s1.source is output_port
assert out_port.signals == [s1]
assert s1.source is out_port
assert s1.destination is None assert s1.destination is None
def test_connect_used_port_to_new_port(inp_port, out_port, out_port2): def test_connect_used_port_to_new_port(input_port, output_port, output_port2):
"""Does connecting multiple ports to an inputport throw error?""" """Multiple connections to an input port should throw an error."""
inp_port.connect(out_port) input_port.connect(output_port)
with pytest.raises(AssertionError): with pytest.raises(Exception):
inp_port.connect(out_port2) input_port.connect(output_port2)
def test_add_signal_then_disconnect(inp_port, s_w_source): def test_add_signal_then_disconnect(input_port, s_w_source):
"""Can signal be connected then disconnected properly?""" """Can signal be connected then disconnected properly?"""
inp_port.add_signal(s_w_source) input_port.add_signal(s_w_source)
assert inp_port.connected_ports == [s_w_source.source] assert input_port.connected_source == s_w_source.source
assert s_w_source.source.connected_ports == [inp_port] assert input_port.signals == [s_w_source]
assert inp_port.signals == [s_w_source]
assert s_w_source.source.signals == [s_w_source] assert s_w_source.source.signals == [s_w_source]
assert s_w_source.destination is inp_port assert s_w_source.destination is input_port
inp_port.remove_signal(s_w_source) input_port.remove_signal(s_w_source)
assert inp_port.connected_ports == [] assert input_port.connected_source is None
assert s_w_source.source.connected_ports == [] assert input_port.signals == []
assert inp_port.signals == []
assert s_w_source.source.signals == [s_w_source] assert s_w_source.source.signals == [s_w_source]
assert s_w_source.destination is None assert s_w_source.destination is None
def test_connect_then_disconnect(inp_port, out_port):
"""Can port be connected and then disconnected properly?"""
inp_port.connect(out_port)
inp_port.disconnect(out_port)
print("outport signals:", out_port.signals, "count:", out_port.signal_count())
assert inp_port.signal_count() == 1
assert len(inp_port.connected_ports) == 0
assert out_port.signal_count() == 0
from b_asic.core_operations import Constant, Addition """
from b_asic.signal import Signal B-ASIC test suite for the AbstractOperation class.
from b_asic.port import InputPort, OutputPort """
import pytest import pytest
from b_asic import Addition, Subtraction, Multiplication, ConstantMultiplication, Division, Constant, Butterfly, \
MAD, SquareRoot
class TestOperationOverloading:
def test_addition_overload(self):
"""Tests addition overloading for both operation and number argument."""
add1 = Addition(None, None, "add1")
add2 = Addition(None, None, "add2")
add3 = add1 + add2
assert isinstance(add3, Addition)
assert add3.input(0).signals == add1.output(0).signals
assert add3.input(1).signals == add2.output(0).signals
add4 = add3 + 5
assert isinstance(add4, Addition)
assert add4.input(0).signals == add3.output(0).signals
assert add4.input(1).signals[0].source.operation.value == 5
add5 = 5 + add4
assert isinstance(add5, Addition)
assert add5.input(0).signals[0].source.operation.value == 5
assert add5.input(1).signals == add4.output(0).signals
def test_subtraction_overload(self):
"""Tests subtraction overloading for both operation and number argument."""
add1 = Addition(None, None, "add1")
add2 = Addition(None, None, "add2")
sub1 = add1 - add2
assert isinstance(sub1, Subtraction)
assert sub1.input(0).signals == add1.output(0).signals
assert sub1.input(1).signals == add2.output(0).signals
sub2 = sub1 - 5
assert isinstance(sub2, Subtraction)
assert sub2.input(0).signals == sub1.output(0).signals
assert sub2.input(1).signals[0].source.operation.value == 5
sub3 = 5 - sub2
assert isinstance(sub3, Subtraction)
assert sub3.input(0).signals[0].source.operation.value == 5
assert sub3.input(1).signals == sub2.output(0).signals
def test_multiplication_overload(self):
"""Tests multiplication overloading for both operation and number argument."""
add1 = Addition(None, None, "add1")
add2 = Addition(None, None, "add2")
mul1 = add1 * add2
assert isinstance(mul1, Multiplication)
assert mul1.input(0).signals == add1.output(0).signals
assert mul1.input(1).signals == add2.output(0).signals
mul2 = mul1 * 5
assert isinstance(mul2, ConstantMultiplication)
assert mul2.input(0).signals == mul1.output(0).signals
assert mul2.value == 5
mul3 = 5 * mul2
assert isinstance(mul3, ConstantMultiplication)
assert mul3.input(0).signals == mul2.output(0).signals
assert mul3.value == 5
def test_division_overload(self):
"""Tests division overloading for both operation and number argument."""
add1 = Addition(None, None, "add1")
add2 = Addition(None, None, "add2")
div1 = add1 / add2
assert isinstance(div1, Division)
assert div1.input(0).signals == add1.output(0).signals
assert div1.input(1).signals == add2.output(0).signals
div2 = div1 / 5
assert isinstance(div2, Division)
assert div2.input(0).signals == div1.output(0).signals
assert div2.input(1).signals[0].source.operation.value == 5
div3 = 5 / div2
assert isinstance(div3, Division)
assert div3.input(0).signals[0].source.operation.value == 5
assert div3.input(1).signals == div2.output(0).signals
class TestTraverse: class TestTraverse:
def test_traverse_single_tree(self, operation): def test_traverse_single_tree(self, operation):
"""Traverse a tree consisting of one operation.""" """Traverse a tree consisting of one operation."""
...@@ -12,20 +98,89 @@ class TestTraverse: ...@@ -12,20 +98,89 @@ class TestTraverse:
def test_traverse_tree(self, operation_tree): def test_traverse_tree(self, operation_tree):
"""Traverse a basic addition tree with two constants.""" """Traverse a basic addition tree with two constants."""
assert len(list(operation_tree.traverse())) == 3 assert len(list(operation_tree.traverse())) == 5
def test_traverse_large_tree(self, large_operation_tree): def test_traverse_large_tree(self, large_operation_tree):
"""Traverse a larger tree.""" """Traverse a larger tree."""
assert len(list(large_operation_tree.traverse())) == 7 assert len(list(large_operation_tree.traverse())) == 13
def test_traverse_type(self, large_operation_tree): def test_traverse_type(self, large_operation_tree):
traverse = list(large_operation_tree.traverse()) result = list(large_operation_tree.traverse())
assert len(list(filter(lambda type_: isinstance(type_, Addition), traverse))) == 3 assert len(list(filter(lambda type_: isinstance(type_, Addition), result))) == 3
assert len(list(filter(lambda type_: isinstance(type_, Constant), traverse))) == 4 assert len(list(filter(lambda type_: isinstance(type_, Constant), result))) == 4
def test_traverse_loop(self, operation_tree): def test_traverse_loop(self, operation_graph_with_cycle):
add_oper_signal = Signal() assert len(list(operation_graph_with_cycle.traverse())) == 8
operation_tree._output_ports[0].add_signal(add_oper_signal)
operation_tree._input_ports[0].remove_signal(add_oper_signal)
operation_tree._input_ports[0].add_signal(add_oper_signal) class TestToSfg:
assert len(list(operation_tree.traverse())) == 2 def test_convert_mad_to_sfg(self):
mad1 = MAD()
mad1_sfg = mad1.to_sfg()
assert mad1.evaluate(1, 1, 1) == mad1_sfg.evaluate(1, 1, 1)
assert len(mad1_sfg.operations) == 6
def test_butterfly_to_sfg(self):
but1 = Butterfly()
but1_sfg = but1.to_sfg()
assert but1.evaluate(1, 1)[0] == but1_sfg.evaluate(1, 1)[0]
assert but1.evaluate(1, 1)[1] == but1_sfg.evaluate(1, 1)[1]
assert len(but1_sfg.operations) == 8
def test_add_to_sfg(self):
add1 = Addition()
add1_sfg = add1.to_sfg()
assert len(add1_sfg.operations) == 4
def test_sqrt_to_sfg(self):
sqrt1 = SquareRoot()
sqrt1_sfg = sqrt1.to_sfg()
assert len(sqrt1_sfg.operations) == 3
class TestLatency:
def test_latency_constructor(self):
bfly = Butterfly(latency=5)
assert bfly.latency == 5
assert list(bfly.latency_offsets) == [[0, 0], [5, 5]]
def test_latency_offsets_constructor(self):
bfly = Butterfly(latency_offsets={'in0': 2, 'in1': 3, 'out0': 5, 'out1': 10})
assert bfly.latency == 8
assert list(bfly.latency_offsets) == [[2, 3], [5, 10]]
def test_latency_and_latency_offsets_constructor(self):
bfly = Butterfly(latency=5, latency_offsets={'in1': 2, 'out0': 9})
assert bfly.latency == 9
assert list(bfly.latency_offsets) == [[0, 2], [9, 5]]
def test_set_latency(self):
bfly = Butterfly()
bfly.set_latency(9)
assert bfly.latency == 9
assert list(bfly.latency_offsets) == [[0, 0], [9, 9]]
def test_set_latency_offsets(self):
bfly = Butterfly()
bfly.set_latency_offsets({'in0': 3, 'out1': 5})
assert list(bfly.latency_offsets) == [[3, None], [None, 5]]
class TestCopyOperation:
def test_copy_buttefly_latency_offsets(self):
bfly = Butterfly(latency_offsets={'in0': 4, 'in1': 2, 'out0': 10, 'out1': 9})
bfly_copy = bfly.copy_component()
assert list(bfly_copy.latency_offsets) == [[4, 2], [10, 9]]
""" """
B-ASIC test suite for OutputPort. B-ASIC test suite for OutputPort.
""" """
from b_asic import OutputPort, InputPort, Signal
import pytest import pytest
@pytest.fixture from b_asic import OutputPort, InputPort, Signal
def output_port():
return OutputPort(0, None)
@pytest.fixture
def input_port():
return InputPort(0, None)
@pytest.fixture
def list_of_input_ports():
return [InputPort(_, None) for _ in range(0,3)]
class TestConnect: class TestConnect:
def test_multiple_ports(self, output_port, list_of_input_ports): def test_multiple_ports(self, output_port, list_of_input_ports):
"""Can multiple ports connect to an output port?""" """Multiple connections to an output port should be possible."""
for port in list_of_input_ports: for port in list_of_input_ports:
output_port.connect(port) port.connect(output_port)
assert output_port.signal_count() == len(list_of_input_ports) assert output_port.signal_count == len(list_of_input_ports)
def test_same_port(self, output_port, list_of_input_ports): def test_same_port(self, output_port, input_port):
"""Check error handing.""" """Check error handing."""
output_port.connect(list_of_input_ports[0]) input_port.connect(output_port)
with pytest.raises(AssertionError): with pytest.raises(Exception):
output_port.connect(list_of_input_ports[0]) input_port.connect(output_port)
assert output_port.signal_count() == 2 assert output_port.signal_count == 1
class TestAddSignal: class TestAddSignal:
def test_dangling(self, output_port): def test_dangling(self, output_port):
s = Signal() s = Signal()
output_port.add_signal(s) output_port.add_signal(s)
assert output_port.signal_count() == 1 assert output_port.signal_count == 1
assert output_port.signals == [s]
def test_with_destination(self, output_port, input_port):
s = Signal(destination=input_port)
output_port.add_signal(s)
assert output_port.connected_ports == [s.destination] class TestClear:
def test_others_clear(self, output_port, list_of_input_ports):
for port in list_of_input_ports:
port.connect(output_port)
class TestDisconnect:
def test_multiple_ports(self, output_port, list_of_input_ports):
"""Can multiple ports disconnect from OutputPort?"""
for port in list_of_input_ports: for port in list_of_input_ports:
output_port.connect(port) port.clear()
assert output_port.signal_count == 3
assert all(s.dangling() for s in output_port.signals)
def test_self_clear(self, output_port, list_of_input_ports):
for port in list_of_input_ports: for port in list_of_input_ports:
output_port.disconnect(port) port.connect(output_port)
assert output_port.signal_count() == 3 output_port.clear()
assert output_port.connected_ports == []
assert output_port.signal_count == 0
assert output_port.signals == []
class TestRemoveSignal: class TestRemoveSignal:
def test_one_signal(self, output_port, input_port): def test_one_signal(self, output_port, input_port):
s = output_port.connect(input_port) s = input_port.connect(output_port)
output_port.remove_signal(s) output_port.remove_signal(s)
assert output_port.signal_count() == 0 assert output_port.signal_count == 0
assert output_port.signals == [] assert output_port.signals == []
assert output_port.connected_ports == []
def test_multiple_signals(self, output_port, list_of_input_ports): def test_multiple_signals(self, output_port, list_of_input_ports):
"""Can multiple signals disconnect from OutputPort?"""
sigs = [] sigs = []
for port in list_of_input_ports: for port in list_of_input_ports:
sigs.append(output_port.connect(port)) sigs.append(port.connect(output_port))
for sig in sigs: for s in sigs:
output_port.remove_signal(sig) output_port.remove_signal(s)
assert output_port.signal_count() == 0 assert output_port.signal_count == 0
assert output_port.signals == [] assert output_port.signals == []
"""
B-ASIC test suite for the schema module and Schema class.
"""
from b_asic import Schema, Addition, ConstantMultiplication
class TestInit:
def test_simple_filter_normal_latency(self, simple_filter):
simple_filter.set_latency_of_type(Addition.type_name(), 5)
simple_filter.set_latency_of_type(ConstantMultiplication.type_name(), 4)
schema = Schema(simple_filter)
assert schema._start_times == {"add1": 4, "cmul1": 0}
def test_complicated_single_outputs_normal_latency(self, precedence_sfg_registers):
precedence_sfg_registers.set_latency_of_type(Addition.type_name(), 4)
precedence_sfg_registers.set_latency_of_type(ConstantMultiplication.type_name(), 3)
schema = Schema(precedence_sfg_registers, scheduling_alg="ASAP")
for op in schema._sfg.get_operations_topological_order():
print(op.latency_offsets)
start_times_names = dict()
for op_id, start_time in schema._start_times.items():
op_name = precedence_sfg_registers.find_by_id(op_id).name
start_times_names[op_name] = start_time
assert start_times_names == {"C0": 0, "B1": 0, "B2": 0, "ADD2": 3, "ADD1": 7, "Q1": 11,
"A0": 14, "A1": 0, "A2": 0, "ADD3": 3, "ADD4": 17}
def test_complicated_single_outputs_complex_latencies(self, precedence_sfg_registers):
precedence_sfg_registers.set_latency_offsets_of_type(ConstantMultiplication.type_name(), {'in0': 3, 'out0': 5})
precedence_sfg_registers.find_by_name("B1")[0].set_latency_offsets({'in0': 4, 'out0': 7})
precedence_sfg_registers.find_by_name("B2")[0].set_latency_offsets({'in0': 1, 'out0': 4})
precedence_sfg_registers.find_by_name("ADD2")[0].set_latency_offsets({'in0': 4, 'in1': 2, 'out0': 4})
precedence_sfg_registers.find_by_name("ADD1")[0].set_latency_offsets({'in0': 1, 'in1': 2, 'out0': 4})
precedence_sfg_registers.find_by_name("Q1")[0].set_latency_offsets({'in0': 3, 'out0': 6})
precedence_sfg_registers.find_by_name("A0")[0].set_latency_offsets({'in0': 0, 'out0': 2})
precedence_sfg_registers.find_by_name("A1")[0].set_latency_offsets({'in0': 0, 'out0': 5})
precedence_sfg_registers.find_by_name("A2")[0].set_latency_offsets({'in0': 2, 'out0': 3})
precedence_sfg_registers.find_by_name("ADD3")[0].set_latency_offsets({'in0': 2, 'in1': 1, 'out0': 4})
precedence_sfg_registers.find_by_name("ADD4")[0].set_latency_offsets({'in0': 6, 'in1': 7, 'out0': 9})
schema = Schema(precedence_sfg_registers, scheduling_alg="ASAP")
start_times_names = dict()
for op_id, start_time in schema._start_times.items():
op_name = precedence_sfg_registers.find_by_id(op_id).name
start_times_names[op_name] = start_time
assert start_times_names == {'C0': 0, 'B1': 0, 'B2': 0, 'ADD2': 3, 'ADD1': 5, 'Q1': 6, 'A0': 12,
'A1': 0, 'A2': 0, 'ADD3': 3, 'ADD4': 8}
def test_independent_sfg(self, sfg_two_inputs_two_outputs_independent_with_cmul):
schema = Schema(sfg_two_inputs_two_outputs_independent_with_cmul, scheduling_alg="ASAP")
start_times_names = dict()
for op_id, start_time in schema._start_times.items():
op_name = sfg_two_inputs_two_outputs_independent_with_cmul.find_by_id(op_id).name
start_times_names[op_name] = start_time
assert start_times_names == {'CMUL1': 0, 'CMUL2': 5, "ADD1": 0, "CMUL3": 7}
import pytest
import io
import sys
from b_asic import SFG, Signal, Input, Output, Constant, ConstantMultiplication, Addition, Multiplication, Register, \
Butterfly, Subtraction, SquareRoot
class TestInit:
def test_direct_input_to_output_sfg_construction(self):
in1 = Input("IN1")
out1 = Output(None, "OUT1")
out1.input(0).connect(in1, "S1")
sfg = SFG(inputs=[in1], outputs=[out1]) # in1 ---s1---> out1
assert len(list(sfg.components)) == 3
assert len(list(sfg.operations)) == 2
assert sfg.input_count == 1
assert sfg.output_count == 1
def test_same_signal_input_and_output_sfg_construction(self):
add1 = Addition(None, None, "ADD1")
add2 = Addition(None, None, "ADD2")
s1 = add2.input(0).connect(add1, "S1")
# in1 ---s1---> out1
sfg = SFG(input_signals=[s1], output_signals=[s1])
assert len(list(sfg.components)) == 3
assert len(list(sfg.operations)) == 2
assert sfg.input_count == 1
assert sfg.output_count == 1
def test_outputs_construction(self, operation_tree):
sfg = SFG(outputs=[Output(operation_tree)])
assert len(list(sfg.components)) == 7
assert len(list(sfg.operations)) == 4
assert sfg.input_count == 0
assert sfg.output_count == 1
def test_signals_construction(self, operation_tree):
sfg = SFG(output_signals=[Signal(source=operation_tree.output(0))])
assert len(list(sfg.components)) == 7
assert len(list(sfg.operations)) == 4
assert sfg.input_count == 0
assert sfg.output_count == 1
class TestPrintSfg:
def test_one_addition(self):
inp1 = Input("INP1")
inp2 = Input("INP2")
add1 = Addition(inp1, inp2, "ADD1")
out1 = Output(add1, "OUT1")
sfg = SFG(inputs=[inp1, inp2], outputs=[out1], name="SFG1")
assert sfg.__str__() == \
"id: no_id, \tname: SFG1, \tinputs: {0: '-'}, \toutputs: {0: '-'}\n" + \
"Internal Operations:\n" + \
"----------------------------------------------------------------------------------------------------\n" + \
str(sfg.find_by_name("INP1")[0]) + "\n" + \
str(sfg.find_by_name("INP2")[0]) + "\n" + \
str(sfg.find_by_name("ADD1")[0]) + "\n" + \
str(sfg.find_by_name("OUT1")[0]) + "\n" + \
"----------------------------------------------------------------------------------------------------\n"
def test_add_mul(self):
inp1 = Input("INP1")
inp2 = Input("INP2")
inp3 = Input("INP3")
add1 = Addition(inp1, inp2, "ADD1")
mul1 = Multiplication(add1, inp3, "MUL1")
out1 = Output(mul1, "OUT1")
sfg = SFG(inputs=[inp1, inp2, inp3], outputs=[out1], name="mac_sfg")
assert sfg.__str__() == \
"id: no_id, \tname: mac_sfg, \tinputs: {0: '-'}, \toutputs: {0: '-'}\n" + \
"Internal Operations:\n" + \
"----------------------------------------------------------------------------------------------------\n" + \
str(sfg.find_by_name("INP1")[0]) + "\n" + \
str(sfg.find_by_name("INP2")[0]) + "\n" + \
str(sfg.find_by_name("ADD1")[0]) + "\n" + \
str(sfg.find_by_name("INP3")[0]) + "\n" + \
str(sfg.find_by_name("MUL1")[0]) + "\n" + \
str(sfg.find_by_name("OUT1")[0]) + "\n" + \
"----------------------------------------------------------------------------------------------------\n"
def test_constant(self):
inp1 = Input("INP1")
const1 = Constant(3, "CONST")
add1 = Addition(const1, inp1, "ADD1")
out1 = Output(add1, "OUT1")
sfg = SFG(inputs=[inp1], outputs=[out1], name="sfg")
assert sfg.__str__() == \
"id: no_id, \tname: sfg, \tinputs: {0: '-'}, \toutputs: {0: '-'}\n" + \
"Internal Operations:\n" + \
"----------------------------------------------------------------------------------------------------\n" + \
str(sfg.find_by_name("CONST")[0]) + "\n" + \
str(sfg.find_by_name("INP1")[0]) + "\n" + \
str(sfg.find_by_name("ADD1")[0]) + "\n" + \
str(sfg.find_by_name("OUT1")[0]) + "\n" + \
"----------------------------------------------------------------------------------------------------\n"
def test_simple_filter(self, simple_filter):
assert simple_filter.__str__() == \
"id: no_id, \tname: simple_filter, \tinputs: {0: '-'}, \toutputs: {0: '-'}\n" + \
"Internal Operations:\n" + \
"----------------------------------------------------------------------------------------------------\n" + \
str(simple_filter.find_by_name("IN1")[0]) + "\n" + \
str(simple_filter.find_by_name("ADD1")[0]) + "\n" + \
str(simple_filter.find_by_name("REG1")[0]) + "\n" + \
str(simple_filter.find_by_name("CMUL1")[0]) + "\n" + \
str(simple_filter.find_by_name("OUT1")[0]) + "\n" + \
"----------------------------------------------------------------------------------------------------\n"
class TestDeepCopy:
def test_deep_copy_no_duplicates(self):
inp1 = Input("INP1")
inp2 = Input("INP2")
inp3 = Input("INP3")
add1 = Addition(inp1, inp2, "ADD1")
mul1 = Multiplication(add1, inp3, "MUL1")
out1 = Output(mul1, "OUT1")
mac_sfg = SFG(inputs=[inp1, inp2], outputs=[out1], name="mac_sfg")
mac_sfg_new = mac_sfg()
assert mac_sfg.name == "mac_sfg"
assert mac_sfg_new.name == ""
for g_id, component in mac_sfg._components_by_id.items():
component_copy = mac_sfg_new.find_by_id(g_id)
assert component.name == component_copy.name
def test_deep_copy(self):
inp1 = Input("INP1")
inp2 = Input("INP2")
inp3 = Input("INP3")
add1 = Addition(None, None, "ADD1")
add2 = Addition(None, None, "ADD2")
mul1 = Multiplication(None, None, "MUL1")
out1 = Output(None, "OUT1")
add1.input(0).connect(inp1, "S1")
add1.input(1).connect(inp2, "S2")
add2.input(0).connect(add1, "S4")
add2.input(1).connect(inp3, "S3")
mul1.input(0).connect(add1, "S5")
mul1.input(1).connect(add2, "S6")
out1.input(0).connect(mul1, "S7")
mac_sfg = SFG(inputs=[inp1, inp2], outputs=[out1],
id_number_offset=100, name="mac_sfg")
mac_sfg_new = mac_sfg(name="mac_sfg2")
assert mac_sfg.name == "mac_sfg"
assert mac_sfg_new.name == "mac_sfg2"
assert mac_sfg.id_number_offset == 100
assert mac_sfg_new.id_number_offset == 100
for g_id, component in mac_sfg._components_by_id.items():
component_copy = mac_sfg_new.find_by_id(g_id)
assert component.name == component_copy.name
def test_deep_copy_with_new_sources(self):
inp1 = Input("INP1")
inp2 = Input("INP2")
inp3 = Input("INP3")
add1 = Addition(inp1, inp2, "ADD1")
mul1 = Multiplication(add1, inp3, "MUL1")
out1 = Output(mul1, "OUT1")
mac_sfg = SFG(inputs=[inp1, inp2], outputs=[out1], name="mac_sfg")
a = Addition(Constant(3), Constant(5))
b = Constant(2)
mac_sfg_new = mac_sfg(a, b)
assert mac_sfg_new.input(0).signals[0].source.operation is a
assert mac_sfg_new.input(1).signals[0].source.operation is b
class TestEvaluateOutput:
def test_evaluate_output(self, operation_tree):
sfg = SFG(outputs=[Output(operation_tree)])
assert sfg.evaluate_output(0, []) == 5
def test_evaluate_output_large(self, large_operation_tree):
sfg = SFG(outputs=[Output(large_operation_tree)])
assert sfg.evaluate_output(0, []) == 14
def test_evaluate_output_cycle(self, operation_graph_with_cycle):
sfg = SFG(outputs=[Output(operation_graph_with_cycle)])
with pytest.raises(Exception):
sfg.evaluate_output(0, [])
class TestComponents:
def test_advanced_components(self):
inp1 = Input("INP1")
inp2 = Input("INP2")
inp3 = Input("INP3")
add1 = Addition(None, None, "ADD1")
add2 = Addition(None, None, "ADD2")
mul1 = Multiplication(None, None, "MUL1")
out1 = Output(None, "OUT1")
add1.input(0).connect(inp1, "S1")
add1.input(1).connect(inp2, "S2")
add2.input(0).connect(add1, "S4")
add2.input(1).connect(inp3, "S3")
mul1.input(0).connect(add1, "S5")
mul1.input(1).connect(add2, "S6")
out1.input(0).connect(mul1, "S7")
mac_sfg = SFG(inputs=[inp1, inp2], outputs=[out1], name="mac_sfg")
assert set([comp.name for comp in mac_sfg.components]) == {
"INP1", "INP2", "INP3", "ADD1", "ADD2", "MUL1", "OUT1", "S1", "S2", "S3", "S4", "S5", "S6", "S7"}
class TestReplaceComponents:
def test_replace_addition_by_id(self, operation_tree):
sfg = SFG(outputs=[Output(operation_tree)])
component_id = "add1"
sfg = sfg.replace_component(
Multiplication(name="Multi"), _id=component_id)
assert component_id not in sfg._components_by_id.keys()
assert "Multi" in sfg._components_by_name.keys()
def test_replace_addition_large_tree(self, large_operation_tree):
sfg = SFG(outputs=[Output(large_operation_tree)])
component_id = "add3"
sfg = sfg.replace_component(
Multiplication(name="Multi"), _id=component_id)
assert "Multi" in sfg._components_by_name.keys()
assert component_id not in sfg._components_by_id.keys()
def test_replace_no_input_component(self, operation_tree):
sfg = SFG(outputs=[Output(operation_tree)])
component_id = "c1"
_const = sfg.find_by_id(component_id)
sfg = sfg.replace_component(Constant(1), _id=component_id)
assert _const is not sfg.find_by_id(component_id)
def test_no_match_on_replace(self, large_operation_tree):
sfg = SFG(outputs=[Output(large_operation_tree)])
component_id = "addd1"
try:
sfg = sfg.replace_component(
Multiplication(name="Multi"), _id=component_id)
except AssertionError:
assert True
else:
assert False
def test_not_equal_input(self, large_operation_tree):
sfg = SFG(outputs=[Output(large_operation_tree)])
component_id = "c1"
try:
sfg = sfg.replace_component(
Multiplication(name="Multi"), _id=component_id)
except AssertionError:
assert True
else:
assert False
class TestInsertComponent:
def test_insert_component_in_sfg(self, large_operation_tree_names):
sfg = SFG(outputs=[Output(large_operation_tree_names)])
sqrt = SquareRoot()
_sfg = sfg.insert_operation(sqrt, sfg.find_by_name("constant4")[0].graph_id)
assert _sfg.evaluate() != sfg.evaluate()
assert any([isinstance(comp, SquareRoot) for comp in _sfg.operations])
assert not any([isinstance(comp, SquareRoot) for comp in sfg.operations])
assert not isinstance(sfg.find_by_name("constant4")[0].output(0).signals[0].destination.operation, SquareRoot)
assert isinstance(_sfg.find_by_name("constant4")[0].output(0).signals[0].destination.operation, SquareRoot)
assert sfg.find_by_name("constant4")[0].output(0).signals[0].destination.operation is sfg.find_by_id("add3")
assert _sfg.find_by_name("constant4")[0].output(
0).signals[0].destination.operation is not _sfg.find_by_id("add3")
assert _sfg.find_by_id("sqrt1").output(0).signals[0].destination.operation is _sfg.find_by_id("add3")
def test_insert_invalid_component_in_sfg(self, large_operation_tree):
sfg = SFG(outputs=[Output(large_operation_tree)])
# Should raise an exception for not matching input count to output count.
add4 = Addition()
with pytest.raises(Exception):
sfg.insert_operation(add4, "c1")
def test_insert_at_output(self, large_operation_tree):
sfg = SFG(outputs=[Output(large_operation_tree)])
# Should raise an exception for trying to insert an operation after an output.
sqrt = SquareRoot()
with pytest.raises(Exception):
_sfg = sfg.insert_operation(sqrt, "out1")
def test_insert_multiple_output_ports(self, butterfly_operation_tree):
sfg = SFG(outputs=list(map(Output, butterfly_operation_tree.outputs)))
_sfg = sfg.insert_operation(Butterfly(name="n_bfly"), "bfly3")
assert sfg.evaluate() != _sfg.evaluate()
assert len(sfg.find_by_name("n_bfly")) == 0
assert len(_sfg.find_by_name("n_bfly")) == 1
# Correctly connected old output -> new input
assert _sfg.find_by_name("bfly3")[0].output(
0).signals[0].destination.operation is _sfg.find_by_name("n_bfly")[0]
assert _sfg.find_by_name("bfly3")[0].output(
1).signals[0].destination.operation is _sfg.find_by_name("n_bfly")[0]
# Correctly connected new input -> old output
assert _sfg.find_by_name("n_bfly")[0].input(0).signals[0].source.operation is _sfg.find_by_name("bfly3")[0]
assert _sfg.find_by_name("n_bfly")[0].input(1).signals[0].source.operation is _sfg.find_by_name("bfly3")[0]
# Correctly connected new output -> next input
assert _sfg.find_by_name("n_bfly")[0].output(
0).signals[0].destination.operation is _sfg.find_by_name("bfly2")[0]
assert _sfg.find_by_name("n_bfly")[0].output(
1).signals[0].destination.operation is _sfg.find_by_name("bfly2")[0]
# Correctly connected next input -> new output
assert _sfg.find_by_name("bfly2")[0].input(0).signals[0].source.operation is _sfg.find_by_name("n_bfly")[0]
assert _sfg.find_by_name("bfly2")[0].input(1).signals[0].source.operation is _sfg.find_by_name("n_bfly")[0]
class TestFindComponentsWithTypeName:
def test_mac_components(self):
inp1 = Input("INP1")
inp2 = Input("INP2")
inp3 = Input("INP3")
add1 = Addition(None, None, "ADD1")
add2 = Addition(None, None, "ADD2")
mul1 = Multiplication(None, None, "MUL1")
out1 = Output(None, "OUT1")
add1.input(0).connect(inp1, "S1")
add1.input(1).connect(inp2, "S2")
add2.input(0).connect(add1, "S4")
add2.input(1).connect(inp3, "S3")
mul1.input(0).connect(add1, "S5")
mul1.input(1).connect(add2, "S6")
out1.input(0).connect(mul1, "S7")
mac_sfg = SFG(inputs=[inp1, inp2], outputs=[out1], name="mac_sfg")
assert {comp.name for comp in mac_sfg.get_components_with_type_name(
inp1.type_name())} == {"INP1", "INP2", "INP3"}
assert {comp.name for comp in mac_sfg.get_components_with_type_name(
add1.type_name())} == {"ADD1", "ADD2"}
assert {comp.name for comp in mac_sfg.get_components_with_type_name(
mul1.type_name())} == {"MUL1"}
assert {comp.name for comp in mac_sfg.get_components_with_type_name(
out1.type_name())} == {"OUT1"}
assert {comp.name for comp in mac_sfg.get_components_with_type_name(
Signal.type_name())} == {"S1", "S2", "S3", "S4", "S5", "S6", "S7"}
class TestGetPrecedenceList:
def test_inputs_registers(self, precedence_sfg_registers):
precedence_list = precedence_sfg_registers.get_precedence_list()
assert len(precedence_list) == 7
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[0]]) == {"IN1", "T1", "T2"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[1]]) == {"C0", "B1", "B2", "A1", "A2"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[2]]) == {"ADD2", "ADD3"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[3]]) == {"ADD1"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[4]]) == {"Q1"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[5]]) == {"A0"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[6]]) == {"ADD4"}
def test_inputs_constants_registers_multiple_outputs(self, precedence_sfg_registers_and_constants):
precedence_list = precedence_sfg_registers_and_constants.get_precedence_list()
assert len(precedence_list) == 7
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[0]]) == {"IN1", "T1", "CONST1"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[1]]) == {"C0", "B1", "B2", "A1", "A2"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[2]]) == {"ADD2", "ADD3"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[3]]) == {"ADD1"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[4]]) == {"Q1"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[5]]) == {"A0"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[6]]) == {"BFLY1.0", "BFLY1.1"}
def test_precedence_multiple_outputs_same_precedence(self, sfg_two_inputs_two_outputs):
sfg_two_inputs_two_outputs.name = "NESTED_SFG"
in1 = Input("IN1")
sfg_two_inputs_two_outputs.input(0).connect(in1, "S1")
in2 = Input("IN2")
cmul1 = ConstantMultiplication(10, None, "CMUL1")
cmul1.input(0).connect(in2, "S2")
sfg_two_inputs_two_outputs.input(1).connect(cmul1, "S3")
out1 = Output(sfg_two_inputs_two_outputs.output(0), "OUT1")
out2 = Output(sfg_two_inputs_two_outputs.output(1), "OUT2")
sfg = SFG(inputs=[in1, in2], outputs=[out1, out2])
precedence_list = sfg.get_precedence_list()
assert len(precedence_list) == 3
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[0]]) == {"IN1", "IN2"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[1]]) == {"CMUL1"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[2]]) == {"NESTED_SFG.0", "NESTED_SFG.1"}
def test_precedence_sfg_multiple_outputs_different_precedences(self, sfg_two_inputs_two_outputs_independent):
sfg_two_inputs_two_outputs_independent.name = "NESTED_SFG"
in1 = Input("IN1")
in2 = Input("IN2")
sfg_two_inputs_two_outputs_independent.input(0).connect(in1, "S1")
cmul1 = ConstantMultiplication(10, None, "CMUL1")
cmul1.input(0).connect(in2, "S2")
sfg_two_inputs_two_outputs_independent.input(1).connect(cmul1, "S3")
out1 = Output(sfg_two_inputs_two_outputs_independent.output(0), "OUT1")
out2 = Output(sfg_two_inputs_two_outputs_independent.output(1), "OUT2")
sfg = SFG(inputs=[in1, in2], outputs=[out1, out2])
precedence_list = sfg.get_precedence_list()
assert len(precedence_list) == 3
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[0]]) == {"IN1", "IN2"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[1]]) == {"CMUL1"}
assert set([port.operation.key(port.index, port.operation.name)
for port in precedence_list[2]]) == {"NESTED_SFG.0", "NESTED_SFG.1"}
class TestPrintPrecedence:
def test_registers(self, precedence_sfg_registers):
sfg = precedence_sfg_registers
captured_output = io.StringIO()
sys.stdout = captured_output
sfg.print_precedence_graph()
sys.stdout = sys.__stdout__
captured_output = captured_output.getvalue()
assert captured_output == \
"-" * 120 + "\n" + \
"1.1 \t" + str(sfg.find_by_name("IN1")[0]) + "\n" + \
"1.2 \t" + str(sfg.find_by_name("T1")[0]) + "\n" + \
"1.3 \t" + str(sfg.find_by_name("T2")[0]) + "\n" + \
"-" * 120 + "\n" + \
"2.1 \t" + str(sfg.find_by_name("C0")[0]) + "\n" + \
"2.2 \t" + str(sfg.find_by_name("A1")[0]) + "\n" + \
"2.3 \t" + str(sfg.find_by_name("B1")[0]) + "\n" + \
"2.4 \t" + str(sfg.find_by_name("A2")[0]) + "\n" + \
"2.5 \t" + str(sfg.find_by_name("B2")[0]) + "\n" + \
"-" * 120 + "\n" + \
"3.1 \t" + str(sfg.find_by_name("ADD3")[0]) + "\n" + \
"3.2 \t" + str(sfg.find_by_name("ADD2")[0]) + "\n" + \
"-" * 120 + "\n" + \
"4.1 \t" + str(sfg.find_by_name("ADD1")[0]) + "\n" + \
"-" * 120 + "\n" + \
"5.1 \t" + str(sfg.find_by_name("Q1")[0]) + "\n" + \
"-" * 120 + "\n" + \
"6.1 \t" + str(sfg.find_by_name("A0")[0]) + "\n" + \
"-" * 120 + "\n" + \
"7.1 \t" + str(sfg.find_by_name("ADD4")[0]) + "\n" + \
"-" * 120 + "\n"
class TestDepends:
def test_depends_sfg(self, sfg_two_inputs_two_outputs):
assert set(sfg_two_inputs_two_outputs.inputs_required_for_output(0)) == {
0, 1}
assert set(sfg_two_inputs_two_outputs.inputs_required_for_output(1)) == {
0, 1}
def test_depends_sfg_independent(self, sfg_two_inputs_two_outputs_independent):
assert set(
sfg_two_inputs_two_outputs_independent.inputs_required_for_output(0)) == {0}
assert set(
sfg_two_inputs_two_outputs_independent.inputs_required_for_output(1)) == {1}
class TestConnectExternalSignalsToComponentsSoloComp:
def test_connect_external_signals_to_components_mac(self):
""" Replace a MAC with inner components in an SFG """
inp1 = Input("INP1")
inp2 = Input("INP2")
inp3 = Input("INP3")
add1 = Addition(None, None, "ADD1")
add2 = Addition(None, None, "ADD2")
mul1 = Multiplication(None, None, "MUL1")
out1 = Output(None, "OUT1")
add1.input(0).connect(inp1, "S1")
add1.input(1).connect(inp2, "S2")
add2.input(0).connect(add1, "S3")
add2.input(1).connect(inp3, "S4")
mul1.input(0).connect(add1, "S5")
mul1.input(1).connect(add2, "S6")
out1.input(0).connect(mul1, "S7")
mac_sfg = SFG(inputs=[inp1, inp2], outputs=[out1])
inp4 = Input("INP4")
inp5 = Input("INP5")
out2 = Output(None, "OUT2")
mac_sfg.input(0).connect(inp4, "S8")
mac_sfg.input(1).connect(inp5, "S9")
out2.input(0).connect(mac_sfg.outputs[0], "S10")
test_sfg = SFG(inputs=[inp4, inp5], outputs=[out2])
assert test_sfg.evaluate(1, 2) == 9
mac_sfg.connect_external_signals_to_components()
assert test_sfg.evaluate(1, 2) == 9
assert not test_sfg.connect_external_signals_to_components()
def test_connect_external_signals_to_components_operation_tree(self, operation_tree):
""" Replaces an SFG with only a operation_tree component with its inner components """
sfg1 = SFG(outputs=[Output(operation_tree)])
out1 = Output(None, "OUT1")
out1.input(0).connect(sfg1.outputs[0], "S1")
test_sfg = SFG(outputs=[out1])
assert test_sfg.evaluate_output(0, []) == 5
sfg1.connect_external_signals_to_components()
assert test_sfg.evaluate_output(0, []) == 5
assert not test_sfg.connect_external_signals_to_components()
def test_connect_external_signals_to_components_large_operation_tree(self, large_operation_tree):
""" Replaces an SFG with only a large_operation_tree component with its inner components """
sfg1 = SFG(outputs=[Output(large_operation_tree)])
out1 = Output(None, "OUT1")
out1.input(0).connect(sfg1.outputs[0], "S1")
test_sfg = SFG(outputs=[out1])
assert test_sfg.evaluate_output(0, []) == 14
sfg1.connect_external_signals_to_components()
assert test_sfg.evaluate_output(0, []) == 14
assert not test_sfg.connect_external_signals_to_components()
class TestConnectExternalSignalsToComponentsMultipleComp:
def test_connect_external_signals_to_components_operation_tree(self, operation_tree):
""" Replaces a operation_tree in an SFG with other components """
sfg1 = SFG(outputs=[Output(operation_tree)])
inp1 = Input("INP1")
inp2 = Input("INP2")
out1 = Output(None, "OUT1")
add1 = Addition(None, None, "ADD1")
add2 = Addition(None, None, "ADD2")
add1.input(0).connect(inp1, "S1")
add1.input(1).connect(inp2, "S2")
add2.input(0).connect(add1, "S3")
add2.input(1).connect(sfg1.outputs[0], "S4")
out1.input(0).connect(add2, "S5")
test_sfg = SFG(inputs=[inp1, inp2], outputs=[out1])
assert test_sfg.evaluate(1, 2) == 8
sfg1.connect_external_signals_to_components()
assert test_sfg.evaluate(1, 2) == 8
assert not test_sfg.connect_external_signals_to_components()
def test_connect_external_signals_to_components_large_operation_tree(self, large_operation_tree):
""" Replaces a large_operation_tree in an SFG with other components """
sfg1 = SFG(outputs=[Output(large_operation_tree)])
inp1 = Input("INP1")
inp2 = Input("INP2")
out1 = Output(None, "OUT1")
add1 = Addition(None, None, "ADD1")
add2 = Addition(None, None, "ADD2")
add1.input(0).connect(inp1, "S1")
add1.input(1).connect(inp2, "S2")
add2.input(0).connect(add1, "S3")
add2.input(1).connect(sfg1.outputs[0], "S4")
out1.input(0).connect(add2, "S5")
test_sfg = SFG(inputs=[inp1, inp2], outputs=[out1])
assert test_sfg.evaluate(1, 2) == 17
sfg1.connect_external_signals_to_components()
assert test_sfg.evaluate(1, 2) == 17
assert not test_sfg.connect_external_signals_to_components()
def create_sfg(self, op_tree):
""" Create a simple SFG with either operation_tree or large_operation_tree """
sfg1 = SFG(outputs=[Output(op_tree)])
inp1 = Input("INP1")
inp2 = Input("INP2")
out1 = Output(None, "OUT1")
add1 = Addition(None, None, "ADD1")
add2 = Addition(None, None, "ADD2")
add1.input(0).connect(inp1, "S1")
add1.input(1).connect(inp2, "S2")
add2.input(0).connect(add1, "S3")
add2.input(1).connect(sfg1.outputs[0], "S4")
out1.input(0).connect(add2, "S5")
return SFG(inputs=[inp1, inp2], outputs=[out1])
def test_connect_external_signals_to_components_many_op(self, large_operation_tree):
""" Replaces an sfg component in a larger SFG with several component operations """
inp1 = Input("INP1")
inp2 = Input("INP2")
inp3 = Input("INP3")
inp4 = Input("INP4")
out1 = Output(None, "OUT1")
add1 = Addition(None, None, "ADD1")
sub1 = Subtraction(None, None, "SUB1")
add1.input(0).connect(inp1, "S1")
add1.input(1).connect(inp2, "S2")
sfg1 = self.create_sfg(large_operation_tree)
sfg1.input(0).connect(add1, "S3")
sfg1.input(1).connect(inp3, "S4")
sub1.input(0).connect(sfg1.outputs[0], "S5")
sub1.input(1).connect(inp4, "S6")
out1.input(0).connect(sub1, "S7")
test_sfg = SFG(inputs=[inp1, inp2, inp3, inp4], outputs=[out1])
assert test_sfg.evaluate(1, 2, 3, 4) == 16
sfg1.connect_external_signals_to_components()
assert test_sfg.evaluate(1, 2, 3, 4) == 16
assert not test_sfg.connect_external_signals_to_components()
class TestTopologicalOrderOperations:
def test_feedback_sfg(self, simple_filter):
topological_order = simple_filter.get_operations_topological_order()
assert [comp.name for comp in topological_order] == ["IN1", "ADD1", "REG1", "CMUL1", "OUT1"]
def test_multiple_independent_inputs(self, sfg_two_inputs_two_outputs_independent):
topological_order = sfg_two_inputs_two_outputs_independent.get_operations_topological_order()
assert [comp.name for comp in topological_order] == ["IN1", "OUT1", "IN2", "C1", "ADD1", "OUT2"]
def test_complex_graph(self, precedence_sfg_registers):
topological_order = precedence_sfg_registers.get_operations_topological_order()
assert [comp.name for comp in topological_order] == \
['IN1', 'C0', 'ADD1', 'Q1', 'A0', 'T1', 'B1', 'A1', 'T2', 'B2', 'ADD2', 'A2', 'ADD3', 'ADD4', 'OUT1']
class TestRemove:
def test_remove_single_input_outputs(self, simple_filter):
new_sfg = simple_filter.remove_operation("cmul1")
assert set(op.name for op in simple_filter.find_by_name("REG1")[0].subsequent_operations) == {"CMUL1", "OUT1"}
assert set(op.name for op in new_sfg.find_by_name("REG1")[0].subsequent_operations) == {"ADD1", "OUT1"}
assert set(op.name for op in simple_filter.find_by_name("ADD1")[0].preceding_operations) == {"CMUL1", "IN1"}
assert set(op.name for op in new_sfg.find_by_name("ADD1")[0].preceding_operations) == {"REG1", "IN1"}
assert "S1" in set([sig.name for sig in simple_filter.find_by_name("REG1")[0].output(0).signals])
assert "S2" in set([sig.name for sig in new_sfg.find_by_name("REG1")[0].output(0).signals])
def test_remove_multiple_inputs_outputs(self, butterfly_operation_tree):
out1 = Output(butterfly_operation_tree.output(0), "OUT1")
out2 = Output(butterfly_operation_tree.output(1), "OUT2")
sfg = SFG(outputs=[out1, out2])
new_sfg = sfg.remove_operation(sfg.find_by_name("bfly2")[0].graph_id)
assert sfg.find_by_name("bfly3")[0].output(0).signal_count == 1
assert new_sfg.find_by_name("bfly3")[0].output(0).signal_count == 1
sfg_dest_0 = sfg.find_by_name("bfly3")[0].output(0).signals[0].destination
new_sfg_dest_0 = new_sfg.find_by_name("bfly3")[0].output(0).signals[0].destination
assert sfg_dest_0.index == 0
assert new_sfg_dest_0.index == 0
assert sfg_dest_0.operation.name == "bfly2"
assert new_sfg_dest_0.operation.name == "bfly1"
assert sfg.find_by_name("bfly3")[0].output(1).signal_count == 1
assert new_sfg.find_by_name("bfly3")[0].output(1).signal_count == 1
sfg_dest_1 = sfg.find_by_name("bfly3")[0].output(1).signals[0].destination
new_sfg_dest_1 = new_sfg.find_by_name("bfly3")[0].output(1).signals[0].destination
assert sfg_dest_1.index == 1
assert new_sfg_dest_1.index == 1
assert sfg_dest_1.operation.name == "bfly2"
assert new_sfg_dest_1.operation.name == "bfly1"
assert sfg.find_by_name("bfly1")[0].input(0).signal_count == 1
assert new_sfg.find_by_name("bfly1")[0].input(0).signal_count == 1
sfg_source_0 = sfg.find_by_name("bfly1")[0].input(0).signals[0].source
new_sfg_source_0 = new_sfg.find_by_name("bfly1")[0].input(0).signals[0].source
assert sfg_source_0.index == 0
assert new_sfg_source_0.index == 0
assert sfg_source_0.operation.name == "bfly2"
assert new_sfg_source_0.operation.name == "bfly3"
sfg_source_1 = sfg.find_by_name("bfly1")[0].input(1).signals[0].source
new_sfg_source_1 = new_sfg.find_by_name("bfly1")[0].input(1).signals[0].source
assert sfg_source_1.index == 1
assert new_sfg_source_1.index == 1
assert sfg_source_1.operation.name == "bfly2"
assert new_sfg_source_1.operation.name == "bfly3"
assert "bfly2" not in set(op.name for op in new_sfg.operations)
def remove_different_number_inputs_outputs(self, simple_filter):
with pytest.raises(ValueError):
simple_filter.remove_operation("add1")
class TestGetComponentsOfType:
def test_get_no_operations_of_type(self, sfg_two_inputs_two_outputs):
assert [op.name for op in sfg_two_inputs_two_outputs.get_components_with_type_name(Multiplication.type_name())] \
== []
def test_get_multple_operations_of_type(self, sfg_two_inputs_two_outputs):
assert [op.name for op in sfg_two_inputs_two_outputs.get_components_with_type_name(Addition.type_name())] \
== ["ADD1", "ADD2"]
assert [op.name for op in sfg_two_inputs_two_outputs.get_components_with_type_name(Input.type_name())] \
== ["IN1", "IN2"]
assert [op.name for op in sfg_two_inputs_two_outputs.get_components_with_type_name(Output.type_name())] \
== ["OUT1", "OUT2"]
...@@ -2,14 +2,14 @@ ...@@ -2,14 +2,14 @@
B-ASIC test suit for the signal module which consists of the Signal class. B-ASIC test suit for the signal module which consists of the Signal class.
""" """
from b_asic.port import InputPort, OutputPort
from b_asic.signal import Signal
import pytest import pytest
from b_asic import InputPort, OutputPort, Signal
def test_signal_creation_and_disconnction_and_connection_changing(): def test_signal_creation_and_disconnction_and_connection_changing():
in_port = InputPort(0, None) in_port = InputPort(None, 0)
out_port = OutputPort(1, None) out_port = OutputPort(None, 1)
s = Signal(out_port, in_port) s = Signal(out_port, in_port)
assert in_port.signals == [s] assert in_port.signals == [s]
...@@ -17,7 +17,7 @@ def test_signal_creation_and_disconnction_and_connection_changing(): ...@@ -17,7 +17,7 @@ def test_signal_creation_and_disconnction_and_connection_changing():
assert s.source is out_port assert s.source is out_port
assert s.destination is in_port assert s.destination is in_port
in_port1 = InputPort(0, None) in_port1 = InputPort(None, 0)
s.set_destination(in_port1) s.set_destination(in_port1)
assert in_port.signals == [] assert in_port.signals == []
...@@ -40,7 +40,7 @@ def test_signal_creation_and_disconnction_and_connection_changing(): ...@@ -40,7 +40,7 @@ def test_signal_creation_and_disconnction_and_connection_changing():
assert s.source is None assert s.source is None
assert s.destination is None assert s.destination is None
out_port1 = OutputPort(0, None) out_port1 = OutputPort(None, 0)
s.set_source(out_port1) s.set_source(out_port1)
assert out_port1.signals == [s] assert out_port1.signals == [s]
...@@ -60,3 +60,29 @@ def test_signal_creation_and_disconnction_and_connection_changing(): ...@@ -60,3 +60,29 @@ def test_signal_creation_and_disconnction_and_connection_changing():
assert in_port.signals == [s] assert in_port.signals == [s]
assert s.source is out_port assert s.source is out_port
assert s.destination is in_port assert s.destination is in_port
class Bits:
def test_pos_int(self, signal):
signal.bits = 10
assert signal.bits == 10
def test_bits_zero(self, signal):
signal.bits = 0
assert signal.bits == 0
def test_bits_neg_int(self, signal):
with pytest.raises(Exception):
signal.bits = -10
def test_bits_complex(self, signal):
with pytest.raises(Exception):
signal.bits = (2+4j)
def test_bits_float(self, signal):
with pytest.raises(Exception):
signal.bits = 3.2
def test_bits_pos_then_none(self, signal):
signal.bits = 10
signal.bits = None
assert signal.bits is None
\ No newline at end of file
import pytest
import numpy as np
from b_asic import SFG, Output, Simulation
class TestRunFor:
def test_with_lambdas_as_input(self, sfg_two_inputs_two_outputs):
simulation = Simulation(sfg_two_inputs_two_outputs, [lambda n: n + 3, lambda n: 1 + n * 2], save_results = True)
output = simulation.run_for(101)
assert output[0] == 304
assert output[1] == 505
assert simulation.results[100]["0"] == 304
assert simulation.results[100]["1"] == 505
assert simulation.results[0]["in1"] == 3
assert simulation.results[0]["in2"] == 1
assert simulation.results[0]["add1"] == 4
assert simulation.results[0]["add2"] == 5
assert simulation.results[0]["0"] == 4
assert simulation.results[0]["1"] == 5
assert simulation.results[1]["in1"] == 4
assert simulation.results[1]["in2"] == 3
assert simulation.results[1]["add1"] == 7
assert simulation.results[1]["add2"] == 10
assert simulation.results[1]["0"] == 7
assert simulation.results[1]["1"] == 10
assert simulation.results[2]["in1"] == 5
assert simulation.results[2]["in2"] == 5
assert simulation.results[2]["add1"] == 10
assert simulation.results[2]["add2"] == 15
assert simulation.results[2]["0"] == 10
assert simulation.results[2]["1"] == 15
assert simulation.results[3]["in1"] == 6
assert simulation.results[3]["in2"] == 7
assert simulation.results[3]["add1"] == 13
assert simulation.results[3]["add2"] == 20
assert simulation.results[3]["0"] == 13
assert simulation.results[3]["1"] == 20
def test_with_numpy_arrays_as_input(self, sfg_two_inputs_two_outputs):
input0 = np.array([5, 9, 25, -5, 7])
input1 = np.array([7, 3, 3, 54, 2])
simulation = Simulation(sfg_two_inputs_two_outputs, [input0, input1])
simulation.save_results = True
output = simulation.run_for(5)
assert output[0] == 9
assert output[1] == 11
assert simulation.results[0]["in1"] == 5
assert simulation.results[0]["in2"] == 7
assert simulation.results[0]["add1"] == 12
assert simulation.results[0]["add2"] == 19
assert simulation.results[0]["0"] == 12
assert simulation.results[0]["1"] == 19
assert simulation.results[1]["in1"] == 9
assert simulation.results[1]["in2"] == 3
assert simulation.results[1]["add1"] == 12
assert simulation.results[1]["add2"] == 15
assert simulation.results[1]["0"] == 12
assert simulation.results[1]["1"] == 15
assert simulation.results[2]["in1"] == 25
assert simulation.results[2]["in2"] == 3
assert simulation.results[2]["add1"] == 28
assert simulation.results[2]["add2"] == 31
assert simulation.results[2]["0"] == 28
assert simulation.results[2]["1"] == 31
assert simulation.results[3]["in1"] == -5
assert simulation.results[3]["in2"] == 54
assert simulation.results[3]["add1"] == 49
assert simulation.results[3]["add2"] == 103
assert simulation.results[3]["0"] == 49
assert simulation.results[3]["1"] == 103
assert simulation.results[4]["0"] == 9
assert simulation.results[4]["1"] == 11
def test_with_numpy_array_overflow(self, sfg_two_inputs_two_outputs):
input0 = np.array([5, 9, 25, -5, 7])
input1 = np.array([7, 3, 3, 54, 2])
simulation = Simulation(sfg_two_inputs_two_outputs, [input0, input1])
simulation.run_for(5)
with pytest.raises(IndexError):
simulation.run_for(1)
def test_delay(self, sfg_delay):
simulation = Simulation(sfg_delay, save_results = True)
simulation.set_input(0, [5, -2, 25, -6, 7, 0])
simulation.run_for(6)
assert simulation.results[0]["0"] == 0
assert simulation.results[1]["0"] == 5
assert simulation.results[2]["0"] == -2
assert simulation.results[3]["0"] == 25
assert simulation.results[4]["0"] == -6
assert simulation.results[5]["0"] == 7
class TestRun:
def test_nested(self, sfg_nested):
input0 = np.array([5, 9])
input1 = np.array([7, 3])
simulation = Simulation(sfg_nested, [input0, input1])
output0 = simulation.run()
output1 = simulation.run()
assert output0[0] == 11405
assert output1[0] == 4221
def test_accumulator(self, sfg_accumulator):
data_in = np.array([5, -2, 25, -6, 7, 0])
reset = np.array([0, 0, 0, 1, 0, 0])
simulation = Simulation(sfg_accumulator, [data_in, reset])
output0 = simulation.run()
output1 = simulation.run()
output2 = simulation.run()
output3 = simulation.run()
output4 = simulation.run()
output5 = simulation.run()
assert output0[0] == 0
assert output1[0] == 5
assert output2[0] == 3
assert output3[0] == 28
assert output4[0] == 0
assert output5[0] == 7
def test_simple_filter(self, simple_filter):
input0 = np.array([1, 2, 3, 4, 5])
simulation = Simulation(simple_filter, [input0], save_results=True)
output0 = [simulation.run()[0] for _ in range(len(input0))]
assert output0 == [0, 1.0, 2.5, 4.25, 6.125]