diff --git a/b_asic/schedule.py b/b_asic/schedule.py index a708fe1ad0ed4048143317b00d37827a96ff9259..d1c63041391874c1fb3f8243eef665aad6e10416 100644 --- a/b_asic/schedule.py +++ b/b_asic/schedule.py @@ -33,7 +33,7 @@ from b_asic.operation import Operation from b_asic.port import InputPort, OutputPort from b_asic.process import MemoryVariable, OperatorProcess from b_asic.resources import ProcessCollection -from b_asic.scheduler import Scheduler, SchedulingAlgorithm +from b_asic.scheduler import Scheduler from b_asic.signal_flow_graph import SFG from b_asic.special_operations import Delay, Input, Output from b_asic.types import TypeName @@ -94,12 +94,11 @@ class Schedule: def __init__( self, sfg: SFG, + scheduler: Optional[Scheduler] = None, schedule_time: Optional[int] = None, cyclic: bool = False, - algorithm: SchedulingAlgorithm = "ASAP", start_times: Optional[Dict[GraphID, int]] = None, laps: Optional[Dict[GraphID, int]] = None, - max_resources: Optional[Dict[TypeName, int]] = None, ): """Construct a Schedule from an SFG.""" if not isinstance(sfg, SFG): @@ -112,14 +111,10 @@ class Schedule: self._y_locations = defaultdict(_y_locations_default) self._schedule_time = schedule_time - self.scheduler = Scheduler(self) - if algorithm == "ASAP": - self.scheduler.schedule_asap() - elif algorithm == "ALAP": - self.scheduler.schedule_alap() - elif algorithm == "earliest_deadline": - self.scheduler.schedule_earliest_deadline([]) - elif algorithm == "provided": + if scheduler: + self._scheduler = scheduler + scheduler.apply_scheduling(self) + else: if start_times is None: raise ValueError("Must provide start_times when using 'provided'") if laps is None: @@ -127,11 +122,8 @@ class Schedule: self._start_times = start_times self._laps.update(laps) self._remove_delays_no_laps() - else: - raise NotImplementedError(f"No algorithm with name: {algorithm} defined.") max_end_time = self.get_max_end_time() - if schedule_time is None: self._schedule_time = max_end_time elif schedule_time < max_end_time: @@ -399,6 +391,12 @@ class Schedule: """The start times of the operations in the schedule.""" return self._start_times + @start_times.setter + def start_times(self, start_times: dict[GraphID, int]) -> None: + if not isinstance(start_times, dict[GraphID, int]): + raise TypeError("start_times must be a dict[GraphID, int]") + self._start_times = start_times + @property def laps(self) -> Dict[GraphID, int]: """ @@ -770,7 +768,7 @@ class Schedule: self._sfg = cast(SFG, self._sfg.remove_operation(delay_op.graph_id)) delay_list = self._sfg.find_by_type_name(Delay.type_name()) - def _remove_delays(self) -> None: + def remove_delays(self) -> None: """Remove delay elements and update laps. Used after scheduling algorithm.""" delay_list = self._sfg.find_by_type_name(Delay.type_name()) while delay_list: diff --git a/b_asic/scheduler.py b/b_asic/scheduler.py index 845859c971d98cc50f50a2f3c314a841880d0fcd..30089db2dea91c9752074dab23f7d2b329ef15d7 100644 --- a/b_asic/scheduler.py +++ b/b_asic/scheduler.py @@ -1,30 +1,48 @@ -from enum import Enum +from abc import ABC, abstractmethod from typing import TYPE_CHECKING, cast -from b_asic.operation import Operation from b_asic.port import OutputPort from b_asic.special_operations import Delay, Output +from b_asic.types import TypeName if TYPE_CHECKING: from b_asic.schedule import Schedule -class SchedulingAlgorithm(Enum): - ASAP = "ASAP" - ALAP = "ALAP" - EARLIEST_DEADLINE = "earliest_deadline" - # LEAST_SLACK = "least_slack" # to be implemented - PROVIDED = "provided" +# class SchedulingAlgorithm(Enum): +# ASAP = "ASAP" +# ALAP = "ALAP" +# EARLIEST_DEADLINE = "earliest_deadline" +# # LEAST_SLACK = "least_slack" # to be implemented +# PROVIDED = "provided" -class Scheduler: - def __init__(self, schedule: "Schedule") -> None: - self.schedule = schedule +class Scheduler(ABC): + @abstractmethod + def apply_scheduling(self, schedule: "Schedule") -> None: + pass - def schedule_asap(self) -> None: - """Schedule the operations using as-soon-as-possible scheduling.""" - sched = self.schedule - prec_list = sched.sfg.get_precedence_list() + def _handle_outputs(self, schedule, non_schedulable_ops) -> None: + for output in schedule.sfg.find_by_type_name(Output.type_name()): + output = cast(Output, output) + source_port = cast(OutputPort, output.inputs[0].signals[0].source) + if source_port.operation.graph_id in non_schedulable_ops: + schedule.start_times[output.graph_id] = 0 + else: + if source_port.latency_offset is None: + raise ValueError( + f"Output port {source_port.index} of operation" + f" {source_port.operation.graph_id} has no" + " latency-offset." + ) + schedule.start_times[output.graph_id] = schedule.start_times[ + source_port.operation.graph_id + ] + cast(int, source_port.latency_offset) + + +class ASAPScheduler(Scheduler): + def apply_scheduling(self, schedule: "Schedule") -> None: + prec_list = schedule.sfg.get_precedence_list() if len(prec_list) < 2: raise ValueError("Empty signal flow graph cannot be scheduled.") @@ -34,21 +52,19 @@ class Scheduler: operation = outport.operation if operation.type_name() == Delay.type_name(): non_schedulable_ops.add(operation.graph_id) - # elif operation.graph_id not in sched._start_times: else: - sched._start_times[operation.graph_id] = 0 + schedule.start_times[operation.graph_id] = 0 # handle second set in precedence graph (first operations) for outport in prec_list[1]: operation = outport.operation - # if operation.graph_id not in sched._start_times: - sched._start_times[operation.graph_id] = 0 + schedule.start_times[operation.graph_id] = 0 # handle the remaining sets for outports in prec_list[2:]: for outport in outports: operation = outport.operation - if operation.graph_id not in sched._start_times: + if operation.graph_id not in schedule.start_times: op_start_time = 0 for current_input in operation.inputs: if len(current_input.signals) != 1: @@ -64,7 +80,7 @@ class Scheduler: if source_port.operation.graph_id in non_schedulable_ops: source_end_time = 0 else: - source_op_time = sched._start_times[ + source_op_time = schedule.start_times[ source_port.operation.graph_id ] @@ -91,41 +107,42 @@ class Scheduler: ) op_start_time = max(op_start_time, op_start_time_from_in) - sched._start_times[operation.graph_id] = op_start_time + schedule.start_times[operation.graph_id] = op_start_time + + self._handle_outputs(schedule, non_schedulable_ops) + schedule.remove_delays() - self._handle_outputs_and_delays(non_schedulable_ops) - def schedule_alap(self) -> None: - """Schedule the operations using as-late-as-possible scheduling.""" - self.schedule_asap() - sched = self.schedule - max_end_time = sched.get_max_end_time() +class ALAPScheduler(Scheduler): + def apply_scheduling(self, schedule: "Schedule") -> None: + # self.schedule_asap() + ASAPScheduler().apply_scheduling(schedule) + max_end_time = schedule.get_max_end_time() - if sched.schedule_time is None: - sched.set_schedule_time(max_end_time) - elif sched.schedule_time < max_end_time: + if schedule.schedule_time is None: + schedule.set_schedule_time(max_end_time) + elif schedule.schedule_time < max_end_time: raise ValueError(f"Too short schedule time. Minimum is {max_end_time}.") # move all outputs ALAP before operations - for output in sched.sfg.find_by_type_name(Output.type_name()): + for output in schedule.sfg.find_by_type_name(Output.type_name()): output = cast(Output, output) - sched.move_operation_alap(output.graph_id) + schedule.move_operation_alap(output.graph_id) # move all operations ALAP - for step in reversed(sched.sfg.get_precedence_list()): + for step in reversed(schedule.sfg.get_precedence_list()): for outport in step: if not isinstance(outport.operation, Delay): - sched.move_operation_alap(outport.operation.graph_id) + schedule.move_operation_alap(outport.operation.graph_id) - def schedule_earliest_deadline( - self, process_elements: dict[Operation, int] - ) -> None: - """Schedule the operations using earliest deadline scheduling.""" - # ACT BASED ON THE NUMBER OF PEs! +class EarliestDeadlineScheduler(Scheduler): + def __init__(self, max_resources: dict[TypeName, int]) -> None: + self._max_resources = max_resources - sched = self.schedule - prec_list = sched.sfg.get_precedence_list() + def apply_scheduling(self, schedule: "Schedule") -> None: + # ACT BASED ON THE NUMBER OF PEs! + prec_list = schedule.sfg.get_precedence_list() if len(prec_list) < 2: raise ValueError("Empty signal flow graph cannot be scheduled.") @@ -135,8 +152,8 @@ class Scheduler: operation = outport.operation if operation.type_name() == Delay.type_name(): non_schedulable_ops.add(operation.graph_id) - elif operation.graph_id not in sched._start_times: - sched._start_times[operation.graph_id] = 0 + elif operation.graph_id not in schedule.start_times: + schedule.start_times[operation.graph_id] = 0 current_time = 0 sorted_outports = sorted( @@ -144,50 +161,30 @@ class Scheduler: ) for outport in sorted_outports: op = outport.operation - sched._start_times[op.graph_id] = current_time + schedule.start_times[op.graph_id] = current_time current_time += 1 for outports in prec_list[2:]: - # try all remaining operations for one time step - candidates = [] current_time -= 1 - while len(candidates) == 0: - current_time += 1 - for outport in outports: - remaining_op = outport.operation + for outport in outports: + op = outport.operation + candidates = [] + while not candidates: + current_time += 1 op_is_ready_to_be_scheduled = True - for op_input in remaining_op.inputs: + for op_input in op.inputs: source_op = op_input.signals[0].source.operation - source_op_time = sched.start_times[source_op.graph_id] + source_op_time = schedule.start_times[source_op.graph_id] source_end_time = source_op_time + source_op.latency if source_end_time > current_time: op_is_ready_to_be_scheduled = False if op_is_ready_to_be_scheduled: - candidates.append(remaining_op) - # sched._start_times[remaining_op.graph_id] = current_time - sorted_candidates = sorted( - candidates, key=lambda candidate: candidate.latency - ) - # schedule the best candidate to current time - sched._start_times[sorted_candidates[0].graph_id] = current_time - - self._handle_outputs_and_delays(non_schedulable_ops) - - def _handle_outputs_and_delays(self, non_schedulable_ops) -> None: - sched = self.schedule - for output in sched._sfg.find_by_type_name(Output.type_name()): - output = cast(Output, output) - source_port = cast(OutputPort, output.inputs[0].signals[0].source) - if source_port.operation.graph_id in non_schedulable_ops: - sched._start_times[output.graph_id] = 0 - else: - if source_port.latency_offset is None: - raise ValueError( - f"Output port {source_port.index} of operation" - f" {source_port.operation.graph_id} has no" - " latency-offset." - ) - sched._start_times[output.graph_id] = sched._start_times[ - source_port.operation.graph_id - ] + cast(int, source_port.latency_offset) - sched._remove_delays() + candidates.append(op) + sorted_candidates = sorted( + candidates, key=lambda candidate: candidate.latency + ) + # schedule the best candidate to current time + schedule.start_times[sorted_candidates[0].graph_id] = current_time + + self._handle_outputs(schedule, non_schedulable_ops) + schedule.remove_delays()