Newer
Older
Angus Lothian
committed
"""@package docstring
This module contains the Schema class.
TODO: More info
"""
angloth
committed
from io import StringIO
from typing import Dict, List, Set
import math
Angus Lothian
committed
from b_asic.signal_flow_graph import SFG
from b_asic.graph_component import GraphID
from b_asic.operation import Operation
class Schema:
"""A class that represents an SFG with scheduled Operations."""
_sfg: SFG
_start_times: Dict[GraphID, int]
_laps: Dict[GraphID, List[int]]
_schedule_time: int
_cyclic: bool
_resolution: int
angloth
committed
_max_end_op_id: GraphID
_max_end_time: int
_non_schedulable_ops: Set[GraphID]
Angus Lothian
committed
def __init__(self, sfg: SFG, schedule_time: int = None, cyclic: bool = False, resolution: int = 1, scheduling_alg: str = "ASAP"):
self._sfg = sfg
self._start_times = dict()
self._laps = dict()
self._cyclic = cyclic
self._resolution = resolution
if scheduling_alg == "ASAP":
self._schedule_asap()
else:
angloth
committed
raise ValueError(f"No algorithm with name: {scheduling_alg} defined.")
Angus Lothian
committed
angloth
committed
# Set latest time of the schema.
self._latest_op_time = 0
Angus Lothian
committed
for op_id, op_start_time in self._start_times.items():
op = self._sfg.find_by_id(op_id)
for outport in op.outputs:
angloth
committed
if op_start_time + outport.latency_offset > self._latest_op_time:
self._max_end_op_id = op_id
self._max_end_time = op_start_time + outport.latency_offset
Angus Lothian
committed
if not self._cyclic:
if schedule_time is None:
angloth
committed
self.set_schedule_time(self._max_end_time)
Angus Lothian
committed
else:
angloth
committed
self.set_schedule_time(schedule_time)
else:
raise NotImplementedError("Cyclic schedules not implemented yet.")
def __str__(self):
string_io = StringIO()
for op in self._sfg.get_operations_topological_order():
if op.graph_id in self._start_times:
string_io.write("name: " + op.name + ", \tid: " + op.graph_id + ", \t")
string_io.write("start_time: " + str(self._start_times[op.graph_id]) + ", \t")
string_io.write("backwards_slack: " + str(self.backwards_slack(op.graph_id)) + ", \t")
string_io.write("forwards_slack: " + str(self.forwards_slack(op.graph_id)) + "\n")
return string_io.getvalue()
@property
def schedule_time(self) -> int:
"""Get the schedule time of the Schema."""
return self._schedule_time
def set_schedule_time(self, schedule_time: int) -> None:
"""Set the schedule time to the specified value, if the specified value is shorter then
what is required by the schedule then raises a ValueError."""
if not self._cyclic:
if schedule_time >= self._max_end_time:
Angus Lothian
committed
self._schedule_time = schedule_time
angloth
committed
else:
raise ValueError("Specified schedule time is shorter then the max length of the ")
else:
raise NotImplementedError("Cyclic schedules not implemented yet.")
Angus Lothian
committed
angloth
committed
def get_start_time_of_operation(self, op_id: GraphID) -> int:
Angus Lothian
committed
"""Get the start time of the operation with the specified by the op_id."""
assert op_id in self._start_times, "No operation with the specified op_id in this schema."
return self._start_times[op_id]
angloth
committed
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def get_laps_of_operation_input_port(self, op_id: GraphID, input_index: int) -> int:
"""Get the laps of input port with the entered index of the operation with the specified op_id."""
assert op_id in self._laps, "No operation with the specidied op_id in this schema."
assert input_index >= 0 and input_index < len(
self._laps[op_id]), "Input index for the specified operation out of bounds."
return self._laps[op_id][input_index]
def change_start_time_of_operation(self, op_id: GraphID, relative_change: int = None, new_start_time: int = None) -> None:
"""Changes the start time of the operation with the specified op_id either by the
relative_change if specified or to the new_start_time if that is specified. If both
are specified then the new_start_time value is used.
A negative relative_change represents decreasing the operations start time.
If the change to the start time would break dependencies then the change is cancelled
and a message is printed to the user in the terminal.
"""
assert op_id in self._start_times, "No operation with the specified op_id in this schema."
if new_start_time is not None:
relative_change = new_start_time - self._start_times[op_id]
if relative_change is not None:
if relative_change > 0:
forwards_slack = self.forwards_slack(op_id)
if relative_change > forwards_slack:
print(f"The specified relative change of {relative_change} is larger than the forwards \
slack of the operation of {forwards_slack}.")
return
else:
backwards_slack = self.backwards_slack(op_id)
if abs(relative_change) > backwards_slack:
print(f"The specified relative change of {abs(relative_change)} is larger than the forwards \
slack of the operation of {forwards_slack}.")
return
self._start_times[op_id] += relative_change
else:
raise ValueError("At least one of the parameters relative_change or new_start_time has to be specified.")
def backwards_slack(self, op_id: GraphID) -> int:
return 0
op_start_time = self.get_start_time_of_operation(op_id)
min_backwards_slack = math.inf
for i, inp in enumerate(self._sfg.find_by_id(op_id).inputs):
inp_time = op_start_time + inp.latency_offset
Angus Lothian
committed
angloth
committed
source_op_time = self.get_start_time_of_operation(inp.connected_source.operation.graph_id)
source_outp_time = source_op_time + inp.connected_source.latency_offset
Angus Lothian
committed
angloth
committed
def forwards_slack(self, op_id: GraphID) -> int:
return 0
assert op_id in self._start_times, "No operation with specified op_id in this schema."
Angus Lothian
committed
angloth
committed
def _schedule_asap(self) -> None:
Angus Lothian
committed
pl = self._sfg.get_precedence_list()
if len(pl) < 2:
print("Empty signal flow graph cannot be scheduled.")
return
angloth
committed
self._non_schedulable_ops = set((outp.operation.graph_id for outp in pl[0]))
Angus Lothian
committed
for outport in pl[1]:
op = outport.operation
if op not in self._start_times:
# Set start time of all operations in the first iter to 0
self._start_times[op.graph_id] = 0
for outports in pl[2:]:
for outport in outports:
op = outport.operation
if op.graph_id not in self._start_times:
# Schedule the operation if it doesn't have a start time yet.
op_start_time = 0
for inport in op.inputs:
assert len(inport.signals) == 1, "Error in scheduling, dangling input port detected."
assert inport.signals[0].source is not None, "Error in scheduling, signal with no source detected."
source_port = inport.signals[0].source
source_end_time = None
angloth
committed
if source_port.operation.graph_id in self._non_schedulable_ops:
Angus Lothian
committed
source_end_time = 0
else:
source_op_time = self._start_times[source_port.operation.graph_id]
assert source_port.latency_offset is not None, f"Output port: {source_port.index} of operation: \
{source_port.operation.graph_id} has no latency-offset."
assert inport.latency_offset is not None, f"Input port: {inport.index} of operation: \
{inport.operation.graph_id} has no latency-offset."
source_end_time = source_op_time + source_port.latency_offset
op_start_time_from_in = source_end_time - inport.latency_offset
op_start_time = max(op_start_time, op_start_time_from_in)
self._start_times[op.graph_id] = op_start_time