Newer
Older

Mikael Henriksson
committed
from typing import Dict, Iterable, List, Optional, Set, Tuple, TypeVar, Union
import matplotlib.pyplot as plt
import networkx as nx
from matplotlib.axes import Axes
from matplotlib.ticker import MaxNLocator
from b_asic._preferences import LATENCY_COLOR
# Default latency coloring RGB tuple
_LATENCY_COLOR = tuple(c / 255 for c in LATENCY_COLOR)

Mikael Henriksson
committed
#
# Human-intuitive sorting:
# https://stackoverflow.com/questions/2669059/how-to-sort-alpha-numeric-set-in-python
#
# Typing '_T' to help Pyright propagate type-information
#
_T = TypeVar('_T')

Mikael Henriksson
committed
def _sorted_nicely(to_be_sorted: Iterable[_T]) -> List[_T]:
"""Sort the given iterable in the way that humans expect."""
def convert(text):
return int(text) if text.isdigit() else text
def alphanum_key(key):
return [convert(c) for c in re.split('([0-9]+)', str(key))]
return sorted(to_be_sorted, key=alphanum_key)
def draw_exclusion_graph_coloring(
exclusion_graph: nx.Graph,
color_dict: Dict[Process, int],
ax: Optional[Axes] = None,
color_list: Optional[Union[List[str], List[Tuple[float, float, float]]]] = None,
Draw a colored exclusion graph from the memory assignment.
.. code-block:: python
_, ax = plt.subplots(1, 1)
collection = ProcessCollection(...)
exclusion_graph = collection.create_exclusion_graph_from_overlap()
color_dict = nx.greedy_color(exclusion_graph)
draw_exclusion_graph_coloring(exclusion_graph, color_dict, ax=ax[0])
plt.show()
Parameters
----------
exclusion_graph : nx.Graph
A nx.Graph exclusion graph object that is to be drawn.
color_dict : dict
A dict where keys are :class:`~b_asic.process.Process` objects and values are
integers representing colors. These dictionaries are automatically generated by
:func:`networkx.algorithms.coloring.greedy_color`.
A Matplotlib :class:`~matplotlib.axes.Axes` object to draw the exclusion graph.
color_list : iterable of color, optional
A list of colors in Matplotlib format.
Returns
-------
None
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
"""
COLOR_LIST = [
'#aa0000',
'#00aa00',
'#0000ff',
'#ff00aa',
'#ffaa00',
'#00ffaa',
'#aaff00',
'#aa00ff',
'#00aaff',
'#ff0000',
'#00ff00',
'#0000aa',
'#aaaa00',
'#aa00aa',
'#00aaaa',
]
if color_list is None:
node_color_dict = {k: COLOR_LIST[v] for k, v in color_dict.items()}
else:
node_color_dict = {k: color_list[v] for k, v in color_dict.items()}
node_color_list = [node_color_dict[node] for node in exclusion_graph]
nx.draw_networkx(
exclusion_graph,
node_color=node_color_list,
ax=ax,
pos=nx.spring_layout(exclusion_graph, seed=1),
)
class ProcessCollection:
"""
Collection of one or more processes
Parameters
----------
collection : set of :class:`~b_asic.process.Process` objects
The Process objects forming this ProcessCollection.

Mikael Henriksson
committed
schedule_time : int
Length of the time-axis in the generated graph.
cyclic : bool, default: False
If the processes operates cyclically, i.e., if time 0 == time *schedule_time*.
def __init__(
self,
collection: Set[Process],
schedule_time: int,
cyclic: bool = False,
):
self._collection = collection
self._schedule_time = schedule_time
self._cyclic = cyclic
@property
def collection(self):
return self._collection
def __len__(self):
def add_process(self, process: Process):
"""
Add a new process to this process collection.
Parameters
----------
process : Process
The process object to be added to the collection.
self,
ax: Optional[Axes] = None,
show_name: bool = True,
bar_color: Union[str, Tuple[float, ...]] = _LATENCY_COLOR,
marker_color: Union[str, Tuple[float, ...]] = "black",
marker_read: str = "X",
marker_write: str = "o",
show_markers: bool = True,
row: Optional[int] = None,
Parameters
----------
ax : :class:`matplotlib.axes.Axes`, optional
Matplotlib :class:`~matplotlib.axes.Axes` object to draw this lifetime chart
onto. If not provided (i.e., set to None), this method will return a new
Axes object.
show_name : bool, default: True
Show name of all processes in the lifetime chart.
bar_color : color, optional
Bar color in lifetime chart.
marker_color : color, default 'black'
Color for read and write marker.
marker_write : str, default 'x'
Marker at write time in the lifetime chart.
marker_read : str, default 'o'
Marker at read time in the lifetime chart.
show_markers : bool, default True
Show markers at read and write times.
row : int, optional
Render all processes in this collection on a specified row in the matplotlib axes object.
Defaults to None, which renders all processes on separate rows. This option is useful when
drawing cell assignments.
ax: Associated Matplotlib Axes (or array of Axes) object
if ax is None:
_, _ax = plt.subplots()
else:
_ax = ax

Mikael Henriksson
committed
# Lifetime chart left and right padding
max_execution_time = max(process.execution_time for process in self._collection)
if max_execution_time > self._schedule_time:
# Schedule time needs to be greater than or equal to the maximum process
# lifetime
f'Error: Schedule time: {self._schedule_time} < Max execution'
f' time: {max_execution_time}'

Mikael Henriksson
committed
# Generate the life-time chart
for i, process in enumerate(_sorted_nicely(self._collection)):
bar_row = i if row == None else row
bar_start = process.start_time % self._schedule_time

Mikael Henriksson
committed
bar_end = process.start_time + process.execution_time

Mikael Henriksson
committed
bar_end
if bar_end == self._schedule_time
else bar_end % self._schedule_time
)
if show_markers:
_ax.scatter( # type: ignore
y=bar_row + 1,
marker=marker_write,
color=marker_color,
zorder=10,
)
_ax.scatter( # type: ignore
y=bar_row + 1,
marker=marker_read,
color=marker_color,
zorder=10,
)
if bar_end >= bar_start:
_ax.broken_barh( # type: ignore
(bar_row + 0.55, 0.9),
color=bar_color,
_ax.broken_barh( # type: ignore
[
(
PAD_L + bar_start,
self._schedule_time - bar_start - PAD_L,
)
],
(bar_row + 0.55, 0.9),
color=bar_color,
)
_ax.broken_barh( # type: ignore
[(0, bar_end - PAD_R)], (bar_row + 0.55, 0.9), color=bar_color
_ax.annotate( # type: ignore
(bar_start + PAD_L + 0.025, bar_row + 1.00),
_ax.grid(True) # type: ignore
_ax.xaxis.set_major_locator(MaxNLocator(integer=True)) # type: ignore
_ax.yaxis.set_major_locator(MaxNLocator(integer=True)) # type: ignore
_ax.set_xlim(0, self._schedule_time) # type: ignore
if row == None:
_ax.set_ylim(0.25, len(self._collection) + 0.75) # type: ignore
else:
pass

Mikael Henriksson
committed
def create_exclusion_graph_from_ports(
self,
read_ports: Optional[int] = None,
write_ports: Optional[int] = None,
total_ports: Optional[int] = None,
Create an exclusion graph based on a number of read/write ports.

Mikael Henriksson
committed
read_ports : int
The number of read ports used when splitting process collection based on
memory variable access.

Mikael Henriksson
committed
write_ports : int
The number of write ports used when splitting process collection based on
memory variable access.

Mikael Henriksson
committed
total_ports : int
The total number of ports used when splitting process collection based on
memory variable access.

Mikael Henriksson
committed
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
Returns
-------
nx.Graph
"""
if total_ports is None:
if read_ports is None or write_ports is None:
raise ValueError(
"If total_ports is unset, both read_ports and write_ports"
" must be provided."
)
else:
total_ports = read_ports + write_ports
else:
read_ports = total_ports if read_ports is None else read_ports
write_ports = total_ports if write_ports is None else write_ports
# Guard for proper read/write port settings
if read_ports != 1 or write_ports != 1:
raise ValueError(
"Splitting with read and write ports not equal to one with the"
" graph coloring heuristic does not make sense."
)
if total_ports not in (1, 2):
raise ValueError(
"Total ports should be either 1 (non-concurrent reads/writes)"
" or 2 (concurrent read/writes) for graph coloring heuristic."
)
# Create new exclusion graph. Nodes are Processes
exclusion_graph = nx.Graph()
exclusion_graph.add_nodes_from(self._collection)
for node1 in exclusion_graph:
for node2 in exclusion_graph:
if node1 == node2:
continue
else:
node1_stop_time = node1.start_time + node1.execution_time
node2_stop_time = node2.start_time + node2.execution_time
if total_ports == 1:
# Single-port assignment
if node1.start_time == node2.start_time:
exclusion_graph.add_edge(node1, node2)
elif node1_stop_time == node2_stop_time:
exclusion_graph.add_edge(node1, node2)
elif node1.start_time == node2_stop_time:
exclusion_graph.add_edge(node1, node2)
elif node1_stop_time == node2.start_time:
exclusion_graph.add_edge(node1, node2)
else:
# Dual-port assignment
if node1.start_time == node2.start_time:
exclusion_graph.add_edge(node1, node2)
elif node1_stop_time == node2_stop_time:
exclusion_graph.add_edge(node1, node2)
return exclusion_graph
def create_exclusion_graph_from_execution_time(self) -> nx.Graph:
"""
Generate exclusion graph based on processes overlapping in time
An nx.Graph exclusion graph where nodes are processes and arcs
between two processes indicated overlap in time
"""
exclusion_graph = nx.Graph()
exclusion_graph.add_nodes_from(self._collection)
for process1 in self._collection:
for process2 in self._collection:
if process1 == process2:
continue
else:
t1 = set(
range(
process1.start_time,
process1.start_time + process1.execution_time,
)
)
t2 = set(
range(
process2.start_time,
process2.start_time + process2.execution_time,
)
)
if t1.intersection(t2):
exclusion_graph.add_edge(process1, process2)
return exclusion_graph

Mikael Henriksson
committed
def split_execution_time(
self,
heuristic: str = "graph_color",
coloring_strategy: str = "saturation_largest_first",

Mikael Henriksson
committed
) -> Set["ProcessCollection"]:
"""
Split a ProcessCollection based on overlapping execution time.
Parameters
----------
heuristic : {'graph_color', 'left_edge'}, default: 'graph_color'

Mikael Henriksson
committed
The heuristic used when splitting based on execution times.
coloring_strategy : str, default: 'saturation_largest_first'
Node ordering strategy passed to :func:`networkx.coloring.greedy_color`.
This parameter is only considered if *heuristic* is set to 'graph_color'.

Mikael Henriksson
committed
One of
* 'largest_first'
* 'random_sequential'
* 'smallest_last'
* 'independent_set'
* 'connected_sequential_bfs'
* 'connected_sequential_dfs' or 'connected_sequential'
* 'saturation_largest_first' or 'DSATUR'

Mikael Henriksson
committed
Returns
-------
A set of new ProcessCollection objects with the process splitting.
"""
if heuristic == "graph_color":
exclusion_graph = self.create_exclusion_graph_from_execution_time()
coloring = nx.coloring.greedy_color(
exclusion_graph, strategy=coloring_strategy
)
return self._split_from_graph_coloring(coloring)
elif heuristic == "left_edge":
raise NotImplementedError()
else:
raise ValueError(f"Invalid heuristic '{heuristic}'")
def split_ports(
self,
heuristic: str = "graph_color",
read_ports: Optional[int] = None,
write_ports: Optional[int] = None,
total_ports: Optional[int] = None,
) -> Set["ProcessCollection"]:
"""
Split this process storage based on concurrent read/write times according to some heuristic.
Parameters
----------
heuristic : str, default: "graph_color"
The heuristic used when splitting this ProcessCollection.
The number of read ports used when splitting process collection based on
memory variable access.
The number of write ports used when splitting process collection based on
memory variable access.
The total number of ports used when splitting process collection based on
memory variable access.
A set of new ProcessCollection objects with the process splitting.
"""
if total_ports is None:
if read_ports is None or write_ports is None:

Mikael Henriksson
committed
raise ValueError(
"If total_ports is unset, both read_ports and write_ports"
" must be provided."
)
else:
total_ports = read_ports + write_ports
else:
read_ports = total_ports if read_ports is None else read_ports
write_ports = total_ports if write_ports is None else write_ports
if heuristic == "graph_color":
return self._split_ports_graph_color(read_ports, write_ports, total_ports)

Mikael Henriksson
committed
raise ValueError("Invalid heuristic provided.")

Mikael Henriksson
committed
def _split_ports_graph_color(
self,
read_ports: int,
write_ports: int,
total_ports: int,
coloring_strategy: str = "saturation_largest_first",
) -> Set["ProcessCollection"]:
"""
Parameters
----------

Mikael Henriksson
committed
read_ports : int
The number of read ports used when splitting process collection based on
memory variable access.

Mikael Henriksson
committed
write_ports : int
The number of write ports used when splitting process collection based on
memory variable access.

Mikael Henriksson
committed
total_ports : int
The total number of ports used when splitting process collection based on
memory variable access.
coloring_strategy : str, default: 'saturation_largest_first'
Node ordering strategy passed to :func:`networkx.coloring.greedy_color`

Mikael Henriksson
committed
One of
* 'largest_first'
* 'random_sequential'
* 'smallest_last'
* 'independent_set'
* 'connected_sequential_bfs'
* 'connected_sequential_dfs' or 'connected_sequential'
* 'saturation_largest_first' or 'DSATUR'

Mikael Henriksson
committed
exclusion_graph = self.create_exclusion_graph_from_ports(
read_ports, write_ports, total_ports
)

Mikael Henriksson
committed
# Perform assignment from coloring and return result
coloring = nx.coloring.greedy_color(exclusion_graph, strategy=coloring_strategy)

Mikael Henriksson
committed
return self._split_from_graph_coloring(coloring)
def _split_from_graph_coloring(
self,
coloring: Dict[Process, int],
) -> Set["ProcessCollection"]:
"""
Split :class:`Process` objects into a set of :class:`ProcessesCollection` objects based on a provided graph coloring.
Resulting :class:`ProcessCollection` will have the same schedule time and cyclic
property as self.

Mikael Henriksson
committed
Parameters
----------

Mikael Henriksson
committed
Process->int (color) mappings

Mikael Henriksson
committed
Returns
-------
A set of new ProcessCollections.
"""
process_collection_set_list = [set() for _ in range(max(coloring.values()) + 1)]
process_collection_set_list[color].add(process)
ProcessCollection(process_collection_set, self._schedule_time, self._cyclic)
for process_collection_set in process_collection_set_list
Generate an SVG_ of the resource collection. This is automatically displayed in
e.g. Jupyter Qt console.
self.plot(ax=ax, show_markers=False)
fig.savefig(f, format="svg") # type: ignore
def __repr__(self):
return (
f"ProcessCollection({self._collection}, {self._schedule_time},"
f" {self._cyclic})"
)
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
def __iter__(self):
return iter(self._collection)
def graph_color_cell_assignment(
self,
coloring_strategy: str = "saturation_largest_first",
) -> Dict[int, "ProcessCollection"]:
"""graph_color_cell_assignment.
Parameters
----------
Returns
-------
Dict[int, "ProcessCollection"]
"""
cell_assignment: Dict[int, ProcessCollection] = dict()
exclusion_graph = self.create_exclusion_graph_from_execution_time()
coloring: Dict[Process, int] = nx.coloring.greedy_color(
exclusion_graph, strategy=coloring_strategy
)
return cell_assignment
def left_edge_cell_assignment(self) -> Dict[int, "ProcessCollection"]:
"""
Perform left edge cell assignment of this process collection.
Returns
-------
Dict[Process, int]
"""
next_empty_cell = 0
cell_assignment: Dict[int, ProcessCollection] = dict()
for next_process in sorted(self):
insert_to_new_cell = True
for cell in cell_assignment:
insert_to_this_cell = True
for process in cell_assignment[cell]:
next_process_stop_time = (
next_process.start_time + next_process.execution_time
) % self._schedule_time
if (
next_process.start_time
< process.start_time + process.execution_time
or next_process_stop_time < next_process.start_time
and next_process_stop_time > process.start_time
):
insert_to_this_cell = False
break
if insert_to_this_cell:
cell_assignment[cell].add_process(next_process)
insert_to_new_cell = False
break
if insert_to_new_cell:
cell_assignment[next_empty_cell] = ProcessCollection(
collection=set(), schedule_time=self._schedule_time
)
cell_assignment[next_empty_cell].add_process(next_process)
next_empty_cell += 1
return cell_assignment
def generate_memory_based_storage_vhdl(
self,
filename: str,
):
"""
Generate VHDL code for memory based storage of processes (MemoryVariables).
Parameters
----------
filename : str
Filename of output file.
"""
# Check that hardware can be generated for the ProcessCollection...
raise NotImplementedError("Not implemented yet!")