Skip to content
Snippets Groups Projects
schedule.py 47.7 KiB
Newer Older
"""
B-ASIC Schedule Module.
Contains the schedule class for scheduling operations in an SFG.
Oscar Gustafsson's avatar
Oscar Gustafsson committed
import io
import sys
from collections import defaultdict
from collections.abc import Sequence
from typing import cast
Oscar Gustafsson's avatar
Oscar Gustafsson committed

import matplotlib.pyplot as plt
import numpy as np
from matplotlib.axes import Axes
from matplotlib.figure import Figure
from matplotlib.lines import Line2D
from matplotlib.patches import PathPatch, Polygon
from matplotlib.path import Path
Oscar Gustafsson's avatar
Oscar Gustafsson committed
from matplotlib.ticker import MaxNLocator
from b_asic import Signal
from b_asic._preferences import (
    EXECUTION_TIME_COLOR,
    LATENCY_COLOR,
    SIGNAL_COLOR,
    SIGNAL_LINEWIDTH,
Simon Bjurek's avatar
Simon Bjurek committed
from b_asic.core_operations import DontCare, Sink
from b_asic.graph_component import GraphID
from b_asic.operation import Operation
from b_asic.port import InputPort, OutputPort
from b_asic.process import MemoryVariable, OperatorProcess
from b_asic.resources import ProcessCollection
from b_asic.scheduler import Scheduler
Oscar Gustafsson's avatar
Oscar Gustafsson committed
from b_asic.signal_flow_graph import SFG
Oscar Gustafsson's avatar
Oscar Gustafsson committed
from b_asic.special_operations import Delay, Input, Output
from b_asic.types import TypeName
Oscar Gustafsson's avatar
Oscar Gustafsson committed

# Need RGB from 0 to 1
_EXECUTION_TIME_COLOR: tuple[float, ...] = tuple(
    float(c / 255) for c in EXECUTION_TIME_COLOR
_LATENCY_COLOR: tuple[float, ...] = tuple(float(c / 255) for c in LATENCY_COLOR)
_SIGNAL_COLOR: tuple[float, ...] = tuple(float(c / 255) for c in SIGNAL_COLOR)
def _laps_default():
    """Default value for _laps. Cannot use lambda."""
    return 0


def _y_locations_default():
    """Default value for _y_locations. Cannot use lambda."""
    return None


class Schedule:
    """
    Schedule of an SFG with scheduled Operations.

    Parameters
    ----------
    sfg : :class:`~b_asic.signal_flow_graph.SFG`
        The signal flow graph to schedule.
    scheduler : Scheduler, default: None
        The automatic scheduler to be used.
    schedule_time : int, optional
        The schedule time. If not provided, it will be determined by the scheduling
        algorithm.
    cyclic : bool, default: False
        If the schedule is cyclic.
    start_times : dict, optional
        Dictionary with GraphIDs as keys and start times as values.
        Used when *algorithm* is 'provided'.
    laps : dict, optional
        Dictionary with GraphIDs as keys and laps as values.
        Used when *algorithm* is 'provided'.
    _start_times: dict[GraphID, int]
    _laps: dict[GraphID, int]
    _y_locations: dict[GraphID, int | None]
Oscar Gustafsson's avatar
Oscar Gustafsson committed
    def __init__(
        self,
        sfg: SFG,
        scheduler: Scheduler | None = None,
        schedule_time: int | None = None,
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        cyclic: bool = False,
        start_times: dict[GraphID, int] | None = None,
        laps: dict[GraphID, int] | None = None,
Oscar Gustafsson's avatar
Oscar Gustafsson committed
    ):
        """Construct a Schedule from an SFG."""
        if not isinstance(sfg, SFG):
            raise TypeError("An SFG must be provided")

Oscar Gustafsson's avatar
Oscar Gustafsson committed
        self._start_times = {}
        self._laps = defaultdict(_laps_default)
        self._y_locations = defaultdict(_y_locations_default)
        self._schedule_time = schedule_time
        if scheduler:
            self._scheduler = scheduler
            self._scheduler.apply_scheduling(self)
        else:
Oscar Gustafsson's avatar
Oscar Gustafsson committed
            if start_times is None:
                raise ValueError("Must provide start_times when using 'provided'")
            if laps is None:
                raise ValueError("Must provide laps when using 'provided'")
            self._start_times = start_times
            self._laps.update(laps)
            self._remove_delays_no_laps()
Andreas Bolin's avatar
Andreas Bolin committed
        max_end_time = self.get_max_end_time()
        if not self._schedule_time:
            self._schedule_time = max_end_time
    def __str__(self) -> str:
        """Return a string representation of this Schedule."""
        res: list[tuple[GraphID, int, int, int]] = [
            (
                op.graph_id,
                self.start_time_of_operation(op.graph_id),
                cast(int, self.backward_slack(op.graph_id)),
                self.forward_slack(op.graph_id),
            )
            for op in self._sfg.operations
        ]
        res.sort(key=lambda tup: tup[0])
        res_str = [
            (
                r[0],
                r[1],
                f"{r[2]}".rjust(8) if r[2] < sys.maxsize else "oo".rjust(8),
                f"{r[3]}".rjust(8) if r[3] < sys.maxsize else "oo".rjust(8),
            )
            for r in res
        ]

        string_io = io.StringIO()

        header = ["Graph ID", "Start time", "Backward slack", "Forward slack"]
        string_io.write("|".join(f"{col:^15}" for i, col in enumerate(header)) + "\n")
        string_io.write("-" * (15 * len(header) + len(header) - 1) + "\n")

        for r in res_str:
            row_str = "|".join(f"{str(item):^15}" for i, item in enumerate(r))
            string_io.write(row_str + "\n")

        return string_io.getvalue()

    def _validate_schedule(self) -> None:
        if self._schedule_time is None:
            raise ValueError("Schedule without set scheduling time detected.")
        if not isinstance(self._schedule_time, int):
            raise ValueError("Schedule with non-integer scheduling time detected.")

        ops = {op.graph_id for op in self._sfg.operations}
        missing_elems = ops - set(self._start_times)
        extra_elems = set(self._start_times) - ops
        if missing_elems:
            raise ValueError(
                f"Missing operations detected in start_times: {missing_elems}"
            )
        if extra_elems:
            raise ValueError(f"Extra operations detected in start_times: {extra_elems}")

        for graph_id, time in self._start_times.items():
                    f"Negative forward slack detected in Schedule for operation: {graph_id}, "
                    f"slack: {self.forward_slack(graph_id)}."
                )
            if self.backward_slack(graph_id) < 0:
                raise ValueError(
                    f"Negative backward slack detected in Schedule for operation: {graph_id}, "
                    f"slack: {self.backward_slack(graph_id)}"
Simon Bjurek's avatar
Simon Bjurek committed
            if time > self._schedule_time and not isinstance(
                self._sfg.find_by_id(graph_id), DontCare
            ):
                raise ValueError(
                    f"Start time larger than scheduling time detected in Schedule for operation {graph_id}"
                )

    def start_time_of_operation(self, graph_id: GraphID) -> int:
        Return the start time of the operation with the specified by *graph_id*.
        Parameters
        ----------
        graph_id : GraphID
            The graph id of the operation to get the start time for.
        if graph_id not in self._start_times:
            raise ValueError(f"No operation with graph_id {graph_id!r} in schedule")
        return self._start_times[graph_id]
Andreas Bolin's avatar
Andreas Bolin committed
    def get_max_end_time(self) -> int:
        """Return the current maximum end time among all operations."""
        max_end_time = 0
        for graph_id, op_start_time in self._start_times.items():
            operation = cast(Operation, self._sfg.find_by_id(graph_id))
Simon Bjurek's avatar
Simon Bjurek committed
            if isinstance(operation, Output):
                max_end_time = max(max_end_time, op_start_time)
            else:
                for outport in operation.outputs:
                    max_end_time = max(
                        max_end_time,
                        op_start_time + cast(int, outport.latency_offset),
                    )
        return max_end_time

    def get_max_non_io_end_time(self) -> int:
        """Return the current maximum end time among all non-IO operations."""
        max_end_time = 0
        for graph_id, op_start_time in self._start_times.items():
            operation = cast(Operation, self._sfg.find_by_id(graph_id))
Simon Bjurek's avatar
Simon Bjurek committed
            if isinstance(operation, Output):
                continue
            else:
                for outport in operation.outputs:
                    max_end_time = max(
                        max_end_time,
                        op_start_time + cast(int, outport.latency_offset),
                    )
        return max_end_time

    def forward_slack(self, graph_id: GraphID) -> int:
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        Return how much an operation can be moved forward in time.

        Parameters
        ----------
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        graph_id : GraphID
            The graph id of the operation.
            The number of time steps the operation with *graph_id* can be moved
            forward in time.
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        --------
        backward_slack
        slacks
        if graph_id not in self._start_times:
            raise ValueError(f"No operation with graph_id {graph_id!r} in schedule")
        output_slacks = self._forward_slacks(graph_id)
        return cast(
            int,
            min(
                sum(
                    (
                        list(signal_slacks.values())
                        for signal_slacks in output_slacks.values()
                    ),
                    [sys.maxsize],
                )
            ),
Oscar Gustafsson's avatar
Oscar Gustafsson committed
    def _forward_slacks(
    ) -> dict["OutputPort", dict["Signal", int]]:
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        ret = {}
        start_time = self._start_times[graph_id]
        operation = cast(Operation, self._sfg.find_by_id(graph_id))
        for output_port in operation.outputs:
            ret[output_port] = self._output_slacks(output_port, start_time)
    def _output_slacks(
        self, output_port: "OutputPort", start_time: int | None = None
    ) -> dict[Signal, int]:
        if start_time is None:
            start_time = self._start_times[output_port.operation.graph_id]
        output_slacks = {}
        available_time = start_time + cast(int, output_port.latency_offset)
        if available_time > self._schedule_time:
            available_time 
Loading
Loading full blame...