Newer
Older
Angus Lothian
committed
Contains the schedule class for scheduling operations in an SFG.
from collections import defaultdict
from collections.abc import Sequence
from typing import cast
from matplotlib.axes import Axes
from matplotlib.figure import Figure
from matplotlib.patches import PathPatch, Polygon
from matplotlib.path import Path
Angus Lothian
committed
from b_asic._preferences import (
EXECUTION_TIME_COLOR,
LATENCY_COLOR,
SIGNAL_COLOR,
SIGNAL_LINEWIDTH,
Angus Lothian
committed
from b_asic.graph_component import GraphID
from b_asic.operation import Operation
from b_asic.port import InputPort, OutputPort
from b_asic.process import MemoryVariable, OperatorProcess
from b_asic.resources import ProcessCollection
from b_asic.scheduler import Scheduler
from b_asic.special_operations import Delay, Input, Output
_EXECUTION_TIME_COLOR: tuple[float, ...] = tuple(
float(c / 255) for c in EXECUTION_TIME_COLOR
_LATENCY_COLOR: tuple[float, ...] = tuple(float(c / 255) for c in LATENCY_COLOR)
_SIGNAL_COLOR: tuple[float, ...] = tuple(float(c / 255) for c in SIGNAL_COLOR)
def _laps_default():
"""Default value for _laps. Cannot use lambda."""
return 0
def _y_locations_default():
"""Default value for _y_locations. Cannot use lambda."""
return None
"""
Schedule of an SFG with scheduled Operations.
Parameters
----------
sfg : :class:`~b_asic.signal_flow_graph.SFG`
scheduler : Scheduler, default: None
The automatic scheduler to be used.
The schedule time. If not provided, it will be determined by the scheduling
algorithm.
cyclic : bool, default: False
If the schedule is cyclic.
start_times : dict, optional
Dictionary with GraphIDs as keys and start times as values.
Used when *algorithm* is 'provided'.
laps : dict, optional
Dictionary with GraphIDs as keys and laps as values.
Used when *algorithm* is 'provided'.
Angus Lothian
committed
_sfg: SFG
_start_times: dict[GraphID, int]
_laps: dict[GraphID, int]
Angus Lothian
committed
_schedule_time: int
_cyclic: bool
Angus Lothian
committed
scheduler: Scheduler | None = None,
schedule_time: int | None = None,
start_times: dict[GraphID, int] | None = None,
laps: dict[GraphID, int] | None = None,
if not isinstance(sfg, SFG):
raise TypeError("An SFG must be provided")
Angus Lothian
committed
self._sfg = sfg
Angus Lothian
committed
self._cyclic = cyclic
self._y_locations = defaultdict(_y_locations_default)
self._schedule_time = schedule_time
if scheduler:
self._scheduler = scheduler
self._scheduler.apply_scheduling(self)
else:
if start_times is None:
raise ValueError("Must provide start_times when using 'provided'")
if laps is None:
raise ValueError("Must provide laps when using 'provided'")
self._start_times = start_times
self._laps.update(laps)
self._remove_delays_no_laps()
self._schedule_time = max_end_time
Angus Lothian
committed
self._validate_schedule()
Simon Bjurek
committed
def __str__(self) -> str:
"""Return a string representation of this Schedule."""
res: list[tuple[GraphID, int, int, int]] = [
Simon Bjurek
committed
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
(
op.graph_id,
self.start_time_of_operation(op.graph_id),
cast(int, self.backward_slack(op.graph_id)),
self.forward_slack(op.graph_id),
)
for op in self._sfg.operations
]
res.sort(key=lambda tup: tup[0])
res_str = [
(
r[0],
r[1],
f"{r[2]}".rjust(8) if r[2] < sys.maxsize else "oo".rjust(8),
f"{r[3]}".rjust(8) if r[3] < sys.maxsize else "oo".rjust(8),
)
for r in res
]
string_io = io.StringIO()
header = ["Graph ID", "Start time", "Backward slack", "Forward slack"]
string_io.write("|".join(f"{col:^15}" for i, col in enumerate(header)) + "\n")
string_io.write("-" * (15 * len(header) + len(header) - 1) + "\n")
for r in res_str:
row_str = "|".join(f"{str(item):^15}" for i, item in enumerate(r))
string_io.write(row_str + "\n")
return string_io.getvalue()
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
def _validate_schedule(self) -> None:
if self._schedule_time is None:
raise ValueError("Schedule without set scheduling time detected.")
if not isinstance(self._schedule_time, int):
raise ValueError("Schedule with non-integer scheduling time detected.")
ops = {op.graph_id for op in self._sfg.operations}
missing_elems = ops - set(self._start_times)
extra_elems = set(self._start_times) - ops
if missing_elems:
raise ValueError(
f"Missing operations detected in start_times: {missing_elems}"
)
if extra_elems:
raise ValueError(f"Extra operations detected in start_times: {extra_elems}")
for graph_id, time in self._start_times.items():
if self.forward_slack(graph_id) < 0 or self.backward_slack(graph_id) < 0:
raise ValueError(
f"Negative slack detected in Schedule for operation: {graph_id}."
)
if time > self._schedule_time:
raise ValueError(
f"Start time larger than scheduling time detected in Schedule for operation {graph_id}"
)
def start_time_of_operation(self, graph_id: GraphID) -> int:
Return the start time of the operation with the specified by *graph_id*.
Parameters
----------
graph_id : GraphID
The graph id of the operation to get the start time for.
if graph_id not in self._start_times:
raise ValueError(f"No operation with graph_id {graph_id!r} in schedule")
return self._start_times[graph_id]
Loading
Loading full blame...