Skip to content
Snippets Groups Projects
schedule.py 42.9 KiB
Newer Older
"""
B-ASIC Schedule Module.
Contains the schedule class for scheduling operations in an SFG.
Oscar Gustafsson's avatar
Oscar Gustafsson committed
import io
import sys
from collections import defaultdict
from typing import Dict, List, Optional, Sequence, Tuple, Union, cast
Oscar Gustafsson's avatar
Oscar Gustafsson committed

import matplotlib.pyplot as plt
import numpy as np
from matplotlib.axes import Axes
from matplotlib.figure import Figure
from matplotlib.lines import Line2D
from matplotlib.patches import PathPatch, Polygon
from matplotlib.path import Path
Oscar Gustafsson's avatar
Oscar Gustafsson committed
from matplotlib.ticker import MaxNLocator
from b_asic import Signal
from b_asic._preferences import (
    EXECUTION_TIME_COLOR,
    LATENCY_COLOR,
    SIGNAL_COLOR,
    SIGNAL_LINEWIDTH,
from b_asic.graph_component import GraphID
from b_asic.operation import Operation
from b_asic.port import InputPort, OutputPort
from b_asic.process import MemoryVariable, OperatorProcess
from b_asic.resources import ProcessCollection
Oscar Gustafsson's avatar
Oscar Gustafsson committed
from b_asic.signal_flow_graph import SFG
Oscar Gustafsson's avatar
Oscar Gustafsson committed
from b_asic.special_operations import Delay, Input, Output
from b_asic.types import TypeName
Oscar Gustafsson's avatar
Oscar Gustafsson committed

# Need RGB from 0 to 1
_EXECUTION_TIME_COLOR: Union[
    Tuple[float, float, float], Tuple[float, float, float, float]
] = tuple(float(c / 255) for c in EXECUTION_TIME_COLOR)
_LATENCY_COLOR: Union[Tuple[float, float, float], Tuple[float, float, float, float]] = (
    tuple(float(c / 255) for c in LATENCY_COLOR)
)
_SIGNAL_COLOR: Union[Tuple[float, float, float], Tuple[float, float, float, float]] = (
    tuple(float(c / 255) for c in SIGNAL_COLOR)
)
def _laps_default():
    """Default value for _laps. Cannot use lambda."""
    return 0


def _y_locations_default():
    """Default value for _y_locations. Cannot use lambda."""
    return None


class Schedule:
    """
    Schedule of an SFG with scheduled Operations.

    Parameters
    ----------
    sfg : :class:`~b_asic.signal_flow_graph.SFG`
        The signal flow graph to schedule.
    schedule_time : int, optional
        The schedule time. If not provided, it will be determined by the scheduling
        algorithm.
    cyclic : bool, default: False
        If the schedule is cyclic.
    algorithm : {'ASAP', 'ALAP', 'provided'}, default: 'ASAP'
        The scheduling algorithm to use. The following algorithm are available:

        * ``'ASAP'``: As-soon-as-possible scheduling.
        * ``'ALAP'``: As-late-as-possible scheduling.

        If 'provided', use provided *start_times*  and *laps* dictionaries.
    start_times : dict, optional
        Dictionary with GraphIDs as keys and start times as values.
        Used when *algorithm* is 'provided'.
    laps : dict, optional
        Dictionary with GraphIDs as keys and laps as values.
        Used when *algorithm* is 'provided'.
    max_resources : dict, optional
        Dictionary like ``{Addition.type_name(): 2}`` denoting the maximum number of
        resources for a given operation type if the scheduling algorithm considers
        that. If not provided, or an operation type is not provided, at most one
        resource is used.
    _laps: Dict[GraphID, int]
    _y_locations: Dict[GraphID, Optional[int]]
Oscar Gustafsson's avatar
Oscar Gustafsson committed
    def __init__(
        self,
        sfg: SFG,
        schedule_time: Optional[int] = None,
        cyclic: bool = False,
        algorithm: str = "ASAP",
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        start_times: Optional[Dict[GraphID, int]] = None,
        laps: Optional[Dict[GraphID, int]] = None,
        max_resources: Optional[Dict[TypeName, int]] = None,
Oscar Gustafsson's avatar
Oscar Gustafsson committed
    ):
        """Construct a Schedule from an SFG."""
        if not isinstance(sfg, SFG):
            raise TypeError("An SFG must be provided")

        self._original_sfg = sfg()  # Make a copy
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        self._start_times = {}
        self._laps = defaultdict(_laps_default)
        self._y_locations = defaultdict(_y_locations_default)
        self._schedule_time = schedule_time
        if algorithm == "ASAP":
        elif algorithm == "ALAP":
            self._schedule_alap()
        elif algorithm == "provided":
Oscar Gustafsson's avatar
Oscar Gustafsson committed
            if start_times is None:
                raise ValueError("Must provide start_times when using 'provided'")
            if laps is None:
                raise ValueError("Must provide laps when using 'provided'")
            self._start_times = start_times
            self._laps.update(laps)
            self._remove_delays_no_laps()
            raise NotImplementedError(f"No algorithm with name: {algorithm} defined.")
Andreas Bolin's avatar
Andreas Bolin committed
        max_end_time = self.get_max_end_time()
        if schedule_time is None:
            self._schedule_time = max_end_time
        elif schedule_time < max_end_time:
Oscar Gustafsson's avatar
Oscar Gustafsson committed
            raise ValueError(f"Too short schedule time. Minimum is {max_end_time}.")
    def start_time_of_operation(self, graph_id: GraphID) -> int:
        Return the start time of the operation with the specified by *graph_id*.

        Parameters
        ----------
        graph_id : GraphID
            The graph id of the operation to get the start time for.
        if graph_id not in self._start_times:
Oscar Gustafsson's avatar
Oscar Gustafsson committed
            raise ValueError(f"No operation with graph_id {graph_id} in schedule")
        return self._start_times[graph_id]
Andreas Bolin's avatar
Andreas Bolin committed
    def get_max_end_time(self) -> int:
        """Return the current maximum end time among all operations."""
        max_end_time = 0
        for graph_id, op_start_time in self._start_times.items():
            operation = cast(Operation, self._sfg.find_by_id(graph_id))
            for outport in operation.outputs:
                max_end_time = max(
                    max_end_time,
                    op_start_time + cast(int, outport.latency_offset),
Oscar Gustafsson's avatar
Oscar Gustafsson committed
                )
        return max_end_time

    def forward_slack(self, graph_id: GraphID) -> int:
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        Return how much an operation can be moved forward in time.

        Parameters
        ----------
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        graph_id : GraphID
            The graph id of the operation.
        int
            The number of time steps the operation with *graph_id* can ba moved
            forward in time.
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        --------
        backward_slack
        slacks
        if graph_id not in self._start_times:
Oscar Gustafsson's avatar
Oscar Gustafsson committed
            raise ValueError(f"No operation with graph_id {graph_id} in schedule")
        output_slacks = self._forward_slacks(graph_id)
        return cast(
            int,
            min(
                sum(
                    (
                        list(signal_slacks.values())
                        for signal_slacks in output_slacks.values()
                    ),
Loading
Loading full blame...