Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • da/B-ASIC
  • lukja239/B-ASIC
  • robal695/B-ASIC
3 results
Show changes
Commits on Source (32)
Showing
with 1759 additions and 365 deletions
......@@ -102,6 +102,18 @@ run-test-3.11-pyqt5:
# extends: ".run-test"
# allow_failure: true
run-vhdl-tests:
variables:
QT_API: pyqt5
image: python:3.10
stage: test
script:
- pytest
- pip install vunit_hdl
- apt install -y ghdl
- cd b_asic/codegen/testbench
- python test.py
run-doc-test:
variables:
QT_API: pyside2
......@@ -110,6 +122,9 @@ run-doc-test:
script:
- pip install -r requirements_doc.txt
- sphinx-build -b html docs_sphinx public
# Run linting on doc-strings
- pip install black velin
- velin . --check --black
pages:
variables:
......
......@@ -18,3 +18,7 @@ repos:
hooks:
- id: isort
name: isort (python)
- repo: https://github.com/Carreau/velin
rev: 0.0.11
hooks:
- id: velin
......@@ -55,7 +55,7 @@ To generate the documentation, the following additional packages are required:
- [Furo](https://pradyunsg.me/furo/)
- [numpydoc](https://numpydoc.readthedocs.io/)
- [Sphinx-Gallery](https://sphinx-gallery.github.io/)
- [SciPy](https://scipy.org/)
- [mplsignal](https://mplsignal.readthedocs.io/)
### Using CMake directly
......
......@@ -12,13 +12,12 @@ class Arrow(QGraphicsPathItem):
def __init__(self, source, destination, window, signal=None, parent=None):
"""
Parameters
==========
source :
----------
source
Source operation.
destination :
destination
Destination operation.
window :
window
Window containing signal flow graph.
signal : Signal, optional
Let arrow represent *signal*.
......@@ -47,8 +46,8 @@ class Arrow(QGraphicsPathItem):
self.signal.remove_destination()
self.signal.remove_source()
self._window.scene.removeItem(self)
if self in self._window.signalList:
self._window.signalList.remove(self)
if self in self._window._arrow_list:
self._window._arrow_list.remove(self)
if self in self._window.signalPortDict:
for port1, port2 in self._window.signalPortDict[self]:
......@@ -58,20 +57,20 @@ class Arrow(QGraphicsPathItem):
) in self._window.portDict.items():
if (
port1 in operation_ports or port2 in operation_ports
) and operation in self._window.opToSFG:
) and operation in self._window._operation_to_sfg:
self._window.logger.info(
"Operation detected in existing SFG, removing SFG"
" with name:"
f" {self._window.opToSFG[operation].name}."
f" {self._window._operation_to_sfg[operation].name}."
)
del self._window.sfg_dict[
self._window.opToSFG[operation].name
self._window._operation_to_sfg[operation].name
]
self._window.opToSFG = {
op: self._window.opToSFG[op]
for op in self._window.opToSFG
if self._window.opToSFG[op]
is not self._window.opToSFG[operation]
self._window._operation_to_sfg = {
op: self._window._operation_to_sfg[op]
for op in self._window._operation_to_sfg
if self._window._operation_to_sfg[op]
is not self._window._operation_to_sfg[operation]
}
del self._window.signalPortDict[self]
......@@ -97,11 +96,7 @@ class Arrow(QGraphicsPathItem):
+ self.destination.x()
+ (0 if not destination_flipped else PORTWIDTH)
)
y1 = (
self.destination.operation.y()
+ self.destination.y()
+ PORTHEIGHT / 2
)
y1 = self.destination.operation.y() + self.destination.y() + PORTHEIGHT / 2
xmid = (x0 + x1) / 2
ymid = (y0 + y1) / 2
both_flipped = source_flipped and destination_flipped
......
......@@ -10,16 +10,11 @@ from qtpy.QtCore import QSize, Qt, Signal
from qtpy.QtGui import QIcon
from qtpy.QtWidgets import QAction, QMenu, QPushButton
from b_asic.GUI._preferences import (
GAP,
GRID,
MINBUTTONSIZE,
PORTHEIGHT,
PORTWIDTH,
)
from b_asic.GUI._preferences import GAP, GRID, MINBUTTONSIZE, PORTHEIGHT, PORTWIDTH
from b_asic.GUI.port_button import PortButton
from b_asic.GUI.properties_window import PropertiesWindow
from b_asic.GUI.utils import decorate_class, handle_error
from b_asic.operation import Operation
from b_asic.port import InputPort
......@@ -32,9 +27,8 @@ class DragButton(QPushButton):
Parameters
----------
name
operation
is_show_name
operation : :class:`~b_asic.operation.Operation`
is_show_name : bool
window
parent
"""
......@@ -44,13 +38,12 @@ class DragButton(QPushButton):
def __init__(
self,
name,
operation,
is_show_name,
operation: Operation,
is_show_name: bool,
window,
parent=None,
):
self.name = name
self.name = operation.graph_id
self.ports = []
self.is_show_name = is_show_name
self._window = window
......@@ -64,22 +57,25 @@ class DragButton(QPushButton):
self._flipped = False
self._properties_window = None
self.label = None
self._context_menu = None
super().__init__(parent)
def contextMenuEvent(self, event):
menu = QMenu()
properties = QAction("Properties")
menu.addAction(properties)
properties.triggered.connect(self.show_properties_window)
delete = QAction("Delete")
menu.addAction(delete)
delete.triggered.connect(self.remove)
flip = QAction("Flip horizontal")
menu.addAction(flip)
flip.triggered.connect(self._flip)
menu.exec_(self.cursor().pos())
if self._context_menu is None:
menu = QMenu()
properties = QAction("Properties")
menu.addAction(properties)
properties.triggered.connect(self.show_properties_window)
delete = QAction("Delete")
menu.addAction(delete)
delete.triggered.connect(self.remove)
flip = QAction("Flip horizontal")
menu.addAction(flip)
flip.triggered.connect(self._flip)
self._context_menu = menu
self._context_menu.exec_(self.cursor().pos())
def show_properties_window(self, event=None):
self._properties_window = PropertiesWindow(self, self._window)
......@@ -106,9 +102,7 @@ class DragButton(QPushButton):
if button is self:
continue
button.move(
button.mapToParent(event.pos() - self._mouse_press_pos)
)
button.move(button.mapToParent(event.pos() - self._mouse_press_pos))
self._window.scene.update()
self._window.graphic_view.update()
......@@ -187,17 +181,13 @@ class DragButton(QPushButton):
else:
self._window.pressed_operations.append(self)
for signal in self._window.signalList:
for signal in self._window._arrow_list:
signal.update()
def remove(self, event=None):
"""Remove button/operation from signal flow graph."""
self._window.logger.info(
f"Removing operation with name {self.operation.name}."
)
self._window.scene.removeItem(
self._window.dragOperationSceneDict[self]
)
self._window.logger.info("Removing operation with name " + self.operation.name)
self._window.scene.removeItem(self._window.dragOperationSceneDict[self])
_signals = []
for signal, ports in self._window.signalPortDict.items():
......@@ -208,24 +198,25 @@ class DragButton(QPushButton):
)
):
self._window.logger.info(
f"Removed signal with name: {signal.signal.name} to/from"
f" operation: {self.operation.name}."
"Removed signal with name: %s to/from operation: %s."
% (signal.signal.name, self.operation.name)
)
_signals.append(signal)
for signal in _signals:
signal.remove()
if self in self._window.opToSFG:
if self in self._window._operation_to_sfg:
self._window.logger.info(
"Operation detected in existing SFG, removing SFG with name:"
f" {self._window.opToSFG[self].name}."
"Operation detected in existing SFG, removing SFG with name: "
+ self._window._operation_to_sfg[self].name
)
del self._window.sfg_dict[self._window.opToSFG[self].name]
self._window.opToSFG = {
op: self._window.opToSFG[op]
for op in self._window.opToSFG
if self._window.opToSFG[op] is not self._window.opToSFG[self]
del self._window.sfg_dict[self._window._operation_to_sfg[self].name]
self._window._operation_to_sfg = {
op: self._window._operation_to_sfg[op]
for op in self._window._operation_to_sfg
if self._window._operation_to_sfg[op]
is not self._window._operation_to_sfg[self]
}
for port in self._window.portDict[self]:
......@@ -238,8 +229,8 @@ class DragButton(QPushButton):
if self in self._window.dragOperationSceneDict:
del self._window.dragOperationSceneDict[self]
if self.operation in self._window.operationDragDict:
del self._window.operationDragDict[self.operation]
if self.operation in self._window._operation_drag_buttons:
del self._window._operation_drag_buttons[self.operation]
def add_ports(self):
def _determine_port_distance(opheight, ports):
......@@ -258,14 +249,14 @@ class DragButton(QPushButton):
output_ports_dist = _determine_port_distance(height, op.output_count)
input_ports_dist = _determine_port_distance(height, op.input_count)
for i, dist in enumerate(input_ports_dist):
port = PortButton(">", self, op.input(i), self._window)
port = PortButton(">", self, op.input(i))
port.setFixedSize(PORTWIDTH, PORTHEIGHT)
port.move(0, dist)
port.show()
self.ports.append(port)
for i, dist in enumerate(output_ports_dist):
port = PortButton(">", self, op.output(i), self._window)
port = PortButton(">", self, op.output(i))
port.setFixedSize(PORTWIDTH, PORTHEIGHT)
port.move(MINBUTTONSIZE - PORTWIDTH, dist)
port.show()
......
......@@ -9,7 +9,7 @@ import logging
import os
import sys
from pprint import pprint
from typing import Optional, Tuple
from typing import Dict, List, Optional, Tuple
from qtpy.QtCore import QFileInfo, QSize, Qt
from qtpy.QtGui import QCursor, QIcon, QKeySequence, QPainter
......@@ -40,13 +40,14 @@ from b_asic.GUI.show_pc_window import ShowPCWindow
# from b_asic.GUI.simulate_sfg_window import Plot, SimulateSFGWindow
from b_asic.GUI.simulate_sfg_window import SimulateSFGWindow
from b_asic.GUI.util_dialogs import FaqWindow, KeybindsWindow
from b_asic.GUI.util_dialogs import FaqWindow, KeybindingsWindow
from b_asic.GUI.utils import decorate_class, handle_error
from b_asic.gui_utils.about_window import AboutWindow
from b_asic.gui_utils.plot_window import PlotWindow
from b_asic.operation import Operation
from b_asic.port import InputPort, OutputPort
from b_asic.save_load_structure import python_to_sfg, sfg_to_python
from b_asic.signal import Signal
from b_asic.signal_flow_graph import SFG
# from b_asic import FastSimulation
......@@ -68,21 +69,34 @@ class MainWindow(QMainWindow):
self.zoom = 1
self.sfg_name_i = 0
self.dragOperationSceneDict = {}
self.operationDragDict = {}
self.operationItemSceneList = []
self.signalList = []
self._operation_drag_buttons: Dict[Operation, DragButton] = {}
self._arrow_list: List[Arrow] = []
self.mouse_pressed = False
self.mouse_dragging = False
self.starting_port = []
self.pressed_operations = []
self.portDict = {}
self.signalPortDict = {}
self.opToSFG = {}
self._operation_to_sfg: Dict[DragButton, SFG] = {}
self.pressed_ports = []
self.sfg_dict = {}
self._window = self
self.logger = logging.getLogger(__name__)
self.init_ui()
# Create Graphics View
self.graphic_view = QGraphicsView(self.scene, self)
self.graphic_view.setRenderHint(QPainter.Antialiasing)
self.graphic_view.setGeometry(
self.ui.operation_box.width(), 20, self.width(), self.height()
)
self.graphic_view.setDragMode(QGraphicsView.RubberBandDrag)
# Create toolbar
self.toolbar = self.addToolBar("Toolbar")
self.toolbar.addAction("Create SFG", self.create_sfg_from_toolbar)
self.toolbar.addAction("Clear workspace", self.clear_workspace)
# Add operations
self.add_operations_from_namespace(
b_asic.core_operations, self.ui.core_operations_list
)
......@@ -109,7 +123,7 @@ class MainWindow(QMainWindow):
self.ui.actionSimulateSFG.triggered.connect(self.simulate_sfg)
self.ui.faqBASIC.triggered.connect(self.display_faq_page)
self.ui.aboutBASIC.triggered.connect(self.display_about_page)
self.ui.keybindsBASIC.triggered.connect(self.display_keybinds_page)
self.ui.keybindsBASIC.triggered.connect(self.display_keybindings_page)
self.ui.core_operations_list.itemClicked.connect(
self.on_list_widget_item_clicked
)
......@@ -132,6 +146,10 @@ class MainWindow(QMainWindow):
self.shortcut_signal = QShortcut(QKeySequence(Qt.Key_Space), self)
self.shortcut_signal.activated.connect(self._connect_callback)
self._keybindings_page = None
self._about_page = None
self._faq_page = None
self.logger.info("Finished setting up GUI")
self.logger.info(
"For questions please refer to 'Ctrl+?', or visit the 'Help' "
......@@ -140,31 +158,13 @@ class MainWindow(QMainWindow):
self.cursor = QCursor()
def init_ui(self) -> None:
self.create_toolbar_view()
self.create_graphics_view()
def create_graphics_view(self) -> None:
self.graphic_view = QGraphicsView(self.scene, self)
self.graphic_view.setRenderHint(QPainter.Antialiasing)
self.graphic_view.setGeometry(
self.ui.operation_box.width(), 20, self.width(), self.height()
)
self.graphic_view.setDragMode(QGraphicsView.RubberBandDrag)
def create_toolbar_view(self) -> None:
self.toolbar = self.addToolBar("Toolbar")
self.toolbar.addAction("Create SFG", self.create_sfg_from_toolbar)
self.toolbar.addAction("Clear workspace", self.clear_workspace)
def resizeEvent(self, event) -> None:
self.ui.operation_box.setGeometry(
10, 10, self.ui.operation_box.width(), self.height()
)
ui_width = self.ui.operation_box.width()
self.ui.operation_box.setGeometry(10, 10, ui_width, self.height())
self.graphic_view.setGeometry(
self.ui.operation_box.width() + 20,
ui_width + 20,
60,
self.width() - self.ui.operation_box.width() - 20,
self.width() - ui_width - 20,
self.height() - 30,
)
super().resizeEvent(event)
......@@ -182,7 +182,7 @@ class MainWindow(QMainWindow):
else:
self.is_show_names = False
for operation in self.dragOperationSceneDict.keys():
for operation in self.dragOperationSceneDict:
operation.label.setOpacity(self.is_show_names)
operation.is_show_name = self.is_show_names
......@@ -214,20 +214,20 @@ class MainWindow(QMainWindow):
self.logger.info("Saved SFG to path: " + str(module))
def save_work(self, event=None):
def save_work(self, event=None) -> None:
self.sfg_widget = SelectSFGWindow(self)
self.sfg_widget.show()
# Wait for input to dialog.
self.sfg_widget.ok.connect(self._save_work)
def load_work(self, event=None):
def load_work(self, event=None) -> None:
module, accepted = QFileDialog().getOpenFileName()
if not accepted:
return
self._load_from_file(module)
def _load_from_file(self, module):
def _load_from_file(self, module) -> None:
self.logger.info("Loading SFG from path: " + str(module))
try:
sfg, positions = python_to_sfg(module)
......@@ -252,7 +252,7 @@ class MainWindow(QMainWindow):
self._load_sfg(sfg, positions)
self.logger.info("Loaded SFG from path: " + str(module))
def _load_sfg(self, sfg, positions=None):
def _load_sfg(self, sfg, positions=None) -> None:
if positions is None:
positions = {}
......@@ -271,14 +271,14 @@ class MainWindow(QMainWindow):
source = [
source
for source in self.portDict[
self.operationDragDict[signal.source.operation]
self._operation_drag_buttons[signal.source.operation]
]
if source.port is signal.source
]
destination = [
destination
for destination in self.portDict[
self.operationDragDict[signal.destination.operation]
self._operation_drag_buttons[signal.destination.operation]
]
if destination.port is signal.destination
]
......@@ -293,24 +293,23 @@ class MainWindow(QMainWindow):
connect_ports(op.inputs)
for op in sfg.split():
self.operationDragDict[op].setToolTip(sfg.name)
self.opToSFG[self.operationDragDict[op]] = sfg
self._operation_drag_buttons[op].setToolTip(sfg.name)
self._operation_to_sfg[self._operation_drag_buttons[op]] = sfg
self.sfg_dict[sfg.name] = sfg
self.update()
def exit_app(self):
def exit_app(self) -> None:
self.logger.info("Exiting the application.")
QApplication.quit()
def clear_workspace(self):
def clear_workspace(self) -> None:
self.logger.info("Clearing workspace from operations and SFGs.")
self.pressed_operations.clear()
self.pressed_ports.clear()
self.operationItemSceneList.clear()
self.operationDragDict.clear()
self._operation_drag_buttons.clear()
self.dragOperationSceneDict.clear()
self.signalList.clear()
self._arrow_list.clear()
self.portDict.clear()
self.signalPortDict.clear()
self.sfg_dict.clear()
......@@ -341,7 +340,7 @@ class MainWindow(QMainWindow):
sfg = SFG(inputs=inputs, outputs=outputs, name=name)
self.logger.info("Created SFG with name: %s from selected operations." % name)
def check_equality(signal, signal_2):
def check_equality(signal: Signal, signal_2: Signal) -> bool:
if not (
signal.source.operation.type_name()
== signal_2.source.operation.type_name()
......@@ -435,16 +434,16 @@ class MainWindow(QMainWindow):
for op in self.pressed_operations:
op.setToolTip(sfg.name)
self.opToSFG[op] = sfg
self._operation_to_sfg[op] = sfg
self.sfg_dict[sfg.name] = sfg
def _show_precedence_graph(self, event=None) -> None:
self.dialog = ShowPCWindow(self)
self.dialog.add_sfg_to_dialog()
self.dialog.show()
self._precedence_graph_dialog = ShowPCWindow(self)
self._precedence_graph_dialog.add_sfg_to_dialog()
self._precedence_graph_dialog.show()
def get_operations_from_namespace(self, namespace) -> None:
def get_operations_from_namespace(self, namespace) -> List[str]:
self.logger.info(
"Fetching operations from namespace: " + str(namespace.__name__)
)
......@@ -487,7 +486,6 @@ class MainWindow(QMainWindow):
is_flipped: bool = False,
) -> None:
"""
Parameters
----------
op : Operation
......@@ -495,11 +493,11 @@ class MainWindow(QMainWindow):
is_flipped : bool, default: False
"""
try:
if op in self.operationDragDict:
if op in self._operation_drag_buttons:
self.logger.warning("Multiple instances of operation with same name")
return
attr_button = DragButton(op.graph_id, op, True, window=self)
attr_button = DragButton(op, True, window=self)
if position is None:
attr_button.move(GRID * 3, GRID * 2)
else:
......@@ -555,7 +553,7 @@ class MainWindow(QMainWindow):
if is_flipped:
attr_button._flip()
self.operationDragDict[op] = attr_button
self._operation_drag_buttons[op] = attr_button
self.dragOperationSceneDict[attr_button] = attr_button_scene
except Exception as e:
......@@ -566,8 +564,8 @@ class MainWindow(QMainWindow):
def _create_operation_item(self, item) -> None:
self.logger.info("Creating operation of type: %s" % str(item.text()))
try:
attr_oper = self._operations_from_name[item.text()]()
self.create_operation(attr_oper)
attr_operation = self._operations_from_name[item.text()]()
self.create_operation(attr_operation)
except Exception as e:
self.logger.error(
"Unexpected error occurred while creating operation: " + str(e)
......@@ -654,24 +652,24 @@ class MainWindow(QMainWindow):
)
)
try:
line = Arrow(source, destination, self, signal=next(signal_exists))
arrow = Arrow(source, destination, self, signal=next(signal_exists))
except StopIteration:
line = Arrow(source, destination, self)
arrow = Arrow(source, destination, self)
if line not in self.signalPortDict:
self.signalPortDict[line] = []
if arrow not in self.signalPortDict:
self.signalPortDict[arrow] = []
self.signalPortDict[line].append((source, destination))
self.scene.addItem(line)
self.signalList.append(line)
self.signalPortDict[arrow].append((source, destination))
self.scene.addItem(arrow)
self._arrow_list.append(arrow)
self.update()
def paintEvent(self, event):
def paintEvent(self, event) -> None:
for signal in self.signalPortDict.keys():
signal.moveLine()
def _select_operations(self):
def _select_operations(self) -> None:
selected = [button.widget() for button in self.scene.selectedItems()]
for button in selected:
button._toggle_button(pressed=False)
......@@ -682,8 +680,8 @@ class MainWindow(QMainWindow):
self.pressed_operations = selected
def _simulate_sfg(self):
for sfg, properties in self.dialog.properties.items():
def _simulate_sfg(self) -> None:
for sfg, properties in self._simulation_dialog.properties.items():
self.logger.info("Simulating SFG with name: %s" % str(sfg.name))
simulation = FastSimulation(sfg, input_providers=properties["input_values"])
l_result = simulation.run_for(
......@@ -697,36 +695,35 @@ class MainWindow(QMainWindow):
if properties["show_plot"]:
self.logger.info("Opening plot for SFG with name: " + str(sfg.name))
self.logger.info(
"To save the plot press 'Ctrl+S' when the plot is focused."
)
# self.plot = Plot(simulation, sfg, self)
self.plot = PlotWindow(simulation.results)
self.plot.show()
self._plot = PlotWindow(simulation.results, sfg_name=sfg.name)
self._plot.show()
def simulate_sfg(self, event=None):
self.dialog = SimulateSFGWindow(self)
def simulate_sfg(self, event=None) -> None:
self._simulation_dialog = SimulateSFGWindow(self)
for _, sfg in self.sfg_dict.items():
self.dialog.add_sfg_to_dialog(sfg)
self._simulation_dialog.add_sfg_to_dialog(sfg)
self.dialog.show()
self._simulation_dialog.show()
# Wait for input to dialog.
# Kinda buggy because of the separate window in the same thread.
self.dialog.simulate.connect(self._simulate_sfg)
def display_faq_page(self, event=None):
self.faq_page = FaqWindow(self)
self.faq_page.scroll_area.show()
def display_about_page(self, event=None):
self.about_page = AboutWindow(self)
self.about_page.show()
def display_keybinds_page(self, event=None):
self.keybinds_page = KeybindsWindow(self)
self.keybinds_page.show()
self._simulation_dialog.simulate.connect(self._simulate_sfg)
def display_faq_page(self, event=None) -> None:
if self._faq_page is None:
self._faq_page = FaqWindow(self)
self._faq_page.scroll_area.show()
def display_about_page(self, event=None) -> None:
if self._about_page is None:
self._about_page = AboutWindow(self)
self._about_page.show()
def display_keybindings_page(self, event=None) -> None:
if self._keybindings_page is None:
self._keybindings_page = KeybindingsWindow(self)
self._keybindings_page.show()
def start_gui():
......
"""
B-ASIC port button module.
"""
from typing import TYPE_CHECKING
from qtpy.QtCore import QMimeData, Qt, Signal
from qtpy.QtGui import QDrag
from qtpy.QtWidgets import QMenu, QPushButton
if TYPE_CHECKING:
from b_asic.GUI.drag_button import DragButton
from b_asic.port import Port
class PortButton(QPushButton):
"""
......@@ -12,20 +18,18 @@ class PortButton(QPushButton):
Parameters
----------
name
operation
port
window
parent
name : str
operation : :class:`~b_asic.GUI.drag_button.DragButton`
port : :class:`~b_asic.port.Port`
"""
connectionRequested = Signal(QPushButton)
moved = Signal()
def __init__(self, name, operation, port, window, parent=None):
def __init__(self, name: str, operation: "DragButton", port: "Port"):
super().__init__(name, parent=operation)
self.pressed = False
self._window = window
self._window = operation._window
self.port = port
self.operation = operation
self.clicked = 0
......@@ -49,10 +53,7 @@ class PortButton(QPushButton):
super().mousePressEvent(event)
def mouseReleaseEvent(self, event):
if (
event.button() == Qt.MouseButton.LeftButton
and self._window.mouse_pressed
):
if event.button() == Qt.MouseButton.LeftButton and self._window.mouse_pressed:
self._window.mouse_pressed = False
if self._window.mouse_dragging:
self._window.mouse_dragging = False
......@@ -89,7 +90,7 @@ class PortButton(QPushButton):
self.update()
super().dropEvent(event)
def _toggle_port(self, pressed=False):
def _toggle_port(self, pressed: bool = False):
self.pressed = not pressed
self.setStyleSheet(
f"background-color: {'white' if not self.pressed else 'grey'}"
......@@ -110,5 +111,5 @@ class PortButton(QPushButton):
else:
self._window.pressed_ports.append(self)
for signal in self._window.signalList:
for signal in self._window._arrow_list:
signal.update()
# -*- coding: utf-8 -*-
from qtpy.QtWidgets import QGridLayout, QLabel, QLineEdit, QSpinBox
from qtpy.QtWidgets import (
QFileDialog,
QGridLayout,
QLabel,
QLineEdit,
QPushButton,
QSpinBox,
)
from b_asic.signal_generator import (
Constant,
FromFile,
Gaussian,
Impulse,
SignalGenerator,
......@@ -24,6 +32,19 @@ class SignalGeneratorInput(QGridLayout):
"""Return the SignalGenerator based on the graphical input."""
raise NotImplementedError
def _parse_number(self, string, _type, name, default):
string = string.strip()
try:
if not string:
return default
return _type(string)
except ValueError:
self._logger.warning(
f"Cannot parse {name}: {string} not a {_type.__name__}, setting to"
f" {default}"
)
return default
class DelayInput(SignalGeneratorInput):
"""
......@@ -74,6 +95,7 @@ class ZeroPadInput(SignalGeneratorInput):
self.input_label = QLabel("Input")
self.addWidget(self.input_label, 0, 0)
self.input_sequence = QLineEdit()
self.input_sequence.setPlaceholderText("0.1, -0.2, 0.7")
self.addWidget(self.input_sequence, 0, 1)
def get_generator(self) -> SignalGenerator:
......@@ -83,17 +105,39 @@ class ZeroPadInput(SignalGeneratorInput):
try:
if not val:
val = 0
val = complex(val)
except ValueError:
self._logger.warning(f"Skipping value: {val}, not a digit.")
continue
input_values.append(val)
return ZeroPad(input_values)
class FromFileInput(SignalGeneratorInput):
"""
Class for graphically configuring and generating a
:class:`~b_asic.signal_generators.FromFile` signal generator.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.file_label = QLabel("Browse")
self.addWidget(self.file_label, 0, 0)
self.file_browser = QPushButton("No file selected")
self.file_browser.clicked.connect(
lambda i: self.get_input_file(i, self.file_browser)
)
self.addWidget(self.file_browser, 0, 1)
def get_generator(self) -> SignalGenerator:
return FromFile(self.file_browser.text())
def get_input_file(self, i, file_browser):
module, accepted = QFileDialog().getOpenFileName()
file_browser.setText(module)
return
class SinusoidInput(SignalGeneratorInput):
"""
Class for graphically configuring and generating a
......@@ -105,34 +149,20 @@ class SinusoidInput(SignalGeneratorInput):
self.frequency_label = QLabel("Frequency")
self.addWidget(self.frequency_label, 0, 0)
self.frequency_input = QLineEdit()
self.frequency_input.setText("0.1")
self.addWidget(self.frequency_input, 0, 1)
self.phase_label = QLabel("Phase")
self.addWidget(self.phase_label, 1, 0)
self.phase_input = QLineEdit()
self.phase_input.setText("0.0")
self.addWidget(self.phase_input, 1, 1)
def get_generator(self) -> SignalGenerator:
frequency = self.frequency_input.text().strip()
try:
if not frequency:
frequency = 0.1
frequency = float(frequency)
except ValueError:
self._logger.warning(f"Cannot parse frequency: {frequency} not a number.")
frequency = 0.1
phase = self.phase_input.text().strip()
try:
if not phase:
phase = 0
phase = float(phase)
except ValueError:
self._logger.warning(f"Cannot parse phase: {phase} not a number.")
phase = 0
frequency = self._parse_number(
self.frequency_input.text(), float, "Frequency", 0.1
)
phase = self._parse_number(self.phase_input.text(), float, "Phase", 0.0)
return Sinusoid(frequency, phase)
......@@ -163,26 +193,10 @@ class GaussianInput(SignalGeneratorInput):
self.addWidget(self.seed_spin_box, 2, 1)
def get_generator(self) -> SignalGenerator:
scale = self.scale_input.text().strip()
try:
if not scale:
scale = 1
scale = float(scale)
except ValueError:
self._logger.warning(f"Cannot parse scale: {scale} not a number.")
scale = 1
loc = self.loc_input.text().strip()
try:
if not loc:
loc = 0
loc = float(loc)
except ValueError:
self._logger.warning(f"Cannot parse loc: {loc} not a number.")
loc = 0
scale = self._parse_number(
self.scale_input.text(), float, "Standard deviation", 1.0
)
loc = self._parse_number(self.loc_input.text(), float, "Average value", 0.0)
return Gaussian(self.seed_spin_box.value(), loc, scale)
......@@ -213,26 +227,8 @@ class UniformInput(SignalGeneratorInput):
self.addWidget(self.seed_spin_box, 2, 1)
def get_generator(self) -> SignalGenerator:
low = self.low_input.text().strip()
try:
if not low:
low = -1.0
low = float(low)
except ValueError:
self._logger.warning(f"Cannot parse low: {low} not a number.")
low = -1.0
high = self.high_input.text().strip()
try:
if not high:
high = 1.0
high = float(high)
except ValueError:
self._logger.warning(f"Cannot parse high: {high} not a number.")
high = 1.0
low = self._parse_number(self.low_input.text(), float, "Lower bound", -1.0)
high = self._parse_number(self.high_input.text(), float, "Upper bound", 1.0)
return Uniform(self.seed_spin_box.value(), low, high)
......@@ -251,16 +247,9 @@ class ConstantInput(SignalGeneratorInput):
self.addWidget(self.constant_input, 0, 1)
def get_generator(self) -> SignalGenerator:
constant = self.constant_input.text().strip()
try:
if not constant:
constant = 1.0
constant = complex(constant)
except ValueError:
self._logger.warning(f"Cannot parse constant: {constant} not a number.")
constant = 0.0
constant = self._parse_number(
self.constant_input.text(), complex, "Constant", 1.0
)
return Constant(constant)
......@@ -272,4 +261,5 @@ _GENERATOR_MAPPING = {
"Step": StepInput,
"Uniform": UniformInput,
"ZeroPad": ZeroPadInput,
"FromFile": FromFileInput,
}
"""
B-ASIC window to simulate an SFG.
"""
import numpy as np
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas
from matplotlib.figure import Figure
from qtpy.QtCore import Qt, Signal
from qtpy.QtGui import QKeySequence
from qtpy.QtWidgets import (
QCheckBox,
QComboBox,
QDialog,
QFileDialog,
QFormLayout,
QFrame,
QGridLayout,
QHBoxLayout,
QLabel,
QLineEdit,
QLayout,
QPushButton,
QShortcut,
QSizePolicy,
QSpinBox,
QVBoxLayout,
)
......@@ -40,6 +32,7 @@ class SimulateSFGWindow(QDialog):
self.setWindowTitle("Simulate SFG")
self.dialog_layout = QVBoxLayout()
self.dialog_layout.setSizeConstraint(QLayout.SetFixedSize)
self.simulate_btn = QPushButton("Simulate")
self.simulate_btn.clicked.connect(self.save_properties)
self.dialog_layout.addWidget(self.simulate_btn)
......@@ -47,7 +40,7 @@ class SimulateSFGWindow(QDialog):
self.input_grid = QGridLayout()
self.input_files = {}
def add_sfg_to_dialog(self, sfg):
def add_sfg_to_dialog(self, sfg) -> None:
sfg_layout = QVBoxLayout()
options_layout = QFormLayout()
......@@ -89,15 +82,13 @@ class SimulateSFGWindow(QDialog):
self.input_grid.addWidget(input_label, i, 0)
input_dropdown = QComboBox()
input_dropdown.insertItems(
0, list(_GENERATOR_MAPPING.keys()) + ["File"]
)
input_dropdown.insertItems(0, list(_GENERATOR_MAPPING.keys()))
input_dropdown.currentTextChanged.connect(
lambda text, i=i: self.change_input_format(i, text)
)
self.input_grid.addWidget(input_dropdown, i, 1, alignment=Qt.AlignLeft)
self.change_input_format(i, "Impulse")
self.change_input_format(i, "Constant")
y += 1
......@@ -111,7 +102,7 @@ class SimulateSFGWindow(QDialog):
self.sfg_to_layout[sfg] = sfg_layout
self.dialog_layout.addLayout(sfg_layout)
def change_input_format(self, i, text):
def change_input_format(self, i: int, text: str) -> None:
grid = self.input_grid.itemAtPosition(i, 2)
if grid:
for j in reversed(range(grid.count())):
......@@ -125,46 +116,12 @@ class SimulateSFGWindow(QDialog):
if text in _GENERATOR_MAPPING:
param_grid = _GENERATOR_MAPPING[text](self._window.logger)
elif text == "File":
file_label = QLabel("Browse")
param_grid.addWidget(file_label, 0, 0)
file_browser = QPushButton("No file selected")
file_browser.clicked.connect(
lambda i=i: self.get_input_file(i, file_browser)
)
param_grid.addWidget(file_browser, 0, 1)
else:
raise Exception("Input selection is not implemented")
raise ValueError("Input selection is not implemented")
self.input_grid.addLayout(param_grid, i, 2)
return
def get_input_file(self, i, file_browser):
module, accepted = QFileDialog().getOpenFileName()
file_browser.setText(module)
return
def parse_input_values(self, input_values):
_input_values = []
for _list in input_values:
_list_values = []
for val in _list:
val = val.strip()
try:
if not val:
val = 0
_list_values.append(complex(val))
except ValueError:
self._window.logger.warning(f"Skipping value: {val}, not a digit.")
continue
_input_values.append(_list_values)
return _input_values
def save_properties(self):
def save_properties(self) -> None:
for sfg, _properties in self.input_fields.items():
ic_value = self.input_fields[sfg]["iteration_count"].value()
if ic_value == 0:
......@@ -178,18 +135,8 @@ class SimulateSFGWindow(QDialog):
if in_format in _GENERATOR_MAPPING:
tmp2 = in_param.get_generator()
elif in_format == "File":
widget = in_param.itemAtPosition(0, 1).widget()
path = widget.text()
try:
tmp2 = self.parse_input_values(
np.loadtxt(path, dtype=str).tolist()
)
except FileNotFoundError:
self._window.logger.error(f"Selected input file not found.")
continue
else:
raise Exception("Input selection is not implemented")
raise ValueError("Input selection is not implemented")
input_values.append(tmp2)
......@@ -207,45 +154,3 @@ class SimulateSFGWindow(QDialog):
self.accept()
self.simulate.emit()
class Plot(FigureCanvas):
def __init__(
self, simulation, sfg, window, parent=None, width=5, height=4, dpi=100
):
self.simulation = simulation
self.sfg = sfg
self.dpi = dpi
self._window = window
fig = Figure(figsize=(width, height), dpi=dpi)
fig.suptitle(sfg.name, fontsize=20)
self.axes = fig.add_subplot(111)
FigureCanvas.__init__(self, fig)
self.setParent(parent)
FigureCanvas.setSizePolicy(self, QSizePolicy.Expanding, QSizePolicy.Expanding)
FigureCanvas.updateGeometry(self)
self.save_figure = QShortcut(QKeySequence("Ctrl+S"), self)
self.save_figure.activated.connect(self._save_plot_figure)
self._plot_values_sfg()
def _save_plot_figure(self):
self._window.logger.info(f"Saving plot of figure: {self.sfg.name}.")
file_choices = "PNG (*.png)|*.png"
path, ext = QFileDialog.getSaveFileName(self, "Save file", "", file_choices)
path = path.encode("utf-8")
if not path[-4:] == file_choices[-4:].encode("utf-8"):
path += file_choices[-4:].encode("utf-8")
if path:
self.print_figure(path.decode(), dpi=self.dpi)
self._window.logger.info(f"Saved plot: {self.sfg.name} to path: {path}.")
def _plot_values_sfg(self):
x_axis = list(range(len(self.simulation.results["0"])))
for _output in range(self.sfg.output_count):
y_axis = self.simulation.results[str(_output)]
self.axes.plot(x_axis, y_axis)
......@@ -65,7 +65,7 @@ _QUESTIONS = {
}
class KeybindsWindow(QDialog):
class KeybindingsWindow(QDialog):
def __init__(self, window):
super().__init__()
self._window = window
......@@ -88,7 +88,7 @@ class KeybindsWindow(QDialog):
frame.setFrameShadow(QFrame.Sunken)
self.dialog_layout.addWidget(frame)
keybinds_label = QLabel(
keybindings_label = QLabel(
"'Ctrl+R' - Reload the operation list to add any new operations "
"created.\n"
"'Ctrl+Q' - Quit the application.\n"
......@@ -104,7 +104,7 @@ class KeybindsWindow(QDialog):
self.dialog_layout.addLayout(information_layout)
self.dialog_layout.addWidget(frame)
self.dialog_layout.addWidget(keybinds_label)
self.dialog_layout.addWidget(keybindings_label)
class FaqWindow(QDialog):
......
vunit_out
streaming_matrix_transposition_memory_*x*.vhdl
streaming_matrix_transposition_register_*x*.vhdl
work
1076.1-2017
--
-- Generic streaming transposition testbench using VUnit
-- Author: Mikael Henriksson (2023)
--
library ieee, vunit_lib;
context vunit_lib.vunit_context;
use ieee.std_logic_1164.all;
use ieee.numeric_std.all;
entity streaming_matrix_transposition_tester is
generic(
WL : integer;
ROWS : integer;
COLS : integer
);
port(
clk, rst, en : out std_logic;
input : out std_logic_vector(WL-1 downto 0);
output : in std_logic_vector(WL-1 downto 0);
done : out boolean
);
end entity streaming_matrix_transposition_tester;
architecture behav of streaming_matrix_transposition_tester is
signal clk_sig : std_logic;
begin
-- Clock (100 MHz), enable and reset generation.
clk <= clk_sig;
rst <= '1', '0' after 40 ns;
en <= '0', '1' after 100 ns;
process begin
clk_sig <= '0';
loop
wait for 5 ns; clk_sig <= not(clk_sig);
end loop;
end process;
-- Input generation
input_gen_proc: process begin
wait until en = '1';
for i in 0 to ROWS*COLS-1 loop
wait until clk = '0';
input <= std_logic_vector(to_unsigned(i, input'length));
end loop;
wait;
end process;
-- Output testing
output_test_proc: process begin
wait until en = '1';
wait until output = std_logic_vector(to_unsigned(0, output'length));
for col in 0 to COLS-1 loop
for row in 0 to ROWS-1 loop
wait until clk = '0';
check(output = std_logic_vector(to_unsigned(row*COLS + col, output'length)));
end loop;
end loop;
done <= true;
wait;
end process;
end architecture behav;
--
-- 3x3 memory based matrix transposition
--
library ieee, vunit_lib;
context vunit_lib.vunit_context;
use ieee.std_logic_1164.all;
use ieee.numeric_std.all;
entity streaming_matrix_transposition_memory_3x3_tb is
generic (
runner_cfg : string; -- VUnit python pipe
tb_path : string -- Absolute path to this testbench
);
end entity streaming_matrix_transposition_memory_3x3_tb;
architecture behav of streaming_matrix_transposition_memory_3x3_tb is
constant WL : integer := 16;
signal done : boolean;
signal input, output : std_logic_vector(WL-1 downto 0);
signal clk, rst, en : std_logic;
begin
-- VUnit test runner
process begin
test_runner_setup(runner, runner_cfg);
wait until done = true;
test_runner_cleanup(runner);
end process;
-- Run the test baby!
dut : entity work.streaming_matrix_transposition_memory_3x3
generic map(WL=>WL) port map(clk, rst, en, input, output);
tb : entity work.streaming_matrix_transposition_tester
generic map (WL=>WL, ROWS=>3, COLS=>3) port map(clk, rst, en, input, output, done);
end architecture behav;
--
-- 4x8 memory based matrix transposition
--
library ieee, vunit_lib;
context vunit_lib.vunit_context;
use ieee.std_logic_1164.all;
use ieee.numeric_std.all;
entity streaming_matrix_transposition_memory_4x8_tb is
generic (
runner_cfg : string; -- VUnit python pipe
tb_path : string -- Absolute path to this testbench
);
end entity streaming_matrix_transposition_memory_4x8_tb;
architecture behav of streaming_matrix_transposition_memory_4x8_tb is
constant WL : integer := 16;
signal done : boolean;
signal input, output : std_logic_vector(WL-1 downto 0);
signal clk, rst, en : std_logic;
begin
-- VUnit test runner
process begin
test_runner_setup(runner, runner_cfg);
wait until done = true;
test_runner_cleanup(runner);
end process;
-- Run the test baby!
dut : entity work.streaming_matrix_transposition_memory_4x8
generic map(WL=>WL) port map(clk, rst, en, input, output);
tb : entity work.streaming_matrix_transposition_tester
generic map (WL=>WL, ROWS=>4, COLS=>8) port map(clk, rst, en, input, output, done);
end architecture behav;
--
-- 7x7 memory based matrix transposition
--
library ieee, vunit_lib;
context vunit_lib.vunit_context;
use ieee.std_logic_1164.all;
use ieee.numeric_std.all;
entity streaming_matrix_transposition_memory_7x7_tb is
generic (
runner_cfg : string; -- VUnit python pipe
tb_path : string -- Absolute path to this testbench
);
end entity streaming_matrix_transposition_memory_7x7_tb;
architecture behav of streaming_matrix_transposition_memory_7x7_tb is
constant WL : integer := 16;
signal done : boolean;
signal input, output : std_logic_vector(WL-1 downto 0);
signal clk, rst, en : std_logic;
begin
-- VUnit test runner
process begin
test_runner_setup(runner, runner_cfg);
wait until done = true;
test_runner_cleanup(runner);
end process;
-- Run the test baby!
dut : entity work.streaming_matrix_transposition_memory_7x7
generic map(WL=>WL) port map(clk, rst, en, input, output);
tb : entity work.streaming_matrix_transposition_tester
generic map (WL=>WL, ROWS=>7, COLS=>7) port map(clk, rst, en, input, output, done);
end architecture behav;
--
-- 7x7 register based matrix transposition
--
library ieee, vunit_lib;
context vunit_lib.vunit_context;
use ieee.std_logic_1164.all;
use ieee.numeric_std.all;
entity streaming_matrix_transposition_register_7x7_tb is
generic (
runner_cfg : string; -- VUnit python pipe
tb_path : string -- Absolute path to this testbench
);
end entity streaming_matrix_transposition_register_7x7_tb;
architecture behav of streaming_matrix_transposition_register_7x7_tb is
constant WL : integer := 16;
signal done : boolean;
signal input, output : std_logic_vector(WL-1 downto 0);
signal clk, rst, en : std_logic;
begin
-- VUnit test runner
process begin
test_runner_setup(runner, runner_cfg);
wait until done = true;
test_runner_cleanup(runner);
end process;
-- Run the test baby!
dut : entity work.streaming_matrix_transposition_register_7x7
generic map(WL=>WL) port map(clk, rst, en, input, output);
tb : entity work.streaming_matrix_transposition_tester
generic map (WL=>WL, ROWS=>7, COLS=>7) port map(clk, rst, en, input, output, done);
end architecture behav;
--
-- 5x5 register based matrix transposition
--
library ieee, vunit_lib;
context vunit_lib.vunit_context;
use ieee.std_logic_1164.all;
use ieee.numeric_std.all;
entity streaming_matrix_transposition_register_5x5_tb is
generic (
runner_cfg : string; -- VUnit python pipe
tb_path : string -- Absolute path to this testbench
);
end entity streaming_matrix_transposition_register_5x5_tb;
architecture behav of streaming_matrix_transposition_register_5x5_tb is
constant WL : integer := 16;
signal done : boolean;
signal input, output : std_logic_vector(WL-1 downto 0);
signal clk, rst, en : std_logic;
begin
-- VUnit test runner
process begin
test_runner_setup(runner, runner_cfg);
wait until done = true;
test_runner_cleanup(runner);
end process;
-- Run the test baby!
dut : entity work.streaming_matrix_transposition_register_5x5
generic map(WL=>WL) port map(clk, rst, en, input, output);
tb : entity work.streaming_matrix_transposition_tester
generic map (WL=>WL, ROWS=>5, COLS=>5) port map(clk, rst, en, input, output, done);
end architecture behav;
--
-- 4x4 register based matrix transposition
--
library ieee, vunit_lib;
context vunit_lib.vunit_context;
use ieee.std_logic_1164.all;
use ieee.numeric_std.all;
entity streaming_matrix_transposition_register_4x4_tb is
generic (
runner_cfg : string; -- VUnit python pipe
tb_path : string -- Absolute path to this testbench
);
end entity streaming_matrix_transposition_register_4x4_tb;
architecture behav of streaming_matrix_transposition_register_4x4_tb is
constant WL : integer := 16;
signal done : boolean;
signal input, output : std_logic_vector(WL-1 downto 0);
signal clk, rst, en : std_logic;
begin
-- VUnit test runner
process begin
test_runner_setup(runner, runner_cfg);
wait until done = true;
test_runner_cleanup(runner);
end process;
-- Run the test baby!
dut : entity work.streaming_matrix_transposition_register_4x4
generic map(WL=>WL) port map(clk, rst, en, input, output);
tb : entity work.streaming_matrix_transposition_tester
generic map (WL=>WL, ROWS=>4, COLS=>4) port map(clk, rst, en, input, output, done);
end architecture behav;
--
-- 3x3 register based matrix transposition
--
library ieee, vunit_lib;
context vunit_lib.vunit_context;
use ieee.std_logic_1164.all;
use ieee.numeric_std.all;
entity streaming_matrix_transposition_register_3x3_tb is
generic (
runner_cfg : string; -- VUnit python pipe
tb_path : string -- Absolute path to this testbench
);
end entity streaming_matrix_transposition_register_3x3_tb;
architecture behav of streaming_matrix_transposition_register_3x3_tb is
constant WL : integer := 16;
signal done : boolean;
signal input, output : std_logic_vector(WL-1 downto 0);
signal clk, rst, en : std_logic;
begin
-- VUnit test runner
process begin
test_runner_setup(runner, runner_cfg);
wait until done = true;
test_runner_cleanup(runner);
end process;
-- Run the test baby!
dut : entity work.streaming_matrix_transposition_register_3x3
generic map(WL=>WL) port map(clk, rst, en, input, output);
tb : entity work.streaming_matrix_transposition_tester
generic map (WL=>WL, ROWS=>3, COLS=>3) port map(clk, rst, en, input, output, done);
end architecture behav;
--
-- 2x2 register based matrix transposition
--
library ieee, vunit_lib;
context vunit_lib.vunit_context;
use ieee.std_logic_1164.all;
use ieee.numeric_std.all;
entity streaming_matrix_transposition_register_2x2_tb is
generic (
runner_cfg : string; -- VUnit python pipe
tb_path : string -- Absolute path to this testbench
);
end entity streaming_matrix_transposition_register_2x2_tb;
architecture behav of streaming_matrix_transposition_register_2x2_tb is
constant WL : integer := 16;
signal done : boolean;
signal input, output : std_logic_vector(WL-1 downto 0);
signal clk, rst, en : std_logic;
begin
-- VUnit test runner
process begin
test_runner_setup(runner, runner_cfg);
wait until done = true;
test_runner_cleanup(runner);
end process;
-- Run the test baby!
dut : entity work.streaming_matrix_transposition_register_2x2
generic map(WL=>WL) port map(clk, rst, en, input, output);
tb : entity work.streaming_matrix_transposition_tester
generic map (WL=>WL, ROWS=>2, COLS=>2) port map(clk, rst, en, input, output, done);
end architecture behav;
--
-- 4x8 register based matrix transposition
--
library ieee, vunit_lib;
context vunit_lib.vunit_context;
use ieee.std_logic_1164.all;
use ieee.numeric_std.all;
entity streaming_matrix_transposition_register_4x8_tb is
generic (
runner_cfg : string; -- VUnit python pipe
tb_path : string -- Absolute path to this testbench
);
end entity streaming_matrix_transposition_register_4x8_tb;
architecture behav of streaming_matrix_transposition_register_4x8_tb is
constant WL : integer := 16;
signal done : boolean;
signal input, output : std_logic_vector(WL-1 downto 0);
signal clk, rst, en : std_logic;
begin
-- VUnit test runner
process begin
test_runner_setup(runner, runner_cfg);
wait until done = true;
test_runner_cleanup(runner);
end process;
-- Run the test baby!
dut : entity work.streaming_matrix_transposition_register_4x8
generic map(WL=>WL) port map(clk, rst, en, input, output);
tb : entity work.streaming_matrix_transposition_tester
generic map (WL=>WL, ROWS=>4, COLS=>8) port map(clk, rst, en, input, output, done);
end architecture behav;
#!/usr/bin/env python3
from vunit import VUnit
vu = VUnit.from_argv()
lib = vu.add_library("lib")
lib.add_source_files(
[
"*.vhdl",
]
)
lib.set_compile_option("modelsim.vcom_flags", ["-2008"])
vu.main()
"""
Module for basic VHDL code generation.
"""
from io import TextIOWrapper
from typing import List, Optional, Tuple, Union
# VHDL code generation tab length
VHDL_TAB = r" "
def write(
f: TextIOWrapper,
indent_level: int,
text: str,
*,
end: str = '\n',
start: Optional[str] = None,
):
"""
Base VHDL code generation utility. `f'{VHDL_TAB*indent_level}'` is first written to the :class:`io.TextIOWrapper`
object `f`. Immediatly after the indentation, `text` is written to `f`. Finally, `text` is also written to `f`.
Parameters
----------
f : :class:`io.TextIOWrapper`
The file object to emit VHDL code to.
indent_level : int
Indentation level to use. Exactly ``f'{VHDL_TAB*indent_level}`` is written before the text is written.
text : str
The text to write to.
end : str, default: '\n'
Text to write exactly after *text* is written to *f*.
start : str, optional
Text to write before both indentation and *text*.
"""
if start is not None:
f.write(start)
f.write(f'{VHDL_TAB*indent_level}{text}{end}')
def write_lines(
f: TextIOWrapper, lines: List[Union[Tuple[int, str], Tuple[int, str, str]]]
):
"""
Multiline VHDL code generation utility. Each tuple (int, str, [int]) in the list `lines` is written to the
:class:`io.TextIOWrapper` object `f` using the :function:`vhdl.write` function.
Parameters
----------
f : :class:`io.TextIOWrapper`
The file object to emit VHDL code to.
lines : list of tuple (int,str) [1], or list of tuple (int,str,str) [2]
[1]: The first `int` of the tuple is used as indentation level for the line and
the second `str` of the tuple is the content of the line.
[2]: Same as [1], but the third `str` of the tuple is passed to parameter `end` when calling
:function:`vhdl.write`.
"""
for tpl in lines:
if len(tpl) == 2:
write(f, indent_level=tpl[0], text=tpl[1])
elif len(tpl) == 3:
write(f, indent_level=tpl[0], text=tpl[1], end=tpl[2])
else:
raise ValueError('All tuples in list `lines` must have length 2 or 3')
"""
Module for code generation of VHDL architectures.
"""
from io import TextIOWrapper
from typing import TYPE_CHECKING, Dict, Set, Tuple, cast
from b_asic.codegen import vhdl
from b_asic.process import MemoryVariable, PlainMemoryVariable
if TYPE_CHECKING:
from b_asic.resources import ProcessCollection, _ForwardBackwardTable
def write_memory_based_storage(
f: TextIOWrapper,
assignment: Set["ProcessCollection"],
entity_name: str,
word_length: int,
read_ports: int,
write_ports: int,
total_ports: int,
input_sync: bool = True,
):
"""
Generate the VHDL architecture for a memory based architecture from a process collection of memory variables.
Parameters
----------
assignment : dict
A possible cell assignment to use when generating the memory based storage.
The cell assignment is a dictionary int to ProcessCollection where the integer
corresponds to the cell to assign all MemoryVariables in corresponding process
collection.
If unset, each MemoryVariable will be assigned to a unique cell.
f : TextIOWrapper
File object (or other TextIOWrapper object) to write the architecture onto.
word_length : int
Word length of the memory variable objects.
read_ports : int
Number of read ports.
write_ports : int
Number of write ports.
total_ports : int
Total concurrent memory accesses possible.
input_sync : bool, default: True
Add registers to the input signals (enable signal and data input signals).
Adding registers to the inputs allow pipelining of address generation (which is added automatically).
For large interleavers, this can improve timing significantly.
"""
# Code settings
mem_depth = len(assignment)
architecture_name = "rtl"
schedule_time = next(iter(assignment))._schedule_time
# Write architecture header
vhdl.write(
f, 0, f'architecture {architecture_name} of {entity_name} is', end='\n\n'
)
#
# Architecture declerative region begin
#
vhdl.write(f, 1, '-- HDL memory description')
vhdl.common.write_constant_decl(
f, name='MEM_WL', type='integer', value=word_length, name_pad=12
)
vhdl.common.write_constant_decl(
f, name='MEM_DEPTH', type='integer', value=mem_depth, name_pad=12
)
vhdl.common.write_type_decl(
f, 'mem_type', 'array(0 to MEM_DEPTH-1) of std_logic_vector(MEM_WL-1 downto 0)'
)
vhdl.common.write_signal_decl(
f, name='memory', type='mem_type', name_pad=14, vivado_ram_style='distributed'
)
for i in range(read_ports):
vhdl.common.write_signal_decl(
f, f'read_port_{i}', 'std_logic_vector(MEM_WL-1 downto 0)', name_pad=14
)
vhdl.common.write_signal_decl(
f, f'read_adr_{i}', f'integer range 0 to {schedule_time}-1', name_pad=14
)
vhdl.common.write_signal_decl(f, f'read_en_{i}', 'std_logic', name_pad=14)
for i in range(write_ports):
vhdl.common.write_signal_decl(
f, f'write_port_{i}', 'std_logic_vector(MEM_WL-1 downto 0)', name_pad=14
)
vhdl.common.write_signal_decl(
f, f'write_adr_{i}', f'integer range 0 to {schedule_time}-1', name_pad=14
)
vhdl.common.write_signal_decl(f, f'write_en_{i}', 'std_logic', name_pad=14)
# Schedule time counter
vhdl.write(f, 1, '-- Schedule counter', start='\n')
vhdl.common.write_signal_decl(
f,
name='schedule_cnt',
type=f'integer range 0 to {schedule_time}-1',
name_pad=14,
)
# Input sync signals
if input_sync:
vhdl.write(f, 1, '-- Input synchronization', start='\n')
for i in range(read_ports):
vhdl.common.write_signal_decl(
f, f'p_{i}_in_sync', 'std_logic_vector(WL-1 downto 0)', name_pad=14
)
#
# Architecture body begin
#
vhdl.write(f, 0, 'begin', start='\n', end='\n\n')
vhdl.write(f, 1, '-- Schedule counter')
vhdl.common.write_synchronous_process_prologue(
f=f,
name='schedule_cnt_proc',
clk='clk',
)
vhdl.write_lines(
f,
[
(3, 'if rst = \'1\' then'),
(4, 'schedule_cnt <= 0;'),
(3, 'else'),
(4, 'if en = \'1\' then'),
(5, f'if schedule_cnt = {schedule_time-1} then'),
(6, 'schedule_cnt <= 0;'),
(5, 'else'),
(6, 'schedule_cnt <= schedule_cnt + 1;'),
(5, 'end if;'),
(4, 'end if;'),
(3, 'end if;'),
],
)
vhdl.common.write_synchronous_process_epilogue(
f=f,
name='schedule_cnt_proc',
clk='clk',
)
if input_sync:
vhdl.write(f, 1, '-- Input synchronization', start='\n')
vhdl.common.write_synchronous_process_prologue(
f=f,
name='input_sync_proc',
clk='clk',
)
for i in range(read_ports):
vhdl.write(f, 3, f'p_{i}_in_sync <= p_{i}_in;')
vhdl.common.write_synchronous_process_epilogue(
f=f,
name='input_sync_proc',
clk='clk',
)
# Infer memory
vhdl.write(f, 1, '-- Memory', start='\n')
vhdl.common.write_asynchronous_read_memory(
f=f,
clk='clk',
name=f'mem_{0}_proc',
read_ports={
(f'read_port_{i}', f'read_adr_{i}', f'read_en_{i}')
for i in range(read_ports)
},
write_ports={
(f'write_port_{i}', f'write_adr_{i}', f'write_en_{i}')
for i in range(write_ports)
},
)
# Write address generation
vhdl.write(f, 1, '-- Memory write address generation', start='\n')
if input_sync:
vhdl.common.write_synchronous_process_prologue(
f, clk="clk", name="mem_write_address_proc"
)
else:
vhdl.common.write_process_prologue(
f, sensitivity_list="schedule_cnt", name="mem_write_address_proc"
)
vhdl.write(f, 3, 'case schedule_cnt is')
for i, collection in enumerate(assignment):
for mv in collection:
mv = cast(MemoryVariable, mv)
if mv.execution_time:
vhdl.write_lines(
f,
[
(4, f'-- {mv!r}'),
(4, f'when {(mv.start_time) % schedule_time} =>'),
(5, f'write_adr_0 <= {i};'),
(5, 'write_en_0 <= \'1\';'),
],
)
vhdl.write_lines(
f,
[
(4, 'when others =>'),
(5, 'write_adr_0 <= 0;'),
(5, 'write_en_0 <= \'0\';'),
(3, 'end case;'),
],
)
if input_sync:
vhdl.common.write_synchronous_process_epilogue(
f, clk="clk", name="mem_write_address_proc"
)
else:
vhdl.common.write_process_epilogue(
f, sensitivity_list="clk", name="mem_write_address_proc"
)
# Read address generation
vhdl.write(f, 1, '-- Memory read address generation', start='\n')
vhdl.common.write_synchronous_process_prologue(
f, clk="clk", name="mem_read_address_proc"
)
vhdl.write(f, 3, 'case schedule_cnt is')
for i, collection in enumerate(assignment):
for mv in collection:
mv = cast(PlainMemoryVariable, mv)
vhdl.write(f, 4, f'-- {mv!r}')
for read_time in mv.reads.values():
vhdl.write(
f,
4,
'when'
f' {(mv.start_time+read_time-int(not(input_sync))) % schedule_time} =>',
)
vhdl.write_lines(
f,
[
(5, f'read_adr_0 <= {i};'),
(5, 'read_en_0 <= \'1\';'),
],
)
vhdl.write_lines(
f,
[
(4, 'when others =>'),
(5, 'read_adr_0 <= 0;'),
(5, 'read_en_0 <= \'0\';'),
(3, 'end case;'),
],
)
vhdl.common.write_synchronous_process_epilogue(
f, clk="clk", name="mem_read_address_proc"
)
vhdl.write(f, 1, '-- Input and output assignmentn', start='\n')
if input_sync:
vhdl.write(f, 1, 'write_port_0 <= p_0_in_sync;')
else:
vhdl.write(f, 1, 'write_port_0 <= p_0_in;')
p_zero_exec = filter(
lambda p: p.execution_time == 0, (p for pc in assignment for p in pc)
)
vhdl.common.write_synchronous_process_prologue(
f,
clk='clk',
name='output_reg_proc',
)
vhdl.write(f, 3, 'case schedule_cnt is')
for p in p_zero_exec:
if input_sync:
write_time = (p.start_time + 1) % schedule_time
vhdl.write(f, 4, f'when {write_time} => p_0_out <= p_0_in_sync;')
else:
write_time = (p.start_time) % schedule_time
vhdl.write(f, 4, f'when {write_time} => p_0_out <= p_0_in;')
vhdl.write_lines(
f,
[
(4, 'when others => p_0_out <= read_port_0;'),
(3, 'end case;'),
],
)
vhdl.common.write_synchronous_process_epilogue(
f,
clk='clk',
name='output_reg_proc',
)
vhdl.write(f, 0, f'end architecture {architecture_name};', start='\n')
def write_register_based_storage(
f: TextIOWrapper,
forward_backward_table: "_ForwardBackwardTable",
entity_name: str,
word_length: int,
read_ports: int,
write_ports: int,
total_ports: int,
sync_rst: bool = False,
async_rst: bool = False,
):
architecture_name = "rtl"
schedule_time = len(forward_backward_table)
# Number of registers in this design
reg_cnt = len(forward_backward_table[0].regs)
# Set of the register indices to output from
output_regs = {entry.outputs_from for entry in forward_backward_table.table}
if None in output_regs:
output_regs.remove(None)
output_regs = cast(Set[int], output_regs)
# Table with mapping: register to output multiplexer index
output_mux_table = {reg: i for i, reg in enumerate(output_regs)}
# Back-edge register indices
back_edges: Set[Tuple[int, int]] = {
(frm, to)
for entry in forward_backward_table
for frm, to in entry.back_edge_to.items()
}
back_edge_table: Dict[Tuple[int, int], int] = {
edge: i + 1 for i, edge in enumerate(back_edges)
}
#
# Architecture declerative region begin
#
# Write architecture header
vhdl.write(
f, 0, f'architecture {architecture_name} of {entity_name} is', end='\n\n'
)
# Schedule time counter
vhdl.write(f, 1, '-- Schedule counter')
vhdl.common.write_signal_decl(
f,
name='schedule_cnt',
type=f'integer range 0 to {schedule_time}-1',
name_pad=18,
default_value='0',
)
# Shift register
vhdl.write(f, 1, '-- Shift register', start='\n')
vhdl.common.write_type_decl(
f,
name='shift_reg_type',
alias=f'array(0 to {reg_cnt}-1) of std_logic_vector(WL-1 downto 0)',
)
vhdl.common.write_signal_decl(
f,
name='shift_reg',
type='shift_reg_type',
name_pad=18,
)
# Back edge mux decoder
vhdl.write(f, 1, '-- Back-edge mux select signal', start='\n')
vhdl.common.write_signal_decl(
f,
name='back_edge_mux_sel',
type=f'integer range 0 to {len(back_edges)}',
name_pad=18,
)
# Output mux selector
vhdl.write(f, 1, '-- Output mux select signal', start='\n')
vhdl.common.write_signal_decl(
f,
name='out_mux_sel',
type=f'integer range 0 to {len(output_regs)-1}',
name_pad=18,
)
#
# Architecture body begin
#
vhdl.write(f, 0, 'begin', start='\n', end='\n\n')
vhdl.write(f, 1, '-- Schedule counter')
vhdl.common.write_synchronous_process_prologue(
f=f,
name='schedule_cnt_proc',
clk='clk',
)
vhdl.write_lines(
f,
[
(4, 'if en = \'1\' then'),
(5, f'if schedule_cnt = {schedule_time}-1 then'),
(6, 'schedule_cnt <= 0;'),
(5, 'else'),
(6, 'schedule_cnt <= schedule_cnt + 1;'),
(5, 'end if;'),
(4, 'end if;'),
],
)
vhdl.common.write_synchronous_process_epilogue(
f=f,
name='schedule_cnt_proc',
clk='clk',
)
# Shift register back-edge decoding
vhdl.write(f, 1, '-- Shift register back-edge decoding', start='\n')
vhdl.common.write_synchronous_process_prologue(
f,
clk='clk',
name='shift_reg_back_edge_decode_proc',
)
vhdl.write(f, 3, 'case schedule_cnt is')
for time, entry in enumerate(forward_backward_table):
if entry.back_edge_to:
assert len(entry.back_edge_to) == 1
for src, dst in entry.back_edge_to.items():
mux_idx = back_edge_table[(src, dst)]
vhdl.write_lines(
f,
[
(4, f'when {(time-1)%schedule_time} =>'),
(5, f'-- ({src} -> {dst})'),
(5, f'back_edge_mux_sel <= {mux_idx};'),
],
)
vhdl.write_lines(
f,
[
(4, 'when others =>'),
(5, 'back_edge_mux_sel <= 0;'),
(3, 'end case;'),
],
)
vhdl.common.write_synchronous_process_epilogue(
f,
clk='clk',
name='shift_reg_back_edge_decode_proc',
)
# Shift register multiplexer logic
vhdl.write(f, 1, '-- Multiplexers for shift register', start='\n')
vhdl.common.write_synchronous_process_prologue(
f,
clk='clk',
name='shift_reg_proc',
)
if sync_rst:
vhdl.write(f, 3, 'if rst = \'1\' then')
for reg_idx in range(reg_cnt):
vhdl.write(f, 4, f'shift_reg({reg_idx}) <= (others => \'0\');')
vhdl.write(f, 3, 'else')
vhdl.write_lines(
f,
[
(3, '-- Default case'),
(3, 'shift_reg(0) <= p_0_in;'),
],
)
for reg_idx in range(1, reg_cnt):
vhdl.write(f, 3, f'shift_reg({reg_idx}) <= shift_reg({reg_idx-1});')
vhdl.write(f, 3, 'case back_edge_mux_sel is')
for edge, mux_sel in back_edge_table.items():
vhdl.write_lines(
f,
[
(4, f'when {mux_sel} =>'),
(5, f'shift_reg({edge[1]}) <= shift_reg({edge[0]});'),
],
)
vhdl.write_lines(
f,
[
(4, 'when others => null;'),
(3, 'end case;'),
],
)
if sync_rst:
vhdl.write(f, 3, 'end if;')
vhdl.common.write_synchronous_process_epilogue(
f,
clk='clk',
name='shift_reg_proc',
)
# Output multiplexer decoding logic
vhdl.write(f, 1, '-- Output muliplexer decoding logic', start='\n')
vhdl.common.write_synchronous_process_prologue(
f, clk='clk', name='out_mux_decode_proc'
)
vhdl.write(f, 3, 'case schedule_cnt is')
for i, entry in enumerate(forward_backward_table):
if entry.outputs_from is not None:
sel = output_mux_table[entry.outputs_from]
vhdl.write(f, 4, f'when {(i-1)%schedule_time} =>')
vhdl.write(f, 5, f'out_mux_sel <= {sel};')
vhdl.write(f, 3, 'end case;')
vhdl.common.write_synchronous_process_epilogue(
f, clk='clk', name='out_mux_decode_proc'
)
# Output multiplexer logic
vhdl.write(f, 1, '-- Output muliplexer', start='\n')
vhdl.common.write_synchronous_process_prologue(
f,
clk='clk',
name='out_mux_proc',
)
vhdl.write(f, 3, 'case out_mux_sel is')
for reg_i, mux_i in output_mux_table.items():
vhdl.write(f, 4, f'when {mux_i} =>')
if reg_i < 0:
vhdl.write(f, 5, f'p_0_out <= p_{-1-reg_i}_in;')
else:
vhdl.write(f, 5, f'p_0_out <= shift_reg({reg_i});')
vhdl.write(f, 3, 'end case;')
vhdl.common.write_synchronous_process_epilogue(
f,
clk='clk',
name='out_mux_proc',
)
vhdl.write(f, 0, f'end architecture {architecture_name};', start='\n')
"""
Generation of common VHDL constructs
"""
from datetime import datetime
from io import TextIOWrapper
from subprocess import PIPE, Popen
from typing import Any, Optional, Set, Tuple
from b_asic.codegen import vhdl
def write_b_asic_vhdl_preamble(f: TextIOWrapper):
"""
Write a standard BASIC VHDL preamble comment.
Parameters
----------
f : :class:`io.TextIOWrapper`
The file object to write the header to.
"""
# Try to acquire the current git commit hash
git_commit_id = None
try:
process = Popen(['git', 'rev-parse', '--short', 'HEAD'], stdout=PIPE)
git_commit_id = process.communicate()[0].decode('utf-8').strip()
except:
pass
vhdl.write_lines(
f,
[
(0, '--'),
(0, '-- This code was automatically generated by the B-ASIC toolbox.'),
(0, f'-- Code generation timestamp: ({datetime.now()})'),
],
)
if git_commit_id:
vhdl.write(f, 0, f'-- B-ASIC short commit hash: {git_commit_id}')
vhdl.write_lines(
f,
[
(0, '-- URL: https://gitlab.liu.se/da/B-ASIC'),
(0, '--', '\n\n'),
],
)
def write_ieee_header(
f: TextIOWrapper,
std_logic_1164: bool = True,
numeric_std: bool = True,
):
"""
Write the standard IEEE VHDL use header with includes of std_logic_1164 and numeric_std.
Parameters
----------
f : :class:`io.TextIOWrapper`
The TextIOWrapper object to write the IEEE header to.
std_logic_1164 : bool, default: True
Include the std_logic_1164 header.
numeric_std : bool, default: True
Include the numeric_std header.
"""
vhdl.write(f, 0, 'library ieee;')
if std_logic_1164:
vhdl.write(f, 0, 'use ieee.std_logic_1164.all;')
if numeric_std:
vhdl.write(f, 0, 'use ieee.numeric_std.all;')
vhdl.write(f, 0, '')
def write_signal_decl(
f: TextIOWrapper,
name: str,
type: str,
default_value: Optional[str] = None,
name_pad: Optional[int] = None,
vivado_ram_style: Optional[str] = None,
quartus_ram_style: Optional[str] = None,
):
"""
Create a VHDL signal declaration: ::
signal {name} : {type} [:= {default_value}];
Parameters
----------
f : :class:`io.TextIOWrapper`
The TextIOWrapper object to write the IEEE header to.
name : str
Signal name.
type : str
Signal type.
default_value : string, optional
An optional default value to the signal.
name_pad : int, optional
An optional left padding value applied to the name.
vivado_ram_style : string, optional
An optional Xilinx Vivado RAM style attribute to apply to this signal delcaration.
If set, exactly one of: "block", "distributed", "registers", "ultra", "mixed" or "auto".
quartus_ram_style : string, optional
An optional Quartus Prime RAM style attribute to apply to this signal delcaration.
If set, exactly one of: "M4K", "M9K", "M10K", "M20K", "M144K", "MLAB" or "logic".
"""
# Spacing of VHDL signals declaration always with a single tab
name_pad = name_pad or 0
vhdl.write(f, 1, f'signal {name:<{name_pad}} : {type}', end='')
if default_value is not None:
vhdl.write(f, 0, f' := {default_value}', end='')
vhdl.write(f, 0, ';')
if vivado_ram_style is not None:
vhdl.write_lines(
f,
[
(1, 'attribute ram_style : string;'),
(1, f'attribute ram_style of {name} : signal is "{vivado_ram_style}";'),
],
)
if quartus_ram_style is not None:
vhdl.write_lines(
f,
[
(1, 'attribute ramstyle : string;'),
(1, f'attribute ramstyle of {name} : signal is "{quartus_ram_style}";'),
],
)
def write_constant_decl(
f: TextIOWrapper,
name: str,
type: str,
value: Any,
name_pad: Optional[int] = None,
type_pad: Optional[int] = None,
):
"""
Write a VHDL constant declaration with a name, a type and a value.
Parameters
----------
f : :class:`io.TextIOWrapper`
The TextIOWrapper object to write the constant declaration to.
name : str
Signal name.
type : str
Signal type.
value : anything convertable to str
Default value to the signal.
name_pad : int, optional
An optional left padding value applied to the name.
"""
name_pad = 0 if name_pad is None else name_pad
vhdl.write(f, 1, f'constant {name:<{name_pad}} : {type} := {str(value)};')
def write_type_decl(
f: TextIOWrapper,
name: str,
alias: str,
):
"""
Write a VHDL type declaration with a name tied to an alias.
Parameters
----------
f : :class:`io.TextIOWrapper`
The TextIOWrapper object to write the type declaration to.
name : str
Type name alias.
alias : str
The type to tie the new name to.
"""
vhdl.write(f, 1, f'type {name} is {alias};')
def write_process_prologue(
f: TextIOWrapper,
sensitivity_list: str,
indent: int = 1,
name: Optional[str] = None,
):
"""
Write only the prologue of a regular VHDL process with a user provided sensitivity list.
This method should almost always guarantely be followed by a write_process_epilogue.
Parameters
----------
f : :class:`io.TextIOWrapper`
The TextIOWrapper object to write the type declaration to.
sensitivity_list : str
Content of the process sensitivity list.
indent : int, default: 1
Indentation level to use for this process.
name : Optional[str]
An optional name for the process.
"""
if name is not None:
vhdl.write(f, indent, f'{name}: process({sensitivity_list})')
else:
vhdl.write(f, indent, f'process({sensitivity_list})')
vhdl.write(f, indent, 'begin')
def write_process_epilogue(
f: TextIOWrapper,
sensitivity_list: Optional[str] = None,
indent: int = 1,
name: Optional[str] = None,
):
"""
Parameters
----------
f : :class:`io.TextIOWrapper`
The TextIOWrapper object to write the type declaration to.
sensitivity_list : str
Content of the process sensitivity list. Not needed when writing the epligoue.
indent : int, default: 1
Indentation level to use for this process.
indent : int, default: 1
Indentation level to use for this process.
name : Optional[str]
An optional name of the ending process.
"""
_ = sensitivity_list
vhdl.write(f, indent, 'end process', end="")
if name is not None:
vhdl.write(f, 0, ' ' + name, end="")
vhdl.write(f, 0, ';')
def write_synchronous_process_prologue(
f: TextIOWrapper,
clk: str,
indent: int = 1,
name: Optional[str] = None,
):
"""
Write only the prologue of a regular VHDL synchronous process with a single clock object in the sensitivity list
triggering a rising edge block by some body of VHDL code.
This method should almost always guarantely be followed by a write_synchronous_process_epilogue.
Parameters
----------
f : :class:`io.TextIOWrapper`
The TextIOWrapper to write the VHDL code onto.
clk : str
Name of the clock.
indent : int, default: 1
Indentation level to use for this process.
name : Optional[str]
An optional name for the process.
"""
write_process_prologue(f, sensitivity_list=clk, indent=indent, name=name)
vhdl.write(f, indent + 1, 'if rising_edge(clk) then')
def write_synchronous_process_epilogue(
f: TextIOWrapper,
clk: Optional[str],
indent: int = 1,
name: Optional[str] = None,
):
"""
Write only the epilogue of a regular VHDL synchronous process with a single clock object in the sensitivity list
triggering a rising edge block by some body of VHDL code.
This method should almost always guarantely be followed by a write_synchronous_process_epilogue.
Parameters
----------
f : :class:`io.TextIOWrapper`
The TextIOWrapper to write the VHDL code onto.
clk : str
Name of the clock.
indent : int, default: 1
Indentation level to use for this process.
name : Optional[str]
An optional name for the process
"""
_ = clk
vhdl.write(f, indent + 1, 'end if;')
write_process_epilogue(f, sensitivity_list=clk, indent=indent, name=name)
def write_synchronous_process(
f: TextIOWrapper,
clk: str,
body: str,
indent: int = 1,
name: Optional[str] = None,
):
"""
Write a regular VHDL synchronous process with a single clock object in the sensitivity list triggering
a rising edge block by some body of VHDL code.
Parameters
----------
f : :class:`io.TextIOWrapper`
The TextIOWrapper to write the VHDL code onto.
clk : str
Name of the clock.
body : str
Body of the `if rising_edge(clk) then` block.
indent : int, default: 1
Indentation level to use for this process.
name : Optional[str]
An optional name for the process
"""
write_synchronous_process_prologue(f, clk, indent, name)
for line in body.split('\n'):
if len(line):
vhdl.write(f, indent + 2, f'{line}')
write_synchronous_process_epilogue(f, clk, indent, name)
def write_synchronous_memory(
f: TextIOWrapper,
clk: str,
read_ports: Set[Tuple[str, str, str]],
write_ports: Set[Tuple[str, str, str]],
name: Optional[str] = None,
):
"""
Infer a VHDL synchronous reads and writes.
Parameters
----------
f : :class:`io.TextIOWrapper`
The TextIOWrapper to write the VHDL code onto.
clk : str
Name of clock identifier to the synchronous memory.
read_ports : Set[Tuple[str,str]]
A set of strings used as identifiers for the read ports of the memory.
write_ports : Set[Tuple[str,str,str]]
A set of strings used as identifiers for the write ports of the memory.
name : Optional[str]
An optional name for the memory process.
"""
assert len(read_ports) >= 1
assert len(write_ports) >= 1
write_synchronous_process_prologue(f, clk=clk, name=name)
for read_name, address, re in read_ports:
vhdl.write_lines(
f,
[
(3, f'if {re} = \'1\' then'),
(4, f'{read_name} <= memory({address});'),
(3, 'end if;'),
],
)
for write_name, address, we in write_ports:
vhdl.write_lines(
f,
[
(3, f'if {we} = \'1\' then'),
(4, f'memory({address}) <= {write_name};'),
(3, 'end if;'),
],
)
write_synchronous_process_epilogue(f, clk=clk, name=name)
def write_asynchronous_read_memory(
f: TextIOWrapper,
clk: str,
read_ports: Set[Tuple[str, str, str]],
write_ports: Set[Tuple[str, str, str]],
name: Optional[str] = None,
):
"""
Infer a VHDL memory with synchronous writes and asynchronous reads.
Parameters
----------
f : :class:`io.TextIOWrapper`
The TextIOWrapper to write the VHDL code onto.
clk : str
Name of clock identifier to the synchronous memory.
read_ports : Set[Tuple[str,str]]
A set of strings used as identifiers for the read ports of the memory.
write_ports : Set[Tuple[str,str,str]]
A set of strings used as identifiers for the write ports of the memory.
name : Optional[str]
An optional name for the memory process.
"""
assert len(read_ports) >= 1
assert len(write_ports) >= 1
write_synchronous_process_prologue(f, clk=clk, name=name)
for write_name, address, we in write_ports:
vhdl.write_lines(
f,
[
(3, f'if {we} = \'1\' then'),
(4, f'memory({address}) <= {write_name};'),
(3, 'end if;'),
],
)
write_synchronous_process_epilogue(f, clk=clk, name=name)
for read_name, address, _ in read_ports:
vhdl.write(f, 1, f'{read_name} <= memory({address});')
"""
Module for code generation of VHDL entity declarations
"""
from io import TextIOWrapper
from typing import Set
from b_asic.codegen import vhdl
from b_asic.codegen.vhdl import VHDL_TAB
from b_asic.port import Port
from b_asic.process import MemoryVariable, PlainMemoryVariable
from b_asic.resources import ProcessCollection
def write_memory_based_storage(
f: TextIOWrapper, entity_name: str, collection: ProcessCollection, word_length: int
):
# Check that this is a ProcessCollection of (Plain)MemoryVariables
is_memory_variable = all(
isinstance(process, MemoryVariable) for process in collection
)
is_plain_memory_variable = all(
isinstance(process, PlainMemoryVariable) for process in collection
)
if not (is_memory_variable or is_plain_memory_variable):
raise ValueError(
"HDL can only be generated for ProcessCollection of (Plain)MemoryVariables"
)
entity_name = entity_name
# Write the entity header
vhdl.write_lines(
f,
[
(0, f'entity {entity_name} is'),
(1, 'generic('),
(2, '-- Data word length'),
(2, f'WL : integer := {word_length}'),
(1, ');'),
(1, 'port('),
],
)
# Write the clock and reset signal
vhdl.write_lines(
f,
[
(0, '-- Clock, synchronous reset and enable signals'),
(2, 'clk : in std_logic;'),
(2, 'rst : in std_logic;'),
(2, 'en : in std_logic;'),
(0, ''),
],
)
# Write the input port specification
f.write(f'{2*VHDL_TAB}-- Memory port I/O\n')
read_ports: set[Port] = set(sum((mv.read_ports for mv in collection), ())) # type: ignore
for idx, read_port in enumerate(read_ports):
port_name = read_port if isinstance(read_port, int) else read_port.name
port_name = 'p_' + str(port_name) + '_in'
f.write(f'{2*VHDL_TAB}{port_name} : in std_logic_vector(WL-1 downto 0);\n')
# Write the output port specification
write_ports: Set[Port] = {mv.write_port for mv in collection} # type: ignore
for idx, write_port in enumerate(write_ports):
port_name = write_port if isinstance(write_port, int) else write_port.name
port_name = 'p_' + str(port_name) + '_out'
f.write(f'{2*VHDL_TAB}{port_name} : out std_logic_vector(WL-1 downto 0)')
if idx == len(write_ports) - 1:
f.write('\n')
else:
f.write(';\n')
# Write ending of the port header
f.write(f'{VHDL_TAB});\n')
f.write(f'end entity {entity_name};\n\n')
def write_register_based_storage(
f: TextIOWrapper, entity_name: str, collection: ProcessCollection, word_length: int
):
write_memory_based_storage(f, entity_name, collection, word_length)
......@@ -360,7 +360,7 @@ class Multiplication(AbstractOperation):
@property
def is_linear(self) -> bool:
return any(
input.connected_source.operation.is_constant for input in self.inputs
input_.connected_source.operation.is_constant for input_ in self.inputs
)
......
......@@ -130,10 +130,7 @@ class AbstractGraphComponent(GraphComponent):
f"id: {self.graph_id if self.graph_id else 'no_id'}, \tname:"
f" {self.name if self.name else 'no_name'}"
+ "".join(
(
f", \t{key}: {str(param)}"
for key, param in self._parameters.items()
)
(f", \t{key}: {str(param)}" for key, param in self._parameters.items())
)
)
......@@ -176,12 +173,12 @@ class AbstractGraphComponent(GraphComponent):
def traverse(self) -> Generator[GraphComponent, None, None]:
# Breadth first search.
visited = {self}
fontier = deque([self])
while fontier:
component = fontier.popleft()
frontier = deque([self])
while frontier:
component = frontier.popleft()
yield component
for neighbor in component.neighbors:
neighbor = cast(AbstractGraphComponent, neighbor)
if neighbor not in visited:
visited.add(neighbor)
fontier.append(neighbor)
frontier.append(neighbor)
"""
B-ASIC GUI utilities.
This module contains GUI classes that have a general use and are not
strictly related to either the Block Diagram GUI or Scheduler GUI.
"""