Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import copy
from typing import TYPE_CHECKING, cast
from b_asic.scheduler import ListScheduler, Scheduler
from b_asic.special_operations import Delay, Output
if TYPE_CHECKING:
from b_asic.schedule import Schedule
from b_asic.types import GraphID
class ASAPScheduler(Scheduler):
"""Scheduler that implements the as-soon-as-possible (ASAP) algorithm."""
def apply_scheduling(self, schedule: "Schedule") -> None:
"""Applies the scheduling algorithm on the given Schedule.
Parameters
----------
schedule : Schedule
Schedule to apply the scheduling algorithm on.
"""
prec_list = schedule.sfg.get_precedence_list()
if len(prec_list) < 2:
raise ValueError("Empty signal flow graph cannot be scheduled.")
# handle the first set in precedence graph (input and delays)
non_schedulable_ops = []
for outport in prec_list[0]:
operation = outport.operation
if operation.type_name() == Delay.type_name():
non_schedulable_ops.append(operation.graph_id)
else:
schedule.start_times[operation.graph_id] = 0
# handle second set in precedence graph (first operations)
for outport in prec_list[1]:
operation = outport.operation
schedule.start_times[operation.graph_id] = 0
# handle the remaining sets
for outports in prec_list[2:]:
for outport in outports:
operation = outport.operation
if operation.graph_id not in schedule.start_times:
op_start_time = 0
for current_input in operation.inputs:
source_port = current_input.signals[0].source
if source_port.operation.graph_id in non_schedulable_ops:
source_end_time = 0
else:
source_op_time = schedule.start_times[
source_port.operation.graph_id
]
if source_port.latency_offset is None:
raise ValueError(
f"Output port {source_port.index} of"
" operation"
f" {source_port.operation.graph_id} has no"
" latency-offset."
)
source_end_time = (
source_op_time + source_port.latency_offset
)
if current_input.latency_offset is None:
raise ValueError(
f"Input port {current_input.index} of operation"
f" {current_input.operation.graph_id} has no"
" latency-offset."
)
op_start_time_from_in = (
source_end_time - current_input.latency_offset
)
op_start_time = max(op_start_time, op_start_time_from_in)
schedule.start_times[operation.graph_id] = op_start_time
self._handle_outputs(schedule, non_schedulable_ops)
schedule.remove_delays()
class ALAPScheduler(Scheduler):
"""Scheduler that implements the as-late-as-possible (ALAP) algorithm."""
def apply_scheduling(self, schedule: "Schedule") -> None:
"""Applies the scheduling algorithm on the given Schedule.
Parameters
----------
schedule : Schedule
Schedule to apply the scheduling algorithm on.
"""
ASAPScheduler().apply_scheduling(schedule)
max_end_time = schedule.get_max_end_time()
if schedule.schedule_time is None:
schedule.set_schedule_time(max_end_time)
elif schedule.schedule_time < max_end_time:
raise ValueError(f"Too short schedule time. Minimum is {max_end_time}.")
# move all outputs ALAP before operations
for output in schedule.sfg.find_by_type_name(Output.type_name()):
output = cast(Output, output)
schedule.move_operation_alap(output.graph_id)
# move all operations ALAP
for step in reversed(schedule.sfg.get_precedence_list()):
for outport in step:
if not isinstance(outport.operation, Delay):
schedule.move_operation_alap(outport.operation.graph_id)
class EarliestDeadlineScheduler(ListScheduler):
"""Scheduler that implements the earliest-deadline-first algorithm."""
@staticmethod
def _get_sorted_operations(schedule: "Schedule") -> list["GraphID"]:
ALAPScheduler().apply_scheduling(schedule_copy)
deadlines = {}
for op_id, start_time in schedule_copy.start_times.items():
deadlines[op_id] = start_time + schedule.sfg.find_by_id(op_id).latency
return sorted(deadlines, key=deadlines.get)
class LeastSlackTimeScheduler(ListScheduler):
"""Scheduler that implements the least slack time first algorithm."""
@staticmethod
def _get_sorted_operations(schedule: "Schedule") -> list["GraphID"]:
schedule_copy = copy.copy(schedule)
ALAPScheduler().apply_scheduling(schedule_copy)
return sorted(schedule_copy.start_times, key=schedule_copy.start_times.get)
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
class MaxFanOutScheduler(ListScheduler):
"""Scheduler that implements the maximum fan-out algorithm."""
@staticmethod
def _get_sorted_operations(schedule: "Schedule") -> list["GraphID"]:
schedule_copy = copy.copy(schedule)
ALAPScheduler().apply_scheduling(schedule_copy)
fan_outs = {}
for op_id, start_time in schedule_copy.start_times.items():
fan_outs[op_id] = len(schedule.sfg.find_by_id(op_id).output_signals)
return sorted(fan_outs, key=fan_outs.get, reverse=True)
class HybridScheduler(ListScheduler):
"""Scheduler that implements a hybrid algorithm. Will receive a new name once finalized."""
@staticmethod
def _get_sorted_operations(schedule: "Schedule") -> list["GraphID"]:
# sort least-slack and then resort ties according to max-fan-out
schedule_copy = copy.copy(schedule)
ALAPScheduler().apply_scheduling(schedule_copy)
sorted_items = sorted(
schedule_copy.start_times.items(), key=lambda item: item[1]
)
fan_out_sorted_items = []
last_value = sorted_items[0][0]
current_group = []
for key, value in sorted_items:
if value != last_value:
# the group is completed, sort it internally
sorted_group = sorted(
current_group,
key=lambda pair: len(
schedule.sfg.find_by_id(pair[0]).output_signals
),
reverse=True,
)
fan_out_sorted_items += sorted_group
current_group = []
current_group.append((key, value))
last_value = value
sorted_group = sorted(
current_group,
key=lambda pair: len(schedule.sfg.find_by_id(pair[0]).output_signals),
reverse=True,
)
fan_out_sorted_items += sorted_group
sorted_op_list = [pair[0] for pair in fan_out_sorted_items]
return sorted_op_list