diff --git a/b_asic/schedule.py b/b_asic/schedule.py index 1d72ce0b1ee2d3f7889c29f1fd418122f118431c..114b44961c2dd5104ecd2565632091e88f6a26d1 100644 --- a/b_asic/schedule.py +++ b/b_asic/schedule.py @@ -117,7 +117,6 @@ class Schedule: self._start_times = start_times self._laps.update(laps) self._remove_delays_no_laps() - max_end_time = self.get_max_end_time() if not self._schedule_time: self._schedule_time = max_end_time @@ -176,11 +175,14 @@ class Schedule: max_end_time = 0 for graph_id, op_start_time in self._start_times.items(): operation = cast(Operation, self._sfg.find_by_id(graph_id)) - for outport in operation.outputs: - max_end_time = max( - max_end_time, - op_start_time + cast(int, outport.latency_offset), - ) + if graph_id.startswith("out"): + max_end_time = max(max_end_time, op_start_time) + else: + for outport in operation.outputs: + max_end_time = max( + max_end_time, + op_start_time + cast(int, outport.latency_offset), + ) return max_end_time def forward_slack(self, graph_id: GraphID) -> int: @@ -655,6 +657,44 @@ class Schedule: max_pos_graph_id = max(self._y_locations, key=self._y_locations.get) return self._get_y_position(max_pos_graph_id, operation_height, operation_gap) + def place_operation(self, op: Operation, time: int) -> None: + """Schedule the given operation in given time. + + Parameters + ---------- + op : Operation + Operation to schedule. + time : int + Time slot to schedule the operation in. + If time > schedule_time -> schedule cyclically. + """ + start = time % self._schedule_time if self._schedule_time else time + self._start_times[op.graph_id] = start + + if not self.schedule_time: + return + + # Update input laps + input_slacks = self._backward_slacks(op.graph_id) + for in_port, signal_slacks in input_slacks.items(): + for signal, signal_slack in signal_slacks.items(): + new_slack = signal_slack + laps = 0 + while new_slack < 0: + laps += 1 + new_slack += self._schedule_time + self._laps[signal.graph_id] = laps + + if ( + start == 0 + and isinstance(op, Output) + and self._laps[op.input(0).signals[0].graph_id] != 0 + ): + start = self._schedule_time + self._laps[op.input(0).signals[0].graph_id] -= 1 + + self._start_times[op.graph_id] = start + def move_operation(self, graph_id: GraphID, time: int) -> "Schedule": """ Move an operation in the schedule. diff --git a/b_asic/scheduler.py b/b_asic/scheduler.py index 311c0634a01af55a394eb92649d262327c68871f..1b8477ed256dbc635fc17c6ee4e118cb1ceaa791 100644 --- a/b_asic/scheduler.py +++ b/b_asic/scheduler.py @@ -1,6 +1,7 @@ import copy import sys from abc import ABC, abstractmethod +from math import ceil from typing import TYPE_CHECKING, Optional, cast from b_asic.core_operations import DontCare @@ -11,7 +12,6 @@ from b_asic.types import TypeName if TYPE_CHECKING: from b_asic.operation import Operation from b_asic.schedule import Schedule - from b_asic.signal_flow_graph import SFG from b_asic.types import GraphID @@ -157,6 +157,9 @@ class ALAPScheduler(Scheduler): class ListScheduler(Scheduler, ABC): + + TIME_OUT_COUNTER_LIMIT = 100 + def __init__( self, max_resources: Optional[dict[TypeName, int]] = None, @@ -179,6 +182,11 @@ class ListScheduler(Scheduler, ABC): else: self._max_resources = {} + if Input.type_name() not in self._max_resources: + self._max_resources[Input.type_name()] = 1 + if Output.type_name() not in self._max_resources: + self._max_resources[Output.type_name()] = 1 + self._max_concurrent_reads = max_concurrent_reads or sys.maxsize self._max_concurrent_writes = max_concurrent_writes or sys.maxsize @@ -198,127 +206,160 @@ class ListScheduler(Scheduler, ABC): schedule : Schedule Schedule to apply the scheduling algorithm on. """ - sfg = schedule.sfg + self._schedule = schedule + self._sfg = schedule.sfg - alap_schedule = copy.copy(schedule) + if self._schedule.cyclic and self._schedule.schedule_time is None: + raise ValueError("Scheduling time must be provided when cyclic = True.") + + for resource_type, resource_amount in self._max_resources.items(): + total_exec_time = sum( + [op.execution_time for op in self._sfg.find_by_type_name(resource_type)] + ) + if self._schedule.schedule_time is not None: + resource_lower_bound = ceil( + total_exec_time / self._schedule.schedule_time + ) + if resource_amount < resource_lower_bound: + raise ValueError( + f"Amount of resource: {resource_type} is not enough to " + f"realize schedule for scheduling time: {self._schedule.schedule_time}" + ) + + alap_schedule = copy.copy(self._schedule) ALAPScheduler().apply_scheduling(alap_schedule) alap_start_times = alap_schedule.start_times - schedule.start_times = {} + self._schedule.start_times = {} + + if not self._schedule.cyclic and self._schedule.schedule_time: + if alap_schedule.schedule_time > self._schedule.schedule_time: + raise ValueError( + f"Provided scheduling time {schedule.schedule_time} cannot be reached, " + "try to enable the cyclic property or increase the time to at least " + f"{alap_schedule.schedule_time}." + ) used_resources_ready_times = {} remaining_resources = self._max_resources.copy() - if Input.type_name() not in remaining_resources: - remaining_resources[Input.type_name()] = 1 - if Output.type_name() not in remaining_resources: - remaining_resources[Output.type_name()] = 1 remaining_ops = ( - sfg.operations - + sfg.find_by_type_name(Input.type_name()) - + sfg.find_by_type_name(Output.type_name()) + self._sfg.operations + + self._sfg.find_by_type_name(Input.type_name()) + + self._sfg.find_by_type_name(Output.type_name()) ) remaining_ops = [op.graph_id for op in remaining_ops] - schedule.start_times = {} - remaining_reads = self._max_concurrent_reads + self._schedule.start_times = {} + self.remaining_reads = self._max_concurrent_reads # initial input placement if self._input_times: for input_id in self._input_times: - schedule.start_times[input_id] = self._input_times[input_id] + self._schedule.start_times[input_id] = self._input_times[input_id] remaining_ops = [ elem for elem in remaining_ops if not elem.startswith("in") ] remaining_ops = [op for op in remaining_ops if not op.startswith("dontcare")] remaining_ops = [op for op in remaining_ops if not op.startswith("t")] + remaining_ops = [ + op + for op in remaining_ops + if not (op.startswith("out") and op in self._output_delta_times) + ] + + self._current_time = 0 + self._time_out_counter = 0 + self._op_laps = {} - current_time = 0 - time_out_counter = 0 while remaining_ops: ready_ops_priority_table = self._get_ready_ops_priority_table( - sfg, - schedule.start_times, - current_time, alap_start_times, remaining_ops, remaining_resources, - remaining_reads, ) while ready_ops_priority_table: - next_op = sfg.find_by_id(self._get_next_op_id(ready_ops_priority_table)) + next_op = self._sfg.find_by_id( + self._get_next_op_id(ready_ops_priority_table) + ) if next_op.type_name() in remaining_resources: remaining_resources[next_op.type_name()] -= 1 - if ( - next_op.type_name() == Input.type_name() - or next_op.type_name() == Output.type_name() - ): - used_resources_ready_times[next_op] = current_time + 1 - else: - used_resources_ready_times[next_op] = ( - current_time + next_op.execution_time - ) - remaining_reads -= next_op.input_count + used_resources_ready_times[next_op] = self._current_time + 1 + + self.remaining_reads -= next_op.input_count remaining_ops = [ op_id for op_id in remaining_ops if op_id != next_op.graph_id ] - schedule.start_times[next_op.graph_id] = current_time + + self._schedule.place_operation(next_op, self._current_time) + self._op_laps[next_op.graph_id] = ( + (self._current_time + next_op.latency) + // self._schedule.schedule_time + if self._schedule.schedule_time + else 0 + ) + if not self._schedule.cyclic and self._schedule.schedule_time: + if self._current_time > self._schedule.schedule_time: + raise ValueError( + f"Provided scheduling time {schedule.schedule_time} cannot be reached, " + "try to enable the cyclic property or increase the time." + ) ready_ops_priority_table = self._get_ready_ops_priority_table( - sfg, - schedule.start_times, - current_time, alap_start_times, remaining_ops, remaining_resources, - remaining_reads, ) - current_time += 1 - time_out_counter += 1 - if time_out_counter >= 100: - raise TimeoutError( - "Algorithm did not schedule any operation for 10 time steps, " - "try relaxing constraints." - ) + self._go_to_next_time_step() ready_ops_priority_table = self._get_ready_ops_priority_table( - sfg, - schedule.start_times, - current_time, alap_start_times, remaining_ops, remaining_resources, - remaining_reads, ) + # update available reads and operators - remaining_reads = self._max_concurrent_reads + self.remaining_reads = self._max_concurrent_reads for operation, ready_time in used_resources_ready_times.items(): - if ready_time == current_time: + if ready_time >= self._current_time: remaining_resources[operation.type_name()] += 1 - current_time -= 1 + used_resources_ready_times = dict( + [ + pair + for pair in used_resources_ready_times.items() + if pair[1] > self._current_time + ] + ) + + self._current_time -= 1 - self._handle_outputs(schedule) + self._handle_outputs() - if not schedule.cyclic: - max_start_time = max(schedule.start_times.values()) - if current_time < max_start_time: - current_time = max_start_time - current_time = max(current_time, schedule.get_max_end_time()) - schedule.set_schedule_time(current_time) + if self._schedule.schedule_time is None: + self._schedule.set_schedule_time(self._schedule.get_max_end_time()) - schedule.remove_delays() + self._schedule.remove_delays() # schedule all dont cares ALAP - for dc_op in sfg.find_by_type_name(DontCare.type_name()): + for dc_op in self._sfg.find_by_type_name(DontCare.type_name()): dc_op = cast(DontCare, dc_op) - schedule.start_times[dc_op.graph_id] = 0 - schedule.move_operation_alap(dc_op.graph_id) + self._schedule.start_times[dc_op.graph_id] = 0 + self._schedule.move_operation_alap(dc_op.graph_id) - schedule.sort_y_locations_on_start_times() + self._schedule.sort_y_locations_on_start_times() + + def _go_to_next_time_step(self): + self._time_out_counter += 1 + if self._time_out_counter >= self.TIME_OUT_COUNTER_LIMIT: + raise TimeoutError( + "Algorithm did not manage to schedule any operation for 10 time steps, " + "try relaxing the constraints." + ) + self._current_time += 1 def _get_next_op_id( self, ready_ops_priority_table: list[tuple["GraphID", int, ...]] @@ -334,35 +375,24 @@ class ListScheduler(Scheduler, ABC): def _get_ready_ops_priority_table( self, - sfg: "SFG", - start_times: dict["GraphID", int], - current_time: int, alap_start_times: dict["GraphID", int], remaining_ops: list["GraphID"], remaining_resources: dict["GraphID", int], - remaining_reads: int, ) -> list[tuple["GraphID", int, int, int]]: ready_ops = [ op_id for op_id in remaining_ops if self._op_is_schedulable( - start_times, - sfg, - sfg.find_by_id(op_id), - current_time, + self._sfg.find_by_id(op_id), remaining_resources, - remaining_reads, - self._max_concurrent_writes, remaining_ops, ) ] - deadlines = self._calculate_deadlines(sfg, alap_start_times) - output_slacks = self._calculate_alap_output_slacks( - current_time, alap_start_times - ) - fan_outs = self._calculate_fan_outs(sfg, alap_start_times) + deadlines = self._calculate_deadlines(alap_start_times) + output_slacks = self._calculate_alap_output_slacks(alap_start_times) + fan_outs = self._calculate_fan_outs(alap_start_times) ready_ops_priority_table = [] for op_id in ready_ops: @@ -372,38 +402,33 @@ class ListScheduler(Scheduler, ABC): return ready_ops_priority_table def _calculate_deadlines( - self, sfg, alap_start_times: dict["GraphID", int] + self, alap_start_times: dict["GraphID", int] ) -> dict["GraphID", int]: return { - op_id: start_time + sfg.find_by_id(op_id).latency + op_id: start_time + self._sfg.find_by_id(op_id).latency for op_id, start_time in alap_start_times.items() } def _calculate_alap_output_slacks( - self, current_time: int, alap_start_times: dict["GraphID", int] + self, alap_start_times: dict["GraphID", int] ) -> dict["GraphID", int]: return { - op_id: start_time - current_time + op_id: start_time - self._current_time for op_id, start_time in alap_start_times.items() } def _calculate_fan_outs( - self, sfg: "SFG", alap_start_times: dict["GraphID", int] + self, alap_start_times: dict["GraphID", int] ) -> dict["GraphID", int]: return { - op_id: len(sfg.find_by_id(op_id).output_signals) + op_id: len(self._sfg.find_by_id(op_id).output_signals) for op_id, start_time in alap_start_times.items() } - @staticmethod def _op_is_schedulable( - start_times: dict["GraphID"], - sfg: "SFG", + self, op: "Operation", - current_time: int, remaining_resources: dict["GraphID", int], - remaining_reads: int, - max_concurrent_writes: int, remaining_ops: list["GraphID"], ) -> bool: if ( @@ -412,18 +437,18 @@ class ListScheduler(Scheduler, ABC): ): return False - op_finish_time = current_time + op.latency + op_finish_time = self._current_time + op.latency future_ops = [ - sfg.find_by_id(item[0]) - for item in start_times.items() - if item[1] + sfg.find_by_id(item[0]).latency == op_finish_time + self._sfg.find_by_id(item[0]) + for item in self._schedule.start_times.items() + if item[1] + self._sfg.find_by_id(item[0]).latency == op_finish_time ] future_ops_writes = sum([op.input_count for op in future_ops]) if ( not op.graph_id.startswith("out") - and future_ops_writes >= max_concurrent_writes + and future_ops_writes >= self._max_concurrent_writes ): return False @@ -439,32 +464,58 @@ class ListScheduler(Scheduler, ABC): if source_op_graph_id in remaining_ops: return False - if start_times[source_op_graph_id] != current_time - 1: + if self._schedule.start_times[source_op_graph_id] != self._current_time - 1: # not a direct connection -> memory read required read_counter += 1 - if read_counter > remaining_reads: + if read_counter > self.remaining_reads: return False - proceeding_op_start_time = start_times.get(source_op_graph_id) - proceeding_op_finish_time = proceeding_op_start_time + source_op.latency + if self._schedule.schedule_time is not None: + proceeding_op_start_time = ( + self._schedule.start_times.get(source_op_graph_id) + + self._op_laps[source_op.graph_id] * self._schedule.schedule_time + ) + proceeding_op_finish_time = proceeding_op_start_time + source_op.latency + else: + proceeding_op_start_time = self._schedule.start_times.get( + source_op_graph_id + ) + proceeding_op_finish_time = proceeding_op_start_time + source_op.latency earliest_start_time = max(earliest_start_time, proceeding_op_finish_time) - return earliest_start_time <= current_time - - def _handle_outputs( - self, schedule: "Schedule", non_schedulable_ops: Optional[list["GraphID"]] = [] - ) -> None: - schedule.set_schedule_time(schedule.get_max_end_time()) + return earliest_start_time <= self._current_time - for output in schedule.sfg.find_by_type_name(Output.type_name()): + def _handle_outputs(self) -> None: + end = self._schedule.get_max_end_time() + for output in self._sfg.find_by_type_name(Output.type_name()): output = cast(Output, output) if output.graph_id in self._output_delta_times: delta_time = self._output_delta_times[output.graph_id] - if schedule.cyclic: - schedule.start_times[output.graph_id] = schedule.schedule_time - schedule.move_operation(output.graph_id, delta_time) + + new_time = end + delta_time + + if self._schedule.cyclic and self._schedule.schedule_time is not None: + self._schedule.start_times[output.graph_id] = ( + self._schedule.schedule_time + ) + self._schedule.move_operation(output.graph_id, delta_time) else: - schedule.start_times[output.graph_id] = ( - schedule.schedule_time + delta_time + self._schedule.start_times[output.graph_id] = new_time + + count = -1 + for op_id, time in self._schedule.start_times.items(): + if time == new_time and op_id.startswith("out"): + count += 1 + + remaining_resources = self._max_resources + remaining_resources[Output.type_name()] -= count + + self._current_time = new_time + if not self._op_is_schedulable(output, remaining_resources, {}): + raise ValueError( + "Cannot schedule outputs according to the provided output_delta_times. " + f"Failed output: {output.graph_id}, " + f"at time: { self._schedule.start_times[output.graph_id]}, " + "try relaxing the constraints." ) diff --git a/test/unit/test_list_schedulers.py b/test/unit/test_list_schedulers.py index 136e285ad4622edc4f0efeeebc4a86b95a7ccb19..56fb6e40d5fc1a4bc9e40371b13e6eb85b915bdb 100644 --- a/test/unit/test_list_schedulers.py +++ b/test/unit/test_list_schedulers.py @@ -895,7 +895,7 @@ class TestHybridScheduler: resources = {MADS.type_name(): 1, Reciprocal.type_name(): 1} with pytest.raises( TimeoutError, - match="Algorithm did not schedule any operation for 10 time steps, try relaxing constraints.", + match="Algorithm did not manage to schedule any operation for 10 time steps, try relaxing constraints.", ): Schedule( sfg,