Skip to content
Snippets Groups Projects
Commit 05b3d917 authored by Simon Bjurek's avatar Simon Bjurek
Browse files

almost working implementation, resource allocation / freeing needs to be rework for cyclic though

parent 1e14a771
No related branches found
No related tags found
1 merge request!478Add cyclic scheduling and improve resource
Pipeline #157083 failed
This commit is part of merge request !478. Comments created here will be created in the context of that merge request.
......@@ -120,8 +120,6 @@ class Schedule:
max_end_time = self.get_max_end_time()
if not self._schedule_time:
self._schedule_time = max_end_time
elif self._schedule_time < max_end_time:
raise ValueError(f"Too short schedule time. Minimum is {max_end_time}.")
def __str__(self) -> str:
"""Return a string representation of this Schedule."""
......@@ -306,13 +304,16 @@ class Schedule:
usage_time = start_time + cast(int, input_port.latency_offset)
for signal in input_port.signals:
source = cast(OutputPort, signal.source)
available_time = (
cast(int, source.latency_offset)
+ self._start_times[source.operation.graph_id]
- self._schedule_time * self._laps[signal.graph_id]
)
if available_time > self._schedule_time:
available_time -= self._schedule_time
if source.operation.graph_id.startswith("dontcare"):
available_time = 0
else:
available_time = (
cast(int, source.latency_offset)
+ self._start_times[source.operation.graph_id]
- self._schedule_time * self._laps[signal.graph_id]
)
if available_time > self._schedule_time:
available_time -= self._schedule_time
input_slacks[signal] = usage_time - available_time
return input_slacks
......
......@@ -58,6 +58,7 @@ class ASAPScheduler(Scheduler):
schedule : Schedule
Schedule to apply the scheduling algorithm on.
"""
prec_list = schedule.sfg.get_precedence_list()
if len(prec_list) < 2:
raise ValueError("Empty signal flow graph cannot be scheduled.")
......@@ -120,6 +121,13 @@ class ASAPScheduler(Scheduler):
self._handle_outputs(schedule, non_schedulable_ops)
schedule.remove_delays()
max_end_time = schedule.get_max_end_time()
if schedule.schedule_time is None:
schedule.set_schedule_time(max_end_time)
elif schedule.schedule_time < max_end_time:
raise ValueError(f"Too short schedule time. Minimum is {max_end_time}.")
schedule.sort_y_locations_on_start_times()
......@@ -135,12 +143,12 @@ class ALAPScheduler(Scheduler):
Schedule to apply the scheduling algorithm on.
"""
ASAPScheduler().apply_scheduling(schedule)
max_end_time = schedule.get_max_end_time()
# max_end_time = schedule.get_max_end_time()
if schedule.schedule_time is None:
schedule.set_schedule_time(max_end_time)
elif schedule.schedule_time < max_end_time:
raise ValueError(f"Too short schedule time. Minimum is {max_end_time}.")
# if schedule.schedule_time is None:
# schedule.set_schedule_time(max_end_time)
# elif schedule.schedule_time < max_end_time:
# raise ValueError(f"Too short schedule time. Minimum is {max_end_time}.")
# move all outputs ALAP before operations
for output in schedule.sfg.find_by_type_name(Output.type_name()):
......@@ -227,6 +235,7 @@ class ListScheduler(Scheduler, ABC):
)
alap_schedule = copy.copy(self._schedule)
alap_schedule.set_schedule_time(sys.maxsize)
ALAPScheduler().apply_scheduling(alap_schedule)
alap_start_times = alap_schedule.start_times
self._schedule.start_times = {}
......@@ -240,22 +249,27 @@ class ListScheduler(Scheduler, ABC):
)
used_resources_ready_times = {}
remaining_resources = self._max_resources.copy()
self._remaining_resources = self._max_resources.copy()
remaining_ops = (
self._sfg.operations
+ self._sfg.find_by_type_name(Input.type_name())
+ self._sfg.find_by_type_name(Output.type_name())
# + self._sfg.find_by_type_name(Input.type_name())
# + self._sfg.find_by_type_name(Output.type_name())
)
remaining_ops = [op.graph_id for op in remaining_ops]
self._schedule.start_times = {}
self.remaining_reads = self._max_concurrent_reads
self._current_time = 0
self._time_out_counter = 0
self._op_laps = {}
# initial input placement
if self._input_times:
for input_id in self._input_times:
self._schedule.start_times[input_id] = self._input_times[input_id]
self._op_laps[input_id] = 0
remaining_ops = [
elem for elem in remaining_ops if not elem.startswith("in")
]
......@@ -268,24 +282,26 @@ class ListScheduler(Scheduler, ABC):
if not (op.startswith("out") and op in self._output_delta_times)
]
self._current_time = 0
self._time_out_counter = 0
self._op_laps = {}
while remaining_ops:
ready_ops_priority_table = self._get_ready_ops_priority_table(
alap_start_times,
remaining_ops,
remaining_resources,
)
while ready_ops_priority_table:
next_op = self._sfg.find_by_id(
self._get_next_op_id(ready_ops_priority_table)
)
if next_op.type_name() in remaining_resources:
remaining_resources[next_op.type_name()] -= 1
used_resources_ready_times[next_op] = self._current_time + 1
if next_op.type_name() in self._remaining_resources:
self._remaining_resources[next_op.type_name()] -= 1
if self._schedule.schedule_time is not None:
used_resources_ready_times[next_op] = (
self._current_time + max(next_op.execution_time, 1)
) % self._schedule.schedule_time
else:
used_resources_ready_times[next_op] = self._current_time + max(
next_op.execution_time, 1
)
self.remaining_reads -= next_op.input_count
......@@ -293,10 +309,11 @@ class ListScheduler(Scheduler, ABC):
op_id for op_id in remaining_ops if op_id != next_op.graph_id
]
print("Next:", next_op.graph_id, self._current_time)
self._time_out_counter = 0
self._schedule.place_operation(next_op, self._current_time)
self._op_laps[next_op.graph_id] = (
(self._current_time + next_op.latency)
// self._schedule.schedule_time
(self._current_time) // self._schedule.schedule_time
if self._schedule.schedule_time
else 0
)
......@@ -310,7 +327,6 @@ class ListScheduler(Scheduler, ABC):
ready_ops_priority_table = self._get_ready_ops_priority_table(
alap_start_times,
remaining_ops,
remaining_resources,
)
self._go_to_next_time_step()
......@@ -318,21 +334,21 @@ class ListScheduler(Scheduler, ABC):
ready_ops_priority_table = self._get_ready_ops_priority_table(
alap_start_times,
remaining_ops,
remaining_resources,
)
# update available reads and operators
if self._schedule.schedule_time is not None:
time = self._current_time % self._schedule.schedule_time
else:
time = self._current_time
self.remaining_reads = self._max_concurrent_reads
for operation, ready_time in used_resources_ready_times.items():
if ready_time >= self._current_time:
remaining_resources[operation.type_name()] += 1
if ready_time >= time:
self._remaining_resources[operation.type_name()] += 1
used_resources_ready_times = dict(
[
pair
for pair in used_resources_ready_times.items()
if pair[1] > self._current_time
]
[pair for pair in used_resources_ready_times.items() if pair[1] > time]
)
self._current_time -= 1
......@@ -377,17 +393,11 @@ class ListScheduler(Scheduler, ABC):
self,
alap_start_times: dict["GraphID", int],
remaining_ops: list["GraphID"],
remaining_resources: dict["GraphID", int],
) -> list[tuple["GraphID", int, int, int]]:
ready_ops = [
op_id
for op_id in remaining_ops
if self._op_is_schedulable(
self._sfg.find_by_id(op_id),
remaining_resources,
remaining_ops,
)
if self._op_is_schedulable(self._sfg.find_by_id(op_id), remaining_ops)
]
deadlines = self._calculate_deadlines(alap_start_times)
......@@ -426,14 +436,11 @@ class ListScheduler(Scheduler, ABC):
}
def _op_is_schedulable(
self,
op: "Operation",
remaining_resources: dict["GraphID", int],
remaining_ops: list["GraphID"],
self, op: "Operation", remaining_ops: list["GraphID"]
) -> bool:
if (
op.type_name() in remaining_resources
and remaining_resources[op.type_name()] == 0
op.type_name() in self._remaining_resources
and self._remaining_resources[op.type_name()] == 0
):
return False
......@@ -487,7 +494,10 @@ class ListScheduler(Scheduler, ABC):
return earliest_start_time <= self._current_time
def _handle_outputs(self) -> None:
end = self._schedule.get_max_end_time()
if self._schedule.cyclic:
end = self._schedule.schedule_time
else:
end = self._schedule.get_max_end_time()
for output in self._sfg.find_by_type_name(Output.type_name()):
output = cast(Output, output)
if output.graph_id in self._output_delta_times:
......@@ -496,10 +506,7 @@ class ListScheduler(Scheduler, ABC):
new_time = end + delta_time
if self._schedule.cyclic and self._schedule.schedule_time is not None:
self._schedule.start_times[output.graph_id] = (
self._schedule.schedule_time
)
self._schedule.move_operation(output.graph_id, delta_time)
self._schedule.place_operation(output, new_time)
else:
self._schedule.start_times[output.graph_id] = new_time
......@@ -508,11 +515,11 @@ class ListScheduler(Scheduler, ABC):
if time == new_time and op_id.startswith("out"):
count += 1
remaining_resources = self._max_resources
remaining_resources[Output.type_name()] -= count
self._remaining_resources = self._max_resources
self._remaining_resources[Output.type_name()] -= count
self._current_time = new_time
if not self._op_is_schedulable(output, remaining_resources, {}):
if not self._op_is_schedulable(output, {}):
raise ValueError(
"Cannot schedule outputs according to the provided output_delta_times. "
f"Failed output: {output.graph_id}, "
......
......@@ -34,7 +34,7 @@ schedule.show()
# Generate a non-cyclic Schedule from HybridScheduler with custom IO times.
resources = {Butterfly.type_name(): 1, ConstantMultiplication.type_name(): 1}
input_times = {f"in{i}": i for i in range(points)}
output_delta_times = {f"out{i}": i - 2 for i in range(points)}
output_delta_times = {f"out{i}": i for i in range(points)}
schedule = Schedule(
sfg,
scheduler=HybridScheduler(
......@@ -55,6 +55,37 @@ schedule = Schedule(
input_times=input_times,
output_delta_times=output_delta_times,
),
schedule_time=14,
cyclic=True,
)
schedule.show()
# %%
# Generate a new Schedule with even less scheduling time
output_delta_times = {f"out{i}": i + 1 for i in range(points)}
schedule = Schedule(
sfg,
scheduler=HybridScheduler(
resources,
input_times=input_times,
output_delta_times=output_delta_times,
),
schedule_time=13,
cyclic=True,
)
schedule.show()
# %%
# Try scheduling for 12 cycles, which gives full butterfly usage
output_delta_times = {f"out{i}": i + 2 for i in range(points)}
schedule = Schedule(
sfg,
scheduler=HybridScheduler(
resources,
input_times=input_times,
output_delta_times=output_delta_times,
),
schedule_time=12,
cyclic=True,
)
schedule.show()
......@@ -635,6 +635,7 @@ class TestHybridScheduler:
scheduler=HybridScheduler(
resources, input_times=input_times, output_delta_times=output_times
),
schedule_time=20,
cyclic=True,
)
......@@ -810,6 +811,7 @@ class TestHybridScheduler:
scheduler=HybridScheduler(
resources, input_times=input_times, output_delta_times=output_times
),
schedule_time=16,
cyclic=True,
)
......@@ -895,7 +897,7 @@ class TestHybridScheduler:
resources = {MADS.type_name(): 1, Reciprocal.type_name(): 1}
with pytest.raises(
TimeoutError,
match="Algorithm did not manage to schedule any operation for 10 time steps, try relaxing constraints.",
match="Algorithm did not manage to schedule any operation for 10 time steps, try relaxing the constraints.",
):
Schedule(
sfg,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment