Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • da/B-ASIC
  • lukja239/B-ASIC
  • robal695/B-ASIC
3 results
Show changes
Commits on Source (5)
Showing
with 609 additions and 20 deletions
......@@ -115,3 +115,4 @@ TODO.txt
*.log
b_asic/_version.py
docs_sphinx/_build/
docs_sphinx/examples
<img src="logo.png" width="278" height="100">
<img src="logos/logo.png" width="278" height="100">
# B-ASIC - Better ASIC Toolbox
......@@ -24,6 +24,7 @@ The following packages are required in order to build the library:
- [NumPy](https://numpy.org/)
- [QtPy](https://github.com/spyder-ide/qtpy)
- [setuptools_scm](https://github.com/pypa/setuptools_scm/)
- [NetworkX](https://networkx.org/)
- Qt 5 or 6, with Python bindings, one of:
- pyside2
- pyqt5
......
import pathlib
import sys # ONLY FOR DEBUG
from qtpy.QtCore import Qt
......@@ -43,13 +44,18 @@ class AboutWindow(QDialog):
# |3 links |4 OK | <- layout34
label1 = QLabel(
"# B-ASIC / Better ASIC Toolbox\n*Construct, simulate and analyze"
" components of an ASIC.*\n\nB-ASIC is an open source tool using"
" the B-ASIC library to construct, simulate and analyze"
" ASICs.\n\nB-ASIC is developed under the MIT-license and any"
" extension to the program should follow that same license.\n\nTo"
" read more about how the GUI works please refer to the FAQ under"
f" 'Help'.\n\n*Version: {__version__}*"
"# B-ASIC\n__Better ASIC and FPGA Signal Processing"
" Toolbox__\n\n*Construct, simulate and analyze signal processing"
" algorithms aimed at implementation on an ASIC or"
" FPGA.*\n\nB-ASIC is developed by the <a"
" href=\"https://liu.se/en/organisation/liu/isy/da\">Division of"
" Computer Engineering</a> at <a"
" href=\"https://liu.se/?l=en\">Linköping University</a>,"
" Sweden.\n\nB-ASIC is released under the <a"
" href=\"https://gitlab.liu.se/da/B-ASIC/-/blob/master/LICENSE\">MIT-license</a>"
" and any extension to the program should follow that same"
f" license.\n\n*Version: {__version__}*\n\nCopyright 2020-2023,"
" Oscar Gustafsson et al."
)
label1.setTextFormat(Qt.MarkdownText)
label1.setWordWrap(True)
......@@ -58,15 +64,17 @@ class AboutWindow(QDialog):
self.logo2 = QLabel(self)
self.logo2.setPixmap(
QPixmap("../../small_logo.png").scaledToWidth(100)
QPixmap(
str(pathlib.Path(__file__).parent.resolve())
+ "/../../logos/small_logo.png"
).scaledToWidth(100)
)
self.logo2.setFixedWidth(100)
label3 = QLabel(
"""See: <a href="https://da.gitlab-pages.liu.se/B-ASIC/">documentation</a>,"""
""" <a href="https://gitlab.liu.se/da/B-ASIC/">git</a>,"""
""" <a href="https://www.liu.se/?l=en">liu.se</a>,"""
""" <a href="https://liu.se/organisation/liu/isy/da">Computer Engineering</a>."""
"""Additional resources: <a href="https://da.gitlab-pages.liu.se/B-ASIC/">documentation</a>,"""
""" <a href="https://gitlab.liu.se/da/B-ASIC/">git repository</a>,"""
""" <a href="https://gitlab.liu.se/da/B-ASIC/-/issues">report issues and suggestions</a>."""
)
label3.setOpenExternalLinks(True)
label3.linkHovered.connect(self.hoverText)
......
......@@ -2,7 +2,7 @@
B-ASIC classes representing resource usage.
"""
from typing import Dict, Tuple
from typing import Dict, Optional, Tuple
from b_asic.operation import Operation
from b_asic.port import InputPort, OutputPort
......@@ -20,11 +20,20 @@ class Process:
Start time of process.
execution_time : int
Execution time (lifetime) of process.
name : str, optional
The name of the process. If not provided, generate a name.
"""
def __init__(self, start_time: int, execution_time: int):
def __init__(
self, start_time: int, execution_time: int, name: Optional[str] = None
):
self._start_time = start_time
self._execution_time = execution_time
if name is None:
self._name = f"Proc. {PlainMemoryVariable._name_cnt}"
PlainMemoryVariable._name_cnt += 1
else:
self._name = name
def __lt__(self, other):
return self._start_time < other.start_time or (
......@@ -42,6 +51,16 @@ class Process:
"""Return the execution time."""
return self._execution_time
@property
def name(self) -> str:
return self._name
def __str__(self) -> str:
return self._name
# Static counter for default names
_name_cnt = 0
class OperatorProcess(Process):
"""
......@@ -53,16 +72,27 @@ class OperatorProcess(Process):
Start time of process.
operation : Operation
Operation that the process corresponds to.
name : str, optional
The name of the process.
"""
def __init__(self, start_time: int, operation: Operation):
def __init__(
self,
start_time: int,
operation: Operation,
name: Optional[str] = None,
):
execution_time = operation.execution_time
if execution_time is None:
raise ValueError(
"Operation {operation!r} does not have an execution time"
" specified!"
)
super().__init__(start_time, execution_time)
super().__init__(
start_time,
execution_time,
name=name,
)
self._operation = operation
......@@ -80,6 +110,8 @@ class MemoryVariable(Process):
reads : {InputPort: int, ...}
Dictionary with the InputPorts that reads the memory variable and
for how long after the *write_time* they will read.
name : str, optional
The name of the process.
"""
def __init__(
......@@ -87,12 +119,15 @@ class MemoryVariable(Process):
write_time: int,
write_port: OutputPort,
reads: Dict[InputPort, int],
name: Optional[str] = None,
):
self._read_ports = tuple(reads.keys())
self._life_times = tuple(reads.values())
self._write_port = write_port
super().__init__(
start_time=write_time, execution_time=max(self._life_times)
start_time=write_time,
execution_time=max(self._life_times),
name=name,
)
@property
......@@ -123,6 +158,8 @@ class PlainMemoryVariable(Process):
reads : {int: int, ...}
Dictionary where the key is the destination identifier and the value
is the time after *write_time* that the memory variable is read.
name : str, optional
The name of the process.
"""
def __init__(
......@@ -130,12 +167,15 @@ class PlainMemoryVariable(Process):
write_time: int,
write_port: int,
reads: Dict[int, int],
name: Optional[str] = None,
):
self._read_ports = tuple(reads.keys())
self._life_times = tuple(reads.values())
self._write_port = write_port
super().__init__(
start_time=write_time, execution_time=max(self._life_times)
start_time=write_time,
execution_time=max(self._life_times),
name=name,
)
@property
......
"""
Functions to generate memory-variable test data that are used for research.
"""
import random
from typing import Optional, Set
from b_asic.process import PlainMemoryVariable
from b_asic.resources import ProcessCollection
def _insert_delays(inputorder, outputorder, min_lifetime, cyclic):
size = len(inputorder)
maxdiff = min(outputorder[i] - inputorder[i] for i in range(size))
outputorder = [o - maxdiff + min_lifetime for o in outputorder]
maxdelay = max(outputorder[i] - inputorder[i] for i in range(size))
if cyclic:
if maxdelay >= size:
inputorder = inputorder + [i + size for i in inputorder]
outputorder = outputorder + [o + size for o in outputorder]
return inputorder, outputorder
def generate_random_interleaver(
size: int, min_lifetime: int = 0, cyclic: bool = True
) -> ProcessCollection:
"""
Generate a ProcessCollection with memory variable corresponding to a random
interleaver with length *size*.
Parameters
----------
size : int
The size of the random interleaver sequence.
min_lifetime : int, default: 0
The minimum lifetime for a memory variable. Default is 0 meaning that at least
one variable is passed from the input to the output directly,
cyclic : bool, default: True
If the interleaver should operate continuously in a cyclic manner. That is,
start a new interleaving operation directly after the previous.
Returns
-------
ProcessCollection
"""
inputorder = list(range(size))
outputorder = inputorder[:]
random.shuffle(outputorder)
inputorder, outputorder = _insert_delays(
inputorder, outputorder, min_lifetime, cyclic
)
return ProcessCollection(
{
PlainMemoryVariable(
inputorder[i], 0, {0: outputorder[i] - inputorder[i]}
)
for i in range(len(inputorder))
},
len(inputorder),
cyclic,
)
def generate_matrix_transposer(
height: int,
width: Optional[int] = None,
min_lifetime: int = 0,
cyclic: bool = True,
) -> ProcessCollection:
r"""
Generate a ProcessCollection with memory variable corresponding to transposing a
matrix of size *height* :math:`\times` *width*. If *width* is not provided, a
square matrix of size *height* :math:`\times` *height* is used.
Parameters
----------
height : int
Matrix height.
width : int, optional
Matrix width. If not provided assumed to be equal to height, i.e., a square
matrix.
min_lifetime : int, default: 0
The minimum lifetime for a memory variable. Default is 0 meaning that at
least one variable is passed from the input to the output directly,
cyclic : bool, default: True
If the interleaver should operate continuously in a cyclic manner. That is,
start a new interleaving operation directly after the previous.
Returns
-------
ProcessCollection
"""
if width is None:
width = height
inputorder = []
for row in range(height):
for col in range(width):
inputorder.append(col + width * row)
outputorder = []
for row in range(width):
for col in range(height):
outputorder.append(col * width + row)
inputorder, outputorder = _insert_delays(
inputorder, outputorder, min_lifetime, cyclic
)
return ProcessCollection(
{
PlainMemoryVariable(
inputorder[i],
0,
{0: outputorder[i] - inputorder[i]},
name=f"{inputorder[i]}",
)
for i in range(len(inputorder))
},
len(inputorder),
cyclic,
)
import re
from typing import Dict, List, Optional, Set, Tuple, Union
import matplotlib.pyplot as plt
import networkx as nx
from matplotlib.axes import Axes
from matplotlib.ticker import MaxNLocator
from b_asic.process import Process
# From https://stackoverflow.com/questions/2669059/how-to-sort-alpha-numeric-set-in-python
def _sorted_nicely(to_be_sorted):
"""Sort the given iterable in the way that humans expect."""
convert = lambda text: int(text) if text.isdigit() else text
alphanum_key = lambda key: [
convert(c) for c in re.split('([0-9]+)', str(key))
]
return sorted(to_be_sorted, key=alphanum_key)
def draw_exclusion_graph_coloring(
exclusion_graph: nx.Graph,
color_dict: Dict[Process, int],
ax: Optional[Axes] = None,
color_list: Optional[
Union[List[str], List[Tuple[float, float, float]]]
] = None,
):
"""
Use matplotlib.pyplot and networkx to draw a colored exclusion graph from the memory assigment
.. code-block:: python
_, ax = plt.subplots(1, 1)
collection = ProcessCollection(...)
exclusion_graph = collection.create_exclusion_graph_from_overlap()
color_dict = nx.greedy_color(exclusion_graph)
draw_exclusion_graph_coloring(exclusion_graph, color_dict, ax=ax[0])
plt.show()
Parameters
----------
exclusion_graph : nx.Graph
A nx.Graph exclusion graph object that is to be drawn.
color_dict : dictionary
A color dictionary where keys are Process objects and where values are integers representing colors. These
dictionaries are automatically generated by :func:`networkx.algorithms.coloring.greedy_color`.
ax : :class:`matplotlib.axes.Axes`, optional
A Matplotlib Axes object to draw the exclusion graph
color_list : Optional[Union[List[str], List[Tuple[float,float,float]]]]
"""
COLOR_LIST = [
'#aa0000',
'#00aa00',
'#0000ff',
'#ff00aa',
'#ffaa00',
'#00ffaa',
'#aaff00',
'#aa00ff',
'#00aaff',
'#ff0000',
'#00ff00',
'#0000aa',
'#aaaa00',
'#aa00aa',
'#00aaaa',
]
node_color_dict = {}
if color_list is None:
node_color_dict = {k: COLOR_LIST[v] for k, v in color_dict.items()}
else:
node_color_dict = {k: color_list[v] for k, v in color_dict.items()}
node_color_list = [node_color_dict[node] for node in exclusion_graph]
nx.draw_networkx(
exclusion_graph,
node_color=node_color_list,
ax=ax,
pos=nx.spring_layout(exclusion_graph, seed=1),
)
class ProcessCollection:
"""
Collection of one or more processes
Parameters
----------
collection : set of :class:`~b_asic.process.Process` objects
The Process objects forming this ProcessCollection.
schedule_time : int, default: 0
Length of the time-axis in the generated graph.
cyclic : bool, default: False
If the processes operates cyclically, i.e., if time 0 == time *schedule_time*.
"""
def __init__(
self,
collection: Set[Process],
schedule_time: int,
cyclic: bool = False,
):
self._collection = collection
self._schedule_time = schedule_time
self._cyclic = cyclic
def add_process(self, process: Process):
"""
Add a new process to this process collection.
Parameters
----------
process : Process
The process object to be added to the collection
"""
self._collection.add(process)
def draw_lifetime_chart(
self,
ax: Optional[Axes] = None,
show_name: bool = True,
):
"""
Use matplotlib.pyplot to generate a process variable lifetime chart from this process collection.
Parameters
----------
ax : :class:`matplotlib.axes.Axes`, optional
Matplotlib Axes object to draw this lifetime chart onto. If not provided (i.e., set to None), this will
return a new axes object on return.
show_name : bool, default: True
Show name of all processes in the lifetime chart.
Returns
-------
ax: Associated Matplotlib Axes (or array of Axes) object
"""
# Setup the Axes object
if ax is None:
_, _ax = plt.subplots()
else:
_ax = ax
# Draw the lifetime chart
PAD_L, PAD_R = 0.05, 0.05
max_execution_time = max(
process.execution_time for process in self._collection
)
if max_execution_time > self._schedule_time:
# Schedule time needs to be greater than or equal to the maximum process life time
raise KeyError(
f'Error: Schedule time: {self._schedule_time} < Max execution'
f' time: {max_execution_time}'
)
for i, process in enumerate(_sorted_nicely(self._collection)):
bar_start = process.start_time % self._schedule_time
bar_end = (
process.start_time + process.execution_time
) % self._schedule_time
if bar_end > bar_start:
_ax.broken_barh(
[(PAD_L + bar_start, bar_end - bar_start - PAD_L - PAD_R)],
(i + 0.55, 0.9),
)
else: # bar_end < bar_start
if bar_end != 0:
_ax.broken_barh(
[
(
PAD_L + bar_start,
self._schedule_time - bar_start - PAD_L,
)
],
(i + 0.55, 0.9),
)
_ax.broken_barh([(0, bar_end - PAD_R)], (i + 0.55, 0.9))
else:
_ax.broken_barh(
[
(
PAD_L + bar_start,
self._schedule_time
- bar_start
- PAD_L
- PAD_R,
)
],
(i + 0.55, 0.9),
)
if show_name:
_ax.annotate(
str(process),
(bar_start + PAD_L + 0.025, i + 1.00),
va="center",
)
_ax.grid(True)
_ax.xaxis.set_major_locator(MaxNLocator(integer=True))
_ax.yaxis.set_major_locator(MaxNLocator(integer=True))
_ax.set_xlim(0, self._schedule_time)
_ax.set_ylim(0.25, len(self._collection) + 0.75)
return _ax
def create_exclusion_graph_from_overlap(
self, add_name: bool = True
) -> nx.Graph:
"""
Generate exclusion graph based on processes overlaping in time
Parameters
----------
add_name : bool, default: True
Add name of all processes as a node attribute in the exclusion graph.
Returns
-------
An nx.Graph exclusion graph where nodes are processes and arcs
between two processes indicated overlap in time
"""
exclusion_graph = nx.Graph()
exclusion_graph.add_nodes_from(self._collection)
for process1 in self._collection:
for process2 in self._collection:
if process1 == process2:
continue
else:
t1 = set(
range(
process1.start_time,
process1.start_time + process1.execution_time,
)
)
t2 = set(
range(
process2.start_time,
process2.start_time + process2.execution_time,
)
)
if t1.intersection(t2):
exclusion_graph.add_edge(process1, process2)
return exclusion_graph
def split(
self,
heuristic: str = "graph_color",
read_ports: Optional[int] = None,
write_ports: Optional[int] = None,
total_ports: Optional[int] = None,
) -> Set["ProcessCollection"]:
"""
Split this process storage based on some heuristic.
Parameters
----------
heuristic : str, default: "graph_color"
The heuristic used when spliting this ProcessCollection.
Valid options are:
* "graph_color"
* "..."
read_ports : int, optional
The number of read ports used when spliting process collection based on memory variable access.
write_ports : int, optional
The number of write ports used when spliting process collection based on memory variable access.
total_ports : int, optional
The total number of ports used when spliting process collection based on memory variable access.
Returns
-------
A set of new ProcessColleciton objects with the process spliting.
"""
if total_ports is None:
if read_ports is None or write_ports is None:
raise ValueError("inteligent quote")
else:
total_ports = read_ports + write_ports
else:
read_ports = total_ports if read_ports is None else read_ports
write_ports = total_ports if write_ports is None else write_ports
if heuristic == "graph_color":
return self._split_graph_color(
read_ports, write_ports, total_ports
)
else:
raise ValueError("Invalid heuristic provided")
def _split_graph_color(
self, read_ports: int, write_ports: int, total_ports: int
) -> Set["ProcessCollection"]:
"""
Parameters
----------
read_ports : int, optional
The number of read ports used when spliting process collection based on memory variable access.
write_ports : int, optional
The number of write ports used when spliting process collection based on memory variable access.
total_ports : int, optional
The total number of ports used when spliting process collection based on memory variable access.
"""
if read_ports != 1 or write_ports != 1:
raise ValueError(
"Spliting with read and write ports not equal to one with the"
" graph coloring heuristic does not make sense."
)
if total_ports not in (1, 2):
raise ValueError(
"Total ports should be either 1 (non-concurent reads/writes)"
" or 2 (concurrent read/writes) for graph coloring heuristic."
)
# Create new exclusion graph. Nodes are Processes
exclusion_graph = nx.Graph()
exclusion_graph.add_nodes_from(self._collection)
# Add exclusions (arcs) between processes in the exclusion graph
for node1 in exclusion_graph:
for node2 in exclusion_graph:
if node1 == node2:
continue
else:
node1_stop_time = node1.start_time + node1.execution_time
node2_stop_time = node2.start_time + node2.execution_time
if total_ports == 1:
# Single-port assignment
if node1.start_time == node2.start_time:
exclusion_graph.add_edge(node1, node2)
elif node1_stop_time == node2_stop_time:
exclusion_graph.add_edge(node1, node2)
elif node1.start_time == node2_stop_time:
exclusion_graph.add_edge(node1, node2)
elif node1_stop_time == node2.start_time:
exclusion_graph.add_edge(node1, node2)
else:
# Dual-port assignment
if node1.start_time == node2.start_time:
exclusion_graph.add_edge(node1, node2)
elif node1_stop_time == node2_stop_time:
exclusion_graph.add_edge(node1, node2)
# Perform assignment
coloring = nx.coloring.greedy_color(exclusion_graph)
draw_exclusion_graph_coloring(exclusion_graph, coloring)
# process_collection_list = [ProcessCollection()]*(max(coloring.values()) + 1)
process_collection_set_list = [
set() for _ in range(max(coloring.values()) + 1)
]
for process, color in coloring.items():
process_collection_set_list[color].add(process)
return {
ProcessCollection(
process_collection_set, self._schedule_time, self._cyclic
)
for process_collection_set in process_collection_set_list
}
# Minimal makefile for Sphinx documentation
#
# You can set these variables from the command line.
SPHINXOPTS = -W --keep-going
SPHINXBUILD = python -msphinx
SPHINXPROJ = b_asic
SOURCEDIR = .
BUILDDIR = _build
# Put it first so that "make" without argument is like "make help".
help:
@$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)
.PHONY: help Makefile
# workaround because sphinx does not completely clean up (#11139)
clean:
@$(SPHINXBUILD) -M clean "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)
rm -rf "$(SOURCEDIR)/examples"
show:
@python -c "import webbrowser; webbrowser.open_new_tab('file://$(shell pwd)/_build/html/index.html')"
# Catch-all target: route all unknown targets to Sphinx using the new
# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS).
%: Makefile
@$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)
......@@ -10,6 +10,7 @@ API
operation.rst
port.rst
process.rst
resources.rst
schedule.rst
sfg_generators.rst
signal.rst
......
********************
``b_asic.resources``
********************
.. automodule:: b_asic.resources
:members:
:undoc-members:
......@@ -10,7 +10,7 @@ import shutil
project = 'B-ASIC'
copyright = '2020-2023, Oscar Gustafsson et al'
author = 'Oscar Gustafsson et al'
html_logo = "../logo_tiny.png"
html_logo = "../logos/logo_tiny.png"
pygments_style = 'sphinx'
......@@ -39,6 +39,7 @@ intersphinx_mapping = {
'matplotlib': ('https://matplotlib.org/stable/', None),
'numpy': ('https://numpy.org/doc/stable/', None),
'PyQt5': ("https://www.riverbankcomputing.com/static/Docs/PyQt5", None),
'networkx': ('https://networkx.org/documentation/stable', None),
}
numpydoc_show_class_members = False
......
gui\_utils package
==================
gui\_utils.about\_window module
-------------------------------
.. automodule:: b_asic.gui_utils.about_window
:members:
:undoc-members:
......@@ -28,4 +28,6 @@ Table of Contents
api/index
GUI
scheduler_gui
gui_utils
examples/index
research
Research functions
==================
research.interleaver module
---------------------------
.. automodule:: b_asic.research.interleaver
:members:
:undoc-members:
File moved
File moved
File moved
File moved
......@@ -13,6 +13,7 @@ dependencies = [
"graphviz>=0.19",
"matplotlib",
"setuptools_scm[toml]>=6.2",
"networkx",
]
classifiers = [
"Intended Audience :: Education",
......