Skip to content
Snippets Groups Projects
scheduler.py 10.9 KiB
Newer Older
  • Learn to ignore specific revisions
  • from abc import ABC, abstractmethod
    
    from collections import defaultdict
    
    from typing import TYPE_CHECKING, Optional, cast
    
    Simon Bjurek's avatar
    Simon Bjurek committed
    
    from b_asic.port import OutputPort
    
    from b_asic.special_operations import Delay, Input, Output
    
    from b_asic.types import TypeName
    
    Simon Bjurek's avatar
    Simon Bjurek committed
    
    if TYPE_CHECKING:
        from b_asic.schedule import Schedule
    
    
    
    class Scheduler(ABC):
        @abstractmethod
        def apply_scheduling(self, schedule: "Schedule") -> None:
    
            """Applies the scheduling algorithm on the given Schedule.
    
            Parameters
            ----------
            schedule : Schedule
                Schedule to apply the scheduling algorithm on.
            """
    
        def _handle_outputs(self, schedule, non_schedulable_ops=set()) -> None:
    
            for output in schedule.sfg.find_by_type_name(Output.type_name()):
                output = cast(Output, output)
                source_port = cast(OutputPort, output.inputs[0].signals[0].source)
                if source_port.operation.graph_id in non_schedulable_ops:
                    schedule.start_times[output.graph_id] = 0
                else:
                    if source_port.latency_offset is None:
                        raise ValueError(
                            f"Output port {source_port.index} of operation"
                            f" {source_port.operation.graph_id} has no"
                            " latency-offset."
                        )
                    schedule.start_times[output.graph_id] = schedule.start_times[
                        source_port.operation.graph_id
                    ] + cast(int, source_port.latency_offset)
    
    
    class ASAPScheduler(Scheduler):
    
        """Scheduler that implements the as-soon-as-possible (ASAP) algorithm."""
    
    
        def apply_scheduling(self, schedule: "Schedule") -> None:
    
            """Applies the scheduling algorithm on the given Schedule.
    
            Parameters
            ----------
            schedule : Schedule
                Schedule to apply the scheduling algorithm on.
            """
    
            prec_list = schedule.sfg.get_precedence_list()
    
    Simon Bjurek's avatar
    Simon Bjurek committed
            if len(prec_list) < 2:
                raise ValueError("Empty signal flow graph cannot be scheduled.")
    
            # handle the first set in precedence graph (input and delays)
            non_schedulable_ops = set()
            for outport in prec_list[0]:
                operation = outport.operation
                if operation.type_name() == Delay.type_name():
                    non_schedulable_ops.add(operation.graph_id)
                else:
    
                    schedule.start_times[operation.graph_id] = 0
    
    Simon Bjurek's avatar
    Simon Bjurek committed
    
            # handle second set in precedence graph (first operations)
            for outport in prec_list[1]:
                operation = outport.operation
    
                schedule.start_times[operation.graph_id] = 0
    
    Simon Bjurek's avatar
    Simon Bjurek committed
    
            # handle the remaining sets
            for outports in prec_list[2:]:
                for outport in outports:
                    operation = outport.operation
    
                    if operation.graph_id not in schedule.start_times:
    
    Simon Bjurek's avatar
    Simon Bjurek committed
                        op_start_time = 0
                        for current_input in operation.inputs:
                            if len(current_input.signals) != 1:
                                raise ValueError(
                                    "Error in scheduling, dangling input port detected."
                                )
                            if current_input.signals[0].source is None:
                                raise ValueError(
                                    "Error in scheduling, signal with no source detected."
                                )
                            source_port = current_input.signals[0].source
    
                            if source_port.operation.graph_id in non_schedulable_ops:
                                source_end_time = 0
                            else:
    
                                source_op_time = schedule.start_times[
    
    Simon Bjurek's avatar
    Simon Bjurek committed
                                    source_port.operation.graph_id
                                ]
    
                                if source_port.latency_offset is None:
                                    raise ValueError(
                                        f"Output port {source_port.index} of"
                                        " operation"
                                        f" {source_port.operation.graph_id} has no"
                                        " latency-offset."
                                    )
    
                                source_end_time = (
                                    source_op_time + source_port.latency_offset
                                )
    
                            if current_input.latency_offset is None:
                                raise ValueError(
                                    f"Input port {current_input.index} of operation"
                                    f" {current_input.operation.graph_id} has no"
                                    " latency-offset."
                                )
                            op_start_time_from_in = (
                                source_end_time - current_input.latency_offset
                            )
                            op_start_time = max(op_start_time, op_start_time_from_in)
    
    
                        schedule.start_times[operation.graph_id] = op_start_time
    
            self._handle_outputs(schedule, non_schedulable_ops)
            schedule.remove_delays()
    
    class ALAPScheduler(Scheduler):
    
        """Scheduler that implements the as-late-as-possible (ALAP) algorithm."""
    
    
        def apply_scheduling(self, schedule: "Schedule") -> None:
    
            """Applies the scheduling algorithm on the given Schedule.
    
            Parameters
            ----------
            schedule : Schedule
                Schedule to apply the scheduling algorithm on.
            """
    
            ASAPScheduler().apply_scheduling(schedule)
            max_end_time = schedule.get_max_end_time()
    
            if schedule.schedule_time is None:
                schedule.set_schedule_time(max_end_time)
            elif schedule.schedule_time < max_end_time:
    
    Simon Bjurek's avatar
    Simon Bjurek committed
                raise ValueError(f"Too short schedule time. Minimum is {max_end_time}.")
    
            # move all outputs ALAP before operations
    
            for output in schedule.sfg.find_by_type_name(Output.type_name()):
    
    Simon Bjurek's avatar
    Simon Bjurek committed
                output = cast(Output, output)
    
                schedule.move_operation_alap(output.graph_id)
    
    Simon Bjurek's avatar
    Simon Bjurek committed
    
            # move all operations ALAP
    
            for step in reversed(schedule.sfg.get_precedence_list()):
    
    Simon Bjurek's avatar
    Simon Bjurek committed
                for outport in step:
                    if not isinstance(outport.operation, Delay):
    
                        schedule.move_operation_alap(outport.operation.graph_id)
    
    class EarliestDeadlineScheduler(Scheduler):
    
        """
        Scheduler that implements the earliest-deadline-first algorithm.
    
        Parameters
        ----------
        max_resources : dict, optional
            Dictionary like ``{Addition.type_name(): 2}`` denoting the maximum number of
            resources for a given operation type if the scheduling algorithm considers
            that. If not provided, or an operation type is not provided, at most one
            resource is used.
        """
    
    
        def __init__(self, max_resources: Optional[dict[TypeName, int]] = None) -> None:
            if max_resources:
                self._max_resources = max_resources
            else:
                self._max_resources = {}
    
        def apply_scheduling(self, schedule: "Schedule") -> None:
    
            """Applies the scheduling algorithm on the given Schedule.
    
            Parameters
            ----------
            schedule : Schedule
                Schedule to apply the scheduling algorithm on.
            """
    
            # TODO: Solve bug where operation (ADD) is scheduled before proceeding (MUL) -> pipelining
    
            ALAPScheduler().apply_scheduling(schedule)
    
            # move all inputs ASAP to ensure correct operation
            for input_op in schedule.sfg.find_by_type_name(Input.type_name()):
                input_op = cast(Input, input_op)
                schedule.move_operation_asap(input_op.graph_id)
    
    
            # construct the set of remaining operations, excluding inputs
    
            remaining_ops = set(schedule.start_times.keys())
            remaining_ops = {elem for elem in remaining_ops if not elem.startswith("in")}
    
    
            # construct a dictionarry for storing how many times until a resource is available again
            used_resources_ready_times = {}
    
            # iterate through all remaining operations and schedule them
            # while not exceeding the available resources
    
            remaining_resources = self._max_resources.copy()
    
    Simon Bjurek's avatar
    Simon Bjurek committed
            current_time = 0
    
            while remaining_ops:
                best_candidate = self._find_best_candidate(
                    schedule, remaining_ops, remaining_resources, current_time
                )
    
                if not best_candidate:
                    current_time += 1
    
    
                    # update available operators
                    for operation, ready_time in used_resources_ready_times.items():
                        print(ready_time, current_time)
                        if ready_time == current_time:
                            remaining_resources[operation.type_name()] += 1
                    # remaining_resources = self._max_resources.copy()
    
                # if the resource is constrained, update remaining resources
    
                if best_candidate.type_name() in remaining_resources:
                    remaining_resources[best_candidate.type_name()] -= 1
    
                    used_resources_ready_times[best_candidate] = (
                        current_time + best_candidate.execution_time
                    )
    
                # schedule the best candidate to the current time
    
                remaining_ops.remove(best_candidate.graph_id)
                schedule.start_times[best_candidate.graph_id] = current_time
    
            # move all inputs and outputs ALAP now that operations have moved
            for input_op in schedule.sfg.find_by_type_name(Input.type_name()):
                input_op = cast(Input, input_op)
                schedule.move_operation_alap(input_op.graph_id)
            self._handle_outputs(schedule)
    
        @staticmethod
        def _find_best_candidate(
            schedule, remaining_ops, remaining_resources, current_time
        ):
            sfg = schedule.sfg
            source_end_times = defaultdict(float)
    
    
            # find the best candidate
            best_candidate = None
            best_deadline = float('inf')
    
            for op_id in remaining_ops:
                operation = sfg.find_by_id(op_id)
    
    
                # compute maximum end times of preceding operations
    
                for op_input in operation.inputs:
                    source_op = op_input.signals[0].source.operation
                    if not isinstance(source_op, Delay):
                        source_end_times[op_id] = max(
                            source_end_times[op_id],
                            schedule.start_times[source_op.graph_id] + source_op.latency,
                        )
    
                # check resource constraints
                if operation.type_name() in remaining_resources:
                    if remaining_resources[operation.type_name()] == 0:
                        continue
    
                # check if all inputs are available
                if source_end_times[op_id] <= current_time:
                    operation_deadline = schedule.start_times[op_id] + operation.latency
                    if operation_deadline < best_deadline:
                        best_candidate = operation
                        best_deadline = operation_deadline
    
            return best_candidate