Skip to content
Snippets Groups Projects

Add cyclic scheduling and improve resource

Merged Simon Bjurek requested to merge improve-resource-constraint into master
2 files
+ 25
42
Compare changes
  • Side-by-side
  • Inline
Files
2
+ 184
144
import copy
import sys
from abc import ABC, abstractmethod
from math import ceil
from typing import TYPE_CHECKING, Optional, cast
from b_asic.core_operations import DontCare
@@ -11,7 +12,6 @@ from b_asic.types import TypeName
if TYPE_CHECKING:
from b_asic.operation import Operation
from b_asic.schedule import Schedule
from b_asic.signal_flow_graph import SFG
from b_asic.types import GraphID
@@ -58,6 +58,7 @@ class ASAPScheduler(Scheduler):
schedule : Schedule
Schedule to apply the scheduling algorithm on.
"""
prec_list = schedule.sfg.get_precedence_list()
if len(prec_list) < 2:
raise ValueError("Empty signal flow graph cannot be scheduled.")
@@ -120,6 +121,13 @@ class ASAPScheduler(Scheduler):
self._handle_outputs(schedule, non_schedulable_ops)
schedule.remove_delays()
max_end_time = schedule.get_max_end_time()
if schedule.schedule_time is None:
schedule.set_schedule_time(max_end_time)
elif schedule.schedule_time < max_end_time:
raise ValueError(f"Too short schedule time. Minimum is {max_end_time}.")
schedule.sort_y_locations_on_start_times()
@@ -135,12 +143,6 @@ class ALAPScheduler(Scheduler):
Schedule to apply the scheduling algorithm on.
"""
ASAPScheduler().apply_scheduling(schedule)
max_end_time = schedule.get_max_end_time()
if schedule.schedule_time is None:
schedule.set_schedule_time(max_end_time)
elif schedule.schedule_time < max_end_time:
raise ValueError(f"Too short schedule time. Minimum is {max_end_time}.")
# move all outputs ALAP before operations
for output in schedule.sfg.find_by_type_name(Output.type_name()):
@@ -157,6 +159,9 @@ class ALAPScheduler(Scheduler):
class ListScheduler(Scheduler, ABC):
TIME_OUT_COUNTER_LIMIT = 100
def __init__(
self,
max_resources: Optional[dict[TypeName, int]] = None,
@@ -179,6 +184,11 @@ class ListScheduler(Scheduler, ABC):
else:
self._max_resources = {}
if Input.type_name() not in self._max_resources:
self._max_resources[Input.type_name()] = 1
if Output.type_name() not in self._max_resources:
self._max_resources[Output.type_name()] = 1
self._max_concurrent_reads = max_concurrent_reads or sys.maxsize
self._max_concurrent_writes = max_concurrent_writes or sys.maxsize
@@ -198,127 +208,138 @@ class ListScheduler(Scheduler, ABC):
schedule : Schedule
Schedule to apply the scheduling algorithm on.
"""
sfg = schedule.sfg
self._schedule = schedule
self._sfg = schedule.sfg
alap_schedule = copy.copy(schedule)
if self._schedule.cyclic and self._schedule.schedule_time is None:
raise ValueError("Scheduling time must be provided when cyclic = True.")
for resource_type, resource_amount in self._max_resources.items():
total_exec_time = sum(
[op.execution_time for op in self._sfg.find_by_type_name(resource_type)]
)
if self._schedule.schedule_time is not None:
resource_lower_bound = ceil(
total_exec_time / self._schedule.schedule_time
)
if resource_amount < resource_lower_bound:
raise ValueError(
f"Amount of resource: {resource_type} is not enough to "
f"realize schedule for scheduling time: {self._schedule.schedule_time}"
)
alap_schedule = copy.copy(self._schedule)
alap_schedule._schedule_time = None
ALAPScheduler().apply_scheduling(alap_schedule)
alap_start_times = alap_schedule.start_times
schedule.start_times = {}
used_resources_ready_times = {}
remaining_resources = self._max_resources.copy()
if Input.type_name() not in remaining_resources:
remaining_resources[Input.type_name()] = 1
if Output.type_name() not in remaining_resources:
remaining_resources[Output.type_name()] = 1
remaining_ops = (
sfg.operations
+ sfg.find_by_type_name(Input.type_name())
+ sfg.find_by_type_name(Output.type_name())
)
self._schedule.start_times = {}
if not self._schedule.cyclic and self._schedule.schedule_time:
if alap_schedule.schedule_time > self._schedule.schedule_time:
raise ValueError(
f"Provided scheduling time {schedule.schedule_time} cannot be reached, "
"try to enable the cyclic property or increase the time to at least "
f"{alap_schedule.schedule_time}."
)
self._remaining_resources = self._max_resources.copy()
remaining_ops = self._sfg.operations
remaining_ops = [op.graph_id for op in remaining_ops]
schedule.start_times = {}
remaining_reads = self._max_concurrent_reads
self._schedule.start_times = {}
self.remaining_reads = self._max_concurrent_reads
self._current_time = 0
self._time_out_counter = 0
self._op_laps = {}
# initial input placement
if self._input_times:
for input_id in self._input_times:
schedule.start_times[input_id] = self._input_times[input_id]
self._schedule.start_times[input_id] = self._input_times[input_id]
self._op_laps[input_id] = 0
remaining_ops = [
elem for elem in remaining_ops if not elem.startswith("in")
]
remaining_ops = [op for op in remaining_ops if not op.startswith("dontcare")]
remaining_ops = [op for op in remaining_ops if not op.startswith("t")]
remaining_ops = [
op
for op in remaining_ops
if not (op.startswith("out") and op in self._output_delta_times)
]
current_time = 0
time_out_counter = 0
while remaining_ops:
ready_ops_priority_table = self._get_ready_ops_priority_table(
sfg,
schedule.start_times,
current_time,
alap_start_times,
remaining_ops,
remaining_resources,
remaining_reads,
)
while ready_ops_priority_table:
next_op = sfg.find_by_id(self._get_next_op_id(ready_ops_priority_table))
if next_op.type_name() in remaining_resources:
remaining_resources[next_op.type_name()] -= 1
if (
next_op.type_name() == Input.type_name()
or next_op.type_name() == Output.type_name()
):
used_resources_ready_times[next_op] = current_time + 1
else:
used_resources_ready_times[next_op] = (
current_time + next_op.execution_time
)
remaining_reads -= next_op.input_count
next_op = self._sfg.find_by_id(
self._get_next_op_id(ready_ops_priority_table)
)
self.remaining_reads -= next_op.input_count
remaining_ops = [
op_id for op_id in remaining_ops if op_id != next_op.graph_id
]
schedule.start_times[next_op.graph_id] = current_time
self._time_out_counter = 0
self._schedule.place_operation(next_op, self._current_time)
self._op_laps[next_op.graph_id] = (
(self._current_time) // self._schedule.schedule_time
if self._schedule.schedule_time
else 0
)
if not self._schedule.cyclic and self._schedule.schedule_time:
if self._current_time > self._schedule.schedule_time:
raise ValueError(
f"Provided scheduling time {schedule.schedule_time} cannot be reached, "
"try to enable the cyclic property or increase the time."
)
ready_ops_priority_table = self._get_ready_ops_priority_table(
sfg,
schedule.start_times,
current_time,
alap_start_times,
remaining_ops,
remaining_resources,
remaining_reads,
)
current_time += 1
time_out_counter += 1
if time_out_counter >= 100:
raise TimeoutError(
"Algorithm did not schedule any operation for 10 time steps, "
"try relaxing constraints."
)
self._go_to_next_time_step()
ready_ops_priority_table = self._get_ready_ops_priority_table(
sfg,
schedule.start_times,
current_time,
alap_start_times,
remaining_ops,
remaining_resources,
remaining_reads,
)
# update available reads and operators
remaining_reads = self._max_concurrent_reads
for operation, ready_time in used_resources_ready_times.items():
if ready_time == current_time:
remaining_resources[operation.type_name()] += 1
current_time -= 1
self.remaining_reads = self._max_concurrent_reads
self._handle_outputs(schedule)
self._current_time -= 1
if not schedule.cyclic:
max_start_time = max(schedule.start_times.values())
if current_time < max_start_time:
current_time = max_start_time
current_time = max(current_time, schedule.get_max_end_time())
schedule.set_schedule_time(current_time)
self._handle_outputs()
schedule.remove_delays()
if self._schedule.schedule_time is None:
self._schedule.set_schedule_time(self._schedule.get_max_end_time())
self._schedule.remove_delays()
# schedule all dont cares ALAP
for dc_op in sfg.find_by_type_name(DontCare.type_name()):
for dc_op in self._sfg.find_by_type_name(DontCare.type_name()):
dc_op = cast(DontCare, dc_op)
schedule.start_times[dc_op.graph_id] = 0
schedule.move_operation_alap(dc_op.graph_id)
self._schedule.start_times[dc_op.graph_id] = 0
self._schedule.move_operation_alap(dc_op.graph_id)
schedule.sort_y_locations_on_start_times()
self._schedule.sort_y_locations_on_start_times()
def _go_to_next_time_step(self):
self._time_out_counter += 1
if self._time_out_counter >= self.TIME_OUT_COUNTER_LIMIT:
raise TimeoutError(
"Algorithm did not manage to schedule any operation for 10 time steps, "
"try relaxing the constraints."
)
self._current_time += 1
def _get_next_op_id(
self, ready_ops_priority_table: list[tuple["GraphID", int, ...]]
@@ -334,35 +355,18 @@ class ListScheduler(Scheduler, ABC):
def _get_ready_ops_priority_table(
self,
sfg: "SFG",
start_times: dict["GraphID", int],
current_time: int,
alap_start_times: dict["GraphID", int],
remaining_ops: list["GraphID"],
remaining_resources: dict["GraphID", int],
remaining_reads: int,
) -> list[tuple["GraphID", int, int, int]]:
ready_ops = [
op_id
for op_id in remaining_ops
if self._op_is_schedulable(
start_times,
sfg,
sfg.find_by_id(op_id),
current_time,
remaining_resources,
remaining_reads,
self._max_concurrent_writes,
remaining_ops,
)
if self._op_is_schedulable(self._sfg.find_by_id(op_id), remaining_ops)
]
deadlines = self._calculate_deadlines(sfg, alap_start_times)
output_slacks = self._calculate_alap_output_slacks(
current_time, alap_start_times
)
fan_outs = self._calculate_fan_outs(sfg, alap_start_times)
deadlines = self._calculate_deadlines(alap_start_times)
output_slacks = self._calculate_alap_output_slacks(alap_start_times)
fan_outs = self._calculate_fan_outs(alap_start_times)
ready_ops_priority_table = []
for op_id in ready_ops:
@@ -372,58 +376,68 @@ class ListScheduler(Scheduler, ABC):
return ready_ops_priority_table
def _calculate_deadlines(
self, sfg, alap_start_times: dict["GraphID", int]
self, alap_start_times: dict["GraphID", int]
) -> dict["GraphID", int]:
return {
op_id: start_time + sfg.find_by_id(op_id).latency
op_id: start_time + self._sfg.find_by_id(op_id).latency
for op_id, start_time in alap_start_times.items()
}
def _calculate_alap_output_slacks(
self, current_time: int, alap_start_times: dict["GraphID", int]
self, alap_start_times: dict["GraphID", int]
) -> dict["GraphID", int]:
return {
op_id: start_time - current_time
op_id: start_time - self._current_time
for op_id, start_time in alap_start_times.items()
}
def _calculate_fan_outs(
self, sfg: "SFG", alap_start_times: dict["GraphID", int]
self, alap_start_times: dict["GraphID", int]
) -> dict["GraphID", int]:
return {
op_id: len(sfg.find_by_id(op_id).output_signals)
op_id: len(self._sfg.find_by_id(op_id).output_signals)
for op_id, start_time in alap_start_times.items()
}
@staticmethod
def _op_satisfies_resource_constraints(self, op: "Operation") -> bool:
if self._schedule.schedule_time is not None:
time_slot = self._current_time % self._schedule.schedule_time
else:
time_slot = self._current_time
count = 0
for op_id, start_time in self._schedule.start_times.items():
if self._schedule.schedule_time is not None:
start_time = start_time % self._schedule.schedule_time
if time_slot >= start_time:
if time_slot < start_time + max(
self._sfg.find_by_id(op_id).execution_time, 1
):
if op_id.startswith(op.type_name()):
if op.graph_id != op_id:
count += 1
return count < self._remaining_resources[op.type_name()]
def _op_is_schedulable(
start_times: dict["GraphID"],
sfg: "SFG",
op: "Operation",
current_time: int,
remaining_resources: dict["GraphID", int],
remaining_reads: int,
max_concurrent_writes: int,
remaining_ops: list["GraphID"],
self, op: "Operation", remaining_ops: list["GraphID"]
) -> bool:
if (
op.type_name() in remaining_resources
and remaining_resources[op.type_name()] == 0
):
if not self._op_satisfies_resource_constraints(op):
return False
op_finish_time = current_time + op.latency
op_finish_time = self._current_time + op.latency
future_ops = [
sfg.find_by_id(item[0])
for item in start_times.items()
if item[1] + sfg.find_by_id(item[0]).latency == op_finish_time
self._sfg.find_by_id(item[0])
for item in self._schedule.start_times.items()
if item[1] + self._sfg.find_by_id(item[0]).latency == op_finish_time
]
future_ops_writes = sum([op.input_count for op in future_ops])
if (
not op.graph_id.startswith("out")
and future_ops_writes >= max_concurrent_writes
and future_ops_writes >= self._max_concurrent_writes
):
return False
@@ -439,32 +453,58 @@ class ListScheduler(Scheduler, ABC):
if source_op_graph_id in remaining_ops:
return False
if start_times[source_op_graph_id] != current_time - 1:
if self._schedule.start_times[source_op_graph_id] != self._current_time - 1:
# not a direct connection -> memory read required
read_counter += 1
if read_counter > remaining_reads:
if read_counter > self.remaining_reads:
return False
proceeding_op_start_time = start_times.get(source_op_graph_id)
proceeding_op_finish_time = proceeding_op_start_time + source_op.latency
if self._schedule.schedule_time is not None:
proceeding_op_start_time = (
self._schedule.start_times.get(source_op_graph_id)
+ self._op_laps[source_op.graph_id] * self._schedule.schedule_time
)
proceeding_op_finish_time = proceeding_op_start_time + source_op.latency
else:
proceeding_op_start_time = self._schedule.start_times.get(
source_op_graph_id
)
proceeding_op_finish_time = proceeding_op_start_time + source_op.latency
earliest_start_time = max(earliest_start_time, proceeding_op_finish_time)
return earliest_start_time <= current_time
def _handle_outputs(
self, schedule: "Schedule", non_schedulable_ops: Optional[list["GraphID"]] = []
) -> None:
schedule.set_schedule_time(schedule.get_max_end_time())
return earliest_start_time <= self._current_time
for output in schedule.sfg.find_by_type_name(Output.type_name()):
def _handle_outputs(self) -> None:
if self._schedule.cyclic:
end = self._schedule.schedule_time
else:
end = self._schedule.get_max_end_time()
for output in self._sfg.find_by_type_name(Output.type_name()):
output = cast(Output, output)
if output.graph_id in self._output_delta_times:
delta_time = self._output_delta_times[output.graph_id]
if schedule.cyclic:
schedule.start_times[output.graph_id] = schedule.schedule_time
schedule.move_operation(output.graph_id, delta_time)
new_time = end + delta_time
if self._schedule.cyclic and self._schedule.schedule_time is not None:
self._schedule.place_operation(output, new_time)
else:
schedule.start_times[output.graph_id] = (
schedule.schedule_time + delta_time
self._schedule.start_times[output.graph_id] = new_time
count = -1
for op_id, time in self._schedule.start_times.items():
if time == new_time and op_id.startswith("out"):
count += 1
self._remaining_resources = self._max_resources
self._remaining_resources[Output.type_name()] -= count
self._current_time = new_time
if not self._op_is_schedulable(output, {}):
raise ValueError(
"Cannot schedule outputs according to the provided output_delta_times. "
f"Failed output: {output.graph_id}, "
f"at time: { self._schedule.start_times[output.graph_id]}, "
"try relaxing the constraints."
)
Loading