Skip to content
Snippets Groups Projects
Commit 1567bb76 authored by Oscar Gustafsson's avatar Oscar Gustafsson :bicyclist:
Browse files

Add basic functionality for saving schedules

parent 0be589c8
No related branches found
No related tags found
No related merge requests found
Pipeline #90252 failed
......@@ -11,11 +11,12 @@ from typing import Dict, Optional, Tuple, cast
from b_asic.graph_component import GraphComponent
from b_asic.port import InputPort
from b_asic.schedule import Schedule
from b_asic.signal_flow_graph import SFG
def sfg_to_python(
sfg: SFG, counter: int = 0, suffix: Optional[str] = None
sfg: SFG, counter: int = 0, suffix: Optional[str] = None, schedule=False
) -> str:
"""
Given an SFG structure try to serialize it for saving to a file.
......@@ -23,15 +24,20 @@ def sfg_to_python(
Parameters
==========
sfg : SFG
The SFG to serialize
The SFG to serialize.
counter : int, default: 0
Number used for naming the SFG. Enables SFGs in SFGs.
suffix : str, optional
String to append at the end of the result.
schedule : bool, default: False
True if printing a schedule
"""
_type = "Schedule" if schedule else "SFG"
result = (
'\n"""\nB-ASIC automatically generated SFG file.\n'
'\n"""\n'
+ f"B-ASIC automatically generated {_type} file.\n"
+ "Name: "
+ f"{sfg.name}"
+ "\n"
......@@ -44,6 +50,8 @@ def sfg_to_python(
result += "\nfrom b_asic import SFG, Signal, Input, Output"
for op_type in {type(op) for op in sfg.operations}:
result += f", {op_type.__name__}"
if schedule:
result += ", Schedule"
def kwarg_unpacker(comp: GraphComponent, params=None) -> str:
if params is None:
......@@ -61,16 +69,12 @@ def sfg_to_python(
params = {k: v for k, v in params.items() if v}
if params.get("latency_offsets", None) is not None:
params["latency_offsets"] = {
k: v
for k, v in params["latency_offsets"].items()
if v is not None
k: v for k, v in params["latency_offsets"].items() if v is not None
}
if not params["latency_offsets"]:
del params["latency_offsets"]
return ", ".join(
[f"{param}={value}" for param, value in params.items()]
)
return ", ".join([f"{param}={value}" for param, value in params.items()])
# No need to redefined I/Os
io_ops = [*sfg._input_operations, *sfg._output_operations]
......@@ -81,9 +85,7 @@ def sfg_to_python(
result += "\n# Outputs:\n"
for output_op in sfg._output_operations:
result += (
f"{output_op.graph_id} = Output({kwarg_unpacker(output_op)})\n"
)
result += f"{output_op.graph_id} = Output({kwarg_unpacker(output_op)})\n"
result += "\n# Operations:\n"
for op in sfg.split():
......@@ -94,9 +96,7 @@ def sfg_to_python(
result = sfg_to_python(op, counter) + result
continue
result += (
f"{op.graph_id} = {op.__class__.__name__}({kwarg_unpacker(op)})\n"
)
result += f"{op.graph_id} = {op.__class__.__name__}({kwarg_unpacker(op)})\n"
result += "\n# Signals:\n"
# Keep track of already existing connections to avoid adding duplicates
......@@ -107,10 +107,10 @@ def sfg_to_python(
destination = cast(InputPort, signal.destination)
dest_op = destination.operation
connection = (
f"\nSignal(source={op.graph_id}."
f"Signal(source={op.graph_id}."
f"output({op.outputs.index(signal.source)}),"
f" destination={dest_op.graph_id}."
f"input({dest_op.inputs.index(destination)}))"
f"input({dest_op.inputs.index(destination)}))\n"
)
if connection in connections:
continue
......@@ -119,20 +119,14 @@ def sfg_to_python(
connections.append(connection)
inputs = "[" + ", ".join(op.graph_id for op in sfg.input_operations) + "]"
outputs = (
"[" + ", ".join(op.graph_id for op in sfg.output_operations) + "]"
)
sfg_name = (
sfg.name if sfg.name else f"sfg{counter}" if counter > 0 else "sfg"
)
sfg_name_var = sfg_name.replace(" ", "_")
outputs = "[" + ", ".join(op.graph_id for op in sfg.output_operations) + "]"
sfg_name = sfg.name if sfg.name else f"sfg{counter}" if counter > 0 else "sfg"
sfg_name_var = sfg_name.replace(" ", "_").replace("-", "_")
result += "\n# Signal flow graph:\n"
result += (
f"\n{sfg_name_var} = SFG(inputs={inputs}, outputs={outputs},"
f" name='{sfg_name}')\n"
)
result += (
"\n# SFG Properties:\n" + "prop = {'name':" + f"{sfg_name_var}" + "}"
f"{sfg_name_var} = SFG(inputs={inputs}, outputs={outputs}, name='{sfg_name}')\n"
)
result += "\n# SFG Properties:\n" + "prop = {'name':" + f"{sfg_name_var}" + "}\n"
if suffix is not None:
result += "\n" + suffix + "\n"
......@@ -159,3 +153,22 @@ def python_to_sfg(path: str) -> Tuple[SFG, Dict[str, Tuple[int, int]]]:
else [v for k, v in locals().items() if isinstance(v, SFG)][0],
locals()["positions"] if "positions" in locals() else {},
)
def schedule_to_python(schedule: Schedule):
"""
Given a schedule structure try to serialize it for saving to a file.
Parameters
==========
schedule : Schedule
The schedule to serialize.
"""
sfg_name = schedule.sfg.name.replace(" ", "_").replace("-", "_")
result = "\n# Schedule:\n"
result += (
f"{sfg_name}_schedule = Schedule({sfg_name}, {schedule.schedule_time},"
f" {schedule.cyclic}, 'provided', {schedule.start_times},"
f" {dict(schedule.laps)})\n"
)
return sfg_to_python(schedule.sfg, schedule=True) + result
......@@ -55,8 +55,15 @@ class Schedule:
algorithm.
cyclic : bool, default: False
If the schedule is cyclic.
scheduling_algorithm : {'ASAP'}, optional
scheduling_algorithm : {'ASAP', 'provided'}, optional
The scheduling algorithm to use. Currently, only "ASAP" is supported.
If 'provided', use provided *start_times* and *laps* dictionaries.
start_times : dict, optional
Dictionary with GraphIDs as keys and start times as values.
Used when *scheduling_algorithm* is 'provided'.
laps : dict, optional
Dictionary with GraphIDs as keys and laps as values.
Used when *scheduling_algorithm* is 'provided'.
"""
_sfg: SFG
......@@ -72,15 +79,22 @@ class Schedule:
schedule_time: Optional[int] = None,
cyclic: bool = False,
scheduling_algorithm: str = "ASAP",
start_times: Dict[GraphID, int] = None,
laps: Dict[GraphID, int] = None,
):
"""Construct a Schedule from an SFG."""
self._sfg = sfg
self._original_sfg = sfg
self._sfg = sfg() # Make a copy
self._start_times = {}
self._laps = defaultdict(lambda: 0)
self._cyclic = cyclic
self._y_locations = defaultdict(lambda: None)
if scheduling_algorithm == "ASAP":
self._schedule_asap()
elif scheduling_algorithm == "provided":
self._start_times = start_times
self._laps.update(laps)
self._remove_delays_no_laps()
else:
raise NotImplementedError(
f"No algorithm with name: {scheduling_algorithm} defined."
......@@ -270,7 +284,7 @@ class Schedule:
@property
def sfg(self) -> SFG:
return self._sfg
return self._original_sfg
@property
def start_times(self) -> Dict[GraphID, int]:
......@@ -535,6 +549,15 @@ class Schedule:
self._start_times[graph_id] = new_start
return self
def _remove_delays_no_laps(self) -> None:
delay_list = self._sfg.find_by_type_name(Delay.type_name())
while delay_list:
delay_op = cast(Delay, delay_list[0])
delay_input_id = delay_op.input(0).signals[0].graph_id
delay_output_ids = [sig.graph_id for sig in delay_op.output(0).signals]
self._sfg = cast(SFG, self._sfg.remove_operation(delay_op.graph_id))
delay_list = self._sfg.find_by_type_name(Delay.type_name())
def _remove_delays(self) -> None:
delay_list = self._sfg.find_by_type_name(Delay.type_name())
while delay_list:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment