Skip to content
Snippets Groups Projects
schedule.py 37.4 KiB
Newer Older
"""
B-ASIC Schedule Module.
Contains the schedule class for scheduling operations in an SFG.
Oscar Gustafsson's avatar
Oscar Gustafsson committed
import io
import sys
from collections import defaultdict
Oscar Gustafsson's avatar
Oscar Gustafsson committed
from typing import Dict, List, Optional, Sequence, Tuple, cast
Oscar Gustafsson's avatar
Oscar Gustafsson committed

import matplotlib.pyplot as plt
import numpy as np
from matplotlib.axes import Axes
from matplotlib.figure import Figure
from matplotlib.lines import Line2D
from matplotlib.patches import PathPatch, Polygon
from matplotlib.path import Path
Oscar Gustafsson's avatar
Oscar Gustafsson committed
from matplotlib.ticker import MaxNLocator
from b_asic import Signal
from b_asic._preferences import (
    EXECUTION_TIME_COLOR,
    LATENCY_COLOR,
    SIGNAL_COLOR,
    SIGNAL_LINEWIDTH,
from b_asic.graph_component import GraphID
from b_asic.operation import Operation
from b_asic.port import InputPort, OutputPort
from b_asic.process import MemoryVariable, OperatorProcess
from b_asic.resources import ProcessCollection
Oscar Gustafsson's avatar
Oscar Gustafsson committed
from b_asic.signal_flow_graph import SFG
Oscar Gustafsson's avatar
Oscar Gustafsson committed
from b_asic.special_operations import Delay, Input, Output
Oscar Gustafsson's avatar
Oscar Gustafsson committed

# Need RGB from 0 to 1
_EXECUTION_TIME_COLOR = tuple(c / 255 for c in EXECUTION_TIME_COLOR)
_LATENCY_COLOR = tuple(c / 255 for c in LATENCY_COLOR)
_SIGNAL_COLOR = tuple(c / 255 for c in SIGNAL_COLOR)


def _laps_default():
    """Default value for _laps. Cannot use lambda."""
    return 0


def _y_locations_default():
    """Default value for _y_locations. Cannot use lambda."""
    return None


class Schedule:
    """
    Schedule of an SFG with scheduled Operations.

    Parameters
    ----------
    sfg : :class:`~b_asic.signal_flow_graph.SFG`
        The signal flow graph to schedule.
    schedule_time : int, optional
        The schedule time. If not provided, it will be determined by the scheduling
        algorithm.
    cyclic : bool, default: False
        If the schedule is cyclic.
    scheduling_algorithm : {'ASAP', 'provided'}, optional
        The scheduling algorithm to use. Currently, only "ASAP" is supported.
        If 'provided', use provided *start_times*  and *laps* dictionaries.
    start_times : dict, optional
        Dictionary with GraphIDs as keys and start times as values.
        Used when *scheduling_algorithm* is 'provided'.
    laps : dict, optional
        Dictionary with GraphIDs as keys and laps as values.
        Used when *scheduling_algorithm* is 'provided'.
    _laps: Dict[GraphID, int]
    _y_locations: Dict[GraphID, Optional[int]]
Oscar Gustafsson's avatar
Oscar Gustafsson committed
    def __init__(
        self,
        sfg: SFG,
        schedule_time: Optional[int] = None,
        cyclic: bool = False,
        scheduling_algorithm: str = "ASAP",
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        start_times: Optional[Dict[GraphID, int]] = None,
        laps: Optional[Dict[GraphID, int]] = None,
Oscar Gustafsson's avatar
Oscar Gustafsson committed
    ):
        """Construct a Schedule from an SFG."""
        if not isinstance(sfg, SFG):
            raise TypeError("An SFG must be provided")

        self._original_sfg = sfg()  # Make a copy
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        self._start_times = {}
        self._laps = defaultdict(_laps_default)
        self._y_locations = defaultdict(_y_locations_default)
        if scheduling_algorithm == "ASAP":
        elif scheduling_algorithm == "provided":
Oscar Gustafsson's avatar
Oscar Gustafsson committed
            if start_times is None:
                raise ValueError("Must provide start_times when using 'provided'")
            if laps is None:
                raise ValueError("Must provide laps when using 'provided'")
            self._start_times = start_times
            self._laps.update(laps)
            self._remove_delays_no_laps()
                f"No algorithm with name: {scheduling_algorithm} defined."
Oscar Gustafsson's avatar
Oscar Gustafsson committed
            )
Andreas Bolin's avatar
Andreas Bolin committed
        max_end_time = self.get_max_end_time()
        if schedule_time is None:
            self._schedule_time = max_end_time
        elif schedule_time < max_end_time:
Oscar Gustafsson's avatar
Oscar Gustafsson committed
            raise ValueError(f"Too short schedule time. Minimum is {max_end_time}.")
        else:
            self._schedule_time = schedule_time
    def start_time_of_operation(self, graph_id: GraphID) -> int:
        Return the start time of the operation with the specified by *graph_id*.
        if graph_id not in self._start_times:
Oscar Gustafsson's avatar
Oscar Gustafsson committed
            raise ValueError(f"No operation with graph_id {graph_id} in schedule")
        return self._start_times[graph_id]
Andreas Bolin's avatar
Andreas Bolin committed
    def get_max_end_time(self) -> int:
        """Return the current maximum end time among all operations."""
        max_end_time = 0
        for graph_id, op_start_time in self._start_times.items():
            operation = cast(Operation, self._sfg.find_by_id(graph_id))
            for outport in operation.outputs:
                max_end_time = max(
                    max_end_time,
                    op_start_time + cast(int, outport.latency_offset),
Oscar Gustafsson's avatar
Oscar Gustafsson committed
                )
        return max_end_time

    def forward_slack(self, graph_id: GraphID) -> int:
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        Return how much an operation can be moved forward in time.

        Parameters
        ----------
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        graph_id : GraphID
            The graph id of the operation.
        The number of time steps the operation with *graph_id* can ba moved
        forward in time.
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        --------
        backward_slack
        slacks
        if graph_id not in self._start_times:
Oscar Gustafsson's avatar
Oscar Gustafsson committed
            raise ValueError(f"No operation with graph_id {graph_id} in schedule")
        slack = sys.maxsize
        output_slacks = self._forward_slacks(graph_id)
        # Make more pythonic
        for signal_slacks in output_slacks.values():
            for signal_slack in signal_slacks.values():
                slack = min(slack, signal_slack)
        return slack

Oscar Gustafsson's avatar
Oscar Gustafsson committed
    def _forward_slacks(
Oscar Gustafsson's avatar
Oscar Gustafsson committed
    ) -> Dict["OutputPort", Dict["Signal", int]]:
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        ret = {}
        start_time = self._start_times[graph_id]
        operation = cast(Operation, self._sfg.find_by_id(graph_id))
        for output_port in operation.outputs:
            ret[output_port] = self._output_slacks(output_port, start_time)
    def _output_slacks(
        self, output_port: "OutputPort", start_time: Optional[int] = None
    ) -> Dict[Signal, int]:
        if start_time is None:
            start_time = self._start_times[output_port.operation.graph_id]
        output_slacks = {}
        available_time = start_time + cast(int, output_port.latency_offset)
        if available_time > self._schedule_time:
            available_time -= self._schedule_time
        for signal in output_port.signals:
            destination = cast(InputPort, signal.destination)
            usage_time = (
                cast(int, destination.latency_offset)
                + self._start_times[destination.operation.graph_id]
                + self._schedule_time * self._laps[signal.graph_id]
            )
            output_slacks[signal] = usage_time - available_time
        return output_slacks

    def backward_slack(self, graph_id: GraphID) -> int:
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        Return how much an operation can be moved backward in time.

        Parameters
        ----------
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        graph_id : GraphID
            The graph id of the operation.
        The number of time steps the operation with *graph_id* can ba moved
            backward in time.
        .. note:: The backward slack is positive, but a call to :func:`move_operation`
            should be negative to move the operation backward.
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        --------
        forward_slack
        slacks
        if graph_id not in self._start_times:
Oscar Gustafsson's avatar
Oscar Gustafsson committed
            raise ValueError(f"No operation with graph_id {graph_id} in schedule")
        slack = sys.maxsize
        input_slacks = self._backward_slacks(graph_id)
        # Make more pythonic
        for signal_slacks in input_slacks.values():
            for signal_slack in signal_slacks.values():
                slack = min(slack, signal_slack)
        return slack

Oscar Gustafsson's avatar
Oscar Gustafsson committed
    def _backward_slacks(self, graph_id: GraphID) -> Dict[InputPort, Dict[Signal, int]]:
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        ret = {}
        start_time = self._start_times[graph_id]
        operation = cast(Operation, self._sfg.find_by_id(graph_id))
        for input_port in operation.inputs:
            ret[input_port] = self._input_slacks(input_port, start_time)
    def _input_slacks(
        self, input_port: InputPort, start_time: Optional[int] = None
    ) -> Dict[Signal, int]:
        if start_time is None:
            start_time = self._start_times[input_port.operation.graph_id]
        input_slacks = {}
        usage_time = start_time + cast(int, input_port.latency_offset)
        for signal in input_port.signals:
            source = cast(OutputPort, signal.source)
            available_time = (
                cast(int, source.latency_offset)
                + self._start_times[source.operation.graph_id]
                - self._schedule_time * self._laps[signal.graph_id]
            )
            if available_time > self._schedule_time:
                available_time -= self._schedule_time
            input_slacks[signal] = usage_time - available_time
        return input_slacks

    def slacks(self, graph_id: GraphID) -> Tuple[int, int]:
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        """
        Return the backward and forward slacks of operation *graph_id*. That is, how
        much the operation can be moved backward and forward in time.
Oscar Gustafsson's avatar
Oscar Gustafsson committed

        Parameters
        ----------
        graph_id : GraphID
            The graph id of the operation.

        Returns
        -------
        A tuple as ``(backward_slack, forward_slack)``.
        .. note:: The backward slack is positive, but a call to :func:`move_operation`
            should be negative to move the operation backward.
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        --------
        backward_slack
        forward_slack

        """
        if graph_id not in self._start_times:
Oscar Gustafsson's avatar
Oscar Gustafsson committed
            raise ValueError(f"No operation with graph_id {graph_id} in schedule")
        return self.backward_slack(graph_id), self.forward_slack(graph_id)
    def print_slacks(self, order: int = 0) -> None:
        """
        Print the slack times for all operations in the schedule.

        Parameters
        ----------
        order : int, default: 0
            Sorting order.

            * 0: alphabetical on Graph ID
            * 1: backward slack
            * 2: forward slack

        """
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        res = [
            (
                op.graph_id,
                self.backward_slack(op.graph_id),
                self.forward_slack(op.graph_id),
            )
            for op in self._sfg.operations
        ]
        res = [
            (
                r[0],
                f"{r[1]}".rjust(8) if r[1] < sys.maxsize else "oo".rjust(8),
                f"{r[2]}".rjust(8) if r[2] < sys.maxsize else "oo".rjust(8),
            )
            for r in res
        ]
        res.sort(key=lambda tup: tup[order])
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        print("Graph ID | Backward |  Forward")
        print("---------|----------|---------")
        for r in res:
            print(f"{r[0]:8} | {r[1]} | {r[2]}")
    def set_schedule_time(self, time: int) -> "Schedule":
Oscar Gustafsson's avatar
Loading
Loading full blame...