Newer
Older
Angus Lothian
committed
Contains the schedule class for scheduling operations in an SFG.
from collections import defaultdict
from collections.abc import Sequence
from typing import cast
from matplotlib.axes import Axes
from matplotlib.figure import Figure
from matplotlib.patches import PathPatch, Polygon
from matplotlib.path import Path
from matplotlib.transforms import Bbox, TransformedBbox
Angus Lothian
committed
from b_asic._preferences import (
EXECUTION_TIME_COLOR,
LATENCY_COLOR,
SIGNAL_COLOR,
SIGNAL_LINEWIDTH,
Angus Lothian
committed
from b_asic.graph_component import GraphID
from b_asic.operation import Operation
from b_asic.port import InputPort, OutputPort
from b_asic.process import MemoryVariable, OperatorProcess
from b_asic.resources import ProcessCollection
from b_asic.scheduler import Scheduler
from b_asic.special_operations import Delay, Input, Output
_EXECUTION_TIME_COLOR: tuple[float, ...] = tuple(
float(c / 255) for c in EXECUTION_TIME_COLOR
_LATENCY_COLOR: tuple[float, ...] = tuple(float(c / 255) for c in LATENCY_COLOR)
_SIGNAL_COLOR: tuple[float, ...] = tuple(float(c / 255) for c in SIGNAL_COLOR)
def _laps_default():
"""Default value for _laps. Cannot use lambda."""
return 0
def _y_locations_default():
"""Default value for _y_locations. Cannot use lambda."""
return None
"""
Schedule of an SFG with scheduled Operations.
Parameters
----------
sfg : :class:`~b_asic.signal_flow_graph.SFG`
scheduler : Scheduler, default: None
The automatic scheduler to be used.
The schedule time. If not provided, it will be determined by the scheduling
algorithm.
cyclic : bool, default: False
If the schedule is cyclic.
start_times : dict, optional
Dictionary with GraphIDs as keys and start times as values.
Used when *algorithm* is 'provided'.
laps : dict, optional
Dictionary with GraphIDs as keys and laps as values.
Used when *algorithm* is 'provided'.
Angus Lothian
committed
_sfg: SFG
_start_times: dict[GraphID, int]
_laps: dict[GraphID, int]
Angus Lothian
committed
_schedule_time: int
_cyclic: bool
Angus Lothian
committed
scheduler: Scheduler | None = None,
schedule_time: int | None = None,
start_times: dict[GraphID, int] | None = None,
laps: dict[GraphID, int] | None = None,
if not isinstance(sfg, SFG):
raise TypeError("An SFG must be provided")
Angus Lothian
committed
self._sfg = sfg
Angus Lothian
committed
self._cyclic = cyclic
self._y_locations = defaultdict(_y_locations_default)
self._schedule_time = schedule_time
if scheduler:
self._scheduler = scheduler
self._scheduler.apply_scheduling(self)
else:
if start_times is None:
raise ValueError("Must provide start_times when using 'provided'")
if laps is None:
raise ValueError("Must provide laps when using 'provided'")
self._start_times = start_times
self._laps.update(laps)
self._remove_delays_no_laps()
self._schedule_time = max_end_time
Angus Lothian
committed
self._validate_schedule()
Simon Bjurek
committed
def __str__(self) -> str:
"""Return a string representation of this Schedule."""
res: list[tuple[GraphID, int, int, int]] = [
Simon Bjurek
committed
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
(
op.graph_id,
self.start_time_of_operation(op.graph_id),
cast(int, self.backward_slack(op.graph_id)),
self.forward_slack(op.graph_id),
)
for op in self._sfg.operations
]
res.sort(key=lambda tup: tup[0])
res_str = [
(
r[0],
r[1],
f"{r[2]}".rjust(8) if r[2] < sys.maxsize else "oo".rjust(8),
f"{r[3]}".rjust(8) if r[3] < sys.maxsize else "oo".rjust(8),
)
for r in res
]
string_io = io.StringIO()
header = ["Graph ID", "Start time", "Backward slack", "Forward slack"]
string_io.write("|".join(f"{col:^15}" for i, col in enumerate(header)) + "\n")
string_io.write("-" * (15 * len(header) + len(header) - 1) + "\n")
for r in res_str:
row_str = "|".join(f"{str(item):^15}" for i, item in enumerate(r))
string_io.write(row_str + "\n")
return string_io.getvalue()
def _validate_schedule(self) -> None:
if self._schedule_time is None:
raise ValueError("Schedule without set scheduling time detected.")
if not isinstance(self._schedule_time, int):
raise ValueError("Schedule with non-integer scheduling time detected.")
ops = {op.graph_id for op in self._sfg.operations}
missing_elems = ops - set(self._start_times)
extra_elems = set(self._start_times) - ops
if missing_elems:
raise ValueError(
f"Missing operations detected in start_times: {missing_elems}"
)
if extra_elems:
raise ValueError(f"Extra operations detected in start_times: {extra_elems}")
for graph_id, time in self._start_times.items():
if self.forward_slack(graph_id) < 0:
f"Negative forward slack detected in Schedule for operation: {graph_id}, "
f"slack: {self.forward_slack(graph_id)}."
)
if self.backward_slack(graph_id) < 0:
raise ValueError(
f"Negative backward slack detected in Schedule for operation: {graph_id}, "
f"slack: {self.backward_slack(graph_id)}"
if time > self._schedule_time and not isinstance(
self._sfg.find_by_id(graph_id), DontCare
):
raise ValueError(
f"Start time larger than scheduling time detected in Schedule for operation {graph_id}"
)
def start_time_of_operation(self, graph_id: GraphID) -> int:
Return the start time of the operation with the specified by *graph_id*.
Parameters
----------
graph_id : GraphID
The graph id of the operation to get the start time for.
if graph_id not in self._start_times:
raise ValueError(f"No operation with graph_id {graph_id!r} in schedule")
return self._start_times[graph_id]
Angus Lothian
committed
"""Return the current maximum end time among all operations."""
for graph_id, op_start_time in self._start_times.items():
operation = cast(Operation, self._sfg.find_by_id(graph_id))
max_end_time = max(max_end_time, op_start_time)
else:
for outport in operation.outputs:
max_end_time = max(
max_end_time,
op_start_time + cast(int, outport.latency_offset),
)
def get_max_non_io_end_time(self) -> int:
"""Return the current maximum end time among all non-IO operations."""
max_end_time = 0
for graph_id, op_start_time in self._start_times.items():
operation = cast(Operation, self._sfg.find_by_id(graph_id))
continue
else:
for outport in operation.outputs:
max_end_time = max(
max_end_time,
op_start_time + cast(int, outport.latency_offset),
)
return max_end_time
def forward_slack(self, graph_id: GraphID) -> int:
Return how much an operation can be moved forward in time.
graph_id : GraphID
The graph id of the operation.
The number of time steps the operation with *graph_id* can be moved
if graph_id not in self._start_times:
raise ValueError(f"No operation with graph_id {graph_id!r} in schedule")
output_slacks = self._forward_slacks(graph_id)
return cast(
int,
min(
sum(
(
list(signal_slacks.values())
for signal_slacks in output_slacks.values()
),
[sys.maxsize],
)
),
self, graph_id: GraphID
) -> dict["OutputPort", dict["Signal", int]]:
start_time = self._start_times[graph_id]
operation = cast(Operation, self._sfg.find_by_id(graph_id))
for output_port in operation.outputs:
ret[output_port] = self._output_slacks(output_port, start_time)
Angus Lothian
committed
self, output_port: "OutputPort", start_time: int | None = None
) -> dict[Signal, int]:
if start_time is None:
start_time = self._start_times[output_port.operation.graph_id]
output_slacks = {}
available_time = start_time + cast(int, output_port.latency_offset)
if available_time > self._schedule_time:
available_time -= self._schedule_time
for signal in output_port.signals:
destination = cast(InputPort, signal.destination)
usage_time = (
cast(int, destination.latency_offset)
+ self._start_times[destination.operation.graph_id]
+ self._schedule_time * self._laps[signal.graph_id]
)
output_slacks[signal] = usage_time - available_time
return output_slacks
def backward_slack(self, graph_id: GraphID) -> int:
Return how much an operation can be moved backward in time.
graph_id : GraphID
The graph id of the operation.
The number of time steps the operation with *graph_id* can be moved
.. note:: The backward slack is positive, but a call to
:func:`move_operation` should be negative to move the operation
backward.
if graph_id not in self._start_times:
raise ValueError(f"No operation with graph_id {graph_id!r} in schedule")
input_slacks = self._backward_slacks(graph_id)
return cast(
int,
min(
sum(
(
list(signal_slacks.values())
for signal_slacks in input_slacks.values()
),
[sys.maxsize],
)
),
def _backward_slacks(self, graph_id: GraphID) -> dict[InputPort, dict[Signal, int]]:
start_time = self._start_times[graph_id]
operation = cast(Operation, self._sfg.find_by_id(graph_id))
for input_port in operation.inputs:
ret[input_port] = self._input_slacks(input_port, start_time)
self, input_port: InputPort, start_time: int | None = None
) -> dict[Signal, int]:
if start_time is None:
start_time = self._start_times[input_port.operation.graph_id]
input_slacks = {}
usage_time = start_time + cast(int, input_port.latency_offset)
for signal in input_port.signals:
source = cast(OutputPort, signal.source)
if isinstance(source.operation, (DontCare, Delay)):
Simon Bjurek
committed
if self._schedule_time is not None:
available_time = (
cast(int, source.latency_offset)
+ self._start_times[source.operation.graph_id]
)
if available_time > self._schedule_time:
available_time -= self._schedule_time
Simon Bjurek
committed
available_time -= self._schedule_time * self._laps[signal.graph_id]
Simon Bjurek
committed
else:
available_time = (
cast(int, source.latency_offset)
+ self._start_times[source.operation.graph_id]
)
input_slacks[signal] = usage_time - available_time
return input_slacks
def slacks(self, graph_id: GraphID) -> tuple[int, int]:
Return the backward and forward slacks of operation *graph_id*.
That is, how much the operation can be moved backward and forward in time.
Parameters
----------
graph_id : GraphID
The graph id of the operation.
Returns
-------
tuple(int, int)
The backward and forward slacks, respectively.
.. note:: The backward slack is positive, but a call to :func:`move_operation`
should be negative to move the operation backward.
--------
backward_slack
forward_slack
"""
if graph_id not in self._start_times:
raise ValueError(f"No operation with graph_id {graph_id!r} in schedule")
return self.backward_slack(graph_id), self.forward_slack(graph_id)
Angus Lothian
committed
def print_slacks(self, order: int = 0) -> None:
"""
Print the slack times for all operations in the schedule.
Parameters
----------
order : int, default: 0
Sorting order.
* 0: alphabetical on Graph ID
* 1: backward slack
* 2: forward slack
"""
cast(int, self.backward_slack(op.graph_id)),
self.forward_slack(op.graph_id),
)
for op in self._sfg.operations
]
res.sort(key=lambda tup: tup[order])
res_str = [
(
r[0],
f"{r[1]}".rjust(8) if r[1] < sys.maxsize else "oo".rjust(8),
f"{r[2]}".rjust(8) if r[2] < sys.maxsize else "oo".rjust(8),
)
for r in res
]
print("Graph ID | Backward | Forward")
print("---------|----------|---------")
Angus Lothian
committed
def set_schedule_time(self, time: int) -> "Schedule":
"""
Set a new schedule time.
Parameters
----------
time : int
The new schedule time. If it is too short, a ValueError will be raised.
max_end_time = self.get_max_end_time()
if time < max_end_time:
f"New schedule time ({time}) too short, minimum: {max_end_time}."
Simon Bjurek
committed
# if updating the scheduling time -> update laps due to operations
# reading and writing in different iterations (across the edge)
if self._schedule_time is not None:
for signal_id in self._laps.keys():
port = self._sfg.find_by_id(signal_id).destination
source_port = port.signals[0].source
source_op = source_port.operation
source_port_start_time = self._start_times[source_op.graph_id]
source_port_latency_offset = source_op.latency_offsets[
f"out{source_port.index}"
]
if (
source_port_start_time + source_port_latency_offset
> self._schedule_time
and source_port_start_time + source_port_latency_offset <= time
):
self._laps[signal_id] += 1
self._schedule_time = time
return self
def swap_io_of_operation(self, operation_id: GraphID) -> None:
"""
Swap the inputs (and outputs) of operation.
Parameters
----------
operation_id : GraphID
The GraphID of the operation to swap.
"""
self._sfg.swap_io_of_operation(operation_id)
"""The SFG corresponding to the current schedule."""
reconstructed_sfg = self._reintroduce_delays()
simplified_sfg = reconstructed_sfg.simplify_delay_element_placement()
return simplified_sfg
def start_times(self) -> dict[GraphID, int]:
"""The start times of the operations in the schedule."""
@start_times.setter
def start_times(self, start_times: dict[GraphID, int]) -> None:
if not isinstance(start_times, dict):
raise TypeError("start_times must be a dict")
for key, value in start_times.items():
if not isinstance(key, str) or not isinstance(value, int):
raise TypeError("start_times must be a dict[GraphID, int]")
self._start_times = start_times
The number of laps for the start times of the operations in the schedule.
@property
def schedule_time(self) -> int:
"""The schedule time of the current schedule."""
"""If the current schedule is cyclic."""
def edit(self, inplace=False) -> "Schedule":
"""
Edit schedule in GUI and return new schedule.
Parameters
----------
inplace : bool, default: False
If True, replace the current schedule.
"""
from b_asic.scheduler_gui.main_window import start_scheduler
new_schedule = start_scheduler(self)
if inplace:
self._start_times = new_schedule._start_times
self._laps = new_schedule._laps
self._schedule_time = new_schedule._schedule_time
self._y_locations = new_schedule._y_locations
def increase_time_resolution(self, factor: int) -> "Schedule":
"""
Increase time resolution for a schedule.
Parameters
factor : int
The time resolution increment.
"""
self._start_times = {k: factor * v for k, v in self._start_times.items()}
for graph_id in self._start_times:
cast(Operation, self._sfg.find_by_id(graph_id))._increase_time_resolution(
factor
)
self._schedule_time *= factor
Return a list of all times for the schedule.
Used to check how the resolution can be modified.
"""
# Local values
ret = [self._schedule_time, *self._start_times.values()]
# Loop over operations
for graph_id in self._start_times:
operation = cast(Operation, self._sfg.find_by_id(graph_id))
ret += [
cast(int, operation.execution_time),
*operation.latency_offsets.values(),
]
# Remove not set values (None)
ret = [v for v in ret if v is not None]
return ret
def get_possible_time_resolution_decrements(self) -> list[int]:
"""Return a list with possible factors to reduce time resolution."""
vals = self._get_all_times()
max_loop = min(val for val in vals if val)
if max_loop <= 1:
for candidate in range(2, max_loop + 1):
if not any(val % candidate for val in vals):
ret.append(candidate)
return ret
def decrease_time_resolution(self, factor: int) -> "Schedule":
"""
Decrease time resolution for a schedule.
Parameters
factor : int
The time resolution decrement.
get_possible_time_resolution_decrements
"""
possible_values = self.get_possible_time_resolution_decrements()
if factor not in possible_values:
raise ValueError(
f"Not possible to decrease resolution with {factor}. Possible"
f" values are {possible_values}"
)
self._start_times = {k: v // factor for k, v in self._start_times.items()}
for graph_id in self._start_times:
cast(Operation, self._sfg.find_by_id(graph_id))._decrease_time_resolution(
factor
)
self._schedule_time = self._schedule_time // factor
return self
def set_execution_time_of_type(
self, type_name: TypeName, execution_time: int
) -> None:
"""
Set the execution time of all operations with the given type name.
Parameters
----------
type_name : TypeName
The type name of the operation. For example, obtained as
``Addition.type_name()``.
execution_time : int
The execution time of the operation.
"""
self._sfg.set_execution_time_of_type(type_name, execution_time)
def set_latency_of_type(self, type_name: TypeName, latency: int) -> None:
"""
Set the latency of all operations with the given type name.
Parameters
----------
type_name : TypeName
The type name of the operation. For example, obtained as
``Addition.type_name()``.
latency : int
The latency of the operation.
"""
passed = True
for op in self._sfg.operations:
if type_name == op.type_name() or type_name == op.graph_id:
change_in_latency = latency - op.latency
if change_in_latency > (self.forward_slack(op.graph_id)):
passed = False
raise ValueError(
Simon Bjurek
committed
f"Error: Can't increase latency for all components. Try increasing forward slack time by rescheduling. "
)
break
if change_in_latency < 0 or passed:
for op in self._sfg.operations:
if type_name == op.type_name() or type_name == op.graph_id:
cast(Operation, op).set_latency(latency)
def move_y_location(
self, graph_id: GraphID, new_y: int, insert: bool = False
) -> None:
"""
Move operation in y-direction and remove any empty rows.
Parameters
----------
graph_id : GraphID
The GraphID of the operation to move.
new_y : int
The new y-position of the operation.
insert : bool, optional
If True, all operations on that y-position will be moved one position.
The default is False.
"""
if insert:
for gid in self._y_locations:
if self.get_y_location(gid) >= new_y:
self.set_y_location(gid, self.get_y_location(gid) + 1)
self.set_y_location(graph_id, new_y)
used_locations = {*self._y_locations.values()}
possible_locations = set(range(round(max(used_locations)) + 1))
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
if not possible_locations - used_locations:
return
remapping = {}
offset = 0
for loc in possible_locations:
if loc in used_locations:
remapping[loc] = loc - offset
else:
offset += 1
for gid, y_location in self._y_locations.items():
self._y_locations[gid] = remapping[self._y_locations[gid]]
def get_y_location(self, graph_id: GraphID) -> int:
"""
Get the y-position of the Operation with GraphID *graph_id*.
Parameters
----------
graph_id : GraphID
The GraphID of the operation.
Returns
-------
int
The y-position of the operation.
"""
return self._y_locations[graph_id]
def set_y_location(self, graph_id: GraphID, y_location: int) -> None:
"""
Set the y-position of the Operation with GraphID *graph_id* to *y_location*.
Parameters
----------
graph_id : GraphID
The GraphID of the operation to move.
y_location : int
The new y-position of the operation.
"""
self._y_locations[graph_id] = y_location
def _get_minimum_height(
self, operation_height: float = 1.0, operation_gap: float = OPERATION_GAP
):
max_pos_graph_id = max(self._y_locations, key=self._y_locations.get)
return self._get_y_position(max_pos_graph_id, operation_height, operation_gap)
def place_operation(
self, op: Operation, time: int, op_laps: dict[GraphID, int]
) -> None:
"""Schedule the given operation in given time.
Parameters
----------
op : Operation
Operation to schedule.
time : int
Time slot to schedule the operation in.
If time > schedule_time -> schedule cyclically.
op_laps : dict[GraphID, int]
Laps of all scheduled operations.
"""
start = time % self._schedule_time if self._schedule_time else time
self._start_times[op.graph_id] = start
if not self.schedule_time:
return
# update input laps
for input_port in op.inputs:
laps = 0
if self._schedule_time is not None:
current_lap = time // self._schedule_time
source_port = source_op = input_port.signals[0].source
source_op = source_port.operation
if not isinstance(source_op, (Delay, DontCare)):
if op_laps[source_op.graph_id] < current_lap:
laps += current_lap - op_laps[source_op.graph_id]
source_available_time = (
self._start_times[source_op.graph_id]
+ source_op.latency_offsets[f"out{source_port.index}"]
)
if source_available_time > self.schedule_time:
laps -= 1
self._laps[input_port.signals[0].graph_id] = laps
if (
start == 0
and isinstance(op, Output)
and self._laps[op.input(0).signals[0].graph_id] != 0
):
start = self._schedule_time
self._laps[op.input(0).signals[0].graph_id] -= 1
Simon Bjurek
committed
if (
start == 0
and isinstance(op, DontCare)
and self._laps[op.output(0).signals[0].graph_id] == 0
):
start = self._schedule_time
if (
time > self._schedule_time
and isinstance(op, DontCare)
and self._laps[op.output(0).signals[0].graph_id] == 0
):
start = time % self._schedule_time
Simon Bjurek
committed
self._start_times[op.graph_id] = start
def move_operation(self, graph_id: GraphID, time: int) -> "Schedule":
"""
Move an operation in the schedule.
Parameters
----------
graph_id : GraphID
The graph id of the operation to move.
time : int
The time to move. If positive move forward, if negative move backward.
"""
if graph_id not in self._start_times:
raise ValueError(f"No operation with graph_id {graph_id!r} in schedule")
(backward_slack, forward_slack) = self.slacks(graph_id)
if not -backward_slack <= time <= forward_slack:
raise ValueError(
f"Operation {graph_id!r} got incorrect move: {time}. Must be"
f" between {-backward_slack} and {forward_slack}."
)
old_start = self._start_times[graph_id]
tmp_start = old_start + time
new_start = tmp_start % self._schedule_time
# Update input laps
input_slacks = self._backward_slacks(graph_id)
for in_port, signal_slacks in input_slacks.items():
tmp_usage = tmp_start + cast(int, in_port.latency_offset)
new_usage = tmp_usage % self._schedule_time
for signal, signal_slack in signal_slacks.items():
# Compute a lower limit on laps
laps = new_slack // self._schedule_time
# Compensate for cases where above is not correct
tmp_prev_available = tmp_usage - new_slack
prev_available = tmp_prev_available % self._schedule_time
# If prev_available == 0 it will come from previous lap, unless it comes
# from an Input
source_op = signal.source_operation
if prev_available == 0 and not isinstance(source_op, Input):
prev_available = self._schedule_time
# Usage time (new_usage) < available time (prev_available) within a
# schedule period
if new_usage < prev_available:
laps += 1
self._laps[signal.graph_id] = laps
# Update output laps
output_slacks = self._forward_slacks(graph_id)
for out_port, signal_slacks in output_slacks.items():
tmp_available = tmp_start + cast(int, out_port.latency_offset)
new_available = tmp_available % self._schedule_time
for signal, signal_slack in signal_slacks.items():
# Compute a lower limit on laps
laps = new_slack // self._schedule_time
# Compensate for cases where above is not correct
tmp_next_usage = tmp_available + new_slack
next_usage = tmp_next_usage % self._schedule_time
# Usage time (new_usage) < available time (prev_available) within a
# schedule period
if new_available == 0 and (new_slack > 0 or next_usage == 0):
if (
next_usage < new_available
and self._start_times[signal.destination_operation.graph_id]
!= self.schedule_time
):
laps += 1
self._laps[signal.graph_id] = laps
# Outputs should not start at 0, but at schedule time
op = self._sfg.find_by_id(graph_id)
if (
new_start == 0
and isinstance(op, Output)
and self._laps[op.input(0).signals[0].graph_id] != 0
):
new_start = self._schedule_time
self._laps[op.input(0).signals[0].graph_id] -= 1
if new_start == 0 and isinstance(op, Input):
Simon Bjurek
committed
new_start = 0
for signal in op.output(0).signals:
if self._laps[signal.graph_id] != 0:
self._laps[signal.graph_id] -= 1
self._start_times[graph_id] = new_start
def move_operation_alap(self, graph_id: GraphID) -> "Schedule":
"""
Move an operation as late as possible in the schedule.
This is basically the same as::
schedule.move_operation(graph_id, schedule.forward_slack(graph_id))
but operations with no succeeding operation (Outputs) will only move to the end
of the schedule.
Parameters
----------
graph_id : GraphID
The graph id of the operation to move.
"""
forward_slack = self.forward_slack(graph_id)
if forward_slack == sys.maxsize:
self.move_operation(
graph_id, self.schedule_time - self._start_times[graph_id]
)
else:
self.move_operation(graph_id, forward_slack)
return self
def move_operation_asap(self, graph_id: GraphID) -> "Schedule":
"""
Move an operation as soon as possible in the schedule.
This is basically the same as::
schedule.move_operation(graph_id, -schedule.backward_slack(graph_id))
but operations that do not have a preceding operation (Inputs and Constants)
will only move to the start of the schedule.
Parameters
----------
graph_id : GraphID
The graph id of the operation to move.
"""
backward_slack = self.backward_slack(graph_id)
if backward_slack == sys.maxsize:
self.move_operation(graph_id, -self._start_times[graph_id])
else:
self.move_operation(graph_id, -backward_slack)
def _remove_delays_no_laps(self) -> None:
"""Remove delay elements without updating laps. Used when loading schedule."""
delay_list = self._sfg.find_by_type_name(Delay.type_name())
while delay_list:
delay_op = cast(Delay, delay_list[0])
self._sfg = cast(SFG, self._sfg.remove_operation(delay_op.graph_id))
delay_list = self._sfg.find_by_type_name(Delay.type_name())
"""Remove delay elements and update laps. Used after scheduling algorithm."""
delay_list = self._sfg.find_by_type_name(Delay.type_name())
while delay_list:
delay_input_id = delay_op.input(0).signals[0].graph_id
delay_output_ids = [sig.graph_id for sig in delay_op.output(0).signals]
self._sfg = cast(SFG, self._sfg.remove_operation(delay_op.graph_id))
for output_id in delay_output_ids:
self._laps[output_id] += 1 + self._laps[delay_input_id]
del self._laps[delay_input_id]
delay_list = self._sfg.find_by_type_name(Delay.type_name())
def _reintroduce_delays(self) -> SFG:
"""
Reintroduce delay elements to each signal according to the ``_laps`` variable.
"""
new_sfg = self._sfg()
Hugo Winbladh
committed
destination_laps = []
Hugo Winbladh
committed
port = new_sfg.find_by_id(signal_id).destination
Simon Bjurek
committed
# if an operation lies across the scheduling time, place a delay after it
source_port = port.signals[0].source
source_op = source_port.operation
source_port_start_time = self._start_times[source_op.graph_id]
source_port_latency_offset = source_op.latency_offsets[
f"out{source_port.index}"
]
if (
source_port_start_time + source_port_latency_offset
> self._schedule_time
):
Simon Bjurek
committed
Hugo Winbladh
committed
destination_laps.append((port.operation.graph_id, port.index, lap))
Simon Bjurek
committed
Hugo Winbladh
committed
new_sfg = new_sfg.insert_operation_before(op, Delay(), port)
def _get_memory_variables_list(self) -> list[MemoryVariable]:
ret: list[MemoryVariable] = []
for graph_id, start_time in self._start_times.items():