Skip to content
Snippets Groups Projects

Add basic functionality for saving schedules

Merged Oscar Gustafsson requested to merge saveschedule into master
2 files
+ 128
83
Compare changes
  • Side-by-side
  • Inline
Files
2
@@ -11,11 +11,12 @@ from typing import Dict, Optional, Tuple, cast
from b_asic.graph_component import GraphComponent
from b_asic.port import InputPort
from b_asic.schedule import Schedule
from b_asic.signal_flow_graph import SFG
def sfg_to_python(
sfg: SFG, counter: int = 0, suffix: Optional[str] = None
sfg: SFG, counter: int = 0, suffix: Optional[str] = None, schedule=False
) -> str:
"""
Given an SFG structure try to serialize it for saving to a file.
@@ -23,15 +24,20 @@ def sfg_to_python(
Parameters
==========
sfg : SFG
The SFG to serialize
The SFG to serialize.
counter : int, default: 0
Number used for naming the SFG. Enables SFGs in SFGs.
suffix : str, optional
String to append at the end of the result.
schedule : bool, default: False
True if printing a schedule.
"""
_type = "Schedule" if schedule else "SFG"
result = (
'\n"""\nB-ASIC automatically generated SFG file.\n'
'\n"""\n'
+ f"B-ASIC automatically generated {_type} file.\n"
+ "Name: "
+ f"{sfg.name}"
+ "\n"
@@ -44,6 +50,8 @@ def sfg_to_python(
result += "\nfrom b_asic import SFG, Signal, Input, Output"
for op_type in {type(op) for op in sfg.operations}:
result += f", {op_type.__name__}"
if schedule:
result += ", Schedule"
def kwarg_unpacker(comp: GraphComponent, params=None) -> str:
if params is None:
@@ -61,56 +69,51 @@ def sfg_to_python(
params = {k: v for k, v in params.items() if v}
if params.get("latency_offsets", None) is not None:
params["latency_offsets"] = {
k: v
for k, v in params["latency_offsets"].items()
if v is not None
k: v for k, v in params["latency_offsets"].items() if v is not None
}
if not params["latency_offsets"]:
del params["latency_offsets"]
return ", ".join(
[f"{param}={value}" for param, value in params.items()]
)
return ", ".join([f"{param}={value}" for param, value in params.items()])
# No need to redefined I/Os
io_ops = [*sfg._input_operations, *sfg._output_operations]
io_ops = [*sfg.input_operations, *sfg.output_operations]
result += "\n# Inputs:\n"
for input_op in sfg._input_operations:
for input_op in sfg.input_operations:
result += f"{input_op.graph_id} = Input({kwarg_unpacker(input_op)})\n"
result += "\n# Outputs:\n"
for output_op in sfg._output_operations:
result += (
f"{output_op.graph_id} = Output({kwarg_unpacker(output_op)})\n"
)
for output_op in sfg.output_operations:
result += f"{output_op.graph_id} = Output({kwarg_unpacker(output_op)})\n"
result += "\n# Operations:\n"
for op in sfg.split():
if op in io_ops:
for operation in sfg.split():
if operation in io_ops:
continue
if isinstance(op, SFG):
if isinstance(operation, SFG):
counter += 1
result = sfg_to_python(op, counter) + result
result = sfg_to_python(operation, counter) + result
continue
result += (
f"{op.graph_id} = {op.__class__.__name__}({kwarg_unpacker(op)})\n"
f"{operation.graph_id} ="
f" {operation.__class__.__name__}({kwarg_unpacker(operation)})\n"
)
result += "\n# Signals:\n"
# Keep track of already existing connections to avoid adding duplicates
connections = []
for op in sfg.split():
for out in op.outputs:
for operation in sfg.split():
for out in operation.outputs:
for signal in out.signals:
destination = cast(InputPort, signal.destination)
dest_op = destination.operation
connection = (
f"\nSignal(source={op.graph_id}."
f"output({op.outputs.index(signal.source)}),"
f"Signal(source={operation.graph_id}."
f"output({operation.outputs.index(signal.source)}),"
f" destination={dest_op.graph_id}."
f"input({dest_op.inputs.index(destination)}))"
f"input({dest_op.inputs.index(destination)}))\n"
)
if connection in connections:
continue
@@ -119,20 +122,14 @@ def sfg_to_python(
connections.append(connection)
inputs = "[" + ", ".join(op.graph_id for op in sfg.input_operations) + "]"
outputs = (
"[" + ", ".join(op.graph_id for op in sfg.output_operations) + "]"
)
sfg_name = (
sfg.name if sfg.name else f"sfg{counter}" if counter > 0 else "sfg"
)
sfg_name_var = sfg_name.replace(" ", "_")
result += (
f"\n{sfg_name_var} = SFG(inputs={inputs}, outputs={outputs},"
f" name='{sfg_name}')\n"
)
outputs = "[" + ", ".join(op.graph_id for op in sfg.output_operations) + "]"
sfg_name = sfg.name if sfg.name else f"sfg{counter}" if counter > 0 else "sfg"
sfg_name_var = sfg_name.replace(" ", "_").replace("-", "_")
result += "\n# Signal flow graph:\n"
result += (
"\n# SFG Properties:\n" + "prop = {'name':" + f"{sfg_name_var}" + "}"
f"{sfg_name_var} = SFG(inputs={inputs}, outputs={outputs}, name='{sfg_name}')\n"
)
result += "\n# SFG Properties:\n" + "prop = {'name':" + f"{sfg_name_var}" + "}\n"
if suffix is not None:
result += "\n" + suffix + "\n"
@@ -149,8 +146,8 @@ def python_to_sfg(path: str) -> Tuple[SFG, Dict[str, Tuple[int, int]]]:
path : str
Path to file to read and deserialize.
"""
with open(path) as f:
code = compile(f.read(), path, "exec")
with open(path) as file:
code = compile(file.read(), path, "exec")
exec(code, globals(), locals())
return (
@@ -159,3 +156,22 @@ def python_to_sfg(path: str) -> Tuple[SFG, Dict[str, Tuple[int, int]]]:
else [v for k, v in locals().items() if isinstance(v, SFG)][0],
locals()["positions"] if "positions" in locals() else {},
)
def schedule_to_python(schedule: Schedule):
"""
Given a schedule structure try to serialize it for saving to a file.
Parameters
==========
schedule : Schedule
The schedule to serialize.
"""
sfg_name = schedule.sfg.name.replace(" ", "_").replace("-", "_")
result = "\n# Schedule:\n"
result += (
f"{sfg_name}_schedule = Schedule({sfg_name}, {schedule.schedule_time},"
f" {schedule.cyclic}, 'provided', {schedule.start_times},"
f" {dict(schedule.laps)})\n"
)
return sfg_to_python(schedule.sfg, schedule=True) + result
Loading