import io import re from typing import Dict, Iterable, List, Optional, Set, Tuple, TypeVar, Union import matplotlib.pyplot as plt import networkx as nx from matplotlib.axes import Axes from matplotlib.ticker import MaxNLocator from b_asic._preferences import LATENCY_COLOR from b_asic.process import Process # Default latency coloring RGB tuple _LATENCY_COLOR = tuple(c / 255 for c in LATENCY_COLOR) # # Human-intuitive sorting: # https://stackoverflow.com/questions/2669059/how-to-sort-alpha-numeric-set-in-python # # Typing '_T' to help Pyright propagate type-information # _T = TypeVar('_T') def _sorted_nicely(to_be_sorted: Iterable[_T]) -> List[_T]: """Sort the given iterable in the way that humans expect.""" def convert(text): return int(text) if text.isdigit() else text def alphanum_key(key): return [convert(c) for c in re.split('([0-9]+)', str(key))] return sorted(to_be_sorted, key=alphanum_key) def draw_exclusion_graph_coloring( exclusion_graph: nx.Graph, color_dict: Dict[Process, int], ax: Optional[Axes] = None, color_list: Optional[Union[List[str], List[Tuple[float, float, float]]]] = None, ) -> None: """ Draw a colored exclusion graph from the memory assignment. .. code-block:: python _, ax = plt.subplots(1, 1) collection = ProcessCollection(...) exclusion_graph = collection.create_exclusion_graph_from_overlap() color_dict = nx.greedy_color(exclusion_graph) draw_exclusion_graph_coloring(exclusion_graph, color_dict, ax=ax[0]) plt.show() Parameters ---------- exclusion_graph : nx.Graph A nx.Graph exclusion graph object that is to be drawn. color_dict : dict A dict where keys are :class:`~b_asic.process.Process` objects and values are integers representing colors. These dictionaries are automatically generated by :func:`networkx.algorithms.coloring.greedy_color`. ax : :class:`matplotlib.axes.Axes`, optional A Matplotlib :class:`~matplotlib.axes.Axes` object to draw the exclusion graph. color_list : iterable of color, optional A list of colors in Matplotlib format. Returns ------- None """ COLOR_LIST = [ '#aa0000', '#00aa00', '#0000ff', '#ff00aa', '#ffaa00', '#00ffaa', '#aaff00', '#aa00ff', '#00aaff', '#ff0000', '#00ff00', '#0000aa', '#aaaa00', '#aa00aa', '#00aaaa', ] if color_list is None: node_color_dict = {k: COLOR_LIST[v] for k, v in color_dict.items()} else: node_color_dict = {k: color_list[v] for k, v in color_dict.items()} node_color_list = [node_color_dict[node] for node in exclusion_graph] nx.draw_networkx( exclusion_graph, node_color=node_color_list, ax=ax, pos=nx.spring_layout(exclusion_graph, seed=1), ) class ProcessCollection: """ Collection of one or more processes Parameters ---------- collection : set of :class:`~b_asic.process.Process` objects The Process objects forming this ProcessCollection. schedule_time : int Length of the time-axis in the generated graph. cyclic : bool, default: False If the processes operates cyclically, i.e., if time 0 == time *schedule_time*. """ def __init__( self, collection: Set[Process], schedule_time: int, cyclic: bool = False, ): self._collection = collection self._schedule_time = schedule_time self._cyclic = cyclic @property def collection(self): return self._collection def __len__(self): return len(self._collection) def add_process(self, process: Process): """ Add a new process to this process collection. Parameters ---------- process : Process The process object to be added to the collection. """ self._collection.add(process) def plot( self, ax: Optional[Axes] = None, show_name: bool = True, bar_color: Union[str, Tuple[float, ...]] = _LATENCY_COLOR, marker_color: Union[str, Tuple[float, ...]] = "black", marker_read: str = "X", marker_write: str = "o", show_markers: bool = True, row: Optional[int] = None, ): """ Plot a process variable lifetime chart. Parameters ---------- ax : :class:`matplotlib.axes.Axes`, optional Matplotlib :class:`~matplotlib.axes.Axes` object to draw this lifetime chart onto. If not provided (i.e., set to None), this method will return a new Axes object. show_name : bool, default: True Show name of all processes in the lifetime chart. bar_color : color, optional Bar color in lifetime chart. marker_color : color, default 'black' Color for read and write marker. marker_write : str, default 'x' Marker at write time in the lifetime chart. marker_read : str, default 'o' Marker at read time in the lifetime chart. show_markers : bool, default True Show markers at read and write times. row : int, optional Render all processes in this collection on a specified row in the matplotlib axes object. Defaults to None, which renders all processes on separate rows. This option is useful when drawing cell assignments. Returns ------- ax: Associated Matplotlib Axes (or array of Axes) object """ # Set up the Axes object if ax is None: _, _ax = plt.subplots() else: _ax = ax # Lifetime chart left and right padding PAD_L, PAD_R = 0.05, 0.05 max_execution_time = max(process.execution_time for process in self._collection) if max_execution_time > self._schedule_time: # Schedule time needs to be greater than or equal to the maximum process # lifetime raise KeyError( f'Error: Schedule time: {self._schedule_time} < Max execution' f' time: {max_execution_time}' ) # Generate the life-time chart for i, process in enumerate(_sorted_nicely(self._collection)): bar_row = i if row == None else row bar_start = process.start_time % self._schedule_time bar_end = process.start_time + process.execution_time bar_end = ( bar_end if bar_end == self._schedule_time else bar_end % self._schedule_time ) if show_markers: _ax.scatter( # type: ignore x=bar_start, y=bar_row + 1, marker=marker_write, color=marker_color, zorder=10, ) _ax.scatter( # type: ignore x=bar_end, y=bar_row + 1, marker=marker_read, color=marker_color, zorder=10, ) if bar_end >= bar_start: _ax.broken_barh( # type: ignore [(PAD_L + bar_start, bar_end - bar_start - PAD_L - PAD_R)], (bar_row + 0.55, 0.9), color=bar_color, ) else: # bar_end < bar_start _ax.broken_barh( # type: ignore [ ( PAD_L + bar_start, self._schedule_time - bar_start - PAD_L, ) ], (bar_row + 0.55, 0.9), color=bar_color, ) _ax.broken_barh( # type: ignore [(0, bar_end - PAD_R)], (bar_row + 0.55, 0.9), color=bar_color ) if show_name: _ax.annotate( # type: ignore str(process), (bar_start + PAD_L + 0.025, bar_row + 1.00), va="center", ) _ax.grid(True) # type: ignore _ax.xaxis.set_major_locator(MaxNLocator(integer=True)) # type: ignore _ax.yaxis.set_major_locator(MaxNLocator(integer=True)) # type: ignore _ax.set_xlim(0, self._schedule_time) # type: ignore if row == None: _ax.set_ylim(0.25, len(self._collection) + 0.75) # type: ignore else: pass return _ax def create_exclusion_graph_from_ports( self, read_ports: Optional[int] = None, write_ports: Optional[int] = None, total_ports: Optional[int] = None, ) -> nx.Graph: """ Create an exclusion graph based on a number of read/write ports. Parameters ---------- read_ports : int The number of read ports used when splitting process collection based on memory variable access. write_ports : int The number of write ports used when splitting process collection based on memory variable access. total_ports : int The total number of ports used when splitting process collection based on memory variable access. Returns ------- nx.Graph """ if total_ports is None: if read_ports is None or write_ports is None: raise ValueError( "If total_ports is unset, both read_ports and write_ports" " must be provided." ) else: total_ports = read_ports + write_ports else: read_ports = total_ports if read_ports is None else read_ports write_ports = total_ports if write_ports is None else write_ports # Guard for proper read/write port settings if read_ports != 1 or write_ports != 1: raise ValueError( "Splitting with read and write ports not equal to one with the" " graph coloring heuristic does not make sense." ) if total_ports not in (1, 2): raise ValueError( "Total ports should be either 1 (non-concurrent reads/writes)" " or 2 (concurrent read/writes) for graph coloring heuristic." ) # Create new exclusion graph. Nodes are Processes exclusion_graph = nx.Graph() exclusion_graph.add_nodes_from(self._collection) for node1 in exclusion_graph: for node2 in exclusion_graph: if node1 == node2: continue else: node1_stop_time = node1.start_time + node1.execution_time node2_stop_time = node2.start_time + node2.execution_time if total_ports == 1: # Single-port assignment if node1.start_time == node2.start_time: exclusion_graph.add_edge(node1, node2) elif node1_stop_time == node2_stop_time: exclusion_graph.add_edge(node1, node2) elif node1.start_time == node2_stop_time: exclusion_graph.add_edge(node1, node2) elif node1_stop_time == node2.start_time: exclusion_graph.add_edge(node1, node2) else: # Dual-port assignment if node1.start_time == node2.start_time: exclusion_graph.add_edge(node1, node2) elif node1_stop_time == node2_stop_time: exclusion_graph.add_edge(node1, node2) return exclusion_graph def create_exclusion_graph_from_execution_time(self) -> nx.Graph: """ Generate exclusion graph based on processes overlapping in time Returns ------- An nx.Graph exclusion graph where nodes are processes and arcs between two processes indicated overlap in time """ exclusion_graph = nx.Graph() exclusion_graph.add_nodes_from(self._collection) for process1 in self._collection: for process2 in self._collection: if process1 == process2: continue else: t1 = set( range( process1.start_time, process1.start_time + process1.execution_time, ) ) t2 = set( range( process2.start_time, process2.start_time + process2.execution_time, ) ) if t1.intersection(t2): exclusion_graph.add_edge(process1, process2) return exclusion_graph def split_execution_time( self, heuristic: str = "graph_color", coloring_strategy: str = "saturation_largest_first", ) -> Set["ProcessCollection"]: """ Split a ProcessCollection based on overlapping execution time. Parameters ---------- heuristic : {'graph_color', 'left_edge'}, default: 'graph_color' The heuristic used when splitting based on execution times. coloring_strategy : str, default: 'saturation_largest_first' Node ordering strategy passed to :func:`networkx.coloring.greedy_color`. This parameter is only considered if *heuristic* is set to 'graph_color'. One of * 'largest_first' * 'random_sequential' * 'smallest_last' * 'independent_set' * 'connected_sequential_bfs' * 'connected_sequential_dfs' or 'connected_sequential' * 'saturation_largest_first' or 'DSATUR' Returns ------- A set of new ProcessCollection objects with the process splitting. """ if heuristic == "graph_color": exclusion_graph = self.create_exclusion_graph_from_execution_time() coloring = nx.coloring.greedy_color( exclusion_graph, strategy=coloring_strategy ) return self._split_from_graph_coloring(coloring) elif heuristic == "left_edge": raise NotImplementedError() else: raise ValueError(f"Invalid heuristic '{heuristic}'") def split_ports( self, heuristic: str = "graph_color", read_ports: Optional[int] = None, write_ports: Optional[int] = None, total_ports: Optional[int] = None, ) -> Set["ProcessCollection"]: """ Split this process storage based on concurrent read/write times according to some heuristic. Parameters ---------- heuristic : str, default: "graph_color" The heuristic used when splitting this ProcessCollection. Valid options are: * "graph_color" * "..." read_ports : int, optional The number of read ports used when splitting process collection based on memory variable access. write_ports : int, optional The number of write ports used when splitting process collection based on memory variable access. total_ports : int, optional The total number of ports used when splitting process collection based on memory variable access. Returns ------- A set of new ProcessCollection objects with the process splitting. """ if total_ports is None: if read_ports is None or write_ports is None: raise ValueError( "If total_ports is unset, both read_ports and write_ports" " must be provided." ) else: total_ports = read_ports + write_ports else: read_ports = total_ports if read_ports is None else read_ports write_ports = total_ports if write_ports is None else write_ports if heuristic == "graph_color": return self._split_ports_graph_color(read_ports, write_ports, total_ports) else: raise ValueError("Invalid heuristic provided.") def _split_ports_graph_color( self, read_ports: int, write_ports: int, total_ports: int, coloring_strategy: str = "saturation_largest_first", ) -> Set["ProcessCollection"]: """ Parameters ---------- read_ports : int The number of read ports used when splitting process collection based on memory variable access. write_ports : int The number of write ports used when splitting process collection based on memory variable access. total_ports : int The total number of ports used when splitting process collection based on memory variable access. coloring_strategy : str, default: 'saturation_largest_first' Node ordering strategy passed to :func:`networkx.coloring.greedy_color` One of * 'largest_first' * 'random_sequential' * 'smallest_last' * 'independent_set' * 'connected_sequential_bfs' * 'connected_sequential_dfs' or 'connected_sequential' * 'saturation_largest_first' or 'DSATUR' """ # Create new exclusion graph. Nodes are Processes exclusion_graph = self.create_exclusion_graph_from_ports( read_ports, write_ports, total_ports ) # Perform assignment from coloring and return result coloring = nx.coloring.greedy_color(exclusion_graph, strategy=coloring_strategy) return self._split_from_graph_coloring(coloring) def _split_from_graph_coloring( self, coloring: Dict[Process, int], ) -> Set["ProcessCollection"]: """ Split :class:`Process` objects into a set of :class:`ProcessesCollection` objects based on a provided graph coloring. Resulting :class:`ProcessCollection` will have the same schedule time and cyclic property as self. Parameters ---------- coloring : dict Process->int (color) mappings Returns ------- A set of new ProcessCollections. """ process_collection_set_list = [set() for _ in range(max(coloring.values()) + 1)] for process, color in coloring.items(): process_collection_set_list[color].add(process) return { ProcessCollection(process_collection_set, self._schedule_time, self._cyclic) for process_collection_set in process_collection_set_list } def _repr_svg_(self) -> str: """ Generate an SVG_ of the resource collection. This is automatically displayed in e.g. Jupyter Qt console. """ fig, ax = plt.subplots() self.plot(ax=ax, show_markers=False) f = io.StringIO() fig.savefig(f, format="svg") # type: ignore return f.getvalue() def __repr__(self): return ( f"ProcessCollection({self._collection}, {self._schedule_time}," f" {self._cyclic})" ) def __iter__(self): return iter(self._collection) def graph_color_cell_assignment( self, coloring_strategy: str = "saturation_largest_first", ) -> Dict[int, "ProcessCollection"]: """graph_color_cell_assignment. Parameters ---------- Returns ------- Dict[int, "ProcessCollection"] """ cell_assignment: Dict[int, ProcessCollection] = dict() exclusion_graph = self.create_exclusion_graph_from_execution_time() coloring: Dict[Process, int] = nx.coloring.greedy_color( exclusion_graph, strategy=coloring_strategy ) return cell_assignment def left_edge_cell_assignment(self) -> Dict[int, "ProcessCollection"]: """ Perform left edge cell assignment of this process collection. Returns ------- Dict[Process, int] """ next_empty_cell = 0 cell_assignment: Dict[int, ProcessCollection] = dict() for next_process in sorted(self): insert_to_new_cell = True for cell in cell_assignment: insert_to_this_cell = True for process in cell_assignment[cell]: next_process_stop_time = ( next_process.start_time + next_process.execution_time ) % self._schedule_time if ( next_process.start_time < process.start_time + process.execution_time or next_process_stop_time < next_process.start_time and next_process_stop_time > process.start_time ): insert_to_this_cell = False break if insert_to_this_cell: cell_assignment[cell].add_process(next_process) insert_to_new_cell = False break if insert_to_new_cell: cell_assignment[next_empty_cell] = ProcessCollection( collection=set(), schedule_time=self._schedule_time ) cell_assignment[next_empty_cell].add_process(next_process) next_empty_cell += 1 return cell_assignment def generate_memory_based_storage_vhdl( self, filename: str, ): """ Generate VHDL code for memory based storage of processes (MemoryVariables). Parameters ---------- filename : str Filename of output file. """ # Check that hardware can be generated for the ProcessCollection... raise NotImplementedError("Not implemented yet!")