Skip to content
Snippets Groups Projects
Commit 3189fe94 authored by Simon Bjurek's avatar Simon Bjurek
Browse files

greatly improved speed of ListScheduler and added more tests

parent c8bb844e
No related branches found
No related tags found
1 merge request!479Misc improvements and added tests
Pipeline #157324 passed
......@@ -4,7 +4,7 @@ stages:
before_script:
- apt-get update --yes
# - apt-get install --yes build-essential cmake graphviz xvfb xdg-utils lcov
# - apt-get install --yes build-essential cmake graphviz python3-pyqt5 xvfb xdg-utils lcov
- apt-get install --yes graphviz python3-pyqt5 xvfb xdg-utils
- apt-get install -y libxcb-cursor-dev
- python -m pip install --upgrade pip
......
......@@ -69,7 +69,7 @@ class Constant(AbstractOperation):
@property
def latency(self) -> int:
return self.latency_offsets["out0"]
return 0
def __repr__(self) -> str:
return f"Constant({self.value})"
......@@ -1689,7 +1689,7 @@ class DontCare(AbstractOperation):
@property
def latency(self) -> int:
return self.latency_offsets["out0"]
return 0
def __repr__(self) -> str:
return "DontCare()"
......@@ -1766,7 +1766,7 @@ class Sink(AbstractOperation):
@property
def latency(self) -> int:
return self.latency_offsets["in0"]
return 0
def __repr__(self) -> str:
return "Sink()"
......
......@@ -212,6 +212,21 @@ class Schedule:
)
return max_end_time
def get_max_non_io_end_time(self) -> int:
"""Return the current maximum end time among all non-IO operations."""
max_end_time = 0
for graph_id, op_start_time in self._start_times.items():
operation = cast(Operation, self._sfg.find_by_id(graph_id))
if graph_id.startswith("out"):
continue
else:
for outport in operation.outputs:
max_end_time = max(
max_end_time,
op_start_time + cast(int, outport.latency_offset),
)
return max_end_time
def forward_slack(self, graph_id: GraphID) -> int:
"""
Return how much an operation can be moved forward in time.
......
......@@ -226,7 +226,7 @@ class ListScheduler(Scheduler, ABC):
if resource_amount < resource_lower_bound:
raise ValueError(
f"Amount of resource: {resource_type} is not enough to "
f"realize schedule for scheduling time: {self._schedule.schedule_time}"
f"realize schedule for scheduling time: {self._schedule.schedule_time}."
)
alap_schedule = copy.copy(self._schedule)
......@@ -245,8 +245,20 @@ class ListScheduler(Scheduler, ABC):
self._remaining_resources = self._max_resources.copy()
remaining_ops = self._sfg.operations
remaining_ops = [op.graph_id for op in remaining_ops]
self._remaining_ops = self._sfg.operations
self._remaining_ops = [op.graph_id for op in self._remaining_ops]
self._cached_latencies = {
op_id: self._sfg.find_by_id(op_id).latency for op_id in self._remaining_ops
}
self._cached_execution_times = {
op_id: self._sfg.find_by_id(op_id).execution_time
for op_id in self._remaining_ops
}
self._deadlines = self._calculate_deadlines(alap_start_times)
self._output_slacks = self._calculate_alap_output_slacks(alap_start_times)
self._fan_outs = self._calculate_fan_outs(alap_start_times)
self._schedule.start_times = {}
self.remaining_reads = self._max_concurrent_reads
......@@ -260,23 +272,24 @@ class ListScheduler(Scheduler, ABC):
for input_id in self._input_times:
self._schedule.start_times[input_id] = self._input_times[input_id]
self._op_laps[input_id] = 0
remaining_ops = [
elem for elem in remaining_ops if not elem.startswith("in")
self._remaining_ops = [
elem for elem in self._remaining_ops if not elem.startswith("in")
]
remaining_ops = [op for op in remaining_ops if not op.startswith("dontcare")]
remaining_ops = [op for op in remaining_ops if not op.startswith("t")]
remaining_ops = [
self._remaining_ops = [
op for op in self._remaining_ops if not op.startswith("dontcare")
]
self._remaining_ops = [
op for op in self._remaining_ops if not op.startswith("t")
]
self._remaining_ops = [
op
for op in remaining_ops
for op in self._remaining_ops
if not (op.startswith("out") and op in self._output_delta_times)
]
while remaining_ops:
ready_ops_priority_table = self._get_ready_ops_priority_table(
alap_start_times,
remaining_ops,
)
while self._remaining_ops:
ready_ops_priority_table = self._get_ready_ops_priority_table()
while ready_ops_priority_table:
next_op = self._sfg.find_by_id(
self._get_next_op_id(ready_ops_priority_table)
......@@ -284,8 +297,8 @@ class ListScheduler(Scheduler, ABC):
self.remaining_reads -= next_op.input_count
remaining_ops = [
op_id for op_id in remaining_ops if op_id != next_op.graph_id
self._remaining_ops = [
op_id for op_id in self._remaining_ops if op_id != next_op.graph_id
]
self._time_out_counter = 0
......@@ -295,25 +308,10 @@ class ListScheduler(Scheduler, ABC):
if self._schedule.schedule_time
else 0
)
if not self._schedule.cyclic and self._schedule.schedule_time:
if self._current_time > self._schedule.schedule_time:
raise ValueError(
f"Provided scheduling time {schedule.schedule_time} cannot be reached, "
"try to enable the cyclic property or increase the time."
)
ready_ops_priority_table = self._get_ready_ops_priority_table(
alap_start_times,
remaining_ops,
)
ready_ops_priority_table = self._get_ready_ops_priority_table()
self._go_to_next_time_step()
ready_ops_priority_table = self._get_ready_ops_priority_table(
alap_start_times,
remaining_ops,
)
self.remaining_reads = self._max_concurrent_reads
self._current_time -= 1
......@@ -354,43 +352,35 @@ class ListScheduler(Scheduler, ABC):
sorted_table = sorted(ready_ops_priority_table, key=sort_key)
return sorted_table[0][0]
def _get_ready_ops_priority_table(
self,
alap_start_times: dict["GraphID", int],
remaining_ops: list["GraphID"],
) -> list[tuple["GraphID", int, int, int]]:
def _get_ready_ops_priority_table(self) -> list[tuple["GraphID", int, int, int]]:
ready_ops = [
op_id
for op_id in remaining_ops
if self._op_is_schedulable(self._sfg.find_by_id(op_id), remaining_ops)
for op_id in self._remaining_ops
if self._op_is_schedulable(self._sfg.find_by_id(op_id))
]
deadlines = self._calculate_deadlines(alap_start_times)
output_slacks = self._calculate_alap_output_slacks(alap_start_times)
fan_outs = self._calculate_fan_outs(alap_start_times)
ready_ops_priority_table = []
for op_id in ready_ops:
ready_ops_priority_table.append(
(op_id, deadlines[op_id], output_slacks[op_id], fan_outs[op_id])
return [
(
op_id,
self._deadlines[op_id],
self._output_slacks[op_id],
self._fan_outs[op_id],
)
return ready_ops_priority_table
for op_id in ready_ops
]
def _calculate_deadlines(
self, alap_start_times: dict["GraphID", int]
) -> dict["GraphID", int]:
return {
op_id: start_time + self._sfg.find_by_id(op_id).latency
op_id: start_time + self._cached_latencies[op_id]
for op_id, start_time in alap_start_times.items()
}
def _calculate_alap_output_slacks(
self, alap_start_times: dict["GraphID", int]
) -> dict["GraphID", int]:
return {
op_id: start_time - self._current_time
for op_id, start_time in alap_start_times.items()
}
return {op_id: start_time for op_id, start_time in alap_start_times.items()}
def _calculate_fan_outs(
self, alap_start_times: dict["GraphID", int]
......@@ -412,26 +402,22 @@ class ListScheduler(Scheduler, ABC):
start_time = start_time % self._schedule.schedule_time
if time_slot >= start_time:
if time_slot < start_time + max(
self._sfg.find_by_id(op_id).execution_time, 1
):
if time_slot < start_time + max(self._cached_execution_times[op_id], 1):
if op_id.startswith(op.type_name()):
if op.graph_id != op_id:
count += 1
return count < self._remaining_resources[op.type_name()]
def _op_is_schedulable(
self, op: "Operation", remaining_ops: list["GraphID"]
) -> bool:
def _op_is_schedulable(self, op: "Operation") -> bool:
if not self._op_satisfies_resource_constraints(op):
return False
op_finish_time = self._current_time + op.latency
op_finish_time = self._current_time + self._cached_latencies[op.graph_id]
future_ops = [
self._sfg.find_by_id(item[0])
for item in self._schedule.start_times.items()
if item[1] + self._sfg.find_by_id(item[0]).latency == op_finish_time
if item[1] + self._cached_latencies[item[0]] == op_finish_time
]
future_ops_writes = sum([op.input_count for op in future_ops])
......@@ -451,7 +437,7 @@ class ListScheduler(Scheduler, ABC):
source_op_graph_id = source_op.graph_id
if source_op_graph_id in remaining_ops:
if source_op_graph_id in self._remaining_ops:
return False
if self._schedule.start_times[source_op_graph_id] != self._current_time - 1:
......@@ -466,12 +452,18 @@ class ListScheduler(Scheduler, ABC):
self._schedule.start_times.get(source_op_graph_id)
+ self._op_laps[source_op.graph_id] * self._schedule.schedule_time
)
proceeding_op_finish_time = proceeding_op_start_time + source_op.latency
proceeding_op_finish_time = (
proceeding_op_start_time
+ self._cached_latencies[source_op.graph_id]
)
else:
proceeding_op_start_time = self._schedule.start_times.get(
source_op_graph_id
)
proceeding_op_finish_time = proceeding_op_start_time + source_op.latency
proceeding_op_finish_time = (
proceeding_op_start_time
+ self._cached_latencies[source_op.graph_id]
)
earliest_start_time = max(earliest_start_time, proceeding_op_finish_time)
return earliest_start_time <= self._current_time
......@@ -502,7 +494,7 @@ class ListScheduler(Scheduler, ABC):
self._remaining_resources[Output.type_name()] -= count
self._current_time = new_time
if not self._op_is_schedulable(output, {}):
if not self._op_is_schedulable(output):
raise ValueError(
"Cannot schedule outputs according to the provided output_delta_times. "
f"Failed output: {output.graph_id}, "
......
......@@ -53,7 +53,7 @@ class Input(AbstractOperation):
@property
def latency(self) -> int:
return self.latency_offsets["out0"]
return 0
@property
def value(self) -> Num:
......@@ -157,7 +157,7 @@ class Output(AbstractOperation):
@property
def latency(self) -> int:
return self.latency_offsets["in0"]
return 0
class Delay(AbstractOperation):
......
......@@ -906,3 +906,408 @@ class TestHybridScheduler:
max_concurrent_reads=2,
),
)
def test_32_point_fft_custom_io_times(self):
POINTS = 32
sfg = radix_2_dif_fft(POINTS)
sfg.set_latency_of_type(Butterfly.type_name(), 1)
sfg.set_latency_of_type(ConstantMultiplication.type_name(), 3)
sfg.set_execution_time_of_type(Butterfly.type_name(), 1)
sfg.set_execution_time_of_type(ConstantMultiplication.type_name(), 1)
resources = {Butterfly.type_name(): 1, ConstantMultiplication.type_name(): 1}
input_times = {f"in{i}": i for i in range(POINTS)}
output_delta_times = {f"out{i}": i for i in range(POINTS)}
schedule = Schedule(
sfg,
scheduler=HybridScheduler(
resources,
input_times=input_times,
output_delta_times=output_delta_times,
),
)
for i in range(POINTS):
assert schedule.start_times[f"in{i}"] == i
assert (
schedule.start_times[f"out{i}"]
== schedule.get_max_non_io_end_time() + i
)
# Too slow for pipeline right now
# def test_64_point_fft_custom_io_times(self):
# POINTS = 64
# sfg = radix_2_dif_fft(POINTS)
# sfg.set_latency_of_type(Butterfly.type_name(), 1)
# sfg.set_latency_of_type(ConstantMultiplication.type_name(), 3)
# sfg.set_execution_time_of_type(Butterfly.type_name(), 1)
# sfg.set_execution_time_of_type(ConstantMultiplication.type_name(), 1)
# resources = {Butterfly.type_name(): 1, ConstantMultiplication.type_name(): 1}
# input_times = {f"in{i}": i for i in range(POINTS)}
# output_delta_times = {f"out{i}": i for i in range(POINTS)}
# schedule = Schedule(
# sfg,
# scheduler=HybridScheduler(
# resources,
# input_times=input_times,
# output_delta_times=output_delta_times,
# ),
# )
# for i in range(POINTS):
# assert schedule.start_times[f"in{i}"] == i
# assert (
# schedule.start_times[f"out{i}"]
# == schedule.get_max_non_io_end_time() + i
# )
def test_32_point_fft_custom_io_times_cyclic(self):
POINTS = 32
sfg = radix_2_dif_fft(POINTS)
sfg.set_latency_of_type(Butterfly.type_name(), 1)
sfg.set_latency_of_type(ConstantMultiplication.type_name(), 3)
sfg.set_execution_time_of_type(Butterfly.type_name(), 1)
sfg.set_execution_time_of_type(ConstantMultiplication.type_name(), 1)
resources = {Butterfly.type_name(): 1, ConstantMultiplication.type_name(): 1}
input_times = {f"in{i}": i for i in range(POINTS)}
output_delta_times = {f"out{i}": i for i in range(POINTS)}
schedule = Schedule(
sfg,
scheduler=HybridScheduler(
resources,
input_times=input_times,
output_delta_times=output_delta_times,
),
schedule_time=96,
cyclic=True,
)
for i in range(POINTS):
assert schedule.start_times[f"in{i}"] == i
assert schedule.start_times[f"out{i}"] == 96 if i == 0 else i
def test_cyclic_scheduling(self):
sfg = radix_2_dif_fft(points=4)
sfg.set_latency_of_type(Butterfly.type_name(), 1)
sfg.set_latency_of_type(ConstantMultiplication.type_name(), 3)
sfg.set_execution_time_of_type(Butterfly.type_name(), 1)
sfg.set_execution_time_of_type(ConstantMultiplication.type_name(), 1)
resources = {
Butterfly.type_name(): 1,
ConstantMultiplication.type_name(): 1,
}
schedule_1 = Schedule(sfg, scheduler=HybridScheduler(resources))
schedule_2 = Schedule(
sfg, scheduler=HybridScheduler(resources), schedule_time=6, cyclic=True
)
schedule_3 = Schedule(
sfg, scheduler=HybridScheduler(resources), schedule_time=5, cyclic=True
)
schedule_4 = Schedule(
sfg, scheduler=HybridScheduler(resources), schedule_time=4, cyclic=True
)
assert schedule_1.start_times == {
"in1": 0,
"in3": 1,
"bfly3": 1,
"cmul0": 2,
"in0": 2,
"in2": 3,
"bfly0": 3,
"bfly1": 4,
"bfly2": 5,
"out0": 5,
"out1": 6,
"out3": 7,
"out2": 8,
}
assert schedule_1.laps == {
"s4": 0,
"s6": 0,
"s5": 0,
"s7": 0,
"s8": 0,
"s12": 0,
"s10": 0,
"s9": 0,
"s0": 0,
"s2": 0,
"s11": 0,
"s1": 0,
"s3": 0,
}
assert schedule_1.schedule_time == 8
assert schedule_2.start_times == {
"in1": 0,
"in3": 1,
"bfly3": 1,
"cmul0": 2,
"in0": 2,
"in2": 3,
"bfly0": 3,
"bfly1": 4,
"bfly2": 5,
"out0": 5,
"out1": 6,
"out3": 1,
"out2": 2,
}
assert schedule_2.laps == {
"s4": 0,
"s6": 1,
"s5": 0,
"s7": 1,
"s8": 0,
"s12": 0,
"s10": 0,
"s9": 0,
"s0": 0,
"s2": 0,
"s11": 0,
"s1": 0,
"s3": 0,
}
assert schedule_2.schedule_time == 6
assert schedule_3.start_times == {
"in1": 0,
"in3": 1,
"bfly3": 1,
"cmul0": 2,
"in0": 2,
"in2": 3,
"bfly0": 3,
"bfly1": 4,
"bfly2": 0,
"out0": 5,
"out1": 1,
"out3": 2,
"out2": 3,
}
assert schedule_3.laps == {
"s4": 0,
"s6": 1,
"s5": 0,
"s7": 0,
"s8": 0,
"s12": 0,
"s10": 1,
"s9": 1,
"s0": 0,
"s2": 0,
"s11": 0,
"s1": 0,
"s3": 0,
}
assert schedule_3.schedule_time == 5
assert schedule_4.start_times == {
"in1": 0,
"in3": 1,
"bfly3": 1,
"cmul0": 2,
"in0": 2,
"in2": 3,
"bfly0": 3,
"bfly1": 0,
"out0": 1,
"bfly2": 2,
"out2": 2,
"out1": 3,
"out3": 4,
}
assert schedule_4.laps == {
"s4": 0,
"s6": 0,
"s5": 0,
"s7": 0,
"s8": 1,
"s12": 1,
"s10": 0,
"s9": 1,
"s0": 0,
"s2": 0,
"s11": 0,
"s1": 0,
"s3": 0,
}
assert schedule_4.schedule_time == 4
def test_cyclic_scheduling_time_not_provided(self):
sfg = ldlt_matrix_inverse(N=2)
sfg.set_latency_of_type(MADS.type_name(), 3)
sfg.set_latency_of_type(Reciprocal.type_name(), 2)
sfg.set_execution_time_of_type(MADS.type_name(), 1)
sfg.set_execution_time_of_type(Reciprocal.type_name(), 1)
resources = {MADS.type_name(): 1, Reciprocal.type_name(): 1}
with pytest.raises(
ValueError,
match="Scheduling time must be provided when cyclic = True.",
):
Schedule(
sfg,
scheduler=HybridScheduler(
max_resources=resources,
),
cyclic=True,
)
def test_resources_not_enough(self):
sfg = ldlt_matrix_inverse(N=3)
sfg.set_latency_of_type(MADS.type_name(), 3)
sfg.set_latency_of_type(Reciprocal.type_name(), 2)
sfg.set_execution_time_of_type(MADS.type_name(), 1)
sfg.set_execution_time_of_type(Reciprocal.type_name(), 1)
resources = {MADS.type_name(): 1, Reciprocal.type_name(): 1}
with pytest.raises(
ValueError,
match="Amount of resource: mads is not enough to realize schedule for scheduling time: 5.",
):
Schedule(
sfg,
scheduler=HybridScheduler(
max_resources=resources,
),
schedule_time=5,
)
def test_scheduling_time_not_enough(self):
sfg = ldlt_matrix_inverse(N=3)
sfg.set_latency_of_type(MADS.type_name(), 3)
sfg.set_latency_of_type(Reciprocal.type_name(), 2)
sfg.set_execution_time_of_type(MADS.type_name(), 1)
sfg.set_execution_time_of_type(Reciprocal.type_name(), 1)
resources = {MADS.type_name(): 10, Reciprocal.type_name(): 10}
with pytest.raises(
ValueError,
match="Provided scheduling time 5 cannot be reached, try to enable the cyclic property or increase the time to at least 30.",
):
Schedule(
sfg,
scheduler=HybridScheduler(
max_resources=resources,
),
schedule_time=5,
)
def test_cyclic_scheduling_write_and_read_constrained(self):
sfg = radix_2_dif_fft(points=4)
sfg.set_latency_of_type(Butterfly.type_name(), 1)
sfg.set_latency_of_type(ConstantMultiplication.type_name(), 3)
sfg.set_execution_time_of_type(Butterfly.type_name(), 1)
sfg.set_execution_time_of_type(ConstantMultiplication.type_name(), 1)
resources = {
Butterfly.type_name(): 1,
ConstantMultiplication.type_name(): 1,
}
schedule = Schedule(
sfg,
scheduler=HybridScheduler(
resources, max_concurrent_reads=2, max_concurrent_writes=2
),
schedule_time=6,
cyclic=True,
)
assert schedule.start_times == {
"in1": 0,
"in3": 1,
"bfly3": 1,
"cmul0": 2,
"in0": 3,
"in2": 4,
"bfly0": 4,
"bfly1": 5,
"bfly2": 0,
"out0": 6,
"out1": 1,
"out3": 2,
"out2": 3,
}
assert schedule.laps == {
"s4": 0,
"s6": 1,
"s5": 0,
"s7": 0,
"s8": 0,
"s12": 0,
"s10": 1,
"s9": 1,
"s0": 0,
"s2": 0,
"s11": 0,
"s1": 0,
"s3": 0,
}
assert schedule.schedule_time == 6
direct, mem_vars = schedule.get_memory_variables().split_on_length()
assert mem_vars.read_ports_bound() == 2
assert mem_vars.write_ports_bound() == 2
def test_cyclic_scheduling_several_inputs_and_outputs(self):
sfg = radix_2_dif_fft(points=4)
sfg.set_latency_of_type(Butterfly.type_name(), 1)
sfg.set_latency_of_type(ConstantMultiplication.type_name(), 3)
sfg.set_execution_time_of_type(Butterfly.type_name(), 1)
sfg.set_execution_time_of_type(ConstantMultiplication.type_name(), 1)
resources = {
Butterfly.type_name(): 1,
ConstantMultiplication.type_name(): 1,
Input.type_name(): 2,
Output.type_name(): 2,
}
schedule = Schedule(
sfg, scheduler=HybridScheduler(resources), schedule_time=4, cyclic=True
)
assert schedule.start_times == {
'in1': 0,
'in3': 0,
'bfly3': 0,
'cmul0': 1,
'in0': 1,
"in2": 1,
'bfly0': 1,
'bfly1': 2,
'out0': 3,
'out2': 3,
'bfly2': 3,
'out1': 4,
'out3': 4,
}
assert schedule.laps == {
's4': 0,
's6': 0,
's5': 0,
's7': 0,
's8': 0,
's12': 0,
's10': 1,
's9': 0,
's0': 0,
's2': 0,
's11': 0,
's1': 0,
's3': 0,
}
assert schedule.schedule_time == 4
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment