Skip to content
Snippets Groups Projects
schedule.py 43.4 KiB
Newer Older
"""
B-ASIC Schedule Module.
Contains the schedule class for scheduling operations in an SFG.
Oscar Gustafsson's avatar
Oscar Gustafsson committed
import io
import sys
from collections import defaultdict
from typing import Dict, List, Optional, Sequence, Tuple, Union, cast
Oscar Gustafsson's avatar
Oscar Gustafsson committed

import matplotlib.pyplot as plt
import numpy as np
from matplotlib.axes import Axes
from matplotlib.figure import Figure
from matplotlib.lines import Line2D
from matplotlib.patches import PathPatch, Polygon
from matplotlib.path import Path
Oscar Gustafsson's avatar
Oscar Gustafsson committed
from matplotlib.ticker import MaxNLocator
from b_asic import Signal
from b_asic._preferences import (
    EXECUTION_TIME_COLOR,
    LATENCY_COLOR,
    SIGNAL_COLOR,
    SIGNAL_LINEWIDTH,
from b_asic.graph_component import GraphID
from b_asic.operation import Operation
from b_asic.port import InputPort, OutputPort
from b_asic.process import MemoryVariable, OperatorProcess
from b_asic.resources import ProcessCollection
Oscar Gustafsson's avatar
Oscar Gustafsson committed
from b_asic.signal_flow_graph import SFG
Oscar Gustafsson's avatar
Oscar Gustafsson committed
from b_asic.special_operations import Delay, Input, Output
from b_asic.types import TypeName
Oscar Gustafsson's avatar
Oscar Gustafsson committed

# Need RGB from 0 to 1
_EXECUTION_TIME_COLOR: Union[
    Tuple[float, float, float], Tuple[float, float, float, float]
] = tuple(float(c / 255) for c in EXECUTION_TIME_COLOR)
_LATENCY_COLOR: Union[Tuple[float, float, float], Tuple[float, float, float, float]] = (
    tuple(float(c / 255) for c in LATENCY_COLOR)
)
_SIGNAL_COLOR: Union[Tuple[float, float, float], Tuple[float, float, float, float]] = (
    tuple(float(c / 255) for c in SIGNAL_COLOR)
)
def _laps_default():
    """Default value for _laps. Cannot use lambda."""
    return 0


def _y_locations_default():
    """Default value for _y_locations. Cannot use lambda."""
    return None


class Schedule:
    """
    Schedule of an SFG with scheduled Operations.

    Parameters
    ----------
    sfg : :class:`~b_asic.signal_flow_graph.SFG`
        The signal flow graph to schedule.
    schedule_time : int, optional
        The schedule time. If not provided, it will be determined by the scheduling
        algorithm.
    cyclic : bool, default: False
        If the schedule is cyclic.
    algorithm : {'ASAP', 'ALAP', 'provided'}, default: 'ASAP'
        The scheduling algorithm to use. The following algorithm are available:

        * ``'ASAP'``: As-soon-as-possible scheduling.
        * ``'ALAP'``: As-late-as-possible scheduling.

        If 'provided', use provided *start_times*  and *laps* dictionaries.
    start_times : dict, optional
        Dictionary with GraphIDs as keys and start times as values.
        Used when *algorithm* is 'provided'.
    laps : dict, optional
        Dictionary with GraphIDs as keys and laps as values.
        Used when *algorithm* is 'provided'.
    max_resources : dict, optional
        Dictionary like ``{Addition.type_name(): 2}`` denoting the maximum number of
        resources for a given operation type if the scheduling algorithm considers
        that. If not provided, or an operation type is not provided, at most one
        resource is used.
    _laps: Dict[GraphID, int]
    _y_locations: Dict[GraphID, Optional[int]]
Oscar Gustafsson's avatar
Oscar Gustafsson committed
    def __init__(
        self,
        sfg: SFG,
        schedule_time: Optional[int] = None,
        cyclic: bool = False,
        algorithm: str = "ASAP",
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        start_times: Optional[Dict[GraphID, int]] = None,
        laps: Optional[Dict[GraphID, int]] = None,
        max_resources: Optional[Dict[TypeName, int]] = None,
Oscar Gustafsson's avatar
Oscar Gustafsson committed
    ):
        """Construct a Schedule from an SFG."""
        if not isinstance(sfg, SFG):
            raise TypeError("An SFG must be provided")

Oscar Gustafsson's avatar
Oscar Gustafsson committed
        self._start_times = {}
        self._laps = defaultdict(_laps_default)
        self._y_locations = defaultdict(_y_locations_default)
        self._schedule_time = schedule_time
        if algorithm == "ASAP":
        elif algorithm == "ALAP":
            self._schedule_alap()
        elif algorithm == "provided":
Oscar Gustafsson's avatar
Oscar Gustafsson committed
            if start_times is None:
                raise ValueError("Must provide start_times when using 'provided'")
            if laps is None:
                raise ValueError("Must provide laps when using 'provided'")
            self._start_times = start_times
            self._laps.update(laps)
            self._remove_delays_no_laps()
            raise NotImplementedError(f"No algorithm with name: {algorithm} defined.")
Andreas Bolin's avatar
Andreas Bolin committed
        max_end_time = self.get_max_end_time()
        if schedule_time is None:
            self._schedule_time = max_end_time
        elif schedule_time < max_end_time:
Oscar Gustafsson's avatar
Oscar Gustafsson committed
            raise ValueError(f"Too short schedule time. Minimum is {max_end_time}.")
    def start_time_of_operation(self, graph_id: GraphID) -> int:
        Return the start time of the operation with the specified by *graph_id*.

        Parameters
        ----------
        graph_id : GraphID
            The graph id of the operation to get the start time for.
        if graph_id not in self._start_times:
            raise ValueError(f"No operation with graph_id {graph_id!r} in schedule")
        return self._start_times[graph_id]
Andreas Bolin's avatar
Andreas Bolin committed
    def get_max_end_time(self) -> int:
        """Return the current maximum end time among all operations."""
        max_end_time = 0
        for graph_id, op_start_time in self._start_times.items():
            operation = cast(Operation, self._sfg.find_by_id(graph_id))
            for outport in operation.outputs:
                max_end_time = max(
                    max_end_time,
                    op_start_time + cast(int, outport.latency_offset),
Oscar Gustafsson's avatar
Oscar Gustafsson committed
                )
        return max_end_time

    def forward_slack(self, graph_id: GraphID) -> int:
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        Return how much an operation can be moved forward in time.

        Parameters
        ----------
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        graph_id : GraphID
            The graph id of the operation.
            The number of time steps the operation with *graph_id* can be moved
            forward in time.
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        --------
        backward_slack
        slacks
        if graph_id not in self._start_times:
            raise ValueError(f"No operation with graph_id {graph_id!r} in schedule")
        output_slacks = self._forward_slacks(graph_id)
        return cast(
            int,
            min(
                sum(
                    (
                        list(signal_slacks.values())
                        for signal_slacks in output_slacks.values()
                    ),
                    [sys.maxsize],
Oscar Gustafsson's avatar
Oscar Gustafsson committed
    def _forward_slacks(
Oscar Gustafsson's avatar
Oscar Gustafsson committed
    ) -> Dict["OutputPort", Dict["Signal", int]]:
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        ret = {}
        start_time = self._start_times[graph_id]
        operation = cast(Operation, self._sfg.find_by_id(graph_id))
        for output_port in operation.outputs:
            ret[output_port] = self._output_slacks(output_port, start_time)
    def _output_slacks(
        self, output_port: "OutputPort", start_time: Optional[int] = None
    ) -> Dict[Signal, int]:
        if start_time is None:
            start_time = self._start_times[output_port.operation.graph_id]
        output_slacks = {}
        available_time = start_time + cast(int, output_port.latency_offset)
        if available_time > self._schedule_time:
            available_time -= self._schedule_time
        for signal in output_port.signals:
            destination = cast(InputPort, signal.destination)
            usage_time = (
                cast(int, destination.latency_offset)
                + self._start_times[destination.operation.graph_id]
                + self._schedule_time * self._laps[signal.graph_id]
            )
            output_slacks[signal] = usage_time - available_time
        return output_slacks

    def backward_slack(self, graph_id: GraphID) -> int:
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        Return how much an operation can be moved backward in time.

        Parameters
        ----------
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        graph_id : GraphID
            The graph id of the operation.
            The number of time steps the operation with *graph_id* can be moved
            backward in time.
        .. note:: The backward slack is positive, but a call to
            :func:`move_operation` should be negative to move the operation
            backward.
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        --------
        forward_slack
        slacks
        if graph_id not in self._start_times:
            raise ValueError(f"No operation with graph_id {graph_id!r} in schedule")
        input_slacks = self._backward_slacks(graph_id)
        return cast(
            int,
            min(
                sum(
                    (
                        list(signal_slacks.values())
                        for signal_slacks in input_slacks.values()
                    ),
                    [sys.maxsize],
                )
            ),
Oscar Gustafsson's avatar
Oscar Gustafsson committed
    def _backward_slacks(self, graph_id: GraphID) -> Dict[InputPort, Dict[Signal, int]]:
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        ret = {}
        start_time = self._start_times[graph_id]
        operation = cast(Operation, self._sfg.find_by_id(graph_id))
        for input_port in operation.inputs:
            ret[input_port] = self._input_slacks(input_port, start_time)
    def _input_slacks(
        self, input_port: InputPort, start_time: Optional[int] = None
    ) -> Dict[Signal, int]:
        if start_time is None:
            start_time = self._start_times[input_port.operation.graph_id]
        input_slacks = {}
        usage_time = start_time + cast(int, input_port.latency_offset)
        for signal in input_port.signals:
            source = cast(OutputPort, signal.source)
            available_time = (
                cast(int, source.latency_offset)
                + self._start_times[source.operation.graph_id]
                - self._schedule_time * self._laps[signal.graph_id]
            )
            if available_time > self._schedule_time:
                available_time -= self._schedule_time
            input_slacks[signal] = usage_time - available_time
        return input_slacks

    def slacks(self, graph_id: GraphID) -> Tuple[int, int]:
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        """
        Return the backward and forward slacks of operation *graph_id*.

        That is, how much the operation can be moved backward and forward in time.
Oscar Gustafsson's avatar
Oscar Gustafsson committed

        Parameters
        ----------
        graph_id : GraphID
            The graph id of the operation.

        Returns
        -------
        tuple(int, int)
            The backward and forward slacks, respectively.
        .. note:: The backward slack is positive, but a call to :func:`move_operation`
            should be negative to move the operation backward.
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        --------
        backward_slack
        forward_slack
        """
        if graph_id not in self._start_times:
            raise ValueError(f"No operation with graph_id {graph_id!r} in schedule")
        return self.backward_slack(graph_id), self.forward_slack(graph_id)
    def print_slacks(self, order: int = 0) -> None:
        """
        Print the slack times for all operations in the schedule.

        Parameters
        ----------
        order : int, default: 0
            Sorting order.

            * 0: alphabetical on Graph ID
            * 1: backward slack
            * 2: forward slack
        """
        res: List[Tuple[GraphID, int, int]] = [
Oscar Gustafsson's avatar
Oscar Gustafsson committed
            (
                op.graph_id,
                cast(int, self.backward_slack(op.graph_id)),
Oscar Gustafsson's avatar
Oscar Gustafsson committed
                self.forward_slack(op.graph_id),
            )
            for op in self._sfg.operations
        ]
        res.sort(key=lambda tup: tup[order])
        res_str = [
Oscar Gustafsson's avatar
Oscar Gustafsson committed
            (
                r[0],
                f"{r[1]}".rjust(8) if r[1] < sys.maxsize else "oo".rjust(8),
                f"{r[2]}".rjust(8) if r[2] < sys.maxsize else "oo".rjust(8),
            )
            for r in res
        ]
        print("Graph ID | Backward |  Forward")
        print("---------|----------|---------")
        for r in res_str:
Oscar Gustafsson's avatar
Oscar Gustafsson committed
            print(f"{r[0]:8} | {r[1]} | {r[2]}")
    def set_schedule_time(self, time: int) -> "Schedule":
        """
        Set a new schedule time.

        Parameters
        ----------
        time : int
            The new schedule time. If it is too short, a ValueError will be raised.

        --------
        get_max_time
        """
        max_end_time = self.get_max_end_time()
        if time < max_end_time:
Oscar Gustafsson's avatar
Oscar Gustafsson committed
            raise ValueError(
                f"New schedule time ({time}) too short, minimum: {max_end_time}."
Oscar Gustafsson's avatar
Oscar Gustafsson committed
            )
        self._schedule_time = time
        return self

    def swap_io_of_operation(self, operation_id: GraphID) -> None:
        """
        Swap the inputs (and outputs) of operation.

        Parameters
        ----------
        operation_id : GraphID
            The GraphID of the operation to swap.
        """
        self._sfg.swap_io_of_operation(operation_id)

Andreas Bolin's avatar
Andreas Bolin committed
    @property
    def sfg(self) -> SFG:
        """The SFG corresponding to the current schedule."""
        reconstructed_sfg = self._reintroduce_delays()
        simplified_sfg = reconstructed_sfg.simplify_delay_element_placement()
        return simplified_sfg
Oscar Gustafsson's avatar
Oscar Gustafsson committed

Andreas Bolin's avatar
Andreas Bolin committed
    @property
Oscar Gustafsson's avatar
Oscar Gustafsson committed
    def start_times(self) -> Dict[GraphID, int]:
        """The start times of the operations in the schedule."""
Andreas Bolin's avatar
Andreas Bolin committed
        return self._start_times
Oscar Gustafsson's avatar
Oscar Gustafsson committed

Andreas Bolin's avatar
Andreas Bolin committed
    @property
Oscar Gustafsson's avatar
Oscar Gustafsson committed
    def laps(self) -> Dict[GraphID, int]:
        The number of laps for the start times of the operations in the schedule.
Andreas Bolin's avatar
Andreas Bolin committed
        return self._laps
Oscar Gustafsson's avatar
Oscar Gustafsson committed

    @property
    def schedule_time(self) -> int:
        """The schedule time of the current schedule."""
        return self._schedule_time
Oscar Gustafsson's avatar
Oscar Gustafsson committed

Andreas Bolin's avatar
Andreas Bolin committed
    @property
    def cyclic(self) -> bool:
        """If the current schedule is cyclic."""
Andreas Bolin's avatar
Andreas Bolin committed
        return self._cyclic
Oscar Gustafsson's avatar
Oscar Gustafsson committed

    def edit(self, inplace=False) -> "Schedule":
        """
        Edit schedule in GUI and return new schedule.

        Parameters
        ----------
        inplace : bool, default: False
            If True, replace the current schedule.
        """
        from b_asic.scheduler_gui.main_window import start_scheduler

        new_schedule = start_scheduler(self)
        if inplace:
            self._start_times = new_schedule._start_times
            self._laps = new_schedule._laps
            self._schedule_time = new_schedule._schedule_time
            self._y_locations = new_schedule._y_locations
        return new_schedule
    def increase_time_resolution(self, factor: int) -> "Schedule":
Loading
Loading full blame...