Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • da/B-ASIC
  • lukja239/B-ASIC
  • robal695/B-ASIC
3 results
Show changes
Commits on Source (11)
......@@ -34,7 +34,6 @@ from b_asic.GUI._preferences import GAP, GRID, MINBUTTONSIZE, PORTHEIGHT
from b_asic.GUI.arrow import Arrow
from b_asic.GUI.drag_button import DragButton
from b_asic.GUI.gui_interface import Ui_main_window
from b_asic.GUI.plot_window import PlotWindow
from b_asic.GUI.port_button import PortButton
from b_asic.GUI.select_sfg_window import SelectSFGWindow
from b_asic.GUI.show_pc_window import ShowPCWindow
......@@ -44,6 +43,7 @@ from b_asic.GUI.simulate_sfg_window import SimulateSFGWindow
from b_asic.GUI.util_dialogs import FaqWindow, KeybindsWindow
from b_asic.GUI.utils import decorate_class, handle_error
from b_asic.gui_utils.about_window import AboutWindow
from b_asic.gui_utils.plot_window import PlotWindow
from b_asic.operation import Operation
from b_asic.port import InputPort, OutputPort
from b_asic.save_load_structure import python_to_sfg, sfg_to_python
......
"""PlotWindow is a window in which simulation results are plotted."""
# TODO's:
# * Solve the legend update. That isn't working at all.
# * Zoom etc. Might need to change FigureCanvas. Or just something very little.
# * Add a function to run this as a "stand-alone".
import re
import sys
from typing import Dict, List, Optional, Tuple
from matplotlib.backends.backend_qt5agg import (
FigureCanvasQTAgg as FigureCanvas,
)
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas
from matplotlib.backends.backend_qt5agg import NavigationToolbar2QT as NavigationToolbar
from matplotlib.figure import Figure
from matplotlib.ticker import MaxNLocator
from qtpy.QtCore import Qt
from qtpy.QtGui import QKeySequence
# Intereme imports for the Plot class:
from qtpy.QtWidgets import ( # QFrame,; QScrollArea,; QLineEdit,; QSizePolicy,; QLabel,
from qtpy.QtWidgets import ( # QFrame,; QScrollArea,; QLineEdit,; QSizePolicy,; QLabel,; QFileDialog,; QShortcut,
QApplication,
QCheckBox,
QDialog,
QFileDialog,
QHBoxLayout,
QListWidget,
QListWidgetItem,
QPushButton,
QShortcut,
QSizePolicy,
QVBoxLayout,
)
class PlotCanvas(FigureCanvas):
"""PlotCanvas is used as a part in the PlotWindow."""
def __init__(self, logger, parent=None, width=5, height=4, dpi=100):
fig = Figure(figsize=(width, height), dpi=dpi)
super().__init__(fig)
self.axes = fig.add_subplot(111)
self.axes.xaxis.set_major_locator(MaxNLocator(integer=True))
self.legend = None
self.logger = logger
FigureCanvas.updateGeometry(self)
self.save_figure = QShortcut(QKeySequence("Ctrl+S"), self)
self.save_figure.activated.connect(self._save_plot_figure)
def _save_plot_figure(self):
self.logger.info(f"Saving plot of figure: {self.sfg.name}.")
file_choices = "PNG (*.png)|*.png"
path, ext = QFileDialog.getSaveFileName(
self, "Save file", "", file_choices
)
path = path.encode("utf-8")
if not path[-4:] == file_choices[-4:].encode("utf-8"):
path += file_choices[-4:].encode("utf-8")
if path:
self.print_figure(path.decode(), dpi=self.dpi)
self.logger.info(f"Saved plot: {self.sfg.name} to path: {path}.")
class PlotWindow(QDialog):
"""Dialog for plotting the result of a simulation."""
def __init__(
self,
sim_result,
# sfg_name="{sfg_name}",
# window=None,
sim_result: Dict[str, List[complex]],
logger=print,
sfg_name: Optional[str] = None,
parent=None,
# width=5,
# height=4,
# dpi=100,
):
super().__init__(parent=parent)
# self._window = window
self.setWindowFlags(
Qt.WindowTitleHint
| Qt.WindowCloseButtonHint
| Qt.WindowMinimizeButtonHint
| Qt.WindowMaximizeButtonHint
| Qt.WindowStaysOnTopHint
)
self.setWindowTitle("Simulation result")
self.sim_result = sim_result
title = (
f"Simulation results: {sfg_name}"
if sfg_name is not None
else "Simulation results"
)
self.setWindowTitle(title)
self._auto_redraw = False
# Categorise sim_results into inputs, outputs, delays, others
......@@ -102,7 +70,7 @@ class PlotWindow(QDialog):
sim_res_others[key] = sim_result[key]
# Layout: ############################################
# | list | |
# | list | icons |
# | ... | plot |
# | misc | |
......@@ -110,25 +78,30 @@ class PlotWindow(QDialog):
self.setLayout(self.dialog_layout)
listlayout = QVBoxLayout()
self.plotcanvas = PlotCanvas(
logger=logger, parent=self, width=5, height=4, dpi=100
)
plotlayout = QVBoxLayout()
self.dialog_layout.addLayout(listlayout)
self.dialog_layout.addWidget(self.plotcanvas)
self.dialog_layout.addLayout(plotlayout)
########### Plot: ##############
# Do this before the list layout, as the list layout will re/set visibility
# Note: The order is of importens. Interesting lines last, to be on top.
# self.plotcanvas = PlotCanvas(
# logger=logger, parent=self, width=5, height=4, dpi=100
# )
self._plot_fig = Figure(figsize=(5, 4), layout="compressed")
self._plot_axes = self._plot_fig.add_subplot(111)
self._plot_axes.xaxis.set_major_locator(MaxNLocator(integer=True))
self._lines = {}
for key in (
sim_res_others | sim_res_delays | sim_res_ins | sim_res_outs
):
line = self.plotcanvas.axes.plot(
sim_result[key], visible=False, label=key
)
self._lines[key] = line
self.plotcanvas.legend = self.plotcanvas.axes.legend()
for key in sim_res_others | sim_res_delays | sim_res_ins | sim_res_outs:
line = self._plot_axes.plot(sim_result[key], label=key)
self._lines[key] = line[0]
self._plot_canvas = FigureCanvas(self._plot_fig)
plotlayout.addWidget(NavigationToolbar(self._plot_canvas, self))
plotlayout.addWidget(self._plot_canvas)
########### List layout: ##############
......@@ -144,11 +117,10 @@ class PlotWindow(QDialog):
# Add the entire list
self.checklist = QListWidget()
self.checklist.setSizePolicy(QSizePolicy.Preferred, QSizePolicy.Preferred)
self.checklist.itemChanged.connect(self._item_change)
listitems = {}
for key in (
sim_res_ins | sim_res_outs | sim_res_delays | sim_res_others
):
for key in sim_res_ins | sim_res_outs | sim_res_delays | sim_res_others:
listitem = QListWidgetItem(key)
listitems[key] = listitem
self.checklist.addItem(listitem)
......@@ -157,10 +129,11 @@ class PlotWindow(QDialog):
)
for key in sim_res_outs:
listitems[key].setCheckState(Qt.CheckState.Checked)
self.checklist.setFixedWidth(150)
# self.checklist.setFixedWidth(150)
listlayout.addWidget(self.checklist)
# Add additional checkboxes
self._legend = self._plot_axes.legend()
self.legend_checkbox = QCheckBox("&Legend")
self.legend_checkbox.stateChanged.connect(self._legend_checkbox_change)
self.legend_checkbox.setCheckState(Qt.CheckState.Checked)
......@@ -180,13 +153,11 @@ class PlotWindow(QDialog):
self._auto_redraw = True
def _legend_checkbox_change(self, checkState):
self.plotcanvas.legend.set(
visible=(checkState == Qt.CheckState.Checked)
)
self._legend.set(visible=(checkState == Qt.CheckState.Checked))
if self._auto_redraw:
if checkState == Qt.CheckState.Checked:
self.plotcanvas.legend = self.plotcanvas.axes.legend()
self.plotcanvas.draw()
self._legend = self._plot_axes.legend()
self._plot_canvas.draw()
# def _ontop_checkbox_change(self, checkState):
# Bugg: It seems the window closes if you change the WindowStaysOnTopHint.
......@@ -200,30 +171,50 @@ class PlotWindow(QDialog):
for x in range(self.checklist.count()):
self.checklist.item(x).setCheckState(Qt.CheckState.Checked)
self._auto_redraw = True
self.plotcanvas.draw()
self._update_legend()
def _update_legend(self):
self._legend = self._plot_axes.legend()
self._plot_canvas.draw()
def _button_none_click(self, event):
self._auto_redraw = False
for x in range(self.checklist.count()):
self.checklist.item(x).setCheckState(Qt.CheckState.Unchecked)
self._auto_redraw = True
self.plotcanvas.draw()
self._update_legend()
def _item_change(self, listitem):
key = listitem.text()
self._lines[key][0].set(
visible=(listitem.checkState() == Qt.CheckState.Checked)
)
if listitem.checkState() == Qt.CheckState.Checked:
self._plot_axes.add_line(self._lines[key])
else:
self._lines[key].remove()
if self._auto_redraw:
if self.legend_checkbox.checkState == Qt.CheckState.Checked:
self.plotcanvas.legend = self.plotcanvas.axes.legend()
self.plotcanvas.draw()
self._update_legend()
def start_simulation_dialog(
sim_results: Dict[str, List[complex]], sfg_name: Optional[str] = None
):
"""
Display the simulation results window.
Parameters
----------
sim_results : dict
Simulation results of the form obtained from :attr:`~b_asic.simulation.Simulation.results`.
sfg_name : str, optional
DESCRIPTION. The default is None.
"""
app = QApplication(sys.argv)
win = PlotWindow(sim_result=sim_results, sfg_name=sfg_name)
win.exec_()
# Simple test of the dialog
if __name__ == "__main__":
app = QApplication(sys.argv)
# sim_res = {"c1": [3, 6, 7], "c2": [4, 5, 5], "bfly1.0": [7, 0, 0], "bfly1.1": [-1, 0, 2], "0": [1, 2, 3]}
sim_res = {
'0': [0.5, 0.5, 0, 0],
'add1': [0.5, 0.5, 0, 0],
......@@ -232,9 +223,4 @@ if __name__ == "__main__":
'in1': [1, 0, 0, 0],
't1': [0, 1, 0, 0],
}
win = PlotWindow(
# window=None, sim_result=sim_res, sfg_name="hej", logger=print
sim_result=sim_res,
)
win.exec_()
# win.show()
start_simulation_dialog(sim_res, "Test data")
......@@ -120,7 +120,7 @@ class ProcessCollection:
return self._collection
def __len__(self):
return len(self.__collection__)
return len(self._collection)
def add_process(self, process: Process):
"""
......
"""
B-ASIC Save/Load Structure Module.
Contains functions for saving/loading SFGs to/from strings that can be stored
as files.
Contains functions for saving/loading SFGs and Schedules to/from strings that can be
stored as files.
"""
from datetime import datetime
......@@ -11,11 +11,12 @@ from typing import Dict, Optional, Tuple, cast
from b_asic.graph_component import GraphComponent
from b_asic.port import InputPort
from b_asic.schedule import Schedule
from b_asic.signal_flow_graph import SFG
def sfg_to_python(
sfg: SFG, counter: int = 0, suffix: Optional[str] = None
sfg: SFG, counter: int = 0, suffix: Optional[str] = None, schedule=False
) -> str:
"""
Given an SFG structure try to serialize it for saving to a file.
......@@ -23,15 +24,23 @@ def sfg_to_python(
Parameters
==========
sfg : SFG
The SFG to serialize
The SFG to serialize.
counter : int, default: 0
Number used for naming the SFG. Enables SFGs in SFGs.
suffix : str, optional
String to append at the end of the result.
schedule : bool, default: False
True if printing a schedule.
"""
if not isinstance(sfg, SFG):
raise TypeError("An SFG must be provided")
_type = "Schedule" if schedule else "SFG"
result = (
'\n"""\nB-ASIC automatically generated SFG file.\n'
'\n"""\n'
+ f"B-ASIC automatically generated {_type} file.\n"
+ "Name: "
+ f"{sfg.name}"
+ "\n"
......@@ -44,6 +53,8 @@ def sfg_to_python(
result += "\nfrom b_asic import SFG, Signal, Input, Output"
for op_type in {type(op) for op in sfg.operations}:
result += f", {op_type.__name__}"
if schedule:
result += ", Schedule"
def kwarg_unpacker(comp: GraphComponent, params=None) -> str:
if params is None:
......@@ -61,56 +72,51 @@ def sfg_to_python(
params = {k: v for k, v in params.items() if v}
if params.get("latency_offsets", None) is not None:
params["latency_offsets"] = {
k: v
for k, v in params["latency_offsets"].items()
if v is not None
k: v for k, v in params["latency_offsets"].items() if v is not None
}
if not params["latency_offsets"]:
del params["latency_offsets"]
return ", ".join(
[f"{param}={value}" for param, value in params.items()]
)
return ", ".join([f"{param}={value}" for param, value in params.items()])
# No need to redefined I/Os
io_ops = [*sfg._input_operations, *sfg._output_operations]
io_ops = [*sfg.input_operations, *sfg.output_operations]
result += "\n# Inputs:\n"
for input_op in sfg._input_operations:
for input_op in sfg.input_operations:
result += f"{input_op.graph_id} = Input({kwarg_unpacker(input_op)})\n"
result += "\n# Outputs:\n"
for output_op in sfg._output_operations:
result += (
f"{output_op.graph_id} = Output({kwarg_unpacker(output_op)})\n"
)
for output_op in sfg.output_operations:
result += f"{output_op.graph_id} = Output({kwarg_unpacker(output_op)})\n"
result += "\n# Operations:\n"
for op in sfg.split():
if op in io_ops:
for operation in sfg.split():
if operation in io_ops:
continue
if isinstance(op, SFG):
if isinstance(operation, SFG):
counter += 1
result = sfg_to_python(op, counter) + result
result = sfg_to_python(operation, counter) + result
continue
result += (
f"{op.graph_id} = {op.__class__.__name__}({kwarg_unpacker(op)})\n"
f"{operation.graph_id} ="
f" {operation.__class__.__name__}({kwarg_unpacker(operation)})\n"
)
result += "\n# Signals:\n"
# Keep track of already existing connections to avoid adding duplicates
connections = []
for op in sfg.split():
for out in op.outputs:
for operation in sfg.split():
for out in operation.outputs:
for signal in out.signals:
destination = cast(InputPort, signal.destination)
dest_op = destination.operation
connection = (
f"\nSignal(source={op.graph_id}."
f"output({op.outputs.index(signal.source)}),"
f"Signal(source={operation.graph_id}."
f"output({operation.outputs.index(signal.source)}),"
f" destination={dest_op.graph_id}."
f"input({dest_op.inputs.index(destination)}))"
f"input({dest_op.inputs.index(destination)}))\n"
)
if connection in connections:
continue
......@@ -119,20 +125,14 @@ def sfg_to_python(
connections.append(connection)
inputs = "[" + ", ".join(op.graph_id for op in sfg.input_operations) + "]"
outputs = (
"[" + ", ".join(op.graph_id for op in sfg.output_operations) + "]"
)
sfg_name = (
sfg.name if sfg.name else f"sfg{counter}" if counter > 0 else "sfg"
)
sfg_name_var = sfg_name.replace(" ", "_")
outputs = "[" + ", ".join(op.graph_id for op in sfg.output_operations) + "]"
sfg_name = sfg.name if sfg.name else f"sfg{counter}" if counter > 0 else "sfg"
sfg_name_var = sfg_name.replace(" ", "_").replace("-", "_")
result += "\n# Signal flow graph:\n"
result += (
f"\n{sfg_name_var} = SFG(inputs={inputs}, outputs={outputs},"
f" name='{sfg_name}')\n"
)
result += (
"\n# SFG Properties:\n" + "prop = {'name':" + f"{sfg_name_var}" + "}"
f"{sfg_name_var} = SFG(inputs={inputs}, outputs={outputs}, name='{sfg_name}')\n"
)
result += "\n# SFG Properties:\n" + "prop = {'name':" + f"{sfg_name_var}" + "}\n"
if suffix is not None:
result += "\n" + suffix + "\n"
......@@ -142,15 +142,15 @@ def sfg_to_python(
def python_to_sfg(path: str) -> Tuple[SFG, Dict[str, Tuple[int, int]]]:
"""
Given a serialized file try to deserialize it and load it to the library.
Given a serialized file, try to deserialize it and load it to the library.
Parameters
==========
path : str
Path to file to read and deserialize.
"""
with open(path) as f:
code = compile(f.read(), path, "exec")
with open(path) as file:
code = compile(file.read(), path, "exec")
exec(code, globals(), locals())
return (
......@@ -159,3 +159,29 @@ def python_to_sfg(path: str) -> Tuple[SFG, Dict[str, Tuple[int, int]]]:
else [v for k, v in locals().items() if isinstance(v, SFG)][0],
locals()["positions"] if "positions" in locals() else {},
)
def schedule_to_python(schedule: Schedule) -> str:
"""
Given a schedule structure try to serialize it for saving to a file.
Parameters
==========
schedule : Schedule
The schedule to serialize.
"""
if not isinstance(schedule, Schedule):
raise TypeError("A Schedule must be provided")
sfg_name = (
schedule.sfg.name.replace(" ", "_").replace("-", "_")
if schedule.sfg.name
else "sfg"
)
result = "\n# Schedule:\n"
nonzerolaps = {gid: val for gid, val in dict(schedule.laps).items() if val}
result += (
f"{sfg_name}_schedule = Schedule({sfg_name}, {schedule.schedule_time},"
f" {schedule.cyclic}, 'provided', {schedule.start_times},"
f" {nonzerolaps})\n"
)
return sfg_to_python(schedule.sfg, schedule=True) + result
......@@ -55,8 +55,15 @@ class Schedule:
algorithm.
cyclic : bool, default: False
If the schedule is cyclic.
scheduling_algorithm : {'ASAP'}, optional
scheduling_algorithm : {'ASAP', 'provided'}, optional
The scheduling algorithm to use. Currently, only "ASAP" is supported.
If 'provided', use provided *start_times* and *laps* dictionaries.
start_times : dict, optional
Dictionary with GraphIDs as keys and start times as values.
Used when *scheduling_algorithm* is 'provided'.
laps : dict, optional
Dictionary with GraphIDs as keys and laps as values.
Used when *scheduling_algorithm* is 'provided'.
"""
_sfg: SFG
......@@ -72,8 +79,14 @@ class Schedule:
schedule_time: Optional[int] = None,
cyclic: bool = False,
scheduling_algorithm: str = "ASAP",
start_times: Optional[Dict[GraphID, int]] = None,
laps: Optional[Dict[GraphID, int]] = None,
):
"""Construct a Schedule from an SFG."""
if not isinstance(sfg, SFG):
raise TypeError("An SFG must be provided")
self._original_sfg = sfg() # Make a copy
self._sfg = sfg
self._start_times = {}
self._laps = defaultdict(lambda: 0)
......@@ -81,6 +94,14 @@ class Schedule:
self._y_locations = defaultdict(lambda: None)
if scheduling_algorithm == "ASAP":
self._schedule_asap()
elif scheduling_algorithm == "provided":
if start_times is None:
raise ValueError("Must provide start_times when using 'provided'")
if laps is None:
raise ValueError("Must provide laps when using 'provided'")
self._start_times = start_times
self._laps.update(laps)
self._remove_delays_no_laps()
else:
raise NotImplementedError(
f"No algorithm with name: {scheduling_algorithm} defined."
......@@ -107,8 +128,8 @@ class Schedule:
"""Return the current maximum end time among all operations."""
max_end_time = 0
for graph_id, op_start_time in self._start_times.items():
op = cast(Operation, self._sfg.find_by_id(graph_id))
for outport in op.outputs:
operation = cast(Operation, self._sfg.find_by_id(graph_id))
for outport in operation.outputs:
max_end_time = max(
max_end_time,
op_start_time + cast(int, outport.latency_offset),
......@@ -149,8 +170,8 @@ class Schedule:
) -> Dict["OutputPort", Dict["Signal", int]]:
ret = {}
start_time = self._start_times[graph_id]
op = cast(Operation, self._sfg.find_by_id(graph_id))
for output_port in op.outputs:
operation = cast(Operation, self._sfg.find_by_id(graph_id))
for output_port in operation.outputs:
output_slacks = {}
available_time = start_time + cast(int, output_port.latency_offset)
......@@ -200,8 +221,8 @@ class Schedule:
def _backward_slacks(self, graph_id: GraphID) -> Dict[InputPort, Dict[Signal, int]]:
ret = {}
start_time = self._start_times[graph_id]
op = cast(Operation, self._sfg.find_by_id(graph_id))
for input_port in op.inputs:
operation = cast(Operation, self._sfg.find_by_id(graph_id))
for input_port in operation.inputs:
input_slacks = {}
usage_time = start_time + cast(int, input_port.latency_offset)
......@@ -270,14 +291,19 @@ class Schedule:
@property
def sfg(self) -> SFG:
return self._sfg
"""The SFG of the current schedule."""
return self._original_sfg
@property
def start_times(self) -> Dict[GraphID, int]:
"""The start times of the operations in the current schedule."""
return self._start_times
@property
def laps(self) -> Dict[GraphID, int]:
"""
The number of laps for the start times of the operations in the current schedule.
"""
return self._laps
@property
......@@ -317,8 +343,11 @@ class Schedule:
ret = [self._schedule_time, *self._start_times.values()]
# Loop over operations
for graph_id in self._start_times:
op = cast(Operation, self._sfg.find_by_id(graph_id))
ret += [cast(int, op.execution_time), *op.latency_offsets.values()]
operation = cast(Operation, self._sfg.find_by_id(graph_id))
ret += [
cast(int, operation.execution_time),
*operation.latency_offsets.values(),
]
# Remove not set values (None)
ret = [v for v in ret if v is not None]
return ret
......@@ -381,10 +410,10 @@ class Schedule:
"""
if insert:
for gid, y_location in self._y_locations.items():
if y_location >= new_y:
self._y_locations[gid] += 1
self._y_locations[graph_id] = new_y
for gid in self._y_locations:
if self.get_y_location(gid) >= new_y:
self.set_y_location(gid, self.get_y_location(gid) + 1)
self.set_y_location(graph_id, new_y)
used_locations = {*self._y_locations.values()}
possible_locations = set(range(max(used_locations) + 1))
if not possible_locations - used_locations:
......@@ -535,7 +564,16 @@ class Schedule:
self._start_times[graph_id] = new_start
return self
def _remove_delays_no_laps(self) -> None:
"""Remove delay elements without updating laps. Used when loading schedule."""
delay_list = self._sfg.find_by_type_name(Delay.type_name())
while delay_list:
delay_op = cast(Delay, delay_list[0])
self._sfg = cast(SFG, self._sfg.remove_operation(delay_op.graph_id))
delay_list = self._sfg.find_by_type_name(Delay.type_name())
def _remove_delays(self) -> None:
"""Remove delay elements and update laps. Used after scheduling algorithm."""
delay_list = self._sfg.find_by_type_name(Delay.type_name())
while delay_list:
delay_op = cast(Delay, delay_list[0])
......@@ -549,35 +587,35 @@ class Schedule:
def _schedule_asap(self) -> None:
"""Schedule the operations using as-soon-as-possible scheduling."""
pl = self._sfg.get_precedence_list()
precedence_list = self._sfg.get_precedence_list()
if len(pl) < 2:
if len(precedence_list) < 2:
print("Empty signal flow graph cannot be scheduled.")
return
non_schedulable_ops = set()
for outport in pl[0]:
op = outport.operation
if op.type_name() not in [Delay.type_name()]:
if op.graph_id not in self._start_times:
for outport in precedence_list[0]:
operation = outport.operation
if operation.type_name() not in [Delay.type_name()]:
if operation.graph_id not in self._start_times:
# Set start time of all operations in the first iter to 0
self._start_times[op.graph_id] = 0
self._start_times[operation.graph_id] = 0
else:
non_schedulable_ops.add(op.graph_id)
non_schedulable_ops.add(operation.graph_id)
for outport in pl[1]:
op = outport.operation
if op.graph_id not in self._start_times:
for outport in precedence_list[1]:
operation = outport.operation
if operation.graph_id not in self._start_times:
# Set start time of all operations in the first iter to 0
self._start_times[op.graph_id] = 0
self._start_times[operation.graph_id] = 0
for outports in pl[2:]:
for outports in precedence_list[2:]:
for outport in outports:
op = outport.operation
if op.graph_id not in self._start_times:
operation = outport.operation
if operation.graph_id not in self._start_times:
# Schedule the operation if it does not have a start time yet.
op_start_time = 0
for inport in op.inputs:
for inport in operation.inputs:
if len(inport.signals) != 1:
raise ValueError(
"Error in scheduling, dangling input port detected."
......@@ -617,7 +655,7 @@ class Schedule:
op_start_time_from_in = source_end_time - inport.latency_offset
op_start_time = max(op_start_time, op_start_time_from_in)
self._start_times[op.graph_id] = op_start_time
self._start_times[operation.graph_id] = op_start_time
for output in self._sfg.find_by_type_name(Output.type_name()):
output = cast(Output, output)
source_port = cast(OutputPort, output.inputs[0].signals[0].source)
......@@ -722,7 +760,7 @@ class Schedule:
line_cache.append(start)
elif end[0] == start[0]:
p = Path(
path = Path(
[
start,
[start[0] + SPLINE_OFFSET, start[1]],
......@@ -742,16 +780,16 @@ class Schedule:
Path.CURVE4,
],
)
pp = PathPatch(
p,
path_patch = PathPatch(
path,
fc='none',
ec=_SIGNAL_COLOR,
lw=SIGNAL_LINEWIDTH,
zorder=10,
)
ax.add_patch(pp)
ax.add_patch(path_patch)
else:
p = Path(
path = Path(
[
start,
[(start[0] + end[0]) / 2, start[1]],
......@@ -760,14 +798,14 @@ class Schedule:
],
[Path.MOVETO, Path.CURVE4, Path.CURVE4, Path.CURVE4],
)
pp = PathPatch(
p,
path_patch = PathPatch(
path,
fc='none',
ec=_SIGNAL_COLOR,
lw=SIGNAL_LINEWIDTH,
zorder=10,
)
ax.add_patch(pp)
ax.add_patch(path_patch)
def _draw_offset_arrow(start, end, start_offset, end_offset, name="", laps=0):
"""Draw an arrow from *start* to *end*, but with an offset."""
......@@ -784,12 +822,12 @@ class Schedule:
ax.grid()
for graph_id, op_start_time in self._start_times.items():
y_pos = self._get_y_position(graph_id, operation_gap=operation_gap)
op = cast(Operation, self._sfg.find_by_id(graph_id))
operation = cast(Operation, self._sfg.find_by_id(graph_id))
# Rewrite to make better use of NumPy
(
latency_coordinates,
execution_time_coordinates,
) = op.get_plot_coordinates()
) = operation.get_plot_coordinates()
_x, _y = zip(*latency_coordinates)
x = np.array(_x)
y = np.array(_y)
......@@ -809,11 +847,11 @@ class Schedule:
yticklabels.append(cast(Operation, self._sfg.find_by_id(graph_id)).name)
for graph_id, op_start_time in self._start_times.items():
op = cast(Operation, self._sfg.find_by_id(graph_id))
out_coordinates = op.get_output_coordinates()
operation = cast(Operation, self._sfg.find_by_id(graph_id))
out_coordinates = operation.get_output_coordinates()
source_y_pos = self._get_y_position(graph_id, operation_gap=operation_gap)
for output_port in op.outputs:
for output_port in operation.outputs:
for output_signal in output_port.signals:
destination = cast(InputPort, output_signal.destination)
destination_op = destination.operation
......@@ -858,7 +896,7 @@ class Schedule:
def _reset_y_locations(self) -> None:
"""Reset all the y-locations in the schedule to None"""
self._y_locations = self._y_locations = defaultdict(lambda: None)
self._y_locations = defaultdict(lambda: None)
def plot_in_axes(self, ax: Axes, operation_gap: Optional[float] = None) -> None:
"""
......@@ -911,7 +949,7 @@ class Schedule:
"""
fig, ax = plt.subplots()
self._plot_schedule(ax)
f = io.StringIO()
fig.savefig(f, format="svg")
buffer = io.StringIO()
fig.savefig(buffer, format="svg")
return f.getvalue()
return buffer.getvalue()
......@@ -257,7 +257,7 @@ class SchedulerItem(SchedulerEvent, QGraphicsItemGroup): # PySide2 / PyQt5
"""Make a new graph out of the stored attributes."""
# build components
for graph_id in self.schedule.start_times.keys():
operation = cast(Operation, self.schedule.sfg.find_by_id(graph_id))
operation = cast(Operation, self.schedule._sfg.find_by_id(graph_id))
component = OperationItem(operation, height=OPERATION_HEIGHT, parent=self)
self._operation_items[graph_id] = component
self._set_position(graph_id)
......
......@@ -59,6 +59,9 @@ class Simulation:
input_providers: Optional[Sequence[Optional[InputProvider]]] = None,
):
"""Construct a Simulation of an SFG."""
if not isinstance(sfg, SFG):
raise TypeError("An SFG must be provided")
# Copy the SFG to make sure it's not modified from the outside.
self._sfg = sfg()
self._results = defaultdict(list)
......@@ -214,3 +217,10 @@ class Simulation:
Clear all current state of the simulation, except for the results and iteration.
"""
self._delays.clear()
def show(self) -> None:
"""Show the simulation results."""
# import here to avoid cyclic imports
from b_asic.gui_utils.plot_window import start_simulation_dialog
start_simulation_dialog(self.results, self._sfg.name)
......@@ -11,6 +11,7 @@ API
port.rst
process.rst
resources.rst
save_load_structure.rst
schedule.rst
sfg_generators.rst
signal.rst
......
******************************
``b_asic.save_load_structure``
******************************
.. automodule:: b_asic.save_load_structure
:members:
......@@ -7,3 +7,11 @@ gui\_utils.about\_window module
.. automodule:: b_asic.gui_utils.about_window
:members:
:undoc-members:
gui\_utils.plot\_window module
-------------------------------
.. automodule:: b_asic.gui_utils.plot_window
:members:
:undoc-members:
import pickle
import matplotlib.pyplot as plt
import networkx as nx
import pytest
from b_asic.process import Process
from b_asic.research.interleaver import (
generate_matrix_transposer,
generate_random_interleaver,
)
from b_asic.resources import ProcessCollection, draw_exclusion_graph_coloring
from b_asic.resources import ProcessCollection
class TestProcessCollectionPlainMemoryVariable:
......@@ -44,3 +42,6 @@ class TestProcessCollectionPlainMemoryVariable:
assert len(collection.split_ports(read_ports=1, write_ports=1)) == 1
if any(var.execution_time for var in collection.collection):
assert len(collection.split_ports(total_ports=1)) == 2
def test_len_process_collection(self, simple_collection: ProcessCollection):
assert len(simple_collection) == 7
......@@ -495,6 +495,8 @@ class TestProcesses:
def test__get_memory_variables_list(self, secondorder_iir_schedule):
mvl = secondorder_iir_schedule._get_memory_variables_list()
assert len(mvl) == 12
pc = secondorder_iir_schedule.get_memory_variables()
assert len(pc) == 12
class TestFigureGeneration:
......