diff --git a/b_asic/simulation.py b/b_asic/simulation.py index bd7d53a6cf820d37788f801260e2006d67274216..de8a526993a0bad4f709936693b40ac741553d65 100644 --- a/b_asic/simulation.py +++ b/b_asic/simulation.py @@ -5,7 +5,7 @@ TODO: More info. from collections import defaultdict from numbers import Number -from typing import List, Dict, DefaultDict, Callable, Sequence, Mapping +from typing import List, Dict, DefaultDict, Callable, Sequence, Mapping, Union, Optional from b_asic.signal_flow_graph import SFG @@ -23,28 +23,34 @@ class Simulation: _current_input_values: Sequence[Number] _latest_output_values: Sequence[Number] - def __init__(self, sfg: SFG, input_functions: Sequence[Callable[[int], Number]]): - if len(input_functions) != sfg.input_count: - raise ValueError(f"Wrong number of inputs supplied to simulation (expected {len(sfg.input_count)}, got {len(input_functions)})") + def __init__(self, sfg: SFG, input_providers: Optional[Sequence[Union[Sequence[Number], Callable[[int], Number]]]] = None): self._sfg = sfg self._results = defaultdict(dict) self._registers = {} self._iteration = 0 - self._input_functions = list(input_functions) - self._current_input_values = [0 for _ in range(self._sfg.input_count)] + self._input_functions = [] + self._current_input_values = [] self._latest_output_values = [0 for _ in range(self._sfg.output_count)] + if input_providers is not None: + self.set_inputs(input_providers) - def input_functions(self, input_functions: Sequence[Callable[[int], Number]]) -> None: + def set_inputs(self, input_providers: Sequence[Union[Sequence[Number], Callable[[int], Number]]]) -> None: """Set the input functions used to get values for the inputs to the internal SFG.""" - if len(input_functions) != len(self._input_functions): - raise ValueError(f"Wrong number of inputs supplied to simulation (expected {len(self._input_functions)}, got {len(input_functions)})") - self._input_functions = input_functions + if len(input_providers) != self._sfg.input_count: + raise ValueError(f"Wrong number of inputs supplied to simulation (expected {self._sfg.input_count}, got {len(input_providers)})") + self._input_functions = [None for _ in range(self._sfg.input_count)] + for index, input_provider in enumerate(input_providers): + self.set_input(index, input_provider) - def input_function(self, index: int, input_function: Callable[[int], Number]) -> None: + def set_input(self, index: int, input_provider: Union[Sequence[Number], Callable[[int], Number]]) -> None: """Set the input function used to get values for the specific input at the given index to the internal SFG.""" if index < 0 or index >= len(self._input_functions): raise IndexError(f"Input index out of range (expected 0-{self._input_functions - 1}, got {index})") - self._input_functions[index] = input_function + + if callable(input_provider): + self._input_functions[index] = input_provider + else: + self._input_functions[index] = lambda n: input_provider[n] def run(self) -> Sequence[Number]: """Run one iteration of the simulation and return the resulting output values.""" diff --git a/test/fixtures/signal_flow_graph.py b/test/fixtures/signal_flow_graph.py index 06cfed56f8dc95333b53a10ff012fc4baa3cf7f9..af41262a4a240c2c69671de089b70dd9f512ece2 100644 --- a/test/fixtures/signal_flow_graph.py +++ b/test/fixtures/signal_flow_graph.py @@ -1,11 +1,11 @@ import pytest -from b_asic import SFG, Input, Output, Addition +from b_asic import SFG, Input, Output, Constant, Register @pytest.fixture def sfg_two_inputs_two_outputs(): - """Valid SFG containing two inputs and two outputs. + """Valid SFG with two inputs and two outputs. . . in1>------+ +---------------out1> . | | . @@ -19,12 +19,42 @@ def sfg_two_inputs_two_outputs(): out1 = in1 + in2 out2 = in1 + 2 * in2 """ - in1=Input() - in2=Input() - add1=Addition(in1, in2) - add2=Addition(add1, in2) - out1=Output(add1) - out2=Output(add2) + in1 = Input() + in2 = Input() + add1 = in1 + in2 + add2 = add1 + in2 + out1 = Output(add1) + out2 = Output(add2) return SFG(inputs = [in1, in2], outputs = [out1, out2]) -# TODO: Testa nestad sfg \ No newline at end of file +@pytest.fixture +def sfg_nested(): + """Valid SFG with two inputs and one output. + out1 = in1 + (in1 + in1 * in2) * (in1 + in2 * (in1 + in1 * in2)) + """ + mac_in1 = Input() + mac_in2 = Input() + mac_in3 = Input() + mac_out1 = Output(mac_in1 + mac_in2 * mac_in3) + MAC = SFG(inputs = [mac_in1, mac_in2, mac_in3], outputs=[mac_out1]) + + in1 = Input() + in2 = Input() + mac1 = MAC(in1, in1, in2) + mac2 = MAC(in1, in2, mac1) + mac3 = MAC(in1, mac1, mac2) + out1 = Output(mac3) + return SFG(inputs = [in1, in2], outputs=[out1]) + +@pytest.fixture +def sfg_accumulator(): + """Valid SFG with two inputs and one output. + data_out = (data_in' + data_in) * (1 - reset) + """ + data_in = Input() + reset = Input() + reset_inverted = Constant(1) - reset + reg = Register() + reg.input(0).connect((reg + data_in) * reset_inverted) + data_out = Output(reg) + return SFG(inputs = [data_in, reset], outputs = [data_out]) \ No newline at end of file diff --git a/test/test_simulation.py b/test/test_simulation.py index 4c0533103201d847a4043a616cad1fb70f896955..7adccda4b7162edac00de9d9f7d876305bd35715 100644 --- a/test/test_simulation.py +++ b/test/test_simulation.py @@ -1,13 +1,21 @@ +import pytest +import numpy as np + from b_asic import SFG, Output, Simulation class TestSimulation: - def test_simulate(self, sfg_two_inputs_two_outputs): + def test_simulate_with_lambdas_as_input(self, sfg_two_inputs_two_outputs): simulation = Simulation(sfg_two_inputs_two_outputs, [lambda n: n + 3, lambda n: 1 + n * 2]) + output = simulation.run_for(101) + assert output[0] == 304 assert output[1] == 505 + assert simulation.results[100]["0"] == 304 + assert simulation.results[100]["1"] == 505 + assert simulation.results[0]["in1"] == 3 assert simulation.results[0]["in2"] == 1 assert simulation.results[0]["add1"] == 4 @@ -35,3 +43,79 @@ class TestSimulation: assert simulation.results[3]["add2"] == 20 assert simulation.results[3]["0"] == 13 assert simulation.results[3]["1"] == 20 + + def test_simulate_with_numpy_arrays_as_input(self, sfg_two_inputs_two_outputs): + input0 = np.array([5, 9, 25, -5, 7]) + input1 = np.array([7, 3, 3, 54, 2]) + simulation = Simulation(sfg_two_inputs_two_outputs, [input0, input1]) + + output = simulation.run_for(5) + + assert output[0] == 9 + assert output[1] == 11 + + assert simulation.results[4]["0"] == 9 + assert simulation.results[4]["1"] == 11 + + assert simulation.results[0]["in1"] == 5 + assert simulation.results[0]["in2"] == 7 + assert simulation.results[0]["add1"] == 12 + assert simulation.results[0]["add2"] == 19 + assert simulation.results[0]["0"] == 12 + assert simulation.results[0]["1"] == 19 + + assert simulation.results[1]["in1"] == 9 + assert simulation.results[1]["in2"] == 3 + assert simulation.results[1]["add1"] == 12 + assert simulation.results[1]["add2"] == 15 + assert simulation.results[1]["0"] == 12 + assert simulation.results[1]["1"] == 15 + + assert simulation.results[2]["in1"] == 25 + assert simulation.results[2]["in2"] == 3 + assert simulation.results[2]["add1"] == 28 + assert simulation.results[2]["add2"] == 31 + assert simulation.results[2]["0"] == 28 + assert simulation.results[2]["1"] == 31 + + assert simulation.results[3]["in1"] == -5 + assert simulation.results[3]["in2"] == 54 + assert simulation.results[3]["add1"] == 49 + assert simulation.results[3]["add2"] == 103 + assert simulation.results[3]["0"] == 49 + assert simulation.results[3]["1"] == 103 + + def test_simulate_with_numpy_array_overflow(self, sfg_two_inputs_two_outputs): + input0 = np.array([5, 9, 25, -5, 7]) + input1 = np.array([7, 3, 3, 54, 2]) + simulation = Simulation(sfg_two_inputs_two_outputs, [input0, input1]) + simulation.run_for(5) + with pytest.raises(IndexError): + simulation.run_for(1) + + def test_simulate_nested(self, sfg_nested): + input0 = np.array([5, 9]) + input1 = np.array([7, 3]) + simulation = Simulation(sfg_nested, [input0, input1]) + + output1 = simulation.run() + output2 = simulation.run() + + assert output1[0] == 11405 + assert output2[0] == 8109 + + def test_simulate_with_register(self, sfg_accumulator): + data_in = np.array([5, -2, 25, -6, 7, 0]) + reset = np.array([0, 0, 0, 1, 0, 0]) + simulation = Simulation(sfg_accumulator, [data_in, reset]) + + output = simulation.run_for(6) + + assert output[0] == 7 + + assert simulation.results[0]["0"] == 0 + assert simulation.results[1]["0"] == 5 + assert simulation.results[2]["0"] == 3 + assert simulation.results[3]["0"] == 28 + assert simulation.results[4]["0"] == 0 + assert simulation.results[5]["0"] == 7 \ No newline at end of file