From d1b07e69130547815148a8dce57194e805093539 Mon Sep 17 00:00:00 2001 From: Simon Bjurek <simbj106@student.liu.se> Date: Tue, 25 Feb 2025 17:14:57 +0000 Subject: [PATCH] Add cyclic scheduling and improve resource --- b_asic/schedule.py | 71 +++- b_asic/scheduler.py | 328 ++++++++++-------- .../auto_scheduling_with_custom_io_times.py | 33 +- examples/ldlt_matrix_inverse.py | 1 + test/unit/test_list_schedulers.py | 4 +- 5 files changed, 276 insertions(+), 161 deletions(-) diff --git a/b_asic/schedule.py b/b_asic/schedule.py index 1d72ce0b..181c426e 100644 --- a/b_asic/schedule.py +++ b/b_asic/schedule.py @@ -117,12 +117,9 @@ class Schedule: self._start_times = start_times self._laps.update(laps) self._remove_delays_no_laps() - max_end_time = self.get_max_end_time() if not self._schedule_time: self._schedule_time = max_end_time - elif self._schedule_time < max_end_time: - raise ValueError(f"Too short schedule time. Minimum is {max_end_time}.") def __str__(self) -> str: """Return a string representation of this Schedule.""" @@ -176,11 +173,14 @@ class Schedule: max_end_time = 0 for graph_id, op_start_time in self._start_times.items(): operation = cast(Operation, self._sfg.find_by_id(graph_id)) - for outport in operation.outputs: - max_end_time = max( - max_end_time, - op_start_time + cast(int, outport.latency_offset), - ) + if graph_id.startswith("out"): + max_end_time = max(max_end_time, op_start_time) + else: + for outport in operation.outputs: + max_end_time = max( + max_end_time, + op_start_time + cast(int, outport.latency_offset), + ) return max_end_time def forward_slack(self, graph_id: GraphID) -> int: @@ -304,13 +304,16 @@ class Schedule: usage_time = start_time + cast(int, input_port.latency_offset) for signal in input_port.signals: source = cast(OutputPort, signal.source) - available_time = ( - cast(int, source.latency_offset) - + self._start_times[source.operation.graph_id] - - self._schedule_time * self._laps[signal.graph_id] - ) - if available_time > self._schedule_time: - available_time -= self._schedule_time + if source.operation.graph_id.startswith("dontcare"): + available_time = 0 + else: + available_time = ( + cast(int, source.latency_offset) + + self._start_times[source.operation.graph_id] + - self._schedule_time * self._laps[signal.graph_id] + ) + if available_time > self._schedule_time: + available_time -= self._schedule_time input_slacks[signal] = usage_time - available_time return input_slacks @@ -655,6 +658,44 @@ class Schedule: max_pos_graph_id = max(self._y_locations, key=self._y_locations.get) return self._get_y_position(max_pos_graph_id, operation_height, operation_gap) + def place_operation(self, op: Operation, time: int) -> None: + """Schedule the given operation in given time. + + Parameters + ---------- + op : Operation + Operation to schedule. + time : int + Time slot to schedule the operation in. + If time > schedule_time -> schedule cyclically. + """ + start = time % self._schedule_time if self._schedule_time else time + self._start_times[op.graph_id] = start + + if not self.schedule_time: + return + + # Update input laps + input_slacks = self._backward_slacks(op.graph_id) + for in_port, signal_slacks in input_slacks.items(): + for signal, signal_slack in signal_slacks.items(): + new_slack = signal_slack + laps = 0 + while new_slack < 0: + laps += 1 + new_slack += self._schedule_time + self._laps[signal.graph_id] = laps + + if ( + start == 0 + and isinstance(op, Output) + and self._laps[op.input(0).signals[0].graph_id] != 0 + ): + start = self._schedule_time + self._laps[op.input(0).signals[0].graph_id] -= 1 + + self._start_times[op.graph_id] = start + def move_operation(self, graph_id: GraphID, time: int) -> "Schedule": """ Move an operation in the schedule. diff --git a/b_asic/scheduler.py b/b_asic/scheduler.py index 311c0634..74a892c0 100644 --- a/b_asic/scheduler.py +++ b/b_asic/scheduler.py @@ -1,6 +1,7 @@ import copy import sys from abc import ABC, abstractmethod +from math import ceil from typing import TYPE_CHECKING, Optional, cast from b_asic.core_operations import DontCare @@ -11,7 +12,6 @@ from b_asic.types import TypeName if TYPE_CHECKING: from b_asic.operation import Operation from b_asic.schedule import Schedule - from b_asic.signal_flow_graph import SFG from b_asic.types import GraphID @@ -58,6 +58,7 @@ class ASAPScheduler(Scheduler): schedule : Schedule Schedule to apply the scheduling algorithm on. """ + prec_list = schedule.sfg.get_precedence_list() if len(prec_list) < 2: raise ValueError("Empty signal flow graph cannot be scheduled.") @@ -120,6 +121,13 @@ class ASAPScheduler(Scheduler): self._handle_outputs(schedule, non_schedulable_ops) schedule.remove_delays() + max_end_time = schedule.get_max_end_time() + + if schedule.schedule_time is None: + schedule.set_schedule_time(max_end_time) + elif schedule.schedule_time < max_end_time: + raise ValueError(f"Too short schedule time. Minimum is {max_end_time}.") + schedule.sort_y_locations_on_start_times() @@ -135,12 +143,6 @@ class ALAPScheduler(Scheduler): Schedule to apply the scheduling algorithm on. """ ASAPScheduler().apply_scheduling(schedule) - max_end_time = schedule.get_max_end_time() - - if schedule.schedule_time is None: - schedule.set_schedule_time(max_end_time) - elif schedule.schedule_time < max_end_time: - raise ValueError(f"Too short schedule time. Minimum is {max_end_time}.") # move all outputs ALAP before operations for output in schedule.sfg.find_by_type_name(Output.type_name()): @@ -157,6 +159,9 @@ class ALAPScheduler(Scheduler): class ListScheduler(Scheduler, ABC): + + TIME_OUT_COUNTER_LIMIT = 100 + def __init__( self, max_resources: Optional[dict[TypeName, int]] = None, @@ -179,6 +184,11 @@ class ListScheduler(Scheduler, ABC): else: self._max_resources = {} + if Input.type_name() not in self._max_resources: + self._max_resources[Input.type_name()] = 1 + if Output.type_name() not in self._max_resources: + self._max_resources[Output.type_name()] = 1 + self._max_concurrent_reads = max_concurrent_reads or sys.maxsize self._max_concurrent_writes = max_concurrent_writes or sys.maxsize @@ -198,127 +208,138 @@ class ListScheduler(Scheduler, ABC): schedule : Schedule Schedule to apply the scheduling algorithm on. """ - sfg = schedule.sfg + self._schedule = schedule + self._sfg = schedule.sfg - alap_schedule = copy.copy(schedule) + if self._schedule.cyclic and self._schedule.schedule_time is None: + raise ValueError("Scheduling time must be provided when cyclic = True.") + + for resource_type, resource_amount in self._max_resources.items(): + total_exec_time = sum( + [op.execution_time for op in self._sfg.find_by_type_name(resource_type)] + ) + if self._schedule.schedule_time is not None: + resource_lower_bound = ceil( + total_exec_time / self._schedule.schedule_time + ) + if resource_amount < resource_lower_bound: + raise ValueError( + f"Amount of resource: {resource_type} is not enough to " + f"realize schedule for scheduling time: {self._schedule.schedule_time}" + ) + + alap_schedule = copy.copy(self._schedule) + alap_schedule._schedule_time = None ALAPScheduler().apply_scheduling(alap_schedule) alap_start_times = alap_schedule.start_times - schedule.start_times = {} - - used_resources_ready_times = {} - remaining_resources = self._max_resources.copy() - if Input.type_name() not in remaining_resources: - remaining_resources[Input.type_name()] = 1 - if Output.type_name() not in remaining_resources: - remaining_resources[Output.type_name()] = 1 - - remaining_ops = ( - sfg.operations - + sfg.find_by_type_name(Input.type_name()) - + sfg.find_by_type_name(Output.type_name()) - ) + self._schedule.start_times = {} + + if not self._schedule.cyclic and self._schedule.schedule_time: + if alap_schedule.schedule_time > self._schedule.schedule_time: + raise ValueError( + f"Provided scheduling time {schedule.schedule_time} cannot be reached, " + "try to enable the cyclic property or increase the time to at least " + f"{alap_schedule.schedule_time}." + ) + + self._remaining_resources = self._max_resources.copy() + + remaining_ops = self._sfg.operations remaining_ops = [op.graph_id for op in remaining_ops] - schedule.start_times = {} - remaining_reads = self._max_concurrent_reads + self._schedule.start_times = {} + self.remaining_reads = self._max_concurrent_reads + + self._current_time = 0 + self._time_out_counter = 0 + self._op_laps = {} # initial input placement if self._input_times: for input_id in self._input_times: - schedule.start_times[input_id] = self._input_times[input_id] + self._schedule.start_times[input_id] = self._input_times[input_id] + self._op_laps[input_id] = 0 remaining_ops = [ elem for elem in remaining_ops if not elem.startswith("in") ] remaining_ops = [op for op in remaining_ops if not op.startswith("dontcare")] remaining_ops = [op for op in remaining_ops if not op.startswith("t")] + remaining_ops = [ + op + for op in remaining_ops + if not (op.startswith("out") and op in self._output_delta_times) + ] - current_time = 0 - time_out_counter = 0 while remaining_ops: ready_ops_priority_table = self._get_ready_ops_priority_table( - sfg, - schedule.start_times, - current_time, alap_start_times, remaining_ops, - remaining_resources, - remaining_reads, ) while ready_ops_priority_table: - next_op = sfg.find_by_id(self._get_next_op_id(ready_ops_priority_table)) - - if next_op.type_name() in remaining_resources: - remaining_resources[next_op.type_name()] -= 1 - if ( - next_op.type_name() == Input.type_name() - or next_op.type_name() == Output.type_name() - ): - used_resources_ready_times[next_op] = current_time + 1 - else: - used_resources_ready_times[next_op] = ( - current_time + next_op.execution_time - ) - remaining_reads -= next_op.input_count + next_op = self._sfg.find_by_id( + self._get_next_op_id(ready_ops_priority_table) + ) + + self.remaining_reads -= next_op.input_count remaining_ops = [ op_id for op_id in remaining_ops if op_id != next_op.graph_id ] - schedule.start_times[next_op.graph_id] = current_time + + self._time_out_counter = 0 + self._schedule.place_operation(next_op, self._current_time) + self._op_laps[next_op.graph_id] = ( + (self._current_time) // self._schedule.schedule_time + if self._schedule.schedule_time + else 0 + ) + if not self._schedule.cyclic and self._schedule.schedule_time: + if self._current_time > self._schedule.schedule_time: + raise ValueError( + f"Provided scheduling time {schedule.schedule_time} cannot be reached, " + "try to enable the cyclic property or increase the time." + ) ready_ops_priority_table = self._get_ready_ops_priority_table( - sfg, - schedule.start_times, - current_time, alap_start_times, remaining_ops, - remaining_resources, - remaining_reads, ) - current_time += 1 - time_out_counter += 1 - if time_out_counter >= 100: - raise TimeoutError( - "Algorithm did not schedule any operation for 10 time steps, " - "try relaxing constraints." - ) + self._go_to_next_time_step() ready_ops_priority_table = self._get_ready_ops_priority_table( - sfg, - schedule.start_times, - current_time, alap_start_times, remaining_ops, - remaining_resources, - remaining_reads, ) - # update available reads and operators - remaining_reads = self._max_concurrent_reads - for operation, ready_time in used_resources_ready_times.items(): - if ready_time == current_time: - remaining_resources[operation.type_name()] += 1 - current_time -= 1 + self.remaining_reads = self._max_concurrent_reads - self._handle_outputs(schedule) + self._current_time -= 1 - if not schedule.cyclic: - max_start_time = max(schedule.start_times.values()) - if current_time < max_start_time: - current_time = max_start_time - current_time = max(current_time, schedule.get_max_end_time()) - schedule.set_schedule_time(current_time) + self._handle_outputs() - schedule.remove_delays() + if self._schedule.schedule_time is None: + self._schedule.set_schedule_time(self._schedule.get_max_end_time()) + + self._schedule.remove_delays() # schedule all dont cares ALAP - for dc_op in sfg.find_by_type_name(DontCare.type_name()): + for dc_op in self._sfg.find_by_type_name(DontCare.type_name()): dc_op = cast(DontCare, dc_op) - schedule.start_times[dc_op.graph_id] = 0 - schedule.move_operation_alap(dc_op.graph_id) + self._schedule.start_times[dc_op.graph_id] = 0 + self._schedule.move_operation_alap(dc_op.graph_id) - schedule.sort_y_locations_on_start_times() + self._schedule.sort_y_locations_on_start_times() + + def _go_to_next_time_step(self): + self._time_out_counter += 1 + if self._time_out_counter >= self.TIME_OUT_COUNTER_LIMIT: + raise TimeoutError( + "Algorithm did not manage to schedule any operation for 10 time steps, " + "try relaxing the constraints." + ) + self._current_time += 1 def _get_next_op_id( self, ready_ops_priority_table: list[tuple["GraphID", int, ...]] @@ -334,35 +355,18 @@ class ListScheduler(Scheduler, ABC): def _get_ready_ops_priority_table( self, - sfg: "SFG", - start_times: dict["GraphID", int], - current_time: int, alap_start_times: dict["GraphID", int], remaining_ops: list["GraphID"], - remaining_resources: dict["GraphID", int], - remaining_reads: int, ) -> list[tuple["GraphID", int, int, int]]: - ready_ops = [ op_id for op_id in remaining_ops - if self._op_is_schedulable( - start_times, - sfg, - sfg.find_by_id(op_id), - current_time, - remaining_resources, - remaining_reads, - self._max_concurrent_writes, - remaining_ops, - ) + if self._op_is_schedulable(self._sfg.find_by_id(op_id), remaining_ops) ] - deadlines = self._calculate_deadlines(sfg, alap_start_times) - output_slacks = self._calculate_alap_output_slacks( - current_time, alap_start_times - ) - fan_outs = self._calculate_fan_outs(sfg, alap_start_times) + deadlines = self._calculate_deadlines(alap_start_times) + output_slacks = self._calculate_alap_output_slacks(alap_start_times) + fan_outs = self._calculate_fan_outs(alap_start_times) ready_ops_priority_table = [] for op_id in ready_ops: @@ -372,58 +376,68 @@ class ListScheduler(Scheduler, ABC): return ready_ops_priority_table def _calculate_deadlines( - self, sfg, alap_start_times: dict["GraphID", int] + self, alap_start_times: dict["GraphID", int] ) -> dict["GraphID", int]: return { - op_id: start_time + sfg.find_by_id(op_id).latency + op_id: start_time + self._sfg.find_by_id(op_id).latency for op_id, start_time in alap_start_times.items() } def _calculate_alap_output_slacks( - self, current_time: int, alap_start_times: dict["GraphID", int] + self, alap_start_times: dict["GraphID", int] ) -> dict["GraphID", int]: return { - op_id: start_time - current_time + op_id: start_time - self._current_time for op_id, start_time in alap_start_times.items() } def _calculate_fan_outs( - self, sfg: "SFG", alap_start_times: dict["GraphID", int] + self, alap_start_times: dict["GraphID", int] ) -> dict["GraphID", int]: return { - op_id: len(sfg.find_by_id(op_id).output_signals) + op_id: len(self._sfg.find_by_id(op_id).output_signals) for op_id, start_time in alap_start_times.items() } - @staticmethod + def _op_satisfies_resource_constraints(self, op: "Operation") -> bool: + if self._schedule.schedule_time is not None: + time_slot = self._current_time % self._schedule.schedule_time + else: + time_slot = self._current_time + + count = 0 + for op_id, start_time in self._schedule.start_times.items(): + if self._schedule.schedule_time is not None: + start_time = start_time % self._schedule.schedule_time + + if time_slot >= start_time: + if time_slot < start_time + max( + self._sfg.find_by_id(op_id).execution_time, 1 + ): + if op_id.startswith(op.type_name()): + if op.graph_id != op_id: + count += 1 + + return count < self._remaining_resources[op.type_name()] + def _op_is_schedulable( - start_times: dict["GraphID"], - sfg: "SFG", - op: "Operation", - current_time: int, - remaining_resources: dict["GraphID", int], - remaining_reads: int, - max_concurrent_writes: int, - remaining_ops: list["GraphID"], + self, op: "Operation", remaining_ops: list["GraphID"] ) -> bool: - if ( - op.type_name() in remaining_resources - and remaining_resources[op.type_name()] == 0 - ): + if not self._op_satisfies_resource_constraints(op): return False - op_finish_time = current_time + op.latency + op_finish_time = self._current_time + op.latency future_ops = [ - sfg.find_by_id(item[0]) - for item in start_times.items() - if item[1] + sfg.find_by_id(item[0]).latency == op_finish_time + self._sfg.find_by_id(item[0]) + for item in self._schedule.start_times.items() + if item[1] + self._sfg.find_by_id(item[0]).latency == op_finish_time ] future_ops_writes = sum([op.input_count for op in future_ops]) if ( not op.graph_id.startswith("out") - and future_ops_writes >= max_concurrent_writes + and future_ops_writes >= self._max_concurrent_writes ): return False @@ -439,32 +453,58 @@ class ListScheduler(Scheduler, ABC): if source_op_graph_id in remaining_ops: return False - if start_times[source_op_graph_id] != current_time - 1: + if self._schedule.start_times[source_op_graph_id] != self._current_time - 1: # not a direct connection -> memory read required read_counter += 1 - if read_counter > remaining_reads: + if read_counter > self.remaining_reads: return False - proceeding_op_start_time = start_times.get(source_op_graph_id) - proceeding_op_finish_time = proceeding_op_start_time + source_op.latency + if self._schedule.schedule_time is not None: + proceeding_op_start_time = ( + self._schedule.start_times.get(source_op_graph_id) + + self._op_laps[source_op.graph_id] * self._schedule.schedule_time + ) + proceeding_op_finish_time = proceeding_op_start_time + source_op.latency + else: + proceeding_op_start_time = self._schedule.start_times.get( + source_op_graph_id + ) + proceeding_op_finish_time = proceeding_op_start_time + source_op.latency earliest_start_time = max(earliest_start_time, proceeding_op_finish_time) - return earliest_start_time <= current_time - - def _handle_outputs( - self, schedule: "Schedule", non_schedulable_ops: Optional[list["GraphID"]] = [] - ) -> None: - schedule.set_schedule_time(schedule.get_max_end_time()) + return earliest_start_time <= self._current_time - for output in schedule.sfg.find_by_type_name(Output.type_name()): + def _handle_outputs(self) -> None: + if self._schedule.cyclic: + end = self._schedule.schedule_time + else: + end = self._schedule.get_max_end_time() + for output in self._sfg.find_by_type_name(Output.type_name()): output = cast(Output, output) if output.graph_id in self._output_delta_times: delta_time = self._output_delta_times[output.graph_id] - if schedule.cyclic: - schedule.start_times[output.graph_id] = schedule.schedule_time - schedule.move_operation(output.graph_id, delta_time) + + new_time = end + delta_time + + if self._schedule.cyclic and self._schedule.schedule_time is not None: + self._schedule.place_operation(output, new_time) else: - schedule.start_times[output.graph_id] = ( - schedule.schedule_time + delta_time + self._schedule.start_times[output.graph_id] = new_time + + count = -1 + for op_id, time in self._schedule.start_times.items(): + if time == new_time and op_id.startswith("out"): + count += 1 + + self._remaining_resources = self._max_resources + self._remaining_resources[Output.type_name()] -= count + + self._current_time = new_time + if not self._op_is_schedulable(output, {}): + raise ValueError( + "Cannot schedule outputs according to the provided output_delta_times. " + f"Failed output: {output.graph_id}, " + f"at time: { self._schedule.start_times[output.graph_id]}, " + "try relaxing the constraints." ) diff --git a/examples/auto_scheduling_with_custom_io_times.py b/examples/auto_scheduling_with_custom_io_times.py index b99f5b04..2064b9ec 100644 --- a/examples/auto_scheduling_with_custom_io_times.py +++ b/examples/auto_scheduling_with_custom_io_times.py @@ -34,7 +34,7 @@ schedule.show() # Generate a non-cyclic Schedule from HybridScheduler with custom IO times. resources = {Butterfly.type_name(): 1, ConstantMultiplication.type_name(): 1} input_times = {f"in{i}": i for i in range(points)} -output_delta_times = {f"out{i}": i - 2 for i in range(points)} +output_delta_times = {f"out{i}": i for i in range(points)} schedule = Schedule( sfg, scheduler=HybridScheduler( @@ -55,6 +55,37 @@ schedule = Schedule( input_times=input_times, output_delta_times=output_delta_times, ), + schedule_time=14, + cyclic=True, +) +schedule.show() + +# %% +# Generate a new Schedule with even less scheduling time +output_delta_times = {f"out{i}": i + 1 for i in range(points)} +schedule = Schedule( + sfg, + scheduler=HybridScheduler( + resources, + input_times=input_times, + output_delta_times=output_delta_times, + ), + schedule_time=13, + cyclic=True, +) +schedule.show() + +# %% +# Try scheduling for 12 cycles, which gives full butterfly usage +output_delta_times = {f"out{i}": i + 2 for i in range(points)} +schedule = Schedule( + sfg, + scheduler=HybridScheduler( + resources, + input_times=input_times, + output_delta_times=output_delta_times, + ), + schedule_time=12, cyclic=True, ) schedule.show() diff --git a/examples/ldlt_matrix_inverse.py b/examples/ldlt_matrix_inverse.py index ffe74c83..a6525b2a 100644 --- a/examples/ldlt_matrix_inverse.py +++ b/examples/ldlt_matrix_inverse.py @@ -86,6 +86,7 @@ schedule = Schedule( scheduler=HybridScheduler( resources, input_times=input_times, output_delta_times=output_delta_times ), + schedule_time=32, cyclic=True, ) print("Scheduling time:", schedule.schedule_time) diff --git a/test/unit/test_list_schedulers.py b/test/unit/test_list_schedulers.py index 136e285a..098e20bd 100644 --- a/test/unit/test_list_schedulers.py +++ b/test/unit/test_list_schedulers.py @@ -635,6 +635,7 @@ class TestHybridScheduler: scheduler=HybridScheduler( resources, input_times=input_times, output_delta_times=output_times ), + schedule_time=20, cyclic=True, ) @@ -810,6 +811,7 @@ class TestHybridScheduler: scheduler=HybridScheduler( resources, input_times=input_times, output_delta_times=output_times ), + schedule_time=16, cyclic=True, ) @@ -895,7 +897,7 @@ class TestHybridScheduler: resources = {MADS.type_name(): 1, Reciprocal.type_name(): 1} with pytest.raises( TimeoutError, - match="Algorithm did not schedule any operation for 10 time steps, try relaxing constraints.", + match="Algorithm did not manage to schedule any operation for 10 time steps, try relaxing the constraints.", ): Schedule( sfg, -- GitLab