Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • da/B-ASIC
  • lukja239/B-ASIC
  • robal695/B-ASIC
3 results
Show changes
Commits on Source (15)
Showing
with 1005 additions and 119 deletions
......@@ -115,3 +115,4 @@ TODO.txt
*.log
b_asic/_version.py
docs_sphinx/_build/
docs_sphinx/examples
......@@ -24,6 +24,7 @@ The following packages are required in order to build the library:
- [NumPy](https://numpy.org/)
- [QtPy](https://github.com/spyder-ide/qtpy)
- [setuptools_scm](https://github.com/pypa/setuptools_scm/)
- [NetworkX](https://networkx.org/)
- Qt 5 or 6, with Python bindings, one of:
- pyside2
- pyqt5
......
......@@ -11,7 +11,7 @@ import sys
from pprint import pprint
from qtpy.QtCore import QFileInfo, QSize, Qt
from qtpy.QtGui import QIcon, QKeySequence, QPainter
from qtpy.QtGui import QCursor, QIcon, QKeySequence, QPainter
from qtpy.QtWidgets import (
QAction,
QApplication,
......@@ -33,10 +33,13 @@ from b_asic.GUI._preferences import GAP, GRID, MINBUTTONSIZE, PORTHEIGHT
from b_asic.GUI.arrow import Arrow
from b_asic.GUI.drag_button import DragButton
from b_asic.GUI.gui_interface import Ui_main_window
from b_asic.GUI.plot_window import PlotWindow
from b_asic.GUI.port_button import PortButton
from b_asic.GUI.select_sfg_window import SelectSFGWindow
from b_asic.GUI.show_pc_window import ShowPCWindow
from b_asic.GUI.simulate_sfg_window import Plot, SimulateSFGWindow
# from b_asic.GUI.simulate_sfg_window import Plot, SimulateSFGWindow
from b_asic.GUI.simulate_sfg_window import SimulateSFGWindow
from b_asic.GUI.util_dialogs import FaqWindow, KeybindsWindow
from b_asic.GUI.utils import decorate_class, handle_error
from b_asic.gui_utils.about_window import AboutWindow
......@@ -67,7 +70,8 @@ class MainWindow(QMainWindow):
self.operationItemSceneList = []
self.signalList = []
self.mouse_pressed = False
self.mouse_draggin = False
self.mouse_dragging = False
self.starting_port = []
self.pressed_operations = []
self.portDict = {}
self.signalPortDict = {}
......@@ -134,6 +138,8 @@ class MainWindow(QMainWindow):
"section on the toolbar."
)
self.cursor = QCursor()
def init_ui(self):
self.create_toolbar_view()
self.create_graphics_view()
......@@ -713,7 +719,8 @@ class MainWindow(QMainWindow):
self.logger.info(
"To save the plot press 'Ctrl+S' when the plot is focused."
)
self.plot = Plot(simulation, sfg, self)
# self.plot = Plot(simulation, sfg, self)
self.plot = PlotWindow(simulation.results)
self.plot.show()
def simulate_sfg(self, event=None):
......
# TODO's:
# * Solve the legend update. That isn't working at all.
# * Zoom etc. Might need to change FigureCanvas. Or just something very little.
import re
import sys
from matplotlib.backends.backend_qt5agg import (
FigureCanvasQTAgg as FigureCanvas,
)
from matplotlib.figure import Figure
from matplotlib.ticker import MaxNLocator
from qtpy.QtCore import Qt
from qtpy.QtGui import QKeySequence
# Intereme imports for the Plot class:
from qtpy.QtWidgets import ( # QFrame,; QScrollArea,; QLineEdit,; QSizePolicy,; QLabel,
QApplication,
QCheckBox,
QDialog,
QFileDialog,
QHBoxLayout,
QListWidget,
QListWidgetItem,
QPushButton,
QShortcut,
QVBoxLayout,
)
class PlotCanvas(FigureCanvas):
"""PlotCanvas is used as a part in the PlotWindow."""
def __init__(self, logger, parent=None, width=5, height=4, dpi=100):
fig = Figure(figsize=(width, height), dpi=dpi)
super().__init__(fig)
self.axes = fig.add_subplot(111)
self.axes.xaxis.set_major_locator(MaxNLocator(integer=True))
self.legend = None
self.logger = logger
FigureCanvas.updateGeometry(self)
self.save_figure = QShortcut(QKeySequence("Ctrl+S"), self)
self.save_figure.activated.connect(self._save_plot_figure)
def _save_plot_figure(self):
self.logger.info(f"Saving plot of figure: {self.sfg.name}.")
file_choices = "PNG (*.png)|*.png"
path, ext = QFileDialog.getSaveFileName(
self, "Save file", "", file_choices
)
path = path.encode("utf-8")
if not path[-4:] == file_choices[-4:].encode("utf-8"):
path += file_choices[-4:].encode("utf-8")
if path:
self.print_figure(path.decode(), dpi=self.dpi)
self.logger.info(f"Saved plot: {self.sfg.name} to path: {path}.")
class PlotWindow(QDialog):
"""Dialog for plotting the result of a simulation."""
def __init__(
self,
sim_result,
# sfg_name="{sfg_name}",
# window=None,
logger=print,
parent=None,
# width=5,
# height=4,
# dpi=100,
):
super().__init__(parent=parent)
# self._window = window
self.setWindowFlags(
Qt.WindowTitleHint
| Qt.WindowCloseButtonHint
| Qt.WindowMinimizeButtonHint
| Qt.WindowMaximizeButtonHint
| Qt.WindowStaysOnTopHint
)
self.setWindowTitle("Simulation result")
self.sim_result = sim_result
self._auto_redraw = False
# Categorise sim_results into inputs, outputs, delays, others
sim_res_ins = {}
sim_res_outs = {}
sim_res_delays = {}
sim_res_others = {}
for key in sim_result:
if re.fullmatch("in[0-9]+", key):
sim_res_ins[key] = sim_result[key]
elif re.fullmatch("[0-9]+", key):
sim_res_outs[key] = sim_result[key]
elif re.fullmatch("t[0-9]+", key):
sim_res_delays[key] = sim_result[key]
else:
sim_res_others[key] = sim_result[key]
# Layout: ############################################
# | list | |
# | ... | plot |
# | misc | |
self.dialog_layout = QHBoxLayout()
self.setLayout(self.dialog_layout)
listlayout = QVBoxLayout()
self.plotcanvas = PlotCanvas(
logger=logger, parent=self, width=5, height=4, dpi=100
)
self.dialog_layout.addLayout(listlayout)
self.dialog_layout.addWidget(self.plotcanvas)
########### Plot: ##############
# Do this before the list layout, as the list layout will re/set visibility
# Note: The order is of importens. Interesting lines last, to be on top.
self._lines = {}
for key in (
sim_res_others | sim_res_delays | sim_res_ins | sim_res_outs
):
line = self.plotcanvas.axes.plot(
sim_result[key], visible=False, label=key
)
self._lines[key] = line
self.plotcanvas.legend = self.plotcanvas.axes.legend()
########### List layout: ##############
# Add two buttons for selecting all/none:
hlayout = QHBoxLayout()
button_all = QPushButton("&All")
button_all.clicked.connect(self._button_all_click)
hlayout.addWidget(button_all)
button_none = QPushButton("&None")
button_none.clicked.connect(self._button_none_click)
hlayout.addWidget(button_none)
listlayout.addLayout(hlayout)
# Add the entire list
self.checklist = QListWidget()
self.checklist.itemChanged.connect(self._item_change)
listitems = {}
for key in (
sim_res_ins | sim_res_outs | sim_res_delays | sim_res_others
):
listitem = QListWidgetItem(key)
listitems[key] = listitem
self.checklist.addItem(listitem)
listitem.setCheckState(
Qt.CheckState.Unchecked # CheckState: Qt.CheckState.{Unchecked, PartiallyChecked, Checked}
)
for key in sim_res_outs:
listitems[key].setCheckState(Qt.CheckState.Checked)
self.checklist.setFixedWidth(150)
listlayout.addWidget(self.checklist)
# Add additional checkboxes
self.legend_checkbox = QCheckBox("&Legend")
self.legend_checkbox.stateChanged.connect(self._legend_checkbox_change)
self.legend_checkbox.setCheckState(Qt.CheckState.Checked)
listlayout.addWidget(self.legend_checkbox)
# self.ontop_checkbox = QCheckBox("&On top")
# self.ontop_checkbox.stateChanged.connect(self._ontop_checkbox_change)
# self.ontop_checkbox.setCheckState(Qt.CheckState.Unchecked)
# listlayout.addWidget(self.ontop_checkbox)
# Add "Close" buttons
buttonClose = QPushButton("&Close", self)
buttonClose.clicked.connect(self.close)
listlayout.addWidget(buttonClose)
# Done. Tell the functions below to redraw the canvas when needed.
# self.plotcanvas.draw()
self._auto_redraw = True
def _legend_checkbox_change(self, checkState):
self.plotcanvas.legend.set(
visible=(checkState == Qt.CheckState.Checked)
)
if self._auto_redraw:
if checkState == Qt.CheckState.Checked:
self.plotcanvas.legend = self.plotcanvas.axes.legend()
self.plotcanvas.draw()
# def _ontop_checkbox_change(self, checkState):
# Bugg: It seems the window closes if you change the WindowStaysOnTopHint.
# (Nothing happens if "changing" from False to False or True to True)
# self.setWindowFlag(Qt.WindowStaysOnTopHint, on = (checkState == Qt.CheckState.Checked))
# self.setWindowFlag(Qt.WindowStaysOnTopHint, on = True)
# print("_ontop_checkbox_change")
def _button_all_click(self, event):
self._auto_redraw = False
for x in range(self.checklist.count()):
self.checklist.item(x).setCheckState(Qt.CheckState.Checked)
self._auto_redraw = True
self.plotcanvas.draw()
def _button_none_click(self, event):
self._auto_redraw = False
for x in range(self.checklist.count()):
self.checklist.item(x).setCheckState(Qt.CheckState.Unchecked)
self._auto_redraw = True
self.plotcanvas.draw()
def _item_change(self, listitem):
key = listitem.text()
self._lines[key][0].set(
visible=(listitem.checkState() == Qt.CheckState.Checked)
)
if self._auto_redraw:
if self.legend_checkbox.checkState == Qt.CheckState.Checked:
self.plotcanvas.legend = self.plotcanvas.axes.legend()
self.plotcanvas.draw()
# Simple test of the dialog
if __name__ == "__main__":
app = QApplication(sys.argv)
# sim_res = {"c1": [3, 6, 7], "c2": [4, 5, 5], "bfly1.0": [7, 0, 0], "bfly1.1": [-1, 0, 2], "0": [1, 2, 3]}
sim_res = {
'0': [0.5, 0.5, 0, 0],
'add1': [0.5, 0.5, 0, 0],
'cmul1': [0, 0.5, 0, 0],
'cmul2': [0.5, 0, 0, 0],
'in1': [1, 0, 0, 0],
't1': [0, 1, 0, 0],
}
win = PlotWindow(
# window=None, sim_result=sim_res, sfg_name="hej", logger=print
sim_result=sim_res,
)
win.exec_()
# win.show()
"""
B-ASIC port button module.
"""
from qtpy.QtCore import Qt, Signal
from qtpy.QtCore import QMimeData, Qt, Signal
from qtpy.QtGui import QDrag
from qtpy.QtWidgets import QMenu, QPushButton
......@@ -30,9 +31,11 @@ class PortButton(QPushButton):
self.clicked = 0
self._m_drag = False
self._m_press = False
self.setAcceptDrops(True)
self.setCursor(Qt.ArrowCursor)
self.setStyleSheet("background-color: white")
self.connectionRequested.connect(self._window._connect_button)
self.connectionRequested.connect(self._window._connect_callback)
def contextMenuEvent(self, event):
menu = QMenu()
......@@ -41,13 +44,51 @@ class PortButton(QPushButton):
def mousePressEvent(self, event):
if event.button() == Qt.MouseButton.LeftButton:
self._window.mouse_pressed = True
self.select_port(event.modifiers())
super().mousePressEvent(event)
def mouseReleaseEvent(self, event):
if (
event.button() == Qt.MouseButton.LeftButton
and self._window.mouse_pressed
):
self._window.mouse_pressed = False
if self._window.mouse_dragging:
self._window.mouse_dragging = False
super().mouseReleaseEvent(event)
def mouseMoveEvent(self, event):
if self._window.mouse_pressed:
self._window.mouse_draggin = True
self._window.starting_port = self
data = QMimeData()
drag = QDrag(self)
drag.setMimeData(data)
drag.exec()
super().mouseMoveEvent(event)
def dragEnterEvent(self, event):
event.acceptProposedAction()
self.update()
super().dragEnterEvent(event)
def dragLeaveEvent(self, event):
self.update()
super().dragLeaveEvent(event)
def dragMoveEvent(self, event):
event.acceptProposedAction()
super().dragMoveEvent(event)
def dropEvent(self, event):
event.acceptProposedAction()
if self != self._window.starting_port:
self.select_port(Qt.KeyboardModifier.ControlModifier)
self._window._connect_callback()
self.update()
super().dropEvent(event)
def _toggle_port(self, pressed=False):
self.pressed = not pressed
self.setStyleSheet(
......
......@@ -258,12 +258,12 @@ class AddSub(AbstractOperation):
@property
def is_add(self) -> Num:
"""Get if operation is add."""
"""Get if operation is an addition."""
return self.param("is_add")
@is_add.setter
def is_add(self, is_add: bool) -> None:
"""Set if operation is add."""
"""Set if operation is an addition."""
self.set_param("is_add", is_add)
......@@ -774,7 +774,7 @@ class Reciprocal(AbstractOperation):
latency_offsets: Optional[Dict[str, int]] = None,
execution_time: Optional[int] = None,
):
"""Construct an Reciprocal operation."""
"""Construct a Reciprocal operation."""
super().__init__(
input_count=1,
output_count=1,
......
......@@ -9,7 +9,7 @@ from collections import deque
from copy import copy, deepcopy
from typing import Any, Dict, Generator, Iterable, Mapping, cast
from b_asic.types import GraphID, GraphIDNumber, Name, Num, TypeName
from b_asic.types import GraphID, Name, TypeName
class GraphComponent(ABC):
......
......@@ -11,7 +11,6 @@ from abc import abstractmethod
from numbers import Number
from typing import (
TYPE_CHECKING,
Any,
Dict,
Iterable,
List,
......@@ -34,7 +33,7 @@ from b_asic.graph_component import (
)
from b_asic.port import InputPort, OutputPort, SignalSourceProvider
from b_asic.signal import Signal
from b_asic.types import Num, NumRuntime
from b_asic.types import Num
if TYPE_CHECKING:
# Conditionally imported to avoid circular imports
......@@ -593,7 +592,7 @@ class AbstractOperation(Operation, AbstractGraphComponent):
# Import here to avoid circular imports.
from b_asic.core_operations import Addition, Constant
if isinstance(src, NumRuntime):
if isinstance(src, Number):
return Addition(self, Constant(src))
else:
return Addition(self, src)
......@@ -603,7 +602,7 @@ class AbstractOperation(Operation, AbstractGraphComponent):
from b_asic.core_operations import Addition, Constant
return Addition(
Constant(src) if isinstance(src, NumRuntime) else src, self
Constant(src) if isinstance(src, Number) else src, self
)
def __sub__(self, src: Union[SignalSourceProvider, Num]) -> "Subtraction":
......@@ -611,7 +610,7 @@ class AbstractOperation(Operation, AbstractGraphComponent):
from b_asic.core_operations import Constant, Subtraction
return Subtraction(
self, Constant(src) if isinstance(src, NumRuntime) else src
self, Constant(src) if isinstance(src, Number) else src
)
def __rsub__(self, src: Union[SignalSourceProvider, Num]) -> "Subtraction":
......@@ -619,7 +618,7 @@ class AbstractOperation(Operation, AbstractGraphComponent):
from b_asic.core_operations import Constant, Subtraction
return Subtraction(
Constant(src) if isinstance(src, NumRuntime) else src, self
Constant(src) if isinstance(src, Number) else src, self
)
def __mul__(
......@@ -633,7 +632,7 @@ class AbstractOperation(Operation, AbstractGraphComponent):
return (
ConstantMultiplication(src, self)
if isinstance(src, NumRuntime)
if isinstance(src, Number)
else Multiplication(self, src)
)
......@@ -648,7 +647,7 @@ class AbstractOperation(Operation, AbstractGraphComponent):
return (
ConstantMultiplication(src, self)
if isinstance(src, NumRuntime)
if isinstance(src, Number)
else Multiplication(src, self)
)
......@@ -657,7 +656,7 @@ class AbstractOperation(Operation, AbstractGraphComponent):
from b_asic.core_operations import Constant, Division
return Division(
self, Constant(src) if isinstance(src, NumRuntime) else src
self, Constant(src) if isinstance(src, Number) else src
)
def __rtruediv__(
......@@ -666,7 +665,7 @@ class AbstractOperation(Operation, AbstractGraphComponent):
# Import here to avoid circular imports.
from b_asic.core_operations import Constant, Division, Reciprocal
if isinstance(src, NumRuntime):
if isinstance(src, Number):
if src == 1:
return Reciprocal(self)
else:
......
......@@ -2,7 +2,7 @@
B-ASIC classes representing resource usage.
"""
from typing import Dict, Tuple
from typing import Dict, Optional, Tuple
from b_asic.operation import Operation
from b_asic.port import InputPort, OutputPort
......@@ -20,11 +20,20 @@ class Process:
Start time of process.
execution_time : int
Execution time (lifetime) of process.
name : str, optional
The name of the process. If not provided, generate a name.
"""
def __init__(self, start_time: int, execution_time: int):
def __init__(
self, start_time: int, execution_time: int, name: Optional[str] = None
):
self._start_time = start_time
self._execution_time = execution_time
if name is None:
self._name = f"Proc. {PlainMemoryVariable._name_cnt}"
PlainMemoryVariable._name_cnt += 1
else:
self._name = name
def __lt__(self, other):
return self._start_time < other.start_time or (
......@@ -42,6 +51,21 @@ class Process:
"""Return the execution time."""
return self._execution_time
@property
def name(self) -> str:
return self._name
def __str__(self) -> str:
return self._name
def __repr__(self) -> str:
return (
f"Process({self.start_time}, {self.execution_time}, {self.name})"
)
# Static counter for default names
_name_cnt = 0
class OperatorProcess(Process):
"""
......@@ -53,16 +77,27 @@ class OperatorProcess(Process):
Start time of process.
operation : Operation
Operation that the process corresponds to.
name : str, optional
The name of the process.
"""
def __init__(self, start_time: int, operation: Operation):
def __init__(
self,
start_time: int,
operation: Operation,
name: Optional[str] = None,
):
execution_time = operation.execution_time
if execution_time is None:
raise ValueError(
"Operation {operation!r} does not have an execution time"
" specified!"
)
super().__init__(start_time, execution_time)
super().__init__(
start_time,
execution_time,
name=name,
)
self._operation = operation
......@@ -80,6 +115,8 @@ class MemoryVariable(Process):
reads : {InputPort: int, ...}
Dictionary with the InputPorts that reads the memory variable and
for how long after the *write_time* they will read.
name : str, optional
The name of the process.
"""
def __init__(
......@@ -87,12 +124,15 @@ class MemoryVariable(Process):
write_time: int,
write_port: OutputPort,
reads: Dict[InputPort, int],
name: Optional[str] = None,
):
self._read_ports = tuple(reads.keys())
self._life_times = tuple(reads.values())
self._write_port = write_port
super().__init__(
start_time=write_time, execution_time=max(self._life_times)
start_time=write_time,
execution_time=max(self._life_times),
name=name,
)
@property
......@@ -107,6 +147,13 @@ class MemoryVariable(Process):
def write_port(self) -> OutputPort:
return self._write_port
def __repr__(self) -> str:
reads = {k: v for k, v in zip(self._read_ports, self._life_times)}
return (
f"MemoryVariable({self.start_time}, {self.write_port},"
f" {reads!r}, {self.name!r})"
)
class PlainMemoryVariable(Process):
"""
......@@ -123,6 +170,8 @@ class PlainMemoryVariable(Process):
reads : {int: int, ...}
Dictionary where the key is the destination identifier and the value
is the time after *write_time* that the memory variable is read.
name : str, optional
The name of the process.
"""
def __init__(
......@@ -130,12 +179,15 @@ class PlainMemoryVariable(Process):
write_time: int,
write_port: int,
reads: Dict[int, int],
name: Optional[str] = None,
):
self._read_ports = tuple(reads.keys())
self._life_times = tuple(reads.values())
self._write_port = write_port
super().__init__(
start_time=write_time, execution_time=max(self._life_times)
start_time=write_time,
execution_time=max(self._life_times),
name=name,
)
@property
......@@ -149,3 +201,10 @@ class PlainMemoryVariable(Process):
@property
def write_port(self) -> int:
return self._write_port
def __repr__(self) -> str:
reads = {k: v for k, v in zip(self._read_ports, self._life_times)}
return (
f"PlainMemoryVariable({self.start_time}, {self.write_port},"
f" {reads!r}, {self.name!r})"
)
......@@ -6,6 +6,7 @@ import random
from typing import Optional, Set
from b_asic.process import PlainMemoryVariable
from b_asic.resources import ProcessCollection
def _insert_delays(inputorder, outputorder, min_lifetime, cyclic):
......@@ -14,9 +15,7 @@ def _insert_delays(inputorder, outputorder, min_lifetime, cyclic):
outputorder = [o - maxdiff + min_lifetime for o in outputorder]
maxdelay = max(outputorder[i] - inputorder[i] for i in range(size))
if cyclic:
if maxdelay < size:
outputorder = [o % size for o in outputorder]
else:
if maxdelay >= size:
inputorder = inputorder + [i + size for i in inputorder]
outputorder = outputorder + [o + size for o in outputorder]
return inputorder, outputorder
......@@ -24,7 +23,7 @@ def _insert_delays(inputorder, outputorder, min_lifetime, cyclic):
def generate_random_interleaver(
size: int, min_lifetime: int = 0, cyclic: bool = True
) -> Set[PlainMemoryVariable]:
) -> ProcessCollection:
"""
Generate a ProcessCollection with memory variable corresponding to a random
interleaver with length *size*.
......@@ -48,15 +47,19 @@ def generate_random_interleaver(
inputorder = list(range(size))
outputorder = inputorder[:]
random.shuffle(outputorder)
print(inputorder, outputorder)
inputorder, outputorder = _insert_delays(
inputorder, outputorder, min_lifetime, cyclic
)
print(inputorder, outputorder)
return {
PlainMemoryVariable(inputorder[i], 0, {0: outputorder[i]})
for i in range(size)
}
return ProcessCollection(
{
PlainMemoryVariable(
inputorder[i], 0, {0: outputorder[i] - inputorder[i]}
)
for i in range(len(inputorder))
},
len(inputorder),
cyclic,
)
def generate_matrix_transposer(
......@@ -64,7 +67,7 @@ def generate_matrix_transposer(
width: Optional[int] = None,
min_lifetime: int = 0,
cyclic: bool = True,
) -> Set[PlainMemoryVariable]:
) -> ProcessCollection:
r"""
Generate a ProcessCollection with memory variable corresponding to transposing a
matrix of size *height* :math:`\times` *width*. If *width* is not provided, a
......@@ -101,12 +104,19 @@ def generate_matrix_transposer(
for col in range(height):
outputorder.append(col * width + row)
print(inputorder, outputorder)
inputorder, outputorder = _insert_delays(
inputorder, outputorder, min_lifetime, cyclic
)
print(inputorder, outputorder)
return {
PlainMemoryVariable(inputorder[i], 0, {0: outputorder[i]})
for i in range(width * height)
}
return ProcessCollection(
{
PlainMemoryVariable(
inputorder[i],
0,
{0: outputorder[i] - inputorder[i]},
name=f"{inputorder[i]}",
)
for i in range(len(inputorder))
},
len(inputorder),
cyclic,
)
import io
import re
from typing import Dict, Iterable, List, Optional, Set, Tuple, TypeVar, Union
import matplotlib.pyplot as plt
import networkx as nx
from matplotlib.axes import Axes
from matplotlib.ticker import MaxNLocator
from b_asic._preferences import LATENCY_COLOR
from b_asic.process import Process
# Default latency coloring RGB tuple
_LATENCY_COLOR = tuple(c / 255 for c in LATENCY_COLOR)
#
# Human-intuitive sorting:
# https://stackoverflow.com/questions/2669059/how-to-sort-alpha-numeric-set-in-python
#
# Typing '_T' to help Pyright propagate type-information
#
_T = TypeVar('_T')
def _sorted_nicely(to_be_sorted: Iterable[_T]) -> List[_T]:
"""Sort the given iterable in the way that humans expect."""
convert = lambda text: int(text) if text.isdigit() else text
alphanum_key = lambda key: [
convert(c) for c in re.split('([0-9]+)', str(key))
]
return sorted(to_be_sorted, key=alphanum_key)
def draw_exclusion_graph_coloring(
exclusion_graph: nx.Graph,
color_dict: Dict[Process, int],
ax: Optional[Axes] = None,
color_list: Optional[
Union[List[str], List[Tuple[float, float, float]]]
] = None,
):
"""
Use matplotlib.pyplot and networkx to draw a colored exclusion graph from the memory assignment
.. code-block:: python
_, ax = plt.subplots(1, 1)
collection = ProcessCollection(...)
exclusion_graph = collection.create_exclusion_graph_from_overlap()
color_dict = nx.greedy_color(exclusion_graph)
draw_exclusion_graph_coloring(exclusion_graph, color_dict, ax=ax[0])
plt.show()
Parameters
----------
exclusion_graph : nx.Graph
A nx.Graph exclusion graph object that is to be drawn.
color_dict : dictionary
A color dictionary where keys are Process objects and where values are integers representing colors. These
dictionaries are automatically generated by :func:`networkx.algorithms.coloring.greedy_color`.
ax : :class:`matplotlib.axes.Axes`, optional
A Matplotlib Axes object to draw the exclusion graph
color_list : Optional[Union[List[str], List[Tuple[float,float,float]]]]
"""
COLOR_LIST = [
'#aa0000',
'#00aa00',
'#0000ff',
'#ff00aa',
'#ffaa00',
'#00ffaa',
'#aaff00',
'#aa00ff',
'#00aaff',
'#ff0000',
'#00ff00',
'#0000aa',
'#aaaa00',
'#aa00aa',
'#00aaaa',
]
if color_list is None:
node_color_dict = {k: COLOR_LIST[v] for k, v in color_dict.items()}
else:
node_color_dict = {k: color_list[v] for k, v in color_dict.items()}
node_color_list = [node_color_dict[node] for node in exclusion_graph]
nx.draw_networkx(
exclusion_graph,
node_color=node_color_list,
ax=ax,
pos=nx.spring_layout(exclusion_graph, seed=1),
)
class ProcessCollection:
"""
Collection of one or more processes
Parameters
----------
collection : set of :class:`~b_asic.process.Process` objects
The Process objects forming this ProcessCollection.
schedule_time : int
Length of the time-axis in the generated graph.
cyclic : bool, default: False
If the processes operates cyclically, i.e., if time 0 == time *schedule_time*.
"""
def __init__(
self,
collection: Set[Process],
schedule_time: int,
cyclic: bool = False,
):
self._collection = collection
self._schedule_time = schedule_time
self._cyclic = cyclic
def add_process(self, process: Process):
"""
Add a new process to this process collection.
Parameters
----------
process : Process
The process object to be added to the collection
"""
self._collection.add(process)
def draw_lifetime_chart(
self,
ax: Optional[Axes] = None,
show_name: bool = True,
bar_color: Union[str, Tuple[float, ...]] = _LATENCY_COLOR,
marker_color: Union[str, Tuple[float, ...]] = "black",
marker_read: str = "X",
marker_write: str = "o",
show_markers: bool = True,
):
"""
Use matplotlib.pyplot to generate a process variable lifetime chart from this process collection.
Parameters
----------
ax : :class:`matplotlib.axes.Axes`, optional
Matplotlib Axes object to draw this lifetime chart onto. If not provided (i.e., set to None),
this method will return a new axes object on return.
show_name : bool, default: True
Show name of all processes in the lifetime chart.
bar_color : color, optional
Bar color in lifetime chart.
marker_color : color, default 'black'
Color for read and write marker.
marker_write : str, default 'x'
Marker at write time in the lifetime chart.
marker_read : str, default 'o'
Marker at read time in the lifetime chart.
show_markers : bool, default True
Show markers at read and write times.
Returns
-------
ax: Associated Matplotlib Axes (or array of Axes) object
"""
# Set up the Axes object
if ax is None:
_, _ax = plt.subplots()
else:
_ax = ax
# Lifetime chart left and right padding
PAD_L, PAD_R = 0.05, 0.05
max_execution_time = max(
process.execution_time for process in self._collection
)
if max_execution_time > self._schedule_time:
# Schedule time needs to be greater than or equal to the maximum process lifetime
raise KeyError(
f'Error: Schedule time: {self._schedule_time} < Max execution'
f' time: {max_execution_time}'
)
# Generate the life-time chart
for i, process in enumerate(_sorted_nicely(self._collection)):
bar_start = process.start_time % self._schedule_time
bar_end = (
process.start_time + process.execution_time
) % self._schedule_time
bar_end = self._schedule_time if bar_end == 0 else bar_end
if show_markers:
_ax.scatter(
x=bar_start,
y=i + 1,
marker=marker_write,
color=marker_color,
zorder=10,
)
_ax.scatter(
x=bar_end,
y=i + 1,
marker=marker_read,
color=marker_color,
zorder=10,
)
if bar_end >= bar_start:
_ax.broken_barh(
[(PAD_L + bar_start, bar_end - bar_start - PAD_L - PAD_R)],
(i + 0.55, 0.9),
color=bar_color,
)
else: # bar_end < bar_start
_ax.broken_barh(
[
(
PAD_L + bar_start,
self._schedule_time - bar_start - PAD_L,
)
],
(i + 0.55, 0.9),
color=bar_color,
)
_ax.broken_barh(
[(0, bar_end - PAD_R)], (i + 0.55, 0.9), color=bar_color
)
if show_name:
_ax.annotate(
str(process),
(bar_start + PAD_L + 0.025, i + 1.00),
va="center",
)
_ax.grid(True)
_ax.xaxis.set_major_locator(MaxNLocator(integer=True))
_ax.yaxis.set_major_locator(MaxNLocator(integer=True))
_ax.set_xlim(0, self._schedule_time)
_ax.set_ylim(0.25, len(self._collection) + 0.75)
return _ax
def create_exclusion_graph_from_overlap(
self, add_name: bool = True
) -> nx.Graph:
"""
Generate exclusion graph based on processes overlapping in time
Parameters
----------
add_name : bool, default: True
Add name of all processes as a node attribute in the exclusion graph.
Returns
-------
An nx.Graph exclusion graph where nodes are processes and arcs
between two processes indicated overlap in time
"""
exclusion_graph = nx.Graph()
exclusion_graph.add_nodes_from(self._collection)
for process1 in self._collection:
for process2 in self._collection:
if process1 == process2:
continue
else:
t1 = set(
range(
process1.start_time,
process1.start_time + process1.execution_time,
)
)
t2 = set(
range(
process2.start_time,
process2.start_time + process2.execution_time,
)
)
if t1.intersection(t2):
exclusion_graph.add_edge(process1, process2)
return exclusion_graph
def split(
self,
heuristic: str = "graph_color",
read_ports: Optional[int] = None,
write_ports: Optional[int] = None,
total_ports: Optional[int] = None,
) -> Set["ProcessCollection"]:
"""
Split this process storage based on some heuristic.
Parameters
----------
heuristic : str, default: "graph_color"
The heuristic used when splitting this ProcessCollection.
Valid options are:
* "graph_color"
* "..."
read_ports : int, optional
The number of read ports used when splitting process collection based on memory variable access.
write_ports : int, optional
The number of write ports used when splitting process collection based on memory variable access.
total_ports : int, optional
The total number of ports used when splitting process collection based on memory variable access.
Returns
-------
A set of new ProcessCollection objects with the process splitting.
"""
if total_ports is None:
if read_ports is None or write_ports is None:
raise ValueError("inteligent quote")
else:
total_ports = read_ports + write_ports
else:
read_ports = total_ports if read_ports is None else read_ports
write_ports = total_ports if write_ports is None else write_ports
if heuristic == "graph_color":
return self._split_graph_color(
read_ports, write_ports, total_ports
)
else:
raise ValueError("Invalid heuristic provided")
def _split_graph_color(
self, read_ports: int, write_ports: int, total_ports: int
) -> Set["ProcessCollection"]:
"""
Parameters
----------
read_ports : int, optional
The number of read ports used when splitting process collection based on memory variable access.
write_ports : int, optional
The number of write ports used when splitting process collection based on memory variable access.
total_ports : int, optional
The total number of ports used when splitting process collection based on memory variable access.
"""
if read_ports != 1 or write_ports != 1:
raise ValueError(
"Splitting with read and write ports not equal to one with the"
" graph coloring heuristic does not make sense."
)
if total_ports not in (1, 2):
raise ValueError(
"Total ports should be either 1 (non-concurrent reads/writes)"
" or 2 (concurrent read/writes) for graph coloring heuristic."
)
# Create new exclusion graph. Nodes are Processes
exclusion_graph = nx.Graph()
exclusion_graph.add_nodes_from(self._collection)
# Add exclusions (arcs) between processes in the exclusion graph
for node1 in exclusion_graph:
for node2 in exclusion_graph:
if node1 == node2:
continue
else:
node1_stop_time = node1.start_time + node1.execution_time
node2_stop_time = node2.start_time + node2.execution_time
if total_ports == 1:
# Single-port assignment
if node1.start_time == node2.start_time:
exclusion_graph.add_edge(node1, node2)
elif node1_stop_time == node2_stop_time:
exclusion_graph.add_edge(node1, node2)
elif node1.start_time == node2_stop_time:
exclusion_graph.add_edge(node1, node2)
elif node1_stop_time == node2.start_time:
exclusion_graph.add_edge(node1, node2)
else:
# Dual-port assignment
if node1.start_time == node2.start_time:
exclusion_graph.add_edge(node1, node2)
elif node1_stop_time == node2_stop_time:
exclusion_graph.add_edge(node1, node2)
# Perform assignment
coloring = nx.coloring.greedy_color(exclusion_graph)
draw_exclusion_graph_coloring(exclusion_graph, coloring)
# process_collection_list = [ProcessCollection()]*(max(coloring.values()) + 1)
process_collection_set_list = [
set() for _ in range(max(coloring.values()) + 1)
]
for process, color in coloring.items():
process_collection_set_list[color].add(process)
return {
ProcessCollection(
process_collection_set, self._schedule_time, self._cyclic
)
for process_collection_set in process_collection_set_list
}
def _repr_svg_(self) -> str:
"""
Generate an SVG_ of the resource collection. This is automatically displayed in e.g.
Jupyter Qt console.
"""
fig, ax = plt.subplots()
self.draw_lifetime_chart(ax, show_markers=False)
f = io.StringIO()
fig.savefig(f, format="svg")
return f.getvalue()
......@@ -32,6 +32,7 @@ from b_asic.graph_component import GraphID
from b_asic.operation import Operation
from b_asic.port import InputPort, OutputPort
from b_asic.process import MemoryVariable, Process
from b_asic.resources import ProcessCollection
from b_asic.signal_flow_graph import SFG
from b_asic.special_operations import Delay, Output
......@@ -583,8 +584,8 @@ class Schedule:
] + cast(int, source_port.latency_offset)
self._remove_delays()
def _get_memory_variables_list(self) -> List['Process']:
ret: List['Process'] = []
def _get_memory_variables_list(self) -> List['MemoryVariable']:
ret: List['MemoryVariable'] = []
for graph_id, start_time in self._start_times.items():
slacks = self._forward_slacks(graph_id)
for outport, signals in slacks.items():
......@@ -597,13 +598,28 @@ class Schedule:
start_time + cast(int, outport.latency_offset),
outport,
reads,
outport.operation.graph_id,
)
)
return ret
def get_memory_variables(self) -> ProcessCollection:
"""
Return a :class:`~b_asic.resources.ProcessCollection` containing all
memory variables.
Returns
-------
ProcessCollection
"""
return ProcessCollection(
set(self._get_memory_variables_list()), self.schedule_time
)
def _get_y_position(
self, graph_id, operation_height=1.0, operation_gap=None
):
) -> float:
if operation_gap is None:
operation_gap = OPERATION_GAP
y_location = self._y_locations[graph_id]
......@@ -617,11 +633,15 @@ class Schedule:
self._y_locations[graph_id] = y_location
return operation_gap + y_location * (operation_height + operation_gap)
def _plot_schedule(self, ax, operation_gap: Optional[float] = None):
def _plot_schedule(
self, ax: Axes, operation_gap: Optional[float] = None
) -> None:
"""Draw the schedule."""
line_cache = []
def _draw_arrow(start, end, name="", laps=0):
def _draw_arrow(
start: List[float], end: List[float], name: str = "", laps: int = 0
):
"""Draw an arrow from *start* to *end*."""
if end[0] < start[0] or laps > 0: # Wrap around
if start not in line_cache:
......@@ -848,7 +868,7 @@ class Schedule:
self._plot_schedule(ax, operation_gap=operation_gap)
return fig
def _repr_svg_(self):
def _repr_svg_(self) -> str:
"""
Generate an SVG of the schedule. This is automatically displayed in e.g.
Jupyter Qt console.
......
......@@ -72,9 +72,8 @@ def wdf_allpass(
odd_order = order % 2
if odd_order:
# First-order section
coeff = np_coefficients[0]
adaptor0 = SymmetricTwoportAdaptor(
coeff,
np_coefficients[0],
input_op,
latency=latency,
latency_offsets=latency_offsets,
......@@ -185,10 +184,11 @@ def direct_form_fir(
prev_add = None
for i, coeff in enumerate(np_coefficients):
tmp_mul = ConstantMultiplication(coeff, prev_delay, **mult_properties)
if prev_add is None:
prev_add = tmp_mul
else:
prev_add = Addition(tmp_mul, prev_add, **add_properties)
prev_add = (
tmp_mul
if prev_add is None
else Addition(tmp_mul, prev_add, **add_properties)
)
if i < taps - 1:
prev_delay = Delay(prev_delay)
......@@ -266,10 +266,11 @@ def transposed_direct_form_fir(
prev_add = None
for i, coeff in enumerate(reversed(np_coefficients)):
tmp_mul = ConstantMultiplication(coeff, input_op, **mult_properties)
if prev_delay is None:
tmp_add = tmp_mul
else:
tmp_add = Addition(tmp_mul, prev_delay, **add_properties)
tmp_add = (
tmp_mul
if prev_delay is None
else Addition(tmp_mul, prev_delay, **add_properties)
)
if i < taps - 1:
prev_delay = Delay(tmp_add)
......
......@@ -27,13 +27,7 @@ from typing import (
from graphviz import Digraph
from b_asic.graph_component import (
GraphComponent,
GraphID,
GraphIDNumber,
Name,
TypeName,
)
from b_asic.graph_component import GraphComponent
from b_asic.operation import (
AbstractOperation,
MutableDelayMap,
......@@ -44,10 +38,21 @@ from b_asic.operation import (
from b_asic.port import InputPort, OutputPort, SignalSourceProvider
from b_asic.signal import Signal
from b_asic.special_operations import Delay, Input, Output
from b_asic.types import GraphID, GraphIDNumber, Name, Num, TypeName
DelayQueue = List[Tuple[str, ResultKey, OutputPort]]
_OPERATION_SHAPE: DefaultDict[TypeName, str] = defaultdict(lambda: "ellipse")
_OPERATION_SHAPE.update(
{
Input.type_name(): "cds",
Output.type_name(): "cds",
Delay.type_name(): "square",
}
)
class GraphIDGenerator:
"""Generates Graph IDs for objects."""
......@@ -367,7 +372,7 @@ class SFG(AbstractOperation):
def evaluate_output(
self,
index: int,
input_values: Sequence[Number],
input_values: Sequence[Num],
results: Optional[MutableResultMap] = None,
delays: Optional[MutableDelayMap] = None,
prefix: str = "",
......@@ -455,11 +460,11 @@ class SFG(AbstractOperation):
for input_port, input_operation in zip(
self.inputs, self.input_operations
):
dest = input_operation.output(0).signals[0].destination
if dest is None:
destination = input_operation.output(0).signals[0].destination
if destination is None:
raise ValueError("Missing destination in signal.")
dest.clear()
input_port.signals[0].set_destination(dest)
destination.clear()
input_port.signals[0].set_destination(destination)
# For each output_signal, connect it to the corresponding operation
for output_port, output_operation in zip(
self.outputs, self.output_operations
......@@ -565,7 +570,7 @@ class SFG(AbstractOperation):
@property
def operations(self) -> List[Operation]:
"""Get all operations of this graph in depth-first order."""
return self._operations_dfs_order
return list(self._operations_dfs_order)
def find_by_type_name(
self, type_name: TypeName
......@@ -813,39 +818,53 @@ class SFG(AbstractOperation):
for i in range(len(p_list)):
ports = p_list[i]
with pg.subgraph(name=f"cluster_{i}") as sub:
sub.attr(label=f"N{i+1}")
sub.attr(label=f"N{i}")
for port in ports:
portstr = f"{port.operation.graph_id}.{port.index}"
port_string = f"{port.operation.graph_id}.{port.index}"
if port.operation.output_count > 1:
sub.node(portstr)
sub.node(port_string)
else:
sub.node(
portstr,
port_string,
shape='rectangle',
label=port.operation.graph_id,
height="0.1",
width="0.1",
)
# Creates edges for each output port and creates nodes for each operation
# and edges for them as well
for i in range(len(p_list)):
ports = p_list[i]
for port in ports:
source_label = port.operation.graph_id
node_node = f"{source_label}.{port.index}"
for signal in port.signals:
destination = cast(InputPort, signal.destination)
if destination.operation.type_name() == Delay.type_name():
dest_node = destination.operation.graph_id + "In"
else:
dest_node = destination.operation.graph_id
dest_label = destination.operation.graph_id
node_node = f"{port.operation.graph_id}.{port.index}"
pg.edge(node_node, dest_node)
pg.node(dest_node, label=dest_label, shape="square")
if port.operation.type_name() == Delay.type_name():
source_node = port.operation.graph_id + "Out"
else:
source_node = port.operation.graph_id
source_label = port.operation.graph_id
node_node = f"{port.operation.graph_id}.{port.index}"
destination_label = destination.operation.graph_id
destination_node = (
destination_label + "In"
if isinstance(destination.operation, Delay)
else destination_label
)
pg.edge(node_node, destination_node)
pg.node(
destination_node,
label=destination_label,
shape=_OPERATION_SHAPE[
destination.operation.type_name()
],
)
source_node = (
source_label + "Out"
if port.operation.type_name() == Delay.type_name()
else source_label
)
pg.edge(source_node, node_node)
pg.node(source_node, label=source_label, shape="square")
pg.node(
source_node,
label=source_label,
shape=_OPERATION_SHAPE[port.operation.type_name()],
)
return pg
......@@ -864,8 +883,8 @@ class SFG(AbstractOperation):
printed_ops = set()
for iter_num, iter in enumerate(precedence_list, start=1):
for outport_num, outport in enumerate(iter, start=1):
for iter_num, iterable in enumerate(precedence_list, start=1):
for outport_num, outport in enumerate(iterable, start=1):
if outport not in printed_ops:
# Only print once per operation, even if it has multiple outports
out_str.write("\n")
......@@ -1137,7 +1156,8 @@ class SFG(AbstractOperation):
self._components_dfs_order.extend(
[new_signal, source.operation]
)
self._operations_dfs_order.append(source.operation)
if source.operation not in self._operations_dfs_order:
self._operations_dfs_order.append(source.operation)
# Check if the signal has not been added before.
elif (
......@@ -1307,7 +1327,7 @@ class SFG(AbstractOperation):
bits_override: Optional[int],
truncate: bool,
deferred_delays: DelayQueue,
) -> Number:
) -> Num:
key_base = (
(prefix + "." + src.operation.graph_id)
if prefix
......@@ -1352,7 +1372,7 @@ class SFG(AbstractOperation):
bits_override: Optional[int],
truncate: bool,
deferred_delays: DelayQueue,
) -> Number:
) -> Num:
input_values = [
self._evaluate_source(
input_port.signals[0].source,
......@@ -1416,12 +1436,7 @@ class SFG(AbstractOperation):
destination.operation.graph_id,
)
else:
if isinstance(op, Delay):
dg.node(op.graph_id, shape="square")
elif isinstance(op, (Input, Output)):
dg.node(op.graph_id, shape="cds")
else:
dg.node(op.graph_id)
dg.node(op.graph_id, shape=_OPERATION_SHAPE[op.type_name()])
return dg
def _repr_mimebundle_(self, include=None, exclude=None):
......@@ -1439,13 +1454,13 @@ class SFG(AbstractOperation):
"image/png"
]
def show(self, format=None, show_id=False, engine=None) -> None:
def show(self, fmt=None, show_id=False, engine=None) -> None:
"""
Shows a visual representation of the SFG using the default system viewer.
Parameters
----------
format : string, optional
fmt : string, optional
File format of the generated graph. Output formats can be found at
https://www.graphviz.org/doc/info/output.html
Most common are "pdf", "eps", "png", and "svg". Default is None which
......@@ -1462,8 +1477,8 @@ class SFG(AbstractOperation):
dg = self.sfg_digraph(show_id=show_id)
if engine is not None:
dg.engine = engine
if format is not None:
dg.format = format
if fmt is not None:
dg.format = fmt
dg.view()
def critical_path(self):
......
......@@ -8,6 +8,7 @@ from collections import defaultdict
from numbers import Number
from typing import (
Callable,
List,
Mapping,
MutableMapping,
MutableSequence,
......@@ -20,11 +21,12 @@ import numpy as np
from b_asic.operation import MutableDelayMap, ResultKey
from b_asic.signal_flow_graph import SFG
from b_asic.types import Num
ResultArrayMap = Mapping[ResultKey, Sequence[Number]]
MutableResultArrayMap = MutableMapping[ResultKey, MutableSequence[Number]]
InputFunction = Callable[[int], Number]
InputProvider = Union[Number, Sequence[Number], InputFunction]
ResultArrayMap = Mapping[ResultKey, Sequence[Num]]
MutableResultArrayMap = MutableMapping[ResultKey, MutableSequence[Num]]
InputFunction = Callable[[int], Num]
InputProvider = Union[Num, Sequence[Num], InputFunction]
class Simulation:
......@@ -39,7 +41,7 @@ class Simulation:
_results: MutableResultArrayMap
_delays: MutableDelayMap
_iteration: int
_input_functions: Sequence[InputFunction]
_input_functions: List[InputFunction]
_input_length: Optional[int]
def __init__(
......@@ -105,7 +107,7 @@ class Simulation:
save_results: bool = True,
bits_override: Optional[int] = None,
truncate: bool = True,
) -> Sequence[Number]:
) -> Sequence[Num]:
"""
Run one iteration of the simulation and return the resulting output values.
"""
......@@ -117,12 +119,12 @@ class Simulation:
save_results: bool = True,
bits_override: Optional[int] = None,
truncate: bool = True,
) -> Sequence[Number]:
) -> Sequence[Num]:
"""
Run the simulation until its iteration is greater than or equal to the given
iteration and return the output values of the last iteration.
"""
result: Sequence[Number] = []
result: Sequence[Num] = []
while self._iteration < iteration:
input_values = [
self._input_functions[i](self._iteration)
......@@ -149,7 +151,7 @@ class Simulation:
save_results: bool = True,
bits_override: Optional[int] = None,
truncate: bool = True,
) -> Sequence[Number]:
) -> Sequence[Num]:
"""
Run a given number of iterations of the simulation and return the output
values of the last iteration.
......@@ -163,7 +165,7 @@ class Simulation:
save_results: bool = True,
bits_override: Optional[int] = None,
truncate: bool = True,
) -> Sequence[Number]:
) -> Sequence[Num]:
"""
Run the simulation until the end of its input arrays and return the output
values of the last iteration.
......
......@@ -5,9 +5,8 @@ Contains operations with special purposes that may be treated differently from
normal operations in an SFG.
"""
from typing import List, Optional, Sequence, Tuple
from typing import Optional, Sequence, Tuple
from b_asic.graph_component import Name, TypeName
from b_asic.operation import (
AbstractOperation,
DelayMap,
......
......@@ -10,6 +10,7 @@ API
operation.rst
port.rst
process.rst
resources.rst
schedule.rst
sfg_generators.rst
signal.rst
......
********************
``b_asic.resources``
********************
.. automodule:: b_asic.resources
:members:
:undoc-members:
......@@ -39,6 +39,7 @@ intersphinx_mapping = {
'matplotlib': ('https://matplotlib.org/stable/', None),
'numpy': ('https://numpy.org/doc/stable/', None),
'PyQt5': ("https://www.riverbankcomputing.com/static/Docs/PyQt5", None),
'networkx': ('https://networkx.org/documentation/stable', None),
}
numpydoc_show_class_members = False
......
......@@ -64,10 +64,87 @@ firstorderiir = SFG([input], [output])
firstorderiir
# %%
# This will look something like
#
# .. graphviz::
#
# digraph {
# rankdir=LR
# in1 [shape=cds]
# in1 -> add1
# out1 [shape=cds]
# add2 -> out1
# add1 [shape=ellipse]
# cmul1 -> add1
# cmul1 [shape=ellipse]
# add1 -> t1
# t1 [shape=square]
# add1 -> add2
# add2 [shape=ellipse]
# cmul2 -> add2
# cmul2 [shape=ellipse]
# t1 -> cmul2
# t1 -> cmul1
# }
#
# For now, we can print the precendence relations of the SFG
firstorderiir.print_precedence_graph()
# %%
# Executing ``firstorderiir.precedence_graph()`` will show something like
#
# .. graphviz::
#
# digraph {
# rankdir=LR
# subgraph cluster_0 {
# label=N0
# "in1.0" [label=in1 height=0.1 shape=rectangle width=0.1]
# "t1.0" [label=t1 height=0.1 shape=rectangle width=0.1]
# }
# subgraph cluster_1 {
# label=N1
# "cmul2.0" [label=cmul2 height=0.1 shape=rectangle width=0.1]
# "cmul1.0" [label=cmul1 height=0.1 shape=rectangle width=0.1]
# }
# subgraph cluster_2 {
# label=N2
# "add1.0" [label=add1 height=0.1 shape=rectangle width=0.1]
# }
# subgraph cluster_3 {
# label=N3
# "add2.0" [label=add2 height=0.1 shape=rectangle width=0.1]
# }
# "in1.0" -> add1
# add1 [label=add1 shape=ellipse]
# in1 -> "in1.0"
# in1 [label=in1 shape=cds]
# "t1.0" -> cmul2
# cmul2 [label=cmul2 shape=ellipse]
# "t1.0" -> cmul1
# cmul1 [label=cmul1 shape=ellipse]
# t1Out -> "t1.0"
# t1Out [label=t1 shape=square]
# "cmul2.0" -> add2
# add2 [label=add2 shape=ellipse]
# cmul2 -> "cmul2.0"
# cmul2 [label=cmul2 shape=ellipse]
# "cmul1.0" -> add1
# add1 [label=add1 shape=ellipse]
# cmul1 -> "cmul1.0"
# cmul1 [label=cmul1 shape=ellipse]
# "add1.0" -> t1In
# t1In [label=t1 shape=square]
# "add1.0" -> add2
# add2 [label=add2 shape=ellipse]
# add1 -> "add1.0"
# add1 [label=add1 shape=ellipse]
# "add2.0" -> out1
# out1 [label=out1 shape=cds]
# add2 -> "add2.0"
# add2 [label=add2 shape=ellipse]
# }
#
# As seen, each operation has an id, in addition to the optional name. This can be used to access the operation.
# For example,
firstorderiir.find_by_id('cmul1')
......