Skip to content
Snippets Groups Projects
Commit c001533e authored by Simon Bjurek's avatar Simon Bjurek Committed by Oscar Gustafsson
Browse files

added io times for list schedulers and added tests for 100 percent test...

added io times for list schedulers and added tests for 100 percent test coverage of schedulers, also added another example
parent 82b6e78a
No related branches found
No related tags found
1 merge request!472Add IO times for list schedulers
Pipeline #156563 passed
......@@ -124,7 +124,8 @@ class EarliestDeadlineScheduler(ListScheduler):
deadlines = {}
for op_id, start_time in schedule_copy.start_times.items():
deadlines[op_id] = start_time + schedule.sfg.find_by_id(op_id).latency
if not op_id.startswith("in"):
deadlines[op_id] = start_time + schedule.sfg.find_by_id(op_id).latency
return sorted(deadlines, key=deadlines.get)
......@@ -137,7 +138,10 @@ class LeastSlackTimeScheduler(ListScheduler):
schedule_copy = copy.copy(schedule)
ALAPScheduler().apply_scheduling(schedule_copy)
return sorted(schedule_copy.start_times, key=schedule_copy.start_times.get)
sorted_ops = sorted(
schedule_copy.start_times, key=schedule_copy.start_times.get
)
return [op for op in sorted_ops if not op.startswith("in")]
class MaxFanOutScheduler(ListScheduler):
......@@ -152,7 +156,8 @@ class MaxFanOutScheduler(ListScheduler):
for op_id, start_time in schedule_copy.start_times.items():
fan_outs[op_id] = len(schedule.sfg.find_by_id(op_id).output_signals)
return sorted(fan_outs, key=fan_outs.get, reverse=True)
sorted_ops = sorted(fan_outs, key=fan_outs.get, reverse=True)
return [op for op in sorted_ops if not op.startswith("in")]
class HybridScheduler(ListScheduler):
......@@ -199,4 +204,4 @@ class HybridScheduler(ListScheduler):
sorted_op_list = [pair[0] for pair in fan_out_sorted_items]
return sorted_op_list
return [op for op in sorted_op_list if not op.startswith("in")]
......@@ -119,9 +119,9 @@ class Schedule:
self._remove_delays_no_laps()
max_end_time = self.get_max_end_time()
if schedule_time is None:
if not self._schedule_time:
self._schedule_time = max_end_time
elif schedule_time < max_end_time:
elif self._schedule_time < max_end_time:
raise ValueError(f"Too short schedule time. Minimum is {max_end_time}.")
def __str__(self) -> str:
......
......@@ -45,8 +45,15 @@ class Scheduler(ABC):
class ListScheduler(Scheduler, ABC):
def __init__(self, max_resources: Optional[dict[TypeName, int]] = None) -> None:
if max_resources:
def __init__(
self,
max_resources: Optional[dict[TypeName, int]] = None,
input_times: Optional[dict["GraphID", int]] = None,
output_delta_times: Optional[dict["GraphID", int]] = None,
cyclic: Optional[bool] = False,
) -> None:
super()
if max_resources is not None:
if not isinstance(max_resources, dict):
raise ValueError("max_resources must be a dictionary.")
for key, value in max_resources.items():
......@@ -54,12 +61,13 @@ class ListScheduler(Scheduler, ABC):
raise ValueError("max_resources key must be a valid type_name.")
if not isinstance(value, int):
raise ValueError("max_resources value must be an integer.")
if max_resources:
self._max_resources = max_resources
else:
self._max_resources = {}
self._input_times = input_times if input_times else {}
self._output_delta_times = output_delta_times if output_delta_times else {}
def apply_scheduling(self, schedule: "Schedule") -> None:
"""Applies the scheduling algorithm on the given Schedule.
......@@ -75,9 +83,14 @@ class ListScheduler(Scheduler, ABC):
remaining_resources = self._max_resources.copy()
sorted_operations = self._get_sorted_operations(schedule)
# place all inputs at time 0
# initial input placement
if self._input_times:
for input_id in self._input_times:
start_times[input_id] = self._input_times[input_id]
for input_op in sfg.find_by_type_name(Input.type_name()):
start_times[input_op.graph_id] = 0
if input_op.graph_id not in self._input_times:
start_times[input_op.graph_id] = 0
current_time = 0
while sorted_operations:
......@@ -119,15 +132,17 @@ class ListScheduler(Scheduler, ABC):
sorted_operations.remove(candidate.graph_id)
start_times[candidate.graph_id] = current_time
schedule.set_schedule_time(current_time)
self._handle_outputs(schedule)
if not schedule.cyclic:
max_start_time = max(schedule.start_times.values())
if current_time < max_start_time:
current_time = max_start_time
schedule.set_schedule_time(current_time)
schedule.remove_delays()
# move all inputs ALAP now that operations have moved
for input_op in schedule.sfg.find_by_type_name(Input.type_name()):
input_op = cast(Input, input_op)
schedule.move_operation_alap(input_op.graph_id)
self._handle_inputs(schedule)
# move all dont cares ALAP
for dc_op in schedule.sfg.find_by_type_name(DontCare.type_name()):
......@@ -168,3 +183,28 @@ class ListScheduler(Scheduler, ABC):
@abstractmethod
def _get_sorted_operations(schedule: "Schedule") -> list["GraphID"]:
raise NotImplementedError
def _handle_inputs(self, schedule: "Schedule") -> None:
for input_op in schedule.sfg.find_by_type_name(Input.type_name()):
input_op = cast(Input, input_op)
if input_op.graph_id not in self._input_times:
schedule.move_operation_alap(input_op.graph_id)
def _handle_outputs(
self, schedule: "Schedule", non_schedulable_ops: Optional[list["GraphID"]] = []
) -> None:
super()._handle_outputs(schedule, non_schedulable_ops)
schedule.set_schedule_time(schedule.get_max_end_time())
for output in schedule.sfg.find_by_type_name(Output.type_name()):
output = cast(Output, output)
if output.graph_id in self._output_delta_times:
delta_time = self._output_delta_times[output.graph_id]
if schedule.cyclic:
schedule.start_times[output.graph_id] = schedule.schedule_time
schedule.move_operation(output.graph_id, delta_time)
else:
schedule.start_times[output.graph_id] = (
schedule.schedule_time + delta_time
)
......@@ -415,7 +415,7 @@ def radix_2_dif_fft(points: int) -> SFG:
inputs = []
for i in range(points):
inputs.append(Input(name=f"Input: {i}"))
inputs.append(Input())
ports = inputs
number_of_stages = int(np.log2(points))
......@@ -430,7 +430,7 @@ def radix_2_dif_fft(points: int) -> SFG:
ports = _get_bit_reversed_ports(ports)
outputs = []
for i, port in enumerate(ports):
outputs.append(Output(port, name=f"Output: {i}"))
outputs.append(Output(port))
return SFG(inputs=inputs, outputs=outputs)
......
"""
=========================================
Auto Scheduling With Custom IO times
=========================================
"""
from b_asic.core_operations import Butterfly, ConstantMultiplication
from b_asic.core_schedulers import ASAPScheduler, HybridScheduler
from b_asic.schedule import Schedule
from b_asic.sfg_generators import radix_2_dif_fft
sfg = radix_2_dif_fft(points=8)
# %%
# The SFG is
sfg
# %%
# Set latencies and execution times.
sfg.set_latency_of_type(Butterfly.type_name(), 3)
sfg.set_latency_of_type(ConstantMultiplication.type_name(), 2)
sfg.set_execution_time_of_type(Butterfly.type_name(), 1)
sfg.set_execution_time_of_type(ConstantMultiplication.type_name(), 1)
# %%
# Generate an ASAP schedule for reference
schedule = Schedule(sfg, scheduler=ASAPScheduler())
schedule.show()
# %%
# Generate a non-cyclic Schedule from HybridScheduler with custom IO times.
resources = {Butterfly.type_name(): 1, ConstantMultiplication.type_name(): 1}
input_times = {
"in0": 0,
"in1": 1,
"in2": 2,
"in3": 3,
"in4": 4,
"in5": 5,
"in6": 6,
"in7": 7,
}
output_delta_times = {
"out0": -2,
"out1": -1,
"out2": 0,
"out3": 1,
"out4": 2,
"out5": 3,
"out6": 4,
"out7": 5,
}
schedule = Schedule(
sfg, scheduler=HybridScheduler(resources, input_times, output_delta_times)
)
schedule.show()
# %%
# Generate a new Schedule with cyclic scheduling enabled
output_delta_times = {
"out0": 0,
"out1": 1,
"out2": 2,
"out3": 3,
"out4": 4,
"out5": 5,
"out6": 6,
"out7": 7,
}
schedule = Schedule(
sfg,
scheduler=HybridScheduler(resources, input_times, output_delta_times),
cyclic=True,
)
schedule.show()
......@@ -64,8 +64,29 @@ print("Scheduling time:", schedule.schedule_time)
schedule.show()
# %%
# Create a HybridScheduler schedule that satisfies the resource constraints.
schedule = Schedule(sfg, scheduler=HybridScheduler(resources))
# Create a HybridScheduler schedule that satisfies the resource constraints with custom IO times.
# This is the schedule we will synthesize an architecture for.
input_times = {
"in0": 0,
"in1": 1,
"in2": 2,
"in3": 3,
"in4": 4,
"in5": 5,
}
output_delta_times = {
"out0": 0,
"out1": 1,
"out2": 2,
"out3": 3,
"out4": 4,
"out5": 5,
}
schedule = Schedule(
sfg,
scheduler=HybridScheduler(resources, input_times, output_delta_times),
cyclic=True,
)
print("Scheduling time:", schedule.schedule_time)
schedule.show()
......
import pytest
from b_asic.core_operations import Addition, Butterfly, ConstantMultiplication
from b_asic.core_operations import (
MADS,
Addition,
Butterfly,
ConstantMultiplication,
Reciprocal,
)
from b_asic.core_schedulers import (
ALAPScheduler,
ASAPScheduler,
EarliestDeadlineScheduler,
HybridScheduler,
LeastSlackTimeScheduler,
MaxFanOutScheduler,
)
from b_asic.schedule import Schedule
from b_asic.sfg_generators import direct_form_1_iir, radix_2_dif_fft
from b_asic.sfg_generators import (
direct_form_1_iir,
ldlt_matrix_inverse,
radix_2_dif_fft,
)
class TestASAPScheduler:
......@@ -484,3 +497,584 @@ class TestEarliestDeadlineScheduler:
"out3": 7,
}
assert schedule.schedule_time == 7
class TestLeastSlackTimeScheduler:
def test_empty_sfg(self, sfg_empty):
with pytest.raises(
ValueError, match="Empty signal flow graph cannot be scheduled."
):
Schedule(sfg_empty, scheduler=LeastSlackTimeScheduler())
def test_direct_form_1_iir(self):
sfg = direct_form_1_iir([1, 2, 3], [1, 2, 3])
sfg.set_latency_of_type(ConstantMultiplication.type_name(), 2)
sfg.set_execution_time_of_type(ConstantMultiplication.type_name(), 1)
sfg.set_latency_of_type(Addition.type_name(), 3)
sfg.set_execution_time_of_type(Addition.type_name(), 1)
resources = {Addition.type_name(): 1, ConstantMultiplication.type_name(): 1}
schedule = Schedule(
sfg, scheduler=LeastSlackTimeScheduler(max_resources=resources)
)
assert schedule.start_times == {
"cmul4": 0,
"cmul3": 1,
"in0": 2,
"cmul0": 2,
"add1": 3,
"cmul1": 3,
"cmul2": 4,
"add3": 6,
"add0": 7,
"add2": 10,
"out0": 13,
}
assert schedule.schedule_time == 13
def test_direct_form_2_iir_inf_resources_no_exec_time(
self, sfg_direct_form_iir_lp_filter
):
sfg_direct_form_iir_lp_filter.set_latency_of_type(Addition.type_name(), 5)
sfg_direct_form_iir_lp_filter.set_latency_of_type(
ConstantMultiplication.type_name(), 4
)
schedule = Schedule(
sfg_direct_form_iir_lp_filter, scheduler=LeastSlackTimeScheduler()
)
# should be the same as for ASAP due to infinite resources, except for input
assert schedule.start_times == {
"in0": 9,
"cmul1": 0,
"cmul4": 0,
"cmul2": 0,
"cmul3": 0,
"add3": 4,
"add1": 4,
"add0": 9,
"cmul0": 14,
"add2": 18,
"out0": 23,
}
assert schedule.schedule_time == 23
def test_direct_form_2_iir_1_add_1_mul_no_exec_time(
self, sfg_direct_form_iir_lp_filter
):
sfg_direct_form_iir_lp_filter.set_latency_of_type(Addition.type_name(), 5)
sfg_direct_form_iir_lp_filter.set_latency_of_type(
ConstantMultiplication.type_name(), 4
)
max_resources = {ConstantMultiplication.type_name(): 1, Addition.type_name(): 1}
schedule = Schedule(
sfg_direct_form_iir_lp_filter,
scheduler=LeastSlackTimeScheduler(max_resources),
)
assert schedule.start_times == {
"cmul4": 0,
"cmul3": 4,
"cmul1": 8,
"add1": 8,
"cmul2": 12,
"in0": 13,
"add0": 13,
"add3": 18,
"cmul0": 18,
"add2": 23,
"out0": 28,
}
assert schedule.schedule_time == 28
def test_direct_form_2_iir_1_add_1_mul_exec_time_1(
self, sfg_direct_form_iir_lp_filter
):
sfg_direct_form_iir_lp_filter.set_latency_of_type(
ConstantMultiplication.type_name(), 3
)
sfg_direct_form_iir_lp_filter.set_latency_of_type(Addition.type_name(), 2)
sfg_direct_form_iir_lp_filter.set_execution_time_of_type(
ConstantMultiplication.type_name(), 1
)
sfg_direct_form_iir_lp_filter.set_execution_time_of_type(
Addition.type_name(), 1
)
max_resources = {ConstantMultiplication.type_name(): 1, Addition.type_name(): 1}
schedule = Schedule(
sfg_direct_form_iir_lp_filter,
scheduler=LeastSlackTimeScheduler(max_resources),
)
assert schedule.start_times == {
"cmul4": 0,
"cmul3": 1,
"cmul1": 2,
"cmul2": 3,
"add1": 4,
"in0": 6,
"add0": 6,
"add3": 7,
"cmul0": 8,
"add2": 11,
"out0": 13,
}
assert schedule.schedule_time == 13
def test_direct_form_2_iir_2_add_3_mul_exec_time_1(
self, sfg_direct_form_iir_lp_filter
):
sfg_direct_form_iir_lp_filter.set_latency_of_type(
ConstantMultiplication.type_name(), 3
)
sfg_direct_form_iir_lp_filter.set_latency_of_type(Addition.type_name(), 2)
sfg_direct_form_iir_lp_filter.set_execution_time_of_type(
ConstantMultiplication.type_name(), 1
)
sfg_direct_form_iir_lp_filter.set_execution_time_of_type(
Addition.type_name(), 1
)
max_resources = {ConstantMultiplication.type_name(): 3, Addition.type_name(): 2}
schedule = Schedule(
sfg_direct_form_iir_lp_filter,
scheduler=LeastSlackTimeScheduler(max_resources),
)
assert schedule.start_times == {
"cmul1": 0,
"cmul4": 0,
"cmul3": 0,
"cmul2": 1,
"add1": 3,
"add3": 4,
"in0": 5,
"add0": 5,
"cmul0": 7,
"add2": 10,
"out0": 12,
}
assert schedule.schedule_time == 12
def test_radix_2_fft_8_points(self):
sfg = radix_2_dif_fft(points=8)
sfg.set_latency_of_type(ConstantMultiplication.type_name(), 2)
sfg.set_execution_time_of_type(ConstantMultiplication.type_name(), 1)
sfg.set_latency_of_type(Butterfly.type_name(), 1)
sfg.set_execution_time_of_type(Butterfly.type_name(), 1)
resources = {Butterfly.type_name(): 2, ConstantMultiplication.type_name(): 2}
schedule = Schedule(
sfg, scheduler=LeastSlackTimeScheduler(max_resources=resources)
)
assert schedule.start_times == {
"in1": 0,
"in3": 0,
"in5": 0,
"in7": 0,
"bfly6": 0,
"bfly8": 0,
"in2": 1,
"in6": 1,
"cmul2": 1,
"cmul3": 1,
"bfly11": 1,
"bfly7": 1,
"in0": 2,
"in4": 2,
"cmul0": 2,
"bfly0": 2,
"cmul4": 2,
"bfly5": 3,
"bfly1": 3,
"cmul1": 4,
"bfly2": 4,
"bfly9": 4,
"bfly10": 5,
"bfly3": 5,
"out0": 5,
"out4": 5,
"bfly4": 6,
"out1": 6,
"out2": 6,
"out5": 6,
"out6": 6,
"out7": 7,
"out3": 7,
}
assert schedule.schedule_time == 7
class TestMaxFanOutScheduler:
def test_empty_sfg(self, sfg_empty):
with pytest.raises(
ValueError, match="Empty signal flow graph cannot be scheduled."
):
Schedule(sfg_empty, scheduler=MaxFanOutScheduler())
def test_direct_form_1_iir(self):
sfg = direct_form_1_iir([1, 2, 3], [1, 2, 3])
sfg.set_latency_of_type(ConstantMultiplication.type_name(), 2)
sfg.set_execution_time_of_type(ConstantMultiplication.type_name(), 1)
sfg.set_latency_of_type(Addition.type_name(), 3)
sfg.set_execution_time_of_type(Addition.type_name(), 1)
resources = {Addition.type_name(): 1, ConstantMultiplication.type_name(): 1}
schedule = Schedule(sfg, scheduler=MaxFanOutScheduler(max_resources=resources))
assert schedule.start_times == {
"in0": 0,
"cmul0": 0,
"cmul1": 1,
"cmul2": 2,
"cmul4": 3,
"cmul3": 4,
"add3": 4,
"add1": 6,
"add0": 9,
"add2": 12,
"out0": 15,
}
assert schedule.schedule_time == 15
class TestHybridScheduler:
def test_empty_sfg(self, sfg_empty):
with pytest.raises(
ValueError, match="Empty signal flow graph cannot be scheduled."
):
Schedule(sfg_empty, scheduler=HybridScheduler())
def test_direct_form_1_iir(self):
sfg = direct_form_1_iir([1, 2, 3], [1, 2, 3])
sfg.set_latency_of_type(ConstantMultiplication.type_name(), 2)
sfg.set_execution_time_of_type(ConstantMultiplication.type_name(), 1)
sfg.set_latency_of_type(Addition.type_name(), 3)
sfg.set_execution_time_of_type(Addition.type_name(), 1)
resources = {Addition.type_name(): 1, ConstantMultiplication.type_name(): 1}
schedule = Schedule(sfg, scheduler=HybridScheduler(max_resources=resources))
assert schedule.start_times == {
"cmul4": 0,
"cmul3": 1,
"in0": 2,
"cmul0": 2,
"add1": 3,
"cmul1": 3,
"cmul2": 4,
"add3": 6,
"add0": 7,
"add2": 10,
"out0": 13,
}
assert schedule.schedule_time == 13
def test_radix_2_fft_8_points(self):
sfg = radix_2_dif_fft(points=8)
sfg.set_latency_of_type(ConstantMultiplication.type_name(), 2)
sfg.set_execution_time_of_type(ConstantMultiplication.type_name(), 1)
sfg.set_latency_of_type(Butterfly.type_name(), 1)
sfg.set_execution_time_of_type(Butterfly.type_name(), 1)
resources = {Butterfly.type_name(): 2, ConstantMultiplication.type_name(): 2}
schedule = Schedule(sfg, scheduler=HybridScheduler(max_resources=resources))
assert schedule.start_times == {
"in1": 0,
"in3": 0,
"in5": 0,
"in7": 0,
"bfly6": 0,
"bfly8": 0,
"in2": 1,
"in6": 1,
"cmul2": 1,
"cmul3": 1,
"bfly11": 1,
"bfly7": 1,
"in0": 2,
"in4": 2,
"cmul0": 2,
"bfly0": 2,
"cmul4": 2,
"bfly5": 3,
"bfly1": 3,
"cmul1": 4,
"bfly2": 4,
"bfly9": 4,
"bfly10": 5,
"bfly3": 5,
"out0": 5,
"out4": 5,
"bfly4": 6,
"out1": 6,
"out2": 6,
"out5": 6,
"out6": 6,
"out7": 7,
"out3": 7,
}
assert schedule.schedule_time == 7
def test_radix_2_fft_8_points_specified_IO_times_cyclic(self):
sfg = radix_2_dif_fft(points=8)
sfg.set_latency_of_type(Butterfly.type_name(), 3)
sfg.set_latency_of_type(ConstantMultiplication.type_name(), 2)
sfg.set_execution_time_of_type(Butterfly.type_name(), 1)
sfg.set_execution_time_of_type(ConstantMultiplication.type_name(), 1)
resources = {Butterfly.type_name(): 1, ConstantMultiplication.type_name(): 1}
input_times = {
"in0": 0,
"in1": 1,
"in2": 2,
"in3": 3,
"in4": 4,
"in5": 5,
"in6": 6,
"in7": 7,
}
output_times = {
"out0": -2,
"out1": -1,
"out2": 0,
"out3": 1,
"out4": 2,
"out5": 3,
"out6": 4,
"out7": 5,
}
schedule = Schedule(
sfg,
scheduler=HybridScheduler(resources, input_times, output_times),
cyclic=True,
)
assert schedule.start_times == {
"in0": 0,
"in1": 1,
"in2": 2,
"in3": 3,
"in4": 4,
"in5": 5,
"in6": 6,
"in7": 7,
"bfly0": 4,
"bfly8": 5,
"bfly11": 6,
"bfly6": 7,
"cmul2": 8,
"cmul0": 9,
"bfly1": 9,
"cmul3": 10,
"bfly7": 10,
"bfly2": 11,
"bfly5": 12,
"cmul4": 13,
"bfly9": 13,
"bfly10": 15,
"cmul1": 15,
"bfly3": 16,
"bfly4": 17,
"out0": 18,
"out1": 19,
"out2": 20,
"out3": 1,
"out4": 2,
"out5": 3,
"out6": 4,
"out7": 5,
}
assert schedule.schedule_time == 20
def test_radix_2_fft_8_points_specified_IO_times_non_cyclic(self):
sfg = radix_2_dif_fft(points=8)
sfg.set_latency_of_type(Butterfly.type_name(), 3)
sfg.set_latency_of_type(ConstantMultiplication.type_name(), 2)
sfg.set_execution_time_of_type(Butterfly.type_name(), 1)
sfg.set_execution_time_of_type(ConstantMultiplication.type_name(), 1)
resources = {Butterfly.type_name(): 1, ConstantMultiplication.type_name(): 1}
input_times = {
"in0": 0,
"in1": 1,
"in2": 2,
"in3": 3,
"in4": 4,
"in5": 5,
"in6": 6,
"in7": 7,
}
output_times = {
"out0": -2,
"out1": -1,
"out2": 0,
"out3": 1,
"out4": 2,
"out5": 3,
"out6": 4,
"out7": 5,
}
schedule = Schedule(
sfg,
scheduler=HybridScheduler(resources, input_times, output_times),
cyclic=False,
)
assert schedule.start_times == {
"in0": 0,
"in1": 1,
"in2": 2,
"in3": 3,
"in4": 4,
"in5": 5,
"in6": 6,
"in7": 7,
"bfly0": 4,
"bfly8": 5,
"bfly11": 6,
"bfly6": 7,
"cmul2": 8,
"cmul0": 9,
"bfly1": 9,
"cmul3": 10,
"bfly7": 10,
"bfly2": 11,
"bfly5": 12,
"cmul4": 13,
"bfly9": 13,
"bfly10": 15,
"cmul1": 15,
"bfly3": 16,
"bfly4": 17,
"out0": 18,
"out1": 19,
"out2": 20,
"out3": 21,
"out4": 22,
"out5": 23,
"out6": 24,
"out7": 25,
}
assert schedule.schedule_time == 25
def test_ldlt_inverse_2x2(self):
sfg = ldlt_matrix_inverse(N=2)
sfg.set_latency_of_type(MADS.type_name(), 3)
sfg.set_latency_of_type(Reciprocal.type_name(), 2)
sfg.set_execution_time_of_type(MADS.type_name(), 1)
sfg.set_execution_time_of_type(Reciprocal.type_name(), 1)
resources = {MADS.type_name(): 1, Reciprocal.type_name(): 1}
schedule = Schedule(
sfg,
scheduler=HybridScheduler(resources),
)
assert schedule.start_times == {
"in0": 0,
"rec0": 0,
"in1": 2,
"dontcare1": 2,
"mads0": 2,
"in2": 5,
"mads3": 5,
"rec1": 8,
"dontcare0": 10,
"mads2": 10,
"mads1": 13,
"out2": 10,
"out1": 13,
"out0": 16,
}
assert schedule.schedule_time == 16
def test_ldlt_inverse_2x2_specified_IO_times_cyclic(self):
sfg = ldlt_matrix_inverse(N=2)
sfg.set_latency_of_type(MADS.type_name(), 3)
sfg.set_latency_of_type(Reciprocal.type_name(), 2)
sfg.set_execution_time_of_type(MADS.type_name(), 1)
sfg.set_execution_time_of_type(Reciprocal.type_name(), 1)
resources = {MADS.type_name(): 1, Reciprocal.type_name(): 1}
input_times = {
"in0": 0,
"in1": 1,
"in2": 2,
}
output_times = {
"out0": 0,
"out1": 1,
"out2": 2,
}
schedule = Schedule(
sfg,
scheduler=HybridScheduler(resources, input_times, output_times),
cyclic=True,
)
assert schedule.start_times == {
"in0": 0,
"in1": 1,
"in2": 2,
"rec0": 0,
"dontcare1": 2,
"mads0": 2,
"mads3": 5,
"rec1": 8,
"dontcare0": 10,
"mads2": 10,
"mads1": 13,
"out0": 16,
"out1": 1,
"out2": 2,
}
assert schedule.schedule_time == 16
def test_max_invalid_resources(self):
sfg = ldlt_matrix_inverse(N=2)
sfg.set_latency_of_type(MADS.type_name(), 3)
sfg.set_latency_of_type(Reciprocal.type_name(), 2)
sfg.set_execution_time_of_type(MADS.type_name(), 1)
sfg.set_execution_time_of_type(Reciprocal.type_name(), 1)
resources = 2
with pytest.raises(ValueError, match="max_resources must be a dictionary."):
Schedule(sfg, scheduler=HybridScheduler(resources))
resources = "test"
with pytest.raises(ValueError, match="max_resources must be a dictionary."):
Schedule(sfg, scheduler=HybridScheduler(resources))
resources = []
with pytest.raises(ValueError, match="max_resources must be a dictionary."):
Schedule(sfg, scheduler=HybridScheduler(resources))
resources = {1: 1}
with pytest.raises(
ValueError, match="max_resources key must be a valid type_name."
):
Schedule(sfg, scheduler=HybridScheduler(resources))
resources = {MADS.type_name(): "test"}
with pytest.raises(ValueError, match="max_resources value must be an integer."):
Schedule(sfg, scheduler=HybridScheduler(resources))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment