Newer
Older
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Optional, cast
from b_asic.port import OutputPort
from b_asic.special_operations import Delay, Output
from b_asic.types import TypeName
if TYPE_CHECKING:
from b_asic.schedule import Schedule
class Scheduler(ABC):
@abstractmethod
def apply_scheduling(self, schedule: "Schedule") -> None:
"""Applies the scheduling algorithm on the given Schedule.
Parameters
----------
schedule : Schedule
Schedule to apply the scheduling algorithm on.
"""
def _handle_outputs(self, schedule, non_schedulable_ops) -> None:
for output in schedule.sfg.find_by_type_name(Output.type_name()):
output = cast(Output, output)
source_port = cast(OutputPort, output.inputs[0].signals[0].source)
if source_port.operation.graph_id in non_schedulable_ops:
schedule.start_times[output.graph_id] = 0
else:
if source_port.latency_offset is None:
raise ValueError(
f"Output port {source_port.index} of operation"
f" {source_port.operation.graph_id} has no"
" latency-offset."
)
schedule.start_times[output.graph_id] = schedule.start_times[
source_port.operation.graph_id
] + cast(int, source_port.latency_offset)
class ASAPScheduler(Scheduler):
"""Scheduler that implements the as-soon-as-possible (ASAP) algorithm."""
def apply_scheduling(self, schedule: "Schedule") -> None:
"""Applies the scheduling algorithm on the given Schedule.
Parameters
----------
schedule : Schedule
Schedule to apply the scheduling algorithm on.
"""
prec_list = schedule.sfg.get_precedence_list()
if len(prec_list) < 2:
raise ValueError("Empty signal flow graph cannot be scheduled.")
# handle the first set in precedence graph (input and delays)
non_schedulable_ops = set()
for outport in prec_list[0]:
operation = outport.operation
if operation.type_name() == Delay.type_name():
non_schedulable_ops.add(operation.graph_id)
else:
schedule.start_times[operation.graph_id] = 0
# handle second set in precedence graph (first operations)
for outport in prec_list[1]:
operation = outport.operation
schedule.start_times[operation.graph_id] = 0
# handle the remaining sets
for outports in prec_list[2:]:
for outport in outports:
operation = outport.operation
if operation.graph_id not in schedule.start_times:
op_start_time = 0
for current_input in operation.inputs:
if len(current_input.signals) != 1:
raise ValueError(
"Error in scheduling, dangling input port detected."
)
if current_input.signals[0].source is None:
raise ValueError(
"Error in scheduling, signal with no source detected."
)
source_port = current_input.signals[0].source
if source_port.operation.graph_id in non_schedulable_ops:
source_end_time = 0
else:
source_op_time = schedule.start_times[
source_port.operation.graph_id
]
if source_port.latency_offset is None:
raise ValueError(
f"Output port {source_port.index} of"
" operation"
f" {source_port.operation.graph_id} has no"
" latency-offset."
)
source_end_time = (
source_op_time + source_port.latency_offset
)
if current_input.latency_offset is None:
raise ValueError(
f"Input port {current_input.index} of operation"
f" {current_input.operation.graph_id} has no"
" latency-offset."
)
op_start_time_from_in = (
source_end_time - current_input.latency_offset
)
op_start_time = max(op_start_time, op_start_time_from_in)
schedule.start_times[operation.graph_id] = op_start_time
self._handle_outputs(schedule, non_schedulable_ops)
schedule.remove_delays()
class ALAPScheduler(Scheduler):
"""Scheduler that implements the as-late-as-possible (ALAP) algorithm."""
def apply_scheduling(self, schedule: "Schedule") -> None:
"""Applies the scheduling algorithm on the given Schedule.
Parameters
----------
schedule : Schedule
Schedule to apply the scheduling algorithm on.
"""
ASAPScheduler().apply_scheduling(schedule)
max_end_time = schedule.get_max_end_time()
if schedule.schedule_time is None:
schedule.set_schedule_time(max_end_time)
elif schedule.schedule_time < max_end_time:
raise ValueError(f"Too short schedule time. Minimum is {max_end_time}.")
# move all outputs ALAP before operations
for output in schedule.sfg.find_by_type_name(Output.type_name()):
schedule.move_operation_alap(output.graph_id)
for step in reversed(schedule.sfg.get_precedence_list()):
for outport in step:
if not isinstance(outport.operation, Delay):
schedule.move_operation_alap(outport.operation.graph_id)
class EarliestDeadlineScheduler(Scheduler):
"""
Scheduler that implements the earliest-deadline-first algorithm.
Parameters
----------
max_resources : dict, optional
Dictionary like ``{Addition.type_name(): 2}`` denoting the maximum number of
resources for a given operation type if the scheduling algorithm considers
that. If not provided, or an operation type is not provided, at most one
resource is used.
"""
def __init__(self, max_resources: Optional[dict[TypeName, int]]) -> None:
self._max_resources = max_resources
def apply_scheduling(self, schedule: "Schedule") -> None:
"""Applies the scheduling algorithm on the given Schedule.
Parameters
----------
schedule : Schedule
Schedule to apply the scheduling algorithm on.
"""
# ACT BASED ON THE NUMBER OF PEs!
prec_list = schedule.sfg.get_precedence_list()
if len(prec_list) < 2:
raise ValueError("Empty signal flow graph cannot be scheduled.")
# handle the first set in precedence graph (input and delays)
non_schedulable_ops = set()
for outport in prec_list[0]:
operation = outport.operation
if operation.type_name() == Delay.type_name():
non_schedulable_ops.add(operation.graph_id)
elif operation.graph_id not in schedule.start_times:
schedule.start_times[operation.graph_id] = 0
current_time = 0
sorted_outports = sorted(
prec_list[1], key=lambda outport: outport.operation.latency
)
for outport in sorted_outports:
op = outport.operation
schedule.start_times[op.graph_id] = current_time
current_time += 1
for outports in prec_list[2:]:
current_time -= 1
for outport in outports:
op = outport.operation
candidates = []
while not candidates:
current_time += 1
for op_input in op.inputs:
source_op_time = schedule.start_times[source_op.graph_id]
source_end_time = source_op_time + source_op.latency
if source_end_time > current_time:
op_is_ready_to_be_scheduled = False
if op_is_ready_to_be_scheduled:
candidates.append(op)
sorted_candidates = sorted(
candidates, key=lambda candidate: candidate.latency
)
# schedule the best candidate to current time
schedule.start_times[sorted_candidates[0].graph_id] = current_time
self._handle_outputs(schedule, non_schedulable_ops)
schedule.remove_delays()