From be377261569416c24d6ae582f38867fff2f09030 Mon Sep 17 00:00:00 2001 From: Simon Bjurek <simbj106@student.liu.se> Date: Thu, 13 Mar 2025 15:35:21 +0000 Subject: [PATCH] Added RecursiveListScheduler and other small improvements/fixes --- b_asic/schedule.py | 52 ++- b_asic/scheduler.py | 344 +++++++++++---- b_asic/scheduler_gui/main_window.py | 2 +- b_asic/scheduler_gui/scheduler_item.py | 2 +- .../auto_scheduling_with_custom_io_times.py | 15 - examples/latency_offset_scheduling.py | 51 +-- test/unit/test_list_schedulers.py | 404 ++++++++---------- 7 files changed, 465 insertions(+), 405 deletions(-) diff --git a/b_asic/schedule.py b/b_asic/schedule.py index 7658fdb3..0fc22917 100644 --- a/b_asic/schedule.py +++ b/b_asic/schedule.py @@ -184,7 +184,7 @@ class Schedule: ) if self.backward_slack(graph_id) < 0: raise ValueError( - f"Negative backward forward slack detected in Schedule for operation: {graph_id}, " + f"Negative backward slack detected in Schedule for operation: {graph_id}, " f"slack: {self.backward_slack(graph_id)}" ) if time > self._schedule_time and not graph_id.startswith("dontcare"): @@ -739,7 +739,9 @@ class Schedule: max_pos_graph_id = max(self._y_locations, key=self._y_locations.get) return self._get_y_position(max_pos_graph_id, operation_height, operation_gap) - def place_operation(self, op: Operation, time: int) -> None: + def place_operation( + self, op: Operation, time: int, op_laps: dict[GraphID, int] + ) -> None: """Schedule the given operation in given time. Parameters @@ -749,6 +751,8 @@ class Schedule: time : int Time slot to schedule the operation in. If time > schedule_time -> schedule cyclically. + op_laps : dict[GraphID, int] + Laps of all scheduled operations. """ start = time % self._schedule_time if self._schedule_time else time self._start_times[op.graph_id] = start @@ -756,16 +760,26 @@ class Schedule: if not self.schedule_time: return - # Update input laps - input_slacks = self._backward_slacks(op.graph_id) - for in_port, signal_slacks in input_slacks.items(): - for signal, signal_slack in signal_slacks.items(): - new_slack = signal_slack - laps = 0 - while new_slack < 0: - laps += 1 - new_slack += self._schedule_time - self._laps[signal.graph_id] = laps + # update input laps + for input_port in op.inputs: + laps = 0 + if self._schedule_time is not None: + current_lap = time // self._schedule_time + source_port = source_op = input_port.signals[0].source + source_op = source_port.operation + + if not isinstance(source_op, Delay) and not isinstance( + source_op, DontCare + ): + if op_laps[source_op.graph_id] < current_lap: + laps += current_lap - op_laps[source_op.graph_id] + source_available_time = ( + self._start_times[source_op.graph_id] + + source_op.latency_offsets[f"out{source_port.index}"] + ) + if source_available_time > self.schedule_time: + laps -= 1 + self._laps[input_port.signals[0].graph_id] = laps if ( start == 0 @@ -875,13 +889,11 @@ class Schedule: ): new_start = self._schedule_time self._laps[op.input(0).signals[0].graph_id] -= 1 - if ( - new_start == 0 - and isinstance(op, Input) - and self._laps[op.output(0).signals[0].graph_id] != 0 - ): + if new_start == 0 and isinstance(op, Input): new_start = 0 - self._laps[op.output(0).signals[0].graph_id] -= 1 + for signal in op.output(0).signals: + if self._laps[signal.graph_id] != 0: + self._laps[signal.graph_id] -= 1 # Set new start time self._start_times[graph_id] = new_start return self @@ -975,9 +987,7 @@ class Schedule: source_port_start_time + source_port_latency_offset > self._schedule_time ): - lap += ( - source_port_start_time + source_port_latency_offset - ) // self._schedule_time + lap += 1 destination_laps.append((port.operation.graph_id, port.index, lap)) diff --git a/b_asic/scheduler.py b/b_asic/scheduler.py index 745be1a9..86e16798 100644 --- a/b_asic/scheduler.py +++ b/b_asic/scheduler.py @@ -264,57 +264,26 @@ class ListScheduler(Scheduler): schedule : Schedule Schedule to apply the scheduling algorithm on. """ + self._logger.debug("--- Scheduler initializing ---") self._initialize_scheduler(schedule) if self._input_times: self._place_inputs_on_given_times() - self._logger.debug("--- Operation scheduling starting ---") - while self._remaining_ops: - ready_ops_priority_table = self._get_ready_ops_priority_table() - while ready_ops_priority_table: - next_op = self._sfg.find_by_id( - self._get_next_op_id(ready_ops_priority_table) - ) - - self._update_port_reads(next_op) - - self._remaining_ops = [ - op_id for op_id in self._remaining_ops if op_id != next_op.graph_id - ] - - self._schedule.place_operation(next_op, self._current_time) - self._op_laps[next_op.graph_id] = ( - (self._current_time) // self._schedule.schedule_time - if self._schedule.schedule_time - else 0 - ) - - self._log_scheduled_op(next_op) - - ready_ops_priority_table = self._get_ready_ops_priority_table() - - self._current_time += 1 - - self._logger.debug("--- Operation scheduling completed ---") - - self._current_time -= 1 + self._schedule_nonrecursive_ops() if self._output_delta_times: self._handle_outputs() if self._schedule.schedule_time is None: self._schedule.set_schedule_time(self._schedule.get_max_end_time()) - self._schedule.remove_delays() - self._handle_dont_cares() - self._schedule.sort_y_locations_on_start_times() self._logger.debug("--- Scheduling completed ---") def _get_next_op_id( - self, ready_ops_priority_table: list[tuple["GraphID", int, ...]] + self, priority_table: list[tuple["GraphID", int, ...]] ) -> "GraphID": def sort_key(item): return tuple( @@ -322,10 +291,10 @@ class ListScheduler(Scheduler): for index, asc in self._sort_order ) - sorted_table = sorted(ready_ops_priority_table, key=sort_key) + sorted_table = sorted(priority_table, key=sort_key) return sorted_table[0][0] - def _get_ready_ops_priority_table(self) -> list[tuple["GraphID", int, int, int]]: + def _get_priority_table(self) -> list[tuple["GraphID", int, int, int]]: ready_ops = [ op_id for op_id in self._remaining_ops @@ -344,11 +313,9 @@ class ListScheduler(Scheduler): for op_id in ready_ops ] - def _calculate_deadlines( - self, alap_start_times: dict["GraphID", int] - ) -> dict["GraphID", int]: + def _calculate_deadlines(self) -> dict["GraphID", int]: deadlines = {} - for op_id, start_time in alap_start_times.items(): + for op_id, start_time in self._alap_start_times.items(): output_offsets = [ pair[1] for pair in self._cached_latency_offsets[op_id].items() @@ -359,17 +326,15 @@ class ListScheduler(Scheduler): ) return deadlines - def _calculate_alap_output_slacks( - self, alap_start_times: dict["GraphID", int] - ) -> dict["GraphID", int]: - return {op_id: start_time for op_id, start_time in alap_start_times.items()} + def _calculate_alap_output_slacks(self) -> dict["GraphID", int]: + return { + op_id: start_time for op_id, start_time in self._alap_start_times.items() + } - def _calculate_fan_outs( - self, alap_start_times: dict["GraphID", int] - ) -> dict["GraphID", int]: + def _calculate_fan_outs(self) -> dict["GraphID", int]: return { op_id: len(self._sfg.find_by_id(op_id).output_signals) - for op_id, start_time in alap_start_times.items() + for op_id in self._alap_start_times.keys() } def _calculate_memory_reads( @@ -393,23 +358,27 @@ class ListScheduler(Scheduler): op_reads[op_id] = reads return op_reads - def _op_satisfies_resource_constraints(self, op: "Operation") -> bool: - if self._schedule.schedule_time is not None: - time_slot = self._current_time % self._schedule.schedule_time - else: - time_slot = self._current_time - + def _execution_times_in_time(self, op: "Operation", time: int) -> int: count = 0 - for op_id, start_time in self._schedule.start_times.items(): + for other_op_id, start_time in self._schedule.start_times.items(): if self._schedule.schedule_time is not None: start_time = start_time % self._schedule.schedule_time - if time_slot >= start_time: - if time_slot < start_time + max(self._cached_execution_times[op_id], 1): - if op_id.startswith(op.type_name()): - if op.graph_id != op_id: + if time >= start_time: + if time < start_time + max( + self._cached_execution_times[other_op_id], 1 + ): + if other_op_id.startswith(op.type_name()): + if other_op_id != op.graph_id: count += 1 + return count + def _op_satisfies_resource_constraints(self, op: "Operation") -> bool: + if self._schedule.schedule_time is not None: + time_slot = self._current_time % self._schedule.schedule_time + else: + time_slot = self._current_time + count = self._execution_times_in_time(op, time_slot) return count < self._remaining_resources[op.type_name()] def _op_satisfies_concurrent_writes(self, op: "Operation") -> bool: @@ -484,13 +453,9 @@ class ListScheduler(Scheduler): return True def _op_satisfies_data_dependencies(self, op: "Operation") -> bool: - for input_port_index, op_input in enumerate(op.inputs): - source_port = source_op = op_input.signals[0].source + for op_input in op.inputs: + source_port = op_input.signals[0].source source_op = source_port.operation - for i, port in enumerate(source_op.outputs): - if port == source_port: - source_port_index = i - break if isinstance(source_op, Delay) or isinstance(source_op, DontCare): continue @@ -505,20 +470,20 @@ class ListScheduler(Scheduler): self._schedule.start_times.get(source_op_graph_id) + self._op_laps[source_op.graph_id] * self._schedule.schedule_time + self._cached_latency_offsets[source_op.graph_id][ - f"out{source_port_index}" + f"out{source_port.index}" ] ) else: available_time = ( self._schedule.start_times.get(source_op_graph_id) + self._cached_latency_offsets[source_op.graph_id][ - f"out{source_port_index}" + f"out{source_port.index}" ] ) required_time = ( self._current_time - + self._cached_latency_offsets[op.graph_id][f"in{input_port_index}"] + + self._cached_latency_offsets[op.graph_id][f"in{op_input.index}"] ) if available_time > required_time: return False @@ -526,17 +491,15 @@ class ListScheduler(Scheduler): def _op_is_schedulable(self, op: "Operation") -> bool: return ( - self._op_satisfies_data_dependencies(op) - and self._op_satisfies_resource_constraints(op) + self._op_satisfies_resource_constraints(op) + and self._op_satisfies_data_dependencies(op) and self._op_satisfies_concurrent_writes(op) and self._op_satisfies_concurrent_reads(op) ) def _initialize_scheduler(self, schedule: "Schedule") -> None: - self._logger.debug("--- Scheduler initializing ---") - self._schedule = schedule - self._sfg = schedule.sfg + self._sfg = schedule._sfg for resource_type in self._max_resources.keys(): if not self._sfg.find_by_type_name(resource_type): @@ -588,7 +551,7 @@ class ListScheduler(Scheduler): alap_schedule = copy.copy(self._schedule) alap_schedule._schedule_time = None ALAPScheduler().apply_scheduling(alap_schedule) - alap_start_times = alap_schedule.start_times + self._alap_start_times = alap_schedule.start_times self._schedule.start_times = {} for key in self._schedule._laps.keys(): self._schedule._laps[key] = 0 @@ -615,9 +578,9 @@ class ListScheduler(Scheduler): for op_id in self._remaining_ops } - self._deadlines = self._calculate_deadlines(alap_start_times) - self._output_slacks = self._calculate_alap_output_slacks(alap_start_times) - self._fan_outs = self._calculate_fan_outs(alap_start_times) + self._deadlines = self._calculate_deadlines() + self._output_slacks = self._calculate_alap_output_slacks() + self._fan_outs = self._calculate_fan_outs() self._schedule.start_times = {} self._used_reads = {0: 0} @@ -637,6 +600,36 @@ class ListScheduler(Scheduler): if not (op.startswith("out") and op in self._output_delta_times) ] + def _schedule_nonrecursive_ops(self) -> None: + self._logger.debug("--- Non-Recursive Operation scheduling starting ---") + while self._remaining_ops: + prio_table = self._get_priority_table() + while prio_table: + next_op = self._sfg.find_by_id(self._get_next_op_id(prio_table)) + + self._update_port_reads(next_op) + + self._remaining_ops = [ + op_id for op_id in self._remaining_ops if op_id != next_op.graph_id + ] + + self._schedule.place_operation( + next_op, self._current_time, self._op_laps + ) + self._op_laps[next_op.graph_id] = ( + (self._current_time) // self._schedule.schedule_time + if self._schedule.schedule_time + else 0 + ) + + self._log_scheduled_op(next_op) + + prio_table = self._get_priority_table() + + self._current_time += 1 + self._current_time -= 1 + self._logger.debug("--- Non-Recursive Operation scheduling completed ---") + def _log_scheduled_op(self, next_op: "Operation") -> None: if self._schedule.schedule_time is not None: self._logger.debug(f" Op: {next_op.graph_id}, time: {self._current_time}") @@ -690,7 +683,7 @@ class ListScheduler(Scheduler): new_time = end + delta_time if self._schedule._cyclic and self._schedule.schedule_time is not None: - self._schedule.place_operation(output, new_time) + self._schedule.place_operation(output, new_time, self._op_laps) else: self._schedule.start_times[output.graph_id] = new_time @@ -712,7 +705,7 @@ class ListScheduler(Scheduler): self._schedule.backward_slack(op.graph_id) for op in self._sfg.find_by_type_name(Output.type_name()) ) - if min_slack > 0: + if min_slack != 0: for output in self._sfg.find_by_type_name(Output.type_name()): if self._schedule._cyclic and self._schedule.schedule_time is not None: self._schedule.move_operation(output.graph_id, -min_slack) @@ -741,5 +734,196 @@ class ListScheduler(Scheduler): for dc_op in self._sfg.find_by_type_name(DontCare.type_name()): self._schedule.start_times[dc_op.graph_id] = 0 self._schedule.place_operation( - dc_op, self._schedule.forward_slack(dc_op.graph_id) + dc_op, self._schedule.forward_slack(dc_op.graph_id), self._op_laps ) + + +class RecursiveListScheduler(ListScheduler): + def __init__( + self, + sort_order: tuple[tuple[int, bool], ...], + max_resources: dict[TypeName, int] | None = None, + max_concurrent_reads: int | None = None, + max_concurrent_writes: int | None = None, + input_times: dict["GraphID", int] | None = None, + output_delta_times: dict["GraphID", int] | None = None, + ) -> None: + super().__init__( + sort_order=sort_order, + max_resources=max_resources, + max_concurrent_reads=max_concurrent_reads, + max_concurrent_writes=max_concurrent_writes, + input_times=input_times, + output_delta_times=output_delta_times, + ) + + def apply_scheduling(self, schedule: "Schedule") -> None: + self._logger.debug("--- Scheduler initializing ---") + self._initialize_scheduler(schedule) + + if self._input_times: + self._place_inputs_on_given_times() + + loops = self._sfg.loops + if loops: + self._schedule_recursive_ops(loops) + + self._schedule_nonrecursive_ops() + + if self._output_delta_times: + self._handle_outputs() + + if self._schedule.schedule_time is None: + self._schedule.set_schedule_time(self._schedule.get_max_end_time()) + self._schedule.remove_delays() + self._handle_dont_cares() + self._schedule.sort_y_locations_on_start_times() + self._logger.debug("--- Scheduling completed ---") + + def _get_recursive_ops(self, loops: list[list["GraphID"]]) -> list["GraphID"]: + recursive_ops = [] + seen = [] + for loop in loops: + for op_id in loop: + if op_id not in seen: + if not isinstance(self._sfg.find_by_id(op_id), Delay): + recursive_ops.append(op_id) + seen.append(op_id) + return recursive_ops + + def _recursive_op_satisfies_data_dependencies(self, op: "Operation") -> bool: + for input_port_index, op_input in enumerate(op.inputs): + source_port = source_op = op_input.signals[0].source + source_op = source_port.operation + if isinstance(source_op, Delay) or isinstance(source_op, DontCare): + continue + if ( + source_op.graph_id in self._recursive_ops + and source_op.graph_id in self._remaining_ops + ): + return False + return True + + def _get_recursive_priority_table(self): + ready_ops = [ + op_id + for op_id in self._remaining_recursive_ops + if self._recursive_op_satisfies_data_dependencies( + self._sfg.find_by_id(op_id) + ) + ] + return [(op_id, self._deadlines[op_id]) for op_id in ready_ops] + + def _schedule_recursive_ops(self, loops: list[list["GraphID"]]) -> None: + saved_sched_time = self._schedule.schedule_time + self._schedule._schedule_time = None + + self._logger.debug("--- Scheduling of recursive loops starting ---") + self._recursive_ops = self._get_recursive_ops(loops) + self._remaining_recursive_ops = self._recursive_ops.copy() + prio_table = self._get_recursive_priority_table() + while prio_table: + op = self._get_next_recursive_op(prio_table) + op_sched_time = 0 + for input_port in op.inputs: + source_port = input_port.signals[0].source + source_op = source_port.operation + if isinstance(source_op, Delay): + continue + source_start_time = self._schedule.start_times.get(source_op.graph_id) + if source_start_time is None: + continue + source_latency = self._cached_latency_offsets[source_op.graph_id][ + f"out{source_port.index}" + ] + op_sched_time = max(op_sched_time, source_start_time + source_latency) + + exec_count = self._execution_times_in_time(op, op_sched_time) + while exec_count >= self._remaining_resources[op.type_name()]: + op_sched_time += 1 + exec_count = self._execution_times_in_time(op, op_sched_time) + + self._schedule.place_operation(op, op_sched_time, self._op_laps) + self._op_laps[op.graph_id] = 0 + self._logger.debug(f" Op: {op.graph_id} time: {op_sched_time}") + self._remaining_recursive_ops.remove(op.graph_id) + self._remaining_ops.remove(op.graph_id) + prio_table = self._get_recursive_priority_table() + + self._schedule._schedule_time = self._schedule.get_max_end_time() + if ( + saved_sched_time is not None + and saved_sched_time < self._schedule.schedule_time + ): + raise ValueError( + f"Requested schedule time {saved_sched_time} cannot be reached, increase to {self._schedule.schedule_time} or assign more resources." + ) + self._logger.debug("--- Scheduling of recursive loops completed ---") + + def _get_next_recursive_op( + self, priority_table: list[tuple["GraphID", int, ...]] + ) -> "GraphID": + sorted_table = sorted(priority_table, key=lambda row: row[1]) + return self._sfg.find_by_id(sorted_table[0][0]) + + def _pipeline_input_to_recursive_sections(self) -> None: + for op_id in self._recursive_ops: + op = self._sfg.find_by_id(op_id) + for input_port in op.inputs: + signal = input_port.signals[0] + source_op = signal.source.operation + if ( + not isinstance(source_op, Delay) + and source_op.graph_id not in self._recursive_ops + ): + # non-recursive to recursive edge found -> pipeline + self._schedule.laps[signal.graph_id] += 1 + + def _op_satisfies_data_dependencies(self, op: "Operation") -> bool: + for output_port in op.outputs: + destination_port = output_port.signals[0].destination + destination_op = destination_port.operation + if destination_op.graph_id not in self._remaining_ops: + # spotted a recursive operation -> check if ok + op_available_time = ( + self._current_time + op.latency_offsets[f"out{output_port.index}"] + ) + usage_time = ( + self._schedule.start_times[destination_op.graph_id] + + self._schedule.schedule_time + * self._schedule.laps[output_port.signals[0].graph_id] + ) + if op_available_time > usage_time: + # constraint is not okay, move all recursive operations one lap by pipelining + # across the non-recursive to recursive edge + self._pipeline_input_to_recursive_sections() + + for op_input in op.inputs: + source_port = op_input.signals[0].source + source_op = source_port.operation + if isinstance(source_op, Delay) or isinstance(source_op, DontCare): + continue + if source_op.graph_id in self._remaining_ops: + return False + if self._schedule.schedule_time is not None: + available_time = ( + self._schedule.start_times.get(source_op.graph_id) + + self._op_laps[source_op.graph_id] * self._schedule.schedule_time + + self._cached_latency_offsets[source_op.graph_id][ + f"out{source_port.index}" + ] + ) + else: + available_time = ( + self._schedule.start_times.get(source_op.graph_id) + + self._cached_latency_offsets[source_op.graph_id][ + f"out{source_port.index}" + ] + ) + required_time = ( + self._current_time + + self._cached_latency_offsets[op.graph_id][f"in{op_input.index}"] + ) + if available_time > required_time: + return False + return True diff --git a/b_asic/scheduler_gui/main_window.py b/b_asic/scheduler_gui/main_window.py index 9bb7dad9..ea8cfce7 100644 --- a/b_asic/scheduler_gui/main_window.py +++ b/b_asic/scheduler_gui/main_window.py @@ -893,7 +893,7 @@ class ScheduleMainWindow(QMainWindow, Ui_MainWindow): if self.schedule is None: return op: GraphComponent = cast( - GraphComponent, self.schedule.sfg.find_by_id(graph_id) + GraphComponent, self.schedule._sfg.find_by_id(graph_id) ) si = self.info_table.rowCount() # si = start index diff --git a/b_asic/scheduler_gui/scheduler_item.py b/b_asic/scheduler_gui/scheduler_item.py index 56a6e7fb..be8d8643 100644 --- a/b_asic/scheduler_gui/scheduler_item.py +++ b/b_asic/scheduler_gui/scheduler_item.py @@ -249,7 +249,7 @@ class SchedulerItem(SchedulerEvent, QGraphicsItemGroup): op_start_time = self.schedule.start_time_of_operation(item.graph_id) new_start_time = floor(pos) - floor(self._x_axis_indent) move_time = new_start_time - op_start_time - op = self._schedule.sfg.find_by_id(item.graph_id) + op = self._schedule._sfg.find_by_id(item.graph_id) if ( isinstance(op, Output) and op_start_time == self.schedule.schedule_time diff --git a/examples/auto_scheduling_with_custom_io_times.py b/examples/auto_scheduling_with_custom_io_times.py index f85710a0..4df8bb7a 100644 --- a/examples/auto_scheduling_with_custom_io_times.py +++ b/examples/auto_scheduling_with_custom_io_times.py @@ -86,18 +86,3 @@ schedule = Schedule( cyclic=True, ) schedule.show() - -# %% -# Push output times one step to prevent lap for out3. -output_delta_times = {f"out{i}": i + 2 for i in range(points)} -schedule = Schedule( - sfg, - scheduler=HybridScheduler( - resources, - input_times=input_times, - output_delta_times=output_delta_times, - ), - schedule_time=12, - cyclic=True, -) -schedule.show() diff --git a/examples/latency_offset_scheduling.py b/examples/latency_offset_scheduling.py index a9f323db..39c21b72 100644 --- a/examples/latency_offset_scheduling.py +++ b/examples/latency_offset_scheduling.py @@ -3,17 +3,14 @@ Automatic Scheduling for different latency-offsets. ================================ -This example showcases how one can synthesize an architecture where the +This example showcases how one can generate a schedule where the operations have different latency offsets for the different inputs/outputs. """ -from b_asic.architecture import Memory, ProcessingElement -from b_asic.core_operations import MADS, Reciprocal from b_asic.list_schedulers import HybridScheduler from b_asic.schedule import Schedule from b_asic.scheduler import ALAPScheduler, ASAPScheduler from b_asic.sfg_generators import ldlt_matrix_inverse -from b_asic.special_operations import Input, Output sfg = ldlt_matrix_inverse( N=3, @@ -63,49 +60,3 @@ schedule = Schedule( cyclic=True, ) schedule.show() - -# %% -# Leverage the fact that the inputs arrive at different times to limit the amount of concurrent memory accesses to 2 -schedule = Schedule( - sfg, - scheduler=HybridScheduler(max_concurrent_writes=2, max_concurrent_reads=2), - schedule_time=30, - cyclic=True, -) -schedule.show() - -# %% -operations = schedule.get_operations() -mads = operations.get_by_type_name(MADS.type_name()) -mads.show(title="MADS executions") -reciprocals = operations.get_by_type_name(Reciprocal.type_name()) -reciprocals.show(title="Reciprocal executions") -inputs = operations.get_by_type_name(Input.type_name()) -inputs.show(title="Input executions") -outputs = operations.get_by_type_name(Output.type_name()) -outputs.show(title="Output executions") - -mads_pe = ProcessingElement(mads, entity_name="mad") -reciprocal_pe = ProcessingElement(reciprocals, entity_name="rec") - -pe_in = ProcessingElement(inputs, entity_name='input') -pe_out = ProcessingElement(outputs, entity_name='output') - -mem_vars = schedule.get_memory_variables() -mem_vars.show(title="All memory variables") -direct, mem_vars = mem_vars.split_on_length() -mem_vars.show(title="Non-zero time memory variables") -mem_vars_set = mem_vars.split_on_ports( - read_ports=1, write_ports=1, total_ports=2, heuristic="graph_color" -) - -# %% -memories = [] -for i, mem in enumerate(mem_vars_set): - memory = Memory(mem, memory_type="RAM", entity_name=f"memory{i}") - memories.append(memory) - mem.show(title=f"{memory.entity_name}") - memory.assign("left_edge") - memory.show_content(title=f"Assigned {memory.entity_name}") - -direct.show(title="Direct interconnects") diff --git a/test/unit/test_list_schedulers.py b/test/unit/test_list_schedulers.py index d788abd2..5a1c31f4 100644 --- a/test/unit/test_list_schedulers.py +++ b/test/unit/test_list_schedulers.py @@ -2,6 +2,7 @@ import sys import numpy as np import pytest +from scipy import signal from b_asic.core_operations import ( MADS, @@ -17,15 +18,17 @@ from b_asic.list_schedulers import ( MaxFanOutScheduler, ) from b_asic.schedule import Schedule +from b_asic.scheduler import RecursiveListScheduler from b_asic.sfg_generators import ( direct_form_1_iir, + direct_form_2_iir, ldlt_matrix_inverse, radix_2_dif_fft, ) from b_asic.signal_flow_graph import SFG from b_asic.signal_generator import Constant, Impulse from b_asic.simulation import Simulation -from b_asic.special_operations import Input, Output +from b_asic.special_operations import Delay, Input, Output class TestEarliestDeadlineScheduler: @@ -1533,7 +1536,7 @@ class TestHybridScheduler: } assert schedule.schedule_time == 6 - direct, mem_vars = schedule.get_memory_variables().split_on_length() + _, mem_vars = schedule.get_memory_variables().split_on_length() assert mem_vars.read_ports_bound() <= 2 assert mem_vars.write_ports_bound() <= 3 @@ -1555,37 +1558,8 @@ class TestHybridScheduler: sfg, scheduler=HybridScheduler(resources), schedule_time=4, cyclic=True ) - assert schedule.start_times == { - "in1": 0, - "in3": 0, - "bfly3": 0, - "cmul0": 1, - "in0": 1, - "in2": 1, - "bfly0": 1, - "bfly1": 2, - "out0": 3, - "out2": 3, - "bfly2": 3, - "out1": 4, - "out3": 4, - } - assert schedule.laps == { - "s4": 0, - "s6": 0, - "s5": 0, - "s7": 0, - "s8": 0, - "s12": 0, - "s10": 1, - "s9": 0, - "s0": 0, - "s2": 0, - "s11": 0, - "s1": 0, - "s3": 0, - } assert schedule.schedule_time == 4 + _validate_recreated_sfg_fft(schedule, points=4, delays=[0, 1, 0, 1]) def test_invalid_output_delta_time(self): sfg = radix_2_dif_fft(points=4) @@ -1708,100 +1682,10 @@ class TestHybridScheduler: cyclic=True, ) - assert schedule.start_times == { - "dontcare0": 49, - "dontcare1": 50, - "dontcare2": 31, - "dontcare3": 6, - "dontcare4": 14, - "dontcare5": 13, - "in0": 0, - "in1": 1, - "in2": 3, - "in3": 2, - "in4": 4, - "in5": 5, - "mads0": 10, - "mads1": 11, - "mads10": 32, - "mads11": 47, - "mads12": 16, - "mads13": 15, - "mads14": 14, - "mads2": 6, - "mads3": 2, - "mads4": 9, - "mads5": 5, - "mads6": 3, - "mads7": 1, - "mads8": 28, - "mads9": 46, - "out0": 13, - "out1": 9, - "out2": 6, - "out3": 5, - "out4": 1, - "out5": 46, - "rec0": 0, - "rec1": 18, - "rec2": 36, - } - assert schedule.laps == { - "s10": 0, - "s11": 0, - "s12": 0, - "s13": 0, - "s14": 0, - "s9": 0, - "s22": 0, - "s20": 0, - "s17": 1, - "s18": 1, - "s19": 0, - "s25": 0, - "s23": 0, - "s50": 1, - "s33": 0, - "s49": 0, - "s38": 0, - "s51": 1, - "s32": 0, - "s28": 0, - "s37": 0, - "s35": 0, - "s36": 0, - "s31": 0, - "s34": 0, - "s27": 1, - "s30": 0, - "s41": 0, - "s26": 1, - "s46": 0, - "s47": 0, - "s40": 0, - "s43": 0, - "s7": 0, - "s3": 0, - "s42": 0, - "s39": 0, - "s8": 0, - "s5": 0, - "s44": 0, - "s21": 1, - "s24": 1, - "s48": 0, - "s4": 0, - "s16": 0, - "s52": 0, - "s15": 0, - "s0": 0, - "s29": 0, - "s1": 0, - "s2": 0, - "s45": 0, - "s6": 0, - "s53": 0, - } + assert schedule.schedule_time == 49 + _validate_recreated_sfg_ldlt_matrix_inverse( + schedule, N=3, delays=[1, 1, 1, 1, 1, 0] + ) def test_latency_offsets_cyclic_min_schedule_time(self): sfg = ldlt_matrix_inverse( @@ -1819,140 +1703,178 @@ class TestHybridScheduler: cyclic=True, ) - assert schedule.start_times == { - "dontcare0": 6, - "dontcare1": 7, - "dontcare2": 16, - "dontcare3": 12, - "dontcare4": 14, - "dontcare5": 13, - "in0": 0, - "in1": 1, - "in2": 3, - "in3": 2, - "in4": 4, - "in5": 5, - "mads0": 10, - "mads1": 11, - "mads10": 2, - "mads11": 4, - "mads12": 1, - "mads13": 0, - "mads14": 14, - "mads2": 5, - "mads3": 8, - "mads4": 6, - "mads5": 12, - "mads6": 9, - "mads7": 7, - "mads8": 13, - "mads9": 3, - "out0": 10, - "out1": 2, - "out2": 12, - "out3": 11, - "out4": 7, - "out5": 1, - "rec0": 0, - "rec1": 3, - "rec2": 6, + assert schedule.schedule_time == 15 + _validate_recreated_sfg_ldlt_matrix_inverse( + schedule, N=3, delays=[4, 4, 3, 3, 3, 3] + ) + + +class TestRecursiveListScheduler: + def test_empty_sfg(self, sfg_empty): + with pytest.raises( + ValueError, match="Empty signal flow graph cannot be scheduled." + ): + Schedule( + sfg_empty, + scheduler=RecursiveListScheduler( + sort_order=((1, True), (3, False), (4, False)) + ), + ) + + def test_direct_form_1_iir(self): + N = 3 + Wc = 0.2 + b, a = signal.butter(N, Wc, btype="lowpass", output="ba") + sfg = direct_form_1_iir(b, a) + + sfg.set_latency_of_type(ConstantMultiplication.type_name(), 2) + sfg.set_execution_time_of_type(ConstantMultiplication.type_name(), 1) + sfg.set_latency_of_type(Addition.type_name(), 3) + sfg.set_execution_time_of_type(Addition.type_name(), 1) + + resources = { + Addition.type_name(): 1, + ConstantMultiplication.type_name(): 1, + Input.type_name(): 1, + Output.type_name(): 1, } - assert schedule.laps == { - "s10": 0, - "s11": 0, - "s12": 0, - "s13": 0, - "s14": 0, - "s9": 0, - "s22": 0, - "s20": 0, - "s17": 1, - "s18": 1, - "s19": 1, - "s25": 0, - "s23": 0, - "s50": 1, - "s33": 0, - "s49": 0, - "s38": 0, - "s51": 1, - "s32": 0, - "s28": 0, - "s37": 1, - "s35": 0, - "s36": 0, - "s31": 0, - "s34": 0, - "s27": 0, - "s30": 0, - "s41": 0, - "s26": 1, - "s46": 0, - "s47": 0, - "s40": 1, - "s43": 0, - "s7": 1, - "s3": 1, - "s42": 1, - "s39": 0, - "s8": 1, - "s5": 1, - "s44": 1, - "s21": 1, - "s24": 1, - "s48": 0, - "s4": 0, - "s16": 0, - "s52": 0, - "s15": 0, - "s0": 0, - "s29": 0, - "s1": 0, - "s2": 0, - "s45": 0, - "s6": 0, - "s53": 0, + schedule = Schedule( + sfg, + scheduler=RecursiveListScheduler( + sort_order=((1, True), (3, False), (4, False)), max_resources=resources + ), + ) + _validate_recreated_sfg_filter(sfg, schedule) + + def test_direct_form_2_iir(self): + N = 3 + Wc = 0.2 + b, a = signal.butter(N, Wc, btype="lowpass", output="ba") + sfg = direct_form_2_iir(b, a) + + sfg.set_latency_of_type(ConstantMultiplication.type_name(), 2) + sfg.set_execution_time_of_type(ConstantMultiplication.type_name(), 1) + sfg.set_latency_of_type(Addition.type_name(), 3) + sfg.set_execution_time_of_type(Addition.type_name(), 1) + + resources = { + Addition.type_name(): 1, + ConstantMultiplication.type_name(): 1, + Input.type_name(): 1, + Output.type_name(): 1, } + schedule = Schedule( + sfg, + scheduler=RecursiveListScheduler( + sort_order=((1, True), (3, False), (4, False)), max_resources=resources + ), + ) + _validate_recreated_sfg_filter(sfg, schedule) + + def test_large_direct_form_2_iir(self): + N = 10 + Wc = 0.2 + b, a = signal.butter(N, Wc, btype="lowpass", output="ba") + sfg = direct_form_2_iir(b, a) + + sfg.set_latency_of_type(ConstantMultiplication.type_name(), 2) + sfg.set_execution_time_of_type(ConstantMultiplication.type_name(), 1) + sfg.set_latency_of_type(Addition.type_name(), 3) + sfg.set_execution_time_of_type(Addition.type_name(), 1) + + resources = { + Addition.type_name(): 1, + ConstantMultiplication.type_name(): 1, + Input.type_name(): 1, + Output.type_name(): 1, + } + schedule = Schedule( + sfg, + scheduler=RecursiveListScheduler( + sort_order=((1, True), (3, False), (4, False)), max_resources=resources + ), + ) + _validate_recreated_sfg_filter(sfg, schedule) + + def test_custom_recursive_filter(self): + # Create the SFG for a digital filter (seen in an exam question from TSTE87). + x = Input() + t0 = Delay() + t1 = Delay(t0) + b = ConstantMultiplication(0.5, x) + d = ConstantMultiplication(0.5, t1) + a1 = Addition(x, d) + a = ConstantMultiplication(0.5, a1) + t2 = Delay(a1) + c = ConstantMultiplication(0.5, t2) + a2 = Addition(b, c) + a3 = Addition(a2, a) + t0.input(0).connect(a3) + y = Output(a2) + sfg = SFG([x], [y]) + + sfg.set_latency_of_type(Addition.type_name(), 1) + sfg.set_latency_of_type(ConstantMultiplication.type_name(), 2) + sfg.set_execution_time_of_type(Addition.type_name(), 1) + sfg.set_execution_time_of_type(ConstantMultiplication.type_name(), 1) + + resources = { + Addition.type_name(): 1, + ConstantMultiplication.type_name(): 1, + Input.type_name(): 1, + Output.type_name(): 1, + } + schedule = Schedule( + sfg, + scheduler=RecursiveListScheduler( + sort_order=((1, True), (3, False), (4, False)), max_resources=resources + ), + ) + _validate_recreated_sfg_filter(sfg, schedule) def _validate_recreated_sfg_filter(sfg: SFG, schedule: Schedule) -> None: # compare the impulse response of the original sfg and recreated one sim1 = Simulation(sfg, [Impulse()]) - sim1.run_for(1000) + sim1.run_for(1024) sim2 = Simulation(schedule.sfg, [Impulse()]) - sim2.run_for(1000) + sim2.run_for(1024) spectrum_1 = abs(np.fft.fft(sim1.results['0'])) spectrum_2 = abs(np.fft.fft(sim2.results['0'])) assert np.allclose(spectrum_1, spectrum_2) -def _validate_recreated_sfg_fft(schedule: Schedule, points: int) -> None: +def _validate_recreated_sfg_fft( + schedule: Schedule, points: int, delays: list[int] | None = None +) -> None: + if delays is None: + delays = [0 for i in range(points)] # impulse input -> constant output - sim = Simulation(schedule.sfg, [Impulse()] + [0 for i in range(points - 1)]) - sim.run_for(1) + sim = Simulation(schedule.sfg, [Constant()] + [0 for i in range(points - 1)]) + sim.run_for(128) for i in range(points): - assert np.allclose(sim.results[str(i)], 1) + assert np.all(np.isclose(sim.results[str(i)][delays[i] :], 1)) # constant input -> impulse (with weight=points) output - sim = Simulation(schedule.sfg, [Impulse() for i in range(points)]) - sim.run_for(1) + sim = Simulation(schedule.sfg, [Constant() for i in range(points)]) + sim.run_for(128) assert np.allclose(sim.results["0"], points) for i in range(1, points): - assert np.allclose(sim.results[str(i)], 0) + assert np.all(np.isclose(sim.results[str(i)][delays[i] :], 0)) # sine input -> compare with numpy fft n = np.linspace(0, 2 * np.pi, points) waveform = np.sin(n) input_samples = [Constant(waveform[i]) for i in range(points)] sim = Simulation(schedule.sfg, input_samples) - sim.run_for(1) - exp_res = abs(np.fft.fft(waveform)) + sim.run_for(128) + exp_res = np.fft.fft(waveform) res = sim.results for i in range(points): - a = abs(res[str(i)]) + a = res[str(i)][delays[i] :] b = exp_res[i] - assert np.isclose(a, b) + assert np.all(np.isclose(a, b)) # multi-tone input -> compare with numpy fft n = np.linspace(0, 2 * np.pi, points) @@ -1965,16 +1887,22 @@ def _validate_recreated_sfg_fft(schedule: Schedule, points: int) -> None: ) input_samples = [Constant(waveform[i]) for i in range(points)] sim = Simulation(schedule.sfg, input_samples) - sim.run_for(1) + sim.run_for(128) exp_res = np.fft.fft(waveform) res = sim.results for i in range(points): - a = res[str(i)] + a = res[str(i)][delays[i] :] b = exp_res[i] - assert np.isclose(a, b) + assert np.all(np.isclose(a, b)) -def _validate_recreated_sfg_ldlt_matrix_inverse(schedule: Schedule, N: int) -> None: +def _validate_recreated_sfg_ldlt_matrix_inverse( + schedule: Schedule, N: int, delays: list[int] | None = None +) -> None: + if delays is None: + num_of_outputs = N * (N + 1) // 2 + delays = [0 for i in range(num_of_outputs)] + # random real s.p.d matrix A = np.random.rand(N, N) A = np.dot(A, A.T) @@ -1987,11 +1915,13 @@ def _validate_recreated_sfg_ldlt_matrix_inverse(schedule: Schedule, N: int) -> N A_inv = np.linalg.inv(A) sim = Simulation(schedule.sfg, input_signals) - sim.run_for(1) + sim.run_for(128) # iterate through the upper diagonal and check count = 0 for i in range(N): for j in range(i, N): - assert np.isclose(sim.results[str(count)], A_inv[i, j]) + assert np.all( + np.isclose(sim.results[str(count)][delays[count] :], A_inv[i, j]) + ) count += 1 -- GitLab