Newer
Older

Mikael Henriksson
committed
from collections import Counter, defaultdict
from functools import reduce

Mikael Henriksson
committed
from math import log2
from typing import Dict, Iterable, List, Optional, Tuple, TypeVar, Union
import matplotlib.pyplot as plt
import networkx as nx
from matplotlib.axes import Axes
from matplotlib.ticker import MaxNLocator

Mikael Henriksson
committed
from b_asic._preferences import LATENCY_COLOR, WARNING_COLOR
from b_asic.codegen.vhdl.common import is_valid_vhdl_identifier

Mikael Henriksson
committed
from b_asic.process import (
MemoryProcess,
MemoryVariable,
OperatorProcess,
PlainMemoryVariable,
Process,
)
from b_asic.types import TypeName
# Default latency coloring RGB tuple
_LATENCY_COLOR = tuple(c / 255 for c in LATENCY_COLOR)

Mikael Henriksson
committed
_WARNING_COLOR = tuple(c / 255 for c in WARNING_COLOR)

Mikael Henriksson
committed
#
# Human-intuitive sorting:
# https://stackoverflow.com/questions/2669059/how-to-sort-alpha-numeric-set-in-python
#
# Typing '_T' to help Pyright propagate type-information
#
_T = TypeVar('_T')

Mikael Henriksson
committed
def _sorted_nicely(to_be_sorted: Iterable[_T]) -> List[_T]:
"""Sort the given iterable in the way that humans expect."""
def convert(text):
return int(text) if text.isdigit() else text
def alphanum_key(key):
return [convert(c) for c in re.split('([0-9]+)', str(key))]
return sorted(to_be_sorted, key=alphanum_key)
def _sanitize_port_option(
read_ports: Optional[int] = None,
write_ports: Optional[int] = None,
total_ports: Optional[int] = None,
) -> Tuple[int, int, int]:
"""
General port sanitization function used to test if a port specification makes sense.
Raises ValueError if the port specification is in-proper.
Parameters
----------
read_ports : int, optional
The number of read ports.
write_ports : int, optional
The number of write ports.
total_ports : int, optional
Returns
-------
Returns a triple int tuple (read_ports, write_ports, total_ports) equal to the
input, or sanitized if one of the input equals None. If total_ports is set to None
at the input, it is set to read_ports+write_ports at the output. If read_ports or
write_ports is set to None at the input, it is set to total_ports at the output.
"""
if total_ports is None:
if read_ports is None or write_ports is None:
raise ValueError(
"If total_ports is unset, both read_ports and write_ports"
" must be provided."
)
else:
total_ports = read_ports + write_ports
else:
read_ports = total_ports if read_ports is None else read_ports
write_ports = total_ports if write_ports is None else write_ports
if total_ports < read_ports:
raise ValueError(
f'Total ports ({total_ports}) less then read ports ({read_ports})'
)
if total_ports < write_ports:
raise ValueError(
f'Total ports ({total_ports}) less then write ports ({write_ports})'
)
return read_ports, write_ports, total_ports
def draw_exclusion_graph_coloring(
exclusion_graph: nx.Graph,
color_dict: Dict[Process, int],
ax: Optional[Axes] = None,
color_list: Optional[Union[List[str], List[Tuple[float, float, float]]]] = None,
**kwargs,

Mikael Henriksson
committed
Helper function for drawing colored exclusion graphs.
Example usage:
import networkx as nx
import matplotlib.pyplot as plt
_, ax = plt.subplots()
exclusion_graph = collection.create_exclusion_graph_from_ports(
read_ports=1,
write_ports=1,
total_ports=2,
)
coloring = nx.greedy_color(exclusion_graph)
draw_exclusion_graph_coloring(exclusion_graph, coloring, ax=ax)
exclusion_graph : :class:`networkx.Graph`
The :class:`networkx.Graph` exclusion graph object that is to be drawn.
color_dict : dict
A dict where keys are :class:`~b_asic.process.Process` objects and values are
integers representing colors. These dictionaries are automatically generated by
:func:`networkx.algorithms.coloring.greedy_color`.
A Matplotlib :class:`~matplotlib.axes.Axes` object to draw the exclusion graph.
color_list : iterable of color, optional
A list of colors in Matplotlib format.
**kwargs : Any
Named arguments passed on to :func:`networkx.draw_networkx`
"""
COLOR_LIST = [
'#aa0000',
'#00aa00',
'#0000ff',
'#ff00aa',
'#ffaa00',
'#ffffff',
'#00ffaa',
'#aaff00',
'#aa00ff',
'#00aaff',
'#ff0000',
'#00ff00',
'#0000aa',
'#aaaa00',
'#aa00aa',
'#00aaaa',
'#666666',
]
if color_list is None:
node_color_dict = {k: COLOR_LIST[v] for k, v in color_dict.items()}
else:
node_color_dict = {k: color_list[v] for k, v in color_dict.items()}
node_color_list = [node_color_dict[node] for node in exclusion_graph]
nx.draw_networkx(
exclusion_graph,
node_color=node_color_list,
ax=ax,
pos=nx.spring_layout(exclusion_graph, seed=1),
**kwargs,
class _ForwardBackwardEntry:
def __init__(
self,
inputs: Optional[List[Process]] = None,
outputs: Optional[List[Process]] = None,
regs: Optional[List[Optional[Process]]] = None,
back_edge_to: Optional[Dict[int, int]] = None,
back_edge_from: Optional[Dict[int, int]] = None,
outputs_from: Optional[int] = None,
):
"""
Single entry in a _ForwardBackwardTable.
Aggregate type of input, output and list of registers.
Parameters
----------
regs
back_edge_to : dict, optional
Dictionary containing back edges of this entry to registers in the next
entry.

Mikael Henriksson
committed
back_edge_from : dict, optional
Dictionary containing the back edge of the previous entry to registers in
this entry.
outputs_from : int, optional
"""
self.inputs: List[Process] = [] if inputs is None else inputs
self.outputs: List[Process] = [] if outputs is None else outputs
self.regs: List[Optional[Process]] = [] if regs is None else regs
self.back_edge_to: Dict[int, int] = {} if back_edge_to is None else back_edge_to
self.back_edge_from: Dict[int, int] = (
{} if back_edge_from is None else back_edge_from
)
self.outputs_from = outputs_from
class _ForwardBackwardTable:
def __init__(self, collection: 'ProcessCollection'):
"""
Forward-Backward allocation table for ProcessCollections.
This structure implements the forward-backward register allocation algorithm,
which is used to generate hardware from MemoryVariables in a ProcessCollection.
Parameters
----------
collection : ProcessCollection
ProcessCollection to apply forward-backward allocation on
"""
# Generate an alive variable list
self._collection = set(collection.collection)
self._live_variables: List[int] = [0] * collection.schedule_time
for mv in self._collection:
stop_time = mv.start_time + mv.execution_time
for alive_time in range(mv.start_time, stop_time):
self._live_variables[alive_time % collection.schedule_time] += 1
# First, create an empty forward-backward table with the right dimensions
self.table: List[_ForwardBackwardEntry] = []
for _ in range(collection.schedule_time):
entry = _ForwardBackwardEntry()
# https://github.com/microsoft/pyright/issues/1073
for _ in range(max(self._live_variables)):
entry.regs.append(None)
self.table.append(entry)
# Insert all processes (one per time-slot) to the table input
# TODO: "Input each variable at the time step corresponding to the beginning of
# its lifetime. If multiple variables are input in a given cycle, these
# are allocated to multiple registers such that the variable with the
# longest lifetime is allocated to the initial register and the other
# variables are allocated to consecutive registers in decreasing order
# of lifetime." -- K. Parhi
for mv in collection:
self.table[mv.start_time].inputs.append(mv)
if mv.execution_time:
self.table[(mv.start_time + 1) % collection.schedule_time].regs[0] = mv
else:
self.table[mv.start_time].outputs.append(mv)
self.table[mv.start_time].outputs_from = -1
# Forward-backward allocation
forward = True
while not self._forward_backward_is_complete():
if forward:
self._do_forward_allocation()
else:
self._do_single_backward_allocation()
forward = not forward
def _forward_backward_is_complete(self) -> bool:
s = {proc for e in self.table for proc in e.outputs}
return len(self._collection - s) == 0
def _do_forward_allocation(self):
"""
Forward all Processes as far as possible in the register chain.
Processes are forwarded until they reach their end time (at which they are
added to the output list), or until they reach the end of the register chain.
"""
rows = len(self.table)
cols = len(self.table[0].regs)
# Note that two passes of the forward allocation need to be done, since
# variables may loop around the schedule cycle boundary.
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
for _ in range(2):
for time, entry in enumerate(self.table):
for reg_idx, reg in enumerate(entry.regs):
if reg is not None:
reg_end_time = (reg.start_time + reg.execution_time) % rows
if reg_end_time == time:
if reg not in self.table[time].outputs:
self.table[time].outputs.append(reg)
self.table[time].outputs_from = reg_idx
elif reg_idx != cols - 1:
next_row = (time + 1) % rows
next_col = reg_idx + 1
if self.table[next_row].regs[next_col] not in (None, reg):
cell = self.table[next_row].regs[next_col]
raise ValueError(
f'Can\'t forward allocate {reg} in row={time},'
f' col={reg_idx} to next_row={next_row},'
f' next_col={next_col} (cell contains: {cell})'
)
else:
self.table[(time + 1) % rows].regs[reg_idx + 1] = reg
def _do_single_backward_allocation(self):
"""
Perform backward allocation of Processes in the allocation table.
"""
rows = len(self.table)
cols = len(self.table[0].regs)
outputs = {out for e in self.table for out in e.outputs}
#
# Pass #1: Find any (one) non-dead variable from the last register and try to
# backward allocate it to a previous register where it is not blocking an open
# path. This heuristic helps minimize forward allocation moves later.
#
for time, entry in enumerate(self.table):
reg = entry.regs[-1]
if reg is not None and reg not in outputs:
next_entry = self.table[(time + 1) % rows]
for nreg_idx, nreg in enumerate(next_entry.regs):
if nreg is None and (
nreg_idx == 0 or entry.regs[nreg_idx - 1] is not None
):
next_entry.regs[nreg_idx] = reg
entry.back_edge_to[cols - 1] = nreg_idx
next_entry.back_edge_from[nreg_idx] = cols - 1
return
#
# Pass #2: Backward allocate the first non-dead variable from the last
# registers to an empty register.
#
for time, entry in enumerate(self.table):
reg = entry.regs[-1]
if reg is not None and reg not in outputs:
next_entry = self.table[(time + 1) % rows]
for nreg_idx, nreg in enumerate(next_entry.regs):
if nreg is None:
next_entry.regs[nreg_idx] = reg
entry.back_edge_to[cols - 1] = nreg_idx

Mikael Henriksson
committed
next_entry.back_edge_from[nreg_idx] = cols - 1
return
# All passes failed, raise exception...
raise ValueError(
"Can't backward allocate any variable. This should not happen."
)
def __getitem__(self, key):
return self.table[key]
def __iter__(self):
yield from self.table
def __len__(self):
return len(self.table)
def __str__(self):
# ANSI escape codes for coloring in the forward-backward table string
GREEN_BACKGROUND_ANSI = "\u001b[42m"
BROWN_BACKGROUND_ANSI = "\u001b[43m"
RESET_BACKGROUND_ANSI = "\033[0m"
# Text width of input and output column
def lst_w(proc_lst):
return reduce(lambda n, p: n + len(str(p)) + 1, proc_lst, 0)
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
input_col_w = max(5, max(lst_w(pl.inputs) for pl in self.table) + 1)
output_col_w = max(5, max(lst_w(pl.outputs) for pl in self.table) + 1)
# Text width of register columns
reg_col_w = 0
for entry in self.table:
for reg in entry.regs:
reg_col_w = max(len(str(reg)), reg_col_w)
reg_col_w = max(4, reg_col_w + 2)
# Header row of the string
res = f' T |{"In":^{input_col_w}}|'
for i in range(max(self._live_variables)):
reg = f'R{i}'
res += f'{reg:^{reg_col_w}}|'
res += f'{"Out":^{output_col_w}}|'
res += '\n'
res += (
6 + input_col_w + (reg_col_w + 1) * max(self._live_variables) + output_col_w
) * '-' + '\n'
for time, entry in enumerate(self.table):
# Time
res += f'{time:^3}| '
# Input column
inputs_str = ''
for input_ in entry.inputs:
inputs_str += input_.name + ','
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
if inputs_str:
inputs_str = inputs_str[:-1]
res += f'{inputs_str:^{input_col_w-1}}|'
# Register columns
for reg_idx, reg in enumerate(entry.regs):
if reg is None:
res += " " * reg_col_w + "|"
else:
if reg_idx in entry.back_edge_to:
res += f'{GREEN_BACKGROUND_ANSI}'
res += f'{reg.name:^{reg_col_w}}'
res += f'{RESET_BACKGROUND_ANSI}|'
elif reg_idx in entry.back_edge_from:
res += f'{BROWN_BACKGROUND_ANSI}'
res += f'{reg.name:^{reg_col_w}}'
res += f'{RESET_BACKGROUND_ANSI}|'
else:
res += f'{reg.name:^{reg_col_w}}' + "|"
# Output column
outputs_str = ''
for output in entry.outputs:
outputs_str += output.name + ','
if outputs_str:
outputs_str = outputs_str[:-1]
if entry.outputs_from is not None:
outputs_str += f"({entry.outputs_from})"
res += f'{outputs_str:^{output_col_w}}|'
res += '\n'
return res
r"""
Collection of :class:`~b_asic.process.Process` objects.
collection : Iterable of :class:`~b_asic.process.Process` objects
The :class:`~b_asic.process.Process` objects forming this
:class:`~b_asic.resources.ProcessCollection`.

Mikael Henriksson
committed
schedule_time : int
The scheduling time associated with this
:class:`~b_asic.resources.ProcessCollection`.
Whether the processes operate cyclically, i.e., if time
.. math:: t = t \bmod T_{\textrm{schedule}}.
schedule_time: int,
cyclic: bool = False,
):
self._collection = list(collection)
self._schedule_time = schedule_time
self._cyclic = cyclic
def collection(self) -> List[Process]:
@property
def schedule_time(self) -> int:
return self._schedule_time

Mikael Henriksson
committed
return len(self.collection)
Add a :class:`~b_asic.process.Process`.
process : :class:`~b_asic.process.Process`
The :class:`~b_asic.process.Process` object to add.
if process in self.collection:
raise ValueError("Process already in ProcessCollection")
self.collection.append(process)

Mikael Henriksson
committed
def remove_process(self, process: Process):
"""
Remove a :class:`~b_asic.process.Process`.

Mikael Henriksson
committed
Raises :class:`KeyError` if the specified :class:`~b_asic.process.Process` is
not in this collection.

Mikael Henriksson
committed
Parameters
----------
process : :class:`~b_asic.process.Process`
The :class:`~b_asic.process.Process` object to remove from this collection.

Mikael Henriksson
committed
"""
if process not in self.collection:
raise KeyError(
f"Can't remove process: '{process}', as it is not in collection."
)
else:
self.collection.remove(process)
def __contains__(self, process: Process) -> bool:
"""
Test if a process is part of this ProcessCollection
Parameters
----------
process : :class:`~b_asic.process.Process`
The process to test.
"""
return process in self.collection
bar_color: Union[str, Tuple[float, ...]] = _LATENCY_COLOR,
marker_color: Union[str, Tuple[float, ...]] = "black",
marker_read: str = "X",
marker_write: str = "o",
show_markers: bool = True,
row: Optional[int] = None,

Mikael Henriksson
committed
allow_excessive_lifetimes: bool = False,
Plot all :class:`~b_asic.process.Process` objects of this
:class:`~b_asic.resources.ProcessCollection` in a lifetime diagram.
If the ``ax`` parameter is not specified, a new Matplotlib figure is created.
Raises :class:`KeyError` if any :class:`~b_asic.process.Process` lifetime
exceeds this :class:`~b_asic.resources.ProcessCollection` schedule time,
unless ``allow_excessive_lifetimes`` is specified. In that case,
:class:`~b_asic.process.Process` objects whose lifetime exceed the schedule
time are drawn using the B-ASIC warning color.
Parameters
----------
ax : :class:`matplotlib.axes.Axes`, optional
Matplotlib :class:`~matplotlib.axes.Axes` object to draw this lifetime chart
onto. If not provided (i.e., set to None), this method will return a new
Axes object.
show_name : bool, default: True
Show name of all processes in the lifetime chart.
bar_color : color, optional
Bar color in lifetime chart.
marker_color : color, default 'black'
Color for read and write marker.
marker_read : str, default 'o'
Marker at read time in the lifetime chart.
marker_write : str, default 'x'
Marker at write time in the lifetime chart.
show_markers : bool, default True
Show markers at read and write times.
row : int, optional
Render all processes in this collection on a specified row in the Matplotlib
axes object. Defaults to None, which renders all processes on separate rows.
This option is useful when drawing cell assignments.

Mikael Henriksson
committed
allow_excessive_lifetimes : bool, default False
If set to true, the plot method allows plotting collections of variables
with a longer lifetime than the schedule time.
ax : :class:`matplotlib.axes.Axes`
Associated Matplotlib Axes (or array of Axes) object.
if ax is None:
_, _ax = plt.subplots()
else:
_ax = ax

Mikael Henriksson
committed
# Lifetime chart left and right padding
max_execution_time = max(process.execution_time for process in self._collection)

Mikael Henriksson
committed
if not allow_excessive_lifetimes and max_execution_time > self._schedule_time:
# Schedule time needs to be greater than or equal to the maximum process
# lifetime
raise ValueError(
f'Schedule time: {self._schedule_time} < Max execution'

Mikael Henriksson
committed
# Generate the life-time chart
for i, process in enumerate(_sorted_nicely(self._collection)):

Mikael Henriksson
committed
bar_start = process.start_time
bar_end = bar_start + process.execution_time
bar_start = (
bar_start
if process.execution_time == 0
else bar_start % self._schedule_time
)

Mikael Henriksson
committed
self.schedule_time
if bar_end and bar_end % self._schedule_time == 0

Mikael Henriksson
committed
else bar_end % self._schedule_time
)
if show_markers:
_ax.scatter( # type: ignore
y=bar_row + 1,
marker=marker_write,
color=marker_color,
zorder=10,
)
for end_time in process.read_times:
end_time = (

Mikael Henriksson
committed
self.schedule_time
if end_time and end_time % self.schedule_time == 0
else end_time % self._schedule_time
)
_ax.scatter( # type: ignore
x=end_time,
y=bar_row + 1,
marker=marker_read,
color=marker_color,
zorder=10,
)

Mikael Henriksson
committed
if process.execution_time > self.schedule_time:
# Execution time longer than schedule time, draw with warning color

Mikael Henriksson
committed
_ax.broken_barh( # type: ignore
[(0, self.schedule_time)],
(bar_row + 0.55, 0.9),
color=_WARNING_COLOR,
)
elif process.execution_time == 0:

Mikael Henriksson
committed
# Execution time zero, draw a slim bar
_ax.broken_barh( # type: ignore
[(PAD_L + bar_start, bar_end - bar_start - PAD_L - PAD_R)],
(bar_row + 0.55, 0.9),
color=bar_color,
)
elif bar_end > bar_start:
_ax.broken_barh( # type: ignore
(bar_row + 0.55, 0.9),
color=bar_color,

Mikael Henriksson
committed
else: # bar_end <= bar_start
_ax.broken_barh( # type: ignore
[
(
PAD_L + bar_start,
self._schedule_time - bar_start - PAD_L,
)
],
(bar_row + 0.55, 0.9),
color=bar_color,
)
_ax.broken_barh( # type: ignore
[(0, bar_end - PAD_R)], (bar_row + 0.55, 0.9), color=bar_color
_ax.annotate( # type: ignore
(bar_start + PAD_L + 0.025, bar_row + 1.00),
_ax.grid(True) # type: ignore
_ax.xaxis.set_major_locator(
MaxNLocator(integer=True, min_n_ticks=1)
) # type: ignore
_ax.yaxis.set_major_locator(
MaxNLocator(integer=True, min_n_ticks=1)
) # type: ignore
_ax.set_xlim(0, self._schedule_time) # type: ignore
_ax.set_ylim(0.25, len(self._collection) + 0.75) # type: ignore
else:
pass
def show(
self,
*,
show_name: bool = True,
bar_color: Union[str, Tuple[float, ...]] = _LATENCY_COLOR,
marker_color: Union[str, Tuple[float, ...]] = "black",
marker_read: str = "X",
marker_write: str = "o",
show_markers: bool = True,
allow_excessive_lifetimes: bool = False,
Display lifetime diagram using the current Matplotlib backend.
Equivalent to creating a Matplotlib figure, passing it and arguments to
:meth:`plot` and invoking :py:meth:`matplotlib.figure.Figure.show`.
Parameters
----------
show_name : bool, default: True
Show name of all processes in the lifetime chart.
bar_color : color, optional
Bar color in lifetime chart.
marker_color : color, default 'black'
Color for read and write marker.
marker_read : str, default 'o'
Marker at read time in the lifetime chart.
marker_write : str, default 'x'
Marker at write time in the lifetime chart.
show_markers : bool, default True
Show markers at read and write times.
allow_excessive_lifetimes : bool, default False
If True, the plot method allows plotting collections of variables with
Figure title.
"""
fig, ax = plt.subplots()
self.plot(
ax=ax,
show_name=show_name,
bar_color=bar_color,
marker_color=marker_color,
marker_read=marker_read,
marker_write=marker_write,
show_markers=show_markers,
allow_excessive_lifetimes=allow_excessive_lifetimes,
)
fig.show() # type: ignore

Mikael Henriksson
committed
def create_exclusion_graph_from_ports(
self,
read_ports: Optional[int] = None,
write_ports: Optional[int] = None,
total_ports: Optional[int] = None,
Create an exclusion graph based on concurrent read and write accesses.

Mikael Henriksson
committed
read_ports : int
The number of read ports used when splitting process collection based on
memory variable access.

Mikael Henriksson
committed
write_ports : int
The number of write ports used when splitting process collection based on
memory variable access.

Mikael Henriksson
committed
total_ports : int
The total number of ports used when splitting process collection based on
memory variable access.

Mikael Henriksson
committed
Returns
-------
:class:`networkx.Graph`
An undirected exclusion graph.

Mikael Henriksson
committed
"""
read_ports, write_ports, total_ports = _sanitize_port_option(
read_ports, write_ports, total_ports
)

Mikael Henriksson
committed
# Guard for proper read/write port settings
if read_ports != 1 or write_ports != 1:
raise ValueError(
"Splitting with read and write ports not equal to one with the"
" graph coloring heuristic does not make sense."
)
if total_ports not in (1, 2):
raise ValueError(
"Total ports should be either 1 (non-concurrent reads/writes)"
" or 2 (concurrent read/writes) for graph coloring heuristic."
)
# Create new exclusion graph. Nodes are Processes
exclusion_graph = nx.Graph()
exclusion_graph.add_nodes_from(self._collection)
for node1 in exclusion_graph:
read_time % self.schedule_time for read_time in node1.read_times
node1_start_time = node1.start_time % self.schedule_time
if total_ports == 1 and node1.start_time in node1_stop_times:
print(node1.start_time, node1_stop_times)
raise ValueError("Cannot read and write in same cycle.")

Mikael Henriksson
committed
for node2 in exclusion_graph:
if node1 == node2:
continue
else:
node2_stop_times = tuple(
read_time % self.schedule_time for read_time in node2.read_times
)
node2_start_time = node2.start_time % self.schedule_time
if write_ports == 1 and node1_start_time == node2_start_time:
exclusion_graph.add_edge(node1, node2)
if read_ports == 1 and node1_stop_times.intersection(
node2_stop_times
):
exclusion_graph.add_edge(node1, node2)
if total_ports == 1 and (
node1_start_time in node2_stop_times
or node2_start_time in node1_stop_times
):
exclusion_graph.add_edge(node1, node2)

Mikael Henriksson
committed
return exclusion_graph
def create_exclusion_graph_from_execution_time(self) -> nx.Graph:
"""
Create an exclusion graph from processes overlapping in execution time.
"""
exclusion_graph = nx.Graph()
exclusion_graph.add_nodes_from(self._collection)
for process1 in self._collection:
for process2 in self._collection:
if process1 == process2:
continue
else:
t1 = set(
range(
process1.start_time,
min(
process1.start_time + process1.execution_time,
self._schedule_time,
),
)
).union(
set(
range(
0,
process1.start_time
+ process1.execution_time
- self._schedule_time,
)
min(
process2.start_time + process2.execution_time,
self._schedule_time,
),
)
).union(
set(
range(
0,
process2.start_time
+ process2.execution_time
- self._schedule_time,
)
)
)
if t1.intersection(t2):
exclusion_graph.add_edge(process1, process2)
return exclusion_graph
heuristic: str = "left_edge",
coloring_strategy: str = "saturation_largest_first",
) -> List["ProcessCollection"]:

Mikael Henriksson
committed
"""
Split based on overlapping execution time.

Mikael Henriksson
committed
Parameters
----------
heuristic : {'graph_color', 'left_edge'}, default: 'graph_color'

Mikael Henriksson
committed
The heuristic used when splitting based on execution times.
coloring_strategy : str, default: 'saturation_largest_first'
Node ordering strategy passed to
:func:`networkx.algorithms.coloring.greedy_color`.
This parameter is only considered if *heuristic* is set to 'graph_color'.

Mikael Henriksson
committed
One of
* 'largest_first'
* 'random_sequential'
* 'smallest_last'
* 'independent_set'
* 'connected_sequential_bfs'
* 'connected_sequential_dfs' or 'connected_sequential'
* 'saturation_largest_first' or 'DSATUR'

Mikael Henriksson
committed
Returns
-------
A list of new ProcessCollection objects with the process splitting.

Mikael Henriksson
committed
"""
if heuristic == "graph_color":

Mikael Henriksson
committed
return self._graph_color_assignment(coloring_strategy)

Mikael Henriksson
committed
elif heuristic == "left_edge":
return self._left_edge_assignment()

Mikael Henriksson
committed
else:
raise ValueError(f"Invalid heuristic '{heuristic}'")

Mikael Henriksson
committed
heuristic: str = "left_edge",
read_ports: Optional[int] = None,
write_ports: Optional[int] = None,
total_ports: Optional[int] = None,
) -> List["ProcessCollection"]:
Split based on concurrent read and write accesses.
Parameters
----------
heuristic : str, default: "graph_color"
The heuristic used when splitting this :class:`ProcessCollection`.

Mikael Henriksson
committed
* "left_edge"
The number of read ports used when splitting process collection based on
memory variable access.
The number of write ports used when splitting process collection based on
memory variable access.
The total number of ports used when splitting process collection based on
memory variable access.
A set of new ProcessCollection objects with the process splitting.
read_ports, write_ports, total_ports = _sanitize_port_option(
read_ports, write_ports, total_ports
)
return self._split_ports_graph_color(read_ports, write_ports, total_ports)

Mikael Henriksson
committed
elif heuristic == "left_edge":
return self.split_ports_sequentially(
read_ports,
write_ports,
total_ports,
sequence=sorted(self),
)

Mikael Henriksson
committed
raise ValueError("Invalid heuristic provided.")

Mikael Henriksson
committed
def split_ports_sequentially(
self,
read_ports: int,
write_ports: int,
total_ports: int,
sequence: List[Process],
) -> List["ProcessCollection"]:
"""
Split this collection into multiple new collections by sequentially assigning
processes in the order of `sequence`.
This method takes the processes from `sequence`, in order, and assigns them to

Mikael Henriksson
committed
to multiple new `ProcessCollection` based on port collisions in a first-come
first-served manner. The first `Process` in `sequence` is assigned first, and
the last `Process` in `sequence is assigned last.

Mikael Henriksson
committed
Parameters
----------
read_ports : int
The number of read ports used when splitting process collection based on
memory variable access.
write_ports : int
The number of write ports used when splitting process collection based on
memory variable access.
total_ports : int
The total number of ports used when splitting process collection based on
memory variable access.

Mikael Henriksson
committed
A list of the processes used to determine the order in which processes are
assigned.
Returns
-------
list of `ProcessCollection`
A set of new ProcessCollection objects with the process splitting.

Mikael Henriksson
committed
"""
def ports_collide(proc: Process, collection: ProcessCollection):
"""
Predicate test if insertion of a process `proc` results in colliding ports
when inserted to `collection` based on the `read_ports`, `write_ports`, and
`total_ports`.
"""
# Test the number of concurrent write accesses
collection_writes = defaultdict(int, collection.write_port_accesses())