Newer
Older
Angus Lothian
committed
Contains the schedule class for scheduling operations in an SFG.
from collections import defaultdict
from collections.abc import Sequence
from typing import cast
from matplotlib.axes import Axes
from matplotlib.figure import Figure
from matplotlib.patches import PathPatch, Polygon
from matplotlib.path import Path
Angus Lothian
committed
from b_asic._preferences import (
EXECUTION_TIME_COLOR,
LATENCY_COLOR,
SIGNAL_COLOR,
SIGNAL_LINEWIDTH,
Angus Lothian
committed
from b_asic.graph_component import GraphID
from b_asic.operation import Operation
from b_asic.port import InputPort, OutputPort
from b_asic.process import MemoryVariable, OperatorProcess
from b_asic.resources import ProcessCollection
from b_asic.scheduler import Scheduler
from b_asic.special_operations import Delay, Input, Output
_EXECUTION_TIME_COLOR: tuple[float, ...] = tuple(
float(c / 255) for c in EXECUTION_TIME_COLOR
_LATENCY_COLOR: tuple[float, ...] = tuple(float(c / 255) for c in LATENCY_COLOR)
_SIGNAL_COLOR: tuple[float, ...] = tuple(float(c / 255) for c in SIGNAL_COLOR)
def _laps_default():
"""Default value for _laps. Cannot use lambda."""
return 0
def _y_locations_default():
"""Default value for _y_locations. Cannot use lambda."""
return None
"""
Schedule of an SFG with scheduled Operations.
Parameters
----------
sfg : :class:`~b_asic.signal_flow_graph.SFG`
scheduler : Scheduler, default: None
The automatic scheduler to be used.
The schedule time. If not provided, it will be determined by the scheduling
algorithm.
cyclic : bool, default: False
If the schedule is cyclic.
start_times : dict, optional
Dictionary with GraphIDs as keys and start times as values.
Used when *algorithm* is 'provided'.
laps : dict, optional
Dictionary with GraphIDs as keys and laps as values.
Used when *algorithm* is 'provided'.
Angus Lothian
committed
_sfg: SFG
_start_times: dict[GraphID, int]
_laps: dict[GraphID, int]
Angus Lothian
committed
_schedule_time: int
_cyclic: bool
Angus Lothian
committed
scheduler: Scheduler | None = None,
schedule_time: int | None = None,
start_times: dict[GraphID, int] | None = None,
laps: dict[GraphID, int] | None = None,
if not isinstance(sfg, SFG):
raise TypeError("An SFG must be provided")
Angus Lothian
committed
self._sfg = sfg
Angus Lothian
committed
self._cyclic = cyclic
self._y_locations = defaultdict(_y_locations_default)
self._schedule_time = schedule_time
if scheduler:
self._scheduler = scheduler
self._scheduler.apply_scheduling(self)
else:
if start_times is None:
raise ValueError("Must provide start_times when using 'provided'")
if laps is None:
raise ValueError("Must provide laps when using 'provided'")
self._start_times = start_times
self._laps.update(laps)
self._remove_delays_no_laps()
self._schedule_time = max_end_time
Angus Lothian
committed
self._validate_schedule()
Simon Bjurek
committed
def __str__(self) -> str:
"""Return a string representation of this Schedule."""
res: list[tuple[GraphID, int, int, int]] = [
Simon Bjurek
committed
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
(
op.graph_id,
self.start_time_of_operation(op.graph_id),
cast(int, self.backward_slack(op.graph_id)),
self.forward_slack(op.graph_id),
)
for op in self._sfg.operations
]
res.sort(key=lambda tup: tup[0])
res_str = [
(
r[0],
r[1],
f"{r[2]}".rjust(8) if r[2] < sys.maxsize else "oo".rjust(8),
f"{r[3]}".rjust(8) if r[3] < sys.maxsize else "oo".rjust(8),
)
for r in res
]
string_io = io.StringIO()
header = ["Graph ID", "Start time", "Backward slack", "Forward slack"]
string_io.write("|".join(f"{col:^15}" for i, col in enumerate(header)) + "\n")
string_io.write("-" * (15 * len(header) + len(header) - 1) + "\n")
for r in res_str:
row_str = "|".join(f"{str(item):^15}" for i, item in enumerate(r))
string_io.write(row_str + "\n")
return string_io.getvalue()
def _validate_schedule(self) -> None:
if self._schedule_time is None:
raise ValueError("Schedule without set scheduling time detected.")
if not isinstance(self._schedule_time, int):
raise ValueError("Schedule with non-integer scheduling time detected.")
ops = {op.graph_id for op in self._sfg.operations}
missing_elems = ops - set(self._start_times)
extra_elems = set(self._start_times) - ops
if missing_elems:
raise ValueError(
f"Missing operations detected in start_times: {missing_elems}"
)
if extra_elems:
raise ValueError(f"Extra operations detected in start_times: {extra_elems}")
for graph_id, time in self._start_times.items():
if self.forward_slack(graph_id) < 0:
f"Negative forward slack detected in Schedule for operation: {graph_id}, "
f"slack: {self.forward_slack(graph_id)}."
)
if self.backward_slack(graph_id) < 0:
raise ValueError(
f"Negative backward slack detected in Schedule for operation: {graph_id}, "
f"slack: {self.backward_slack(graph_id)}"
if time > self._schedule_time and not isinstance(
self._sfg.find_by_id(graph_id), DontCare
):
raise ValueError(
f"Start time larger than scheduling time detected in Schedule for operation {graph_id}"
)
def start_time_of_operation(self, graph_id: GraphID) -> int:
Return the start time of the operation with the specified by *graph_id*.
Parameters
----------
graph_id : GraphID
The graph id of the operation to get the start time for.
if graph_id not in self._start_times:
raise ValueError(f"No operation with graph_id {graph_id!r} in schedule")
return self._start_times[graph_id]
Angus Lothian
committed
"""Return the current maximum end time among all operations."""
for graph_id, op_start_time in self._start_times.items():
operation = cast(Operation, self._sfg.find_by_id(graph_id))
max_end_time = max(max_end_time, op_start_time)
else:
for outport in operation.outputs:
max_end_time = max(
max_end_time,
op_start_time + cast(int, outport.latency_offset),
)
def get_max_non_io_end_time(self) -> int:
"""Return the current maximum end time among all non-IO operations."""
max_end_time = 0
for graph_id, op_start_time in self._start_times.items():
operation = cast(Operation, self._sfg.find_by_id(graph_id))
continue
else:
for outport in operation.outputs:
max_end_time = max(
max_end_time,
op_start_time + cast(int, outport.latency_offset),
)
return max_end_time
def forward_slack(self, graph_id: GraphID) -> int:
Return how much an operation can be moved forward in time.
graph_id : GraphID
The graph id of the operation.
The number of time steps the operation with *graph_id* can be moved
if graph_id not in self._start_times:
raise ValueError(f"No operation with graph_id {graph_id!r} in schedule")
output_slacks = self._forward_slacks(graph_id)
return cast(
int,
min(
sum(
(
list(signal_slacks.values())
for signal_slacks in output_slacks.values()
),
[sys.maxsize],
)
),
self, graph_id: GraphID
) -> dict["OutputPort", dict["Signal", int]]:
start_time = self._start_times[graph_id]
operation = cast(Operation, self._sfg.find_by_id(graph_id))
for output_port in operation.outputs:
ret[output_port] = self._output_slacks(output_port, start_time)
Angus Lothian
committed
self, output_port: "OutputPort", start_time: int | None = None
) -> dict[Signal, int]:
if start_time is None:
start_time = self._start_times[output_port.operation.graph_id]
output_slacks = {}
available_time = start_time + cast(int, output_port.latency_offset)
if available_time > self._schedule_time:
available_time
Loading
Loading full blame...