From 1567bb76310b43925485acdc9c80a1057baa0336 Mon Sep 17 00:00:00 2001 From: Oscar Gustafsson <oscar.gustafsson@gmail.com> Date: Thu, 23 Feb 2023 13:32:35 +0100 Subject: [PATCH] Add basic functionality for saving schedules --- b_asic/save_load_structure.py | 71 +++++++++++++++++++++-------------- b_asic/schedule.py | 29 ++++++++++++-- 2 files changed, 68 insertions(+), 32 deletions(-) diff --git a/b_asic/save_load_structure.py b/b_asic/save_load_structure.py index bcc10c34..d23020ef 100644 --- a/b_asic/save_load_structure.py +++ b/b_asic/save_load_structure.py @@ -11,11 +11,12 @@ from typing import Dict, Optional, Tuple, cast from b_asic.graph_component import GraphComponent from b_asic.port import InputPort +from b_asic.schedule import Schedule from b_asic.signal_flow_graph import SFG def sfg_to_python( - sfg: SFG, counter: int = 0, suffix: Optional[str] = None + sfg: SFG, counter: int = 0, suffix: Optional[str] = None, schedule=False ) -> str: """ Given an SFG structure try to serialize it for saving to a file. @@ -23,15 +24,20 @@ def sfg_to_python( Parameters ========== sfg : SFG - The SFG to serialize + The SFG to serialize. counter : int, default: 0 Number used for naming the SFG. Enables SFGs in SFGs. suffix : str, optional String to append at the end of the result. + schedule : bool, default: False + True if printing a schedule """ + _type = "Schedule" if schedule else "SFG" + result = ( - '\n"""\nB-ASIC automatically generated SFG file.\n' + '\n"""\n' + + f"B-ASIC automatically generated {_type} file.\n" + "Name: " + f"{sfg.name}" + "\n" @@ -44,6 +50,8 @@ def sfg_to_python( result += "\nfrom b_asic import SFG, Signal, Input, Output" for op_type in {type(op) for op in sfg.operations}: result += f", {op_type.__name__}" + if schedule: + result += ", Schedule" def kwarg_unpacker(comp: GraphComponent, params=None) -> str: if params is None: @@ -61,16 +69,12 @@ def sfg_to_python( params = {k: v for k, v in params.items() if v} if params.get("latency_offsets", None) is not None: params["latency_offsets"] = { - k: v - for k, v in params["latency_offsets"].items() - if v is not None + k: v for k, v in params["latency_offsets"].items() if v is not None } if not params["latency_offsets"]: del params["latency_offsets"] - return ", ".join( - [f"{param}={value}" for param, value in params.items()] - ) + return ", ".join([f"{param}={value}" for param, value in params.items()]) # No need to redefined I/Os io_ops = [*sfg._input_operations, *sfg._output_operations] @@ -81,9 +85,7 @@ def sfg_to_python( result += "\n# Outputs:\n" for output_op in sfg._output_operations: - result += ( - f"{output_op.graph_id} = Output({kwarg_unpacker(output_op)})\n" - ) + result += f"{output_op.graph_id} = Output({kwarg_unpacker(output_op)})\n" result += "\n# Operations:\n" for op in sfg.split(): @@ -94,9 +96,7 @@ def sfg_to_python( result = sfg_to_python(op, counter) + result continue - result += ( - f"{op.graph_id} = {op.__class__.__name__}({kwarg_unpacker(op)})\n" - ) + result += f"{op.graph_id} = {op.__class__.__name__}({kwarg_unpacker(op)})\n" result += "\n# Signals:\n" # Keep track of already existing connections to avoid adding duplicates @@ -107,10 +107,10 @@ def sfg_to_python( destination = cast(InputPort, signal.destination) dest_op = destination.operation connection = ( - f"\nSignal(source={op.graph_id}." + f"Signal(source={op.graph_id}." f"output({op.outputs.index(signal.source)})," f" destination={dest_op.graph_id}." - f"input({dest_op.inputs.index(destination)}))" + f"input({dest_op.inputs.index(destination)}))\n" ) if connection in connections: continue @@ -119,20 +119,14 @@ def sfg_to_python( connections.append(connection) inputs = "[" + ", ".join(op.graph_id for op in sfg.input_operations) + "]" - outputs = ( - "[" + ", ".join(op.graph_id for op in sfg.output_operations) + "]" - ) - sfg_name = ( - sfg.name if sfg.name else f"sfg{counter}" if counter > 0 else "sfg" - ) - sfg_name_var = sfg_name.replace(" ", "_") + outputs = "[" + ", ".join(op.graph_id for op in sfg.output_operations) + "]" + sfg_name = sfg.name if sfg.name else f"sfg{counter}" if counter > 0 else "sfg" + sfg_name_var = sfg_name.replace(" ", "_").replace("-", "_") + result += "\n# Signal flow graph:\n" result += ( - f"\n{sfg_name_var} = SFG(inputs={inputs}, outputs={outputs}," - f" name='{sfg_name}')\n" - ) - result += ( - "\n# SFG Properties:\n" + "prop = {'name':" + f"{sfg_name_var}" + "}" + f"{sfg_name_var} = SFG(inputs={inputs}, outputs={outputs}, name='{sfg_name}')\n" ) + result += "\n# SFG Properties:\n" + "prop = {'name':" + f"{sfg_name_var}" + "}\n" if suffix is not None: result += "\n" + suffix + "\n" @@ -159,3 +153,22 @@ def python_to_sfg(path: str) -> Tuple[SFG, Dict[str, Tuple[int, int]]]: else [v for k, v in locals().items() if isinstance(v, SFG)][0], locals()["positions"] if "positions" in locals() else {}, ) + + +def schedule_to_python(schedule: Schedule): + """ + Given a schedule structure try to serialize it for saving to a file. + + Parameters + ========== + schedule : Schedule + The schedule to serialize. + """ + sfg_name = schedule.sfg.name.replace(" ", "_").replace("-", "_") + result = "\n# Schedule:\n" + result += ( + f"{sfg_name}_schedule = Schedule({sfg_name}, {schedule.schedule_time}," + f" {schedule.cyclic}, 'provided', {schedule.start_times}," + f" {dict(schedule.laps)})\n" + ) + return sfg_to_python(schedule.sfg, schedule=True) + result diff --git a/b_asic/schedule.py b/b_asic/schedule.py index 6eaa7111..e2da6701 100644 --- a/b_asic/schedule.py +++ b/b_asic/schedule.py @@ -55,8 +55,15 @@ class Schedule: algorithm. cyclic : bool, default: False If the schedule is cyclic. - scheduling_algorithm : {'ASAP'}, optional + scheduling_algorithm : {'ASAP', 'provided'}, optional The scheduling algorithm to use. Currently, only "ASAP" is supported. + If 'provided', use provided *start_times* and *laps* dictionaries. + start_times : dict, optional + Dictionary with GraphIDs as keys and start times as values. + Used when *scheduling_algorithm* is 'provided'. + laps : dict, optional + Dictionary with GraphIDs as keys and laps as values. + Used when *scheduling_algorithm* is 'provided'. """ _sfg: SFG @@ -72,15 +79,22 @@ class Schedule: schedule_time: Optional[int] = None, cyclic: bool = False, scheduling_algorithm: str = "ASAP", + start_times: Dict[GraphID, int] = None, + laps: Dict[GraphID, int] = None, ): """Construct a Schedule from an SFG.""" - self._sfg = sfg + self._original_sfg = sfg + self._sfg = sfg() # Make a copy self._start_times = {} self._laps = defaultdict(lambda: 0) self._cyclic = cyclic self._y_locations = defaultdict(lambda: None) if scheduling_algorithm == "ASAP": self._schedule_asap() + elif scheduling_algorithm == "provided": + self._start_times = start_times + self._laps.update(laps) + self._remove_delays_no_laps() else: raise NotImplementedError( f"No algorithm with name: {scheduling_algorithm} defined." @@ -270,7 +284,7 @@ class Schedule: @property def sfg(self) -> SFG: - return self._sfg + return self._original_sfg @property def start_times(self) -> Dict[GraphID, int]: @@ -535,6 +549,15 @@ class Schedule: self._start_times[graph_id] = new_start return self + def _remove_delays_no_laps(self) -> None: + delay_list = self._sfg.find_by_type_name(Delay.type_name()) + while delay_list: + delay_op = cast(Delay, delay_list[0]) + delay_input_id = delay_op.input(0).signals[0].graph_id + delay_output_ids = [sig.graph_id for sig in delay_op.output(0).signals] + self._sfg = cast(SFG, self._sfg.remove_operation(delay_op.graph_id)) + delay_list = self._sfg.find_by_type_name(Delay.type_name()) + def _remove_delays(self) -> None: delay_list = self._sfg.find_by_type_name(Delay.type_name()) while delay_list: -- GitLab