Skip to content
Snippets Groups Projects
Commit 4b997e02 authored by Mikael Henriksson's avatar Mikael Henriksson :runner:
Browse files

Add graph_color_cell_assignment method to ProcessCollection

parent 078ac59c
No related branches found
No related tags found
1 merge request!244WIP: VHDL code generation for register and memory based storages
......@@ -8,7 +8,7 @@ from matplotlib.axes import Axes
from matplotlib.ticker import MaxNLocator
from b_asic._preferences import LATENCY_COLOR
from b_asic.process import Process
from b_asic.process import MemoryVariable, PlainMemoryVariable, Process
# Default latency coloring RGB tuple
_LATENCY_COLOR = tuple(c / 255 for c in LATENCY_COLOR)
......@@ -34,6 +34,53 @@ def _sorted_nicely(to_be_sorted: Iterable[_T]) -> List[_T]:
return sorted(to_be_sorted, key=alphanum_key)
def _sanitize_port_option(
read_ports: Optional[int] = None,
write_ports: Optional[int] = None,
total_ports: Optional[int] = None,
) -> Tuple[int, int, int]:
"""
General port sanitization function, to test if a port specification makes sense.
Raises ValueError if the port specification is in-proper.
Parameters
----------
read_ports : int, optional
The number of read ports.
write_ports : int, optional
The number of write ports.
total_ports : int, optional
The total number of ports
Returns
-------
Returns a triple int tuple (read_ports, write_ports, total_ports) equal to the input, or sanitized if one of the input equals None.
If total_ports is set to None at the input, it is set to read_ports+write_ports at the output.
If read_ports or write_ports is set to None at the input, it is set to total_ports at the output.
"""
if total_ports is None:
if read_ports is None or write_ports is None:
raise ValueError(
"If total_ports is unset, both read_ports and write_ports"
" must be provided."
)
else:
total_ports = read_ports + write_ports
else:
read_ports = total_ports if read_ports is None else read_ports
write_ports = total_ports if write_ports is None else write_ports
if total_ports < read_ports:
raise ValueError(
f'Total ports ({total_ports}) less then read ports ({read_ports})'
)
if total_ports < write_ports:
raise ValueError(
f'Total ports ({total_ports}) less then write ports ({write_ports})'
)
return (read_ports, write_ports, total_ports)
def draw_exclusion_graph_coloring(
exclusion_graph: nx.Graph,
color_dict: Dict[Process, int],
......@@ -75,6 +122,7 @@ def draw_exclusion_graph_coloring(
'#0000ff',
'#ff00aa',
'#ffaa00',
'#ffffff',
'#00ffaa',
'#aaff00',
'#aa00ff',
......@@ -85,6 +133,7 @@ def draw_exclusion_graph_coloring(
'#aaaa00',
'#aa00aa',
'#00aaaa',
'#666666',
]
if color_list is None:
node_color_dict = {k: COLOR_LIST[v] for k, v in color_dict.items()}
......@@ -288,17 +337,9 @@ class ProcessCollection:
nx.Graph
"""
if total_ports is None:
if read_ports is None or write_ports is None:
raise ValueError(
"If total_ports is unset, both read_ports and write_ports"
" must be provided."
)
else:
total_ports = read_ports + write_ports
else:
read_ports = total_ports if read_ports is None else read_ports
write_ports = total_ports if write_ports is None else write_ports
read_ports, write_ports, total_ports = _sanitize_port_option(
read_ports, write_ports, total_ports
)
# Guard for proper read/write port settings
if read_ports != 1 or write_ports != 1:
......@@ -359,13 +400,37 @@ class ProcessCollection:
t1 = set(
range(
process1.start_time,
process1.start_time + process1.execution_time,
min(
process1.start_time + process1.execution_time,
self._schedule_time,
),
)
).union(
set(
range(
0,
process1.start_time
+ process1.execution_time
- self._schedule_time,
)
)
)
t2 = set(
range(
process2.start_time,
process2.start_time + process2.execution_time,
min(
process2.start_time + process2.execution_time,
self._schedule_time,
),
)
).union(
set(
range(
0,
process2.start_time
+ process2.execution_time
- self._schedule_time,
)
)
)
if t1.intersection(t2):
......@@ -448,17 +513,9 @@ class ProcessCollection:
-------
A set of new ProcessCollection objects with the process splitting.
"""
if total_ports is None:
if read_ports is None or write_ports is None:
raise ValueError(
"If total_ports is unset, both read_ports and write_ports"
" must be provided."
)
else:
total_ports = read_ports + write_ports
else:
read_ports = total_ports if read_ports is None else read_ports
write_ports = total_ports if write_ports is None else write_ports
read_ports, write_ports, total_ports = _sanitize_port_option(
read_ports, write_ports, total_ports
)
if heuristic == "graph_color":
return self._split_ports_graph_color(read_ports, write_ports, total_ports)
else:
......@@ -554,15 +611,18 @@ class ProcessCollection:
self,
coloring_strategy: str = "saturation_largest_first",
) -> Dict[int, "ProcessCollection"]:
"""graph_color_cell_assignment.
"""
Perform cell assignment of the processes in this collection using graph coloring with networkx.coloring.greedy_color.
Two or more processes can share a single cell if, and only if, they have no overlaping time alive.
Parameters
----------
coloring_strategy : str, default: "saturation_largest_first"
Graph coloring strategy passed to networkx.coloring.greedy_color().
Returns
-------
Dict[int, "ProcessCollection"]
Dict[int, ProcessCollection]
"""
cell_assignment: Dict[int, ProcessCollection] = dict()
......@@ -570,15 +630,22 @@ class ProcessCollection:
coloring: Dict[Process, int] = nx.coloring.greedy_color(
exclusion_graph, strategy=coloring_strategy
)
for process, cell in coloring.items():
try:
cell_assignment[cell].add_process(process)
except:
cell_assignment[cell] = ProcessCollection(set(), self._schedule_time)
cell_assignment[cell].add_process(process)
return cell_assignment
def left_edge_cell_assignment(self) -> Dict[int, "ProcessCollection"]:
"""
Perform left edge cell assignment of this process collection.
Perform cell assignment of the processes in this collection using the left-edge algorithm.
Two or more processes can share a single cell if, and only if, they have no overlaping time alive.
Returns
-------
Dict[Process, int]
Dict[int, ProcessCollection]
"""
next_empty_cell = 0
cell_assignment: Dict[int, ProcessCollection] = dict()
......@@ -613,6 +680,9 @@ class ProcessCollection:
def generate_memory_based_storage_vhdl(
self,
filename: str,
read_ports: Optional[int] = None,
write_ports: Optional[int] = None,
total_ports: Optional[int] = None,
):
"""
Generate VHDL code for memory based storage of processes (MemoryVariables).
......@@ -621,7 +691,53 @@ class ProcessCollection:
----------
filename : str
Filename of output file.
read_ports : int, optional
The number of read ports used when splitting process collection based on
memory variable access. If total ports in unset, this parameter has to be set
and total_ports is assumed to be read_ports + write_ports.
write_ports : int, optional
The number of write ports used when splitting process collection based on
memory variable access. If total ports is unset, this parameter has to be set
and total_ports is assumed to be read_ports + write_ports.
total_ports : int, optional
The total number of ports used when splitting process collection based on
memory variable access.
"""
# Check that hardware can be generated for the ProcessCollection...
raise NotImplementedError("Not implemented yet!")
# Check that this is a ProcessCollection of (Plain)MemoryVariables
is_memory_variable = all(
isinstance(process, MemoryVariable) for process in self._collection
)
is_plain_memory_variable = all(
isinstance(process, PlainMemoryVariable) for process in self._collection
)
if not (is_memory_variable or is_plain_memory_variable):
raise ValueError(
"HDL can only be generated for ProcessCollection of"
" (Plain)MemoryVariables"
)
# Sanitize port settings
read_ports, write_ports, total_ports = _sanitize_port_option(
read_ports, write_ports, total_ports
)
# Make sure that concurrent reads/writes do not surpass the port setting
for mv in self:
filter_write = lambda p: p.start_time == mv.start_time
filter_read = (
lambda p: (p.start_time + p.execution_time) & self._schedule_time
== mv.start_time + mv.execution_time % self._schedule_time
)
if len(list(filter(filter_write, self))) > write_ports + 1:
raise ValueError(
f'More than {write_ports} write ports needed to generate HDL for'
' this ProcessCollection'
)
if len(list(filter(filter_read, self))) > read_ports + 1:
raise ValueError(
f'More than {read_ports} read ports needed to generate HDL for this'
' ProcessCollection'
)
# raise NotImplementedError("Not implemented yet!")
......@@ -3,11 +3,12 @@ import pickle
import matplotlib.pyplot as plt
import pytest
from b_asic.process import PlainMemoryVariable
from b_asic.research.interleaver import (
generate_matrix_transposer,
generate_random_interleaver,
)
from b_asic.resources import ProcessCollection
from b_asic.resources import ProcessCollection, draw_exclusion_graph_coloring
class TestProcessCollectionPlainMemoryVariable:
......@@ -33,11 +34,20 @@ class TestProcessCollectionPlainMemoryVariable:
def test_left_edge_cell_assignment(self, simple_collection: ProcessCollection):
fig, ax = plt.subplots(1, 2)
assignment = simple_collection.left_edge_cell_assignment()
for cell in assignment.keys():
for cell in assignment:
assignment[cell].plot(ax=ax[1], row=cell)
simple_collection.plot(ax[0])
return fig
def test_cell_assignment_matrix_transposer(self):
collection = generate_matrix_transposer(4, min_lifetime=5)
assignment_left_edge = collection.left_edge_cell_assignment()
assignment_graph_color = collection.graph_color_cell_assignment(
coloring_strategy='saturation_largest_first'
)
assert len(assignment_left_edge.keys()) == 18
assert len(assignment_graph_color.keys()) == 16
# Issue: #175
def test_interleaver_issue175(self):
with open('test/fixtures/interleaver-two-port-issue175.p', 'rb') as f:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment