from typing import Dict, List, Optional, Set, Tuple, Union import matplotlib.pyplot as plt import networkx as nx from matplotlib.axes import Axes from matplotlib.ticker import MaxNLocator from b_asic.process import Process def draw_exclusion_graph_coloring( exclusion_graph: nx.Graph, color_dict: Dict[Process, int], ax: Optional[Axes] = None, color_list: Optional[ Union[List[str], List[Tuple[float, float, float]]] ] = None, ): """ Use matplotlib.pyplot and networkx to draw a colored exclusion graph from the memory assigment .. code-block:: python _, ax = plt.subplots(1, 1) collection = ProcessCollection(...) exclusion_graph = collection.create_exclusion_graph_from_overlap() color_dict = nx.greedy_color(exclusion_graph) draw_exclusion_graph_coloring(exclusion_graph, color_dict, ax=ax[0]) plt.show() Parameters ---------- exclusion_graph : nx.Graph A nx.Graph exclusion graph object that is to be drawn. color_dict : dictionary A color dictionary where keys are Process objects and where values are integers representing colors. These dictionaries are automatically generated by :func:`networkx.algorithms.coloring.greedy_color`. ax : :class:`matplotlib.axes.Axes`, optional A Matplotlib Axes object to draw the exclusion graph color_list : Optional[Union[List[str], List[Tuple[float,float,float]]]] """ COLOR_LIST = [ '#aa0000', '#00aa00', '#0000ff', '#ff00aa', '#ffaa00', '#00ffaa', '#aaff00', '#aa00ff', '#00aaff', '#ff0000', '#00ff00', '#0000aa', '#aaaa00', '#aa00aa', '#00aaaa', ] node_color_dict = {} if color_list is None: node_color_dict = {k: COLOR_LIST[v] for k, v in color_dict.items()} else: node_color_dict = {k: color_list[v] for k, v in color_dict.items()} node_color_list = [node_color_dict[node] for node in exclusion_graph] nx.draw_networkx( exclusion_graph, node_color=node_color_list, ax=ax, pos=nx.spring_layout(exclusion_graph, seed=1), ) class ProcessCollection: """ Collection of one or more processes Parameters ---------- collection : set of :class:`~b_asic.process.Process` objects, optional """ def __init__(self, collection: Optional[Set[Process]] = None): if collection is None: self._collection: Set[Process] = set() else: self._collection = collection def add_process(self, process: Process): """ Add a new process to this process collection. Parameters ---------- process : Process The process object to be added to the collection """ self._collection.add(process) def draw_lifetime_chart( self, schedule_time: int = 0, ax: Optional[Axes] = None, show_name: bool = True, ): """ Use matplotlib.pyplot to generate a process variable lifetime chart from this process collection. Parameters ---------- schedule_time : int, default: 0 Length of the time-axis in the generated graph. The time axis will span [0, schedule_time-1]. If set to zero (which is the default), the ... ax : :class:`matplotlib.axes.Axes`, optional Matplotlib Axes object to draw this lifetime chart onto. If not provided (i.e., set to None), this will return a new axes object on return. show_name : bool, default: True Show name of all processes in the lifetime chart. Returns ------- ax: Associated Matplotlib Axes (or array of Axes) object """ # Setup the Axes object if ax is None: _, _ax = plt.subplots() else: _ax = ax # Draw the lifetime chart PAD_L, PAD_R = 0.05, 0.05 max_execution_time = max( [process.execution_time for process in self._collection] ) schedule_time = ( schedule_time if schedule_time else max(p.start_time + p.execution_time for p in self._collection) ) if max_execution_time > schedule_time: # Schedule time needs to be greater than or equal to the maximum process life time raise KeyError( f'Error: Schedule time: {schedule_time} < Max execution time:' f' {max_execution_time}' ) for i, process in enumerate( sorted(self._collection, key=lambda p: str(p)) ): bar_start = process.start_time % schedule_time bar_end = ( process.start_time + process.execution_time ) % schedule_time if bar_end > bar_start: _ax.broken_barh( [(PAD_L + bar_start, bar_end - bar_start - PAD_L - PAD_R)], (i + 0.55, 0.9), ) else: # bar_end < bar_start if bar_end != 0: _ax.broken_barh( [ ( PAD_L + bar_start, schedule_time - bar_start - PAD_L, ) ], (i + 0.55, 0.9), ) _ax.broken_barh([(0, bar_end - PAD_R)], (i + 0.55, 0.9)) else: _ax.broken_barh( [ ( PAD_L + bar_start, schedule_time - bar_start - PAD_L - PAD_R, ) ], (i + 0.55, 0.9), ) if show_name: _ax.annotate( str(process), (bar_start + PAD_L + 0.025, i + 1.00), va="center", ) _ax.grid(True) _ax.xaxis.set_major_locator(MaxNLocator(integer=True)) _ax.yaxis.set_major_locator(MaxNLocator(integer=True)) return _ax def create_exclusion_graph_from_overlap( self, add_name: bool = True ) -> nx.Graph: """ Generate exclusion graph based on processes overlaping in time Parameters ---------- add_name : bool, default: True Add name of all processes as a node attribute in the exclusion graph. Returns ------- An nx.Graph exclusion graph where nodes are processes and arcs between two processes indicated overlap in time """ exclusion_graph = nx.Graph() exclusion_graph.add_nodes_from(self._collection) for process1 in self._collection: for process2 in self._collection: if process1 == process2: continue else: t1 = set( range( process1.start_time, process1.start_time + process1.execution_time, ) ) t2 = set( range( process2.start_time, process2.start_time + process2.execution_time, ) ) if t1.intersection(t2): exclusion_graph.add_edge(process1, process2) return exclusion_graph def split( self, heuristic: str = "graph_color", read_ports: Optional[int] = None, write_ports: Optional[int] = None, total_ports: Optional[int] = None, ) -> Set["ProcessCollection"]: """ Split this process storage based on some heuristic. Parameters ---------- heuristic : str, default: "graph_color" The heuristic used when spliting this ProcessCollection. Valid options are: * "graph_color" * "..." read_ports : int, optional The number of read ports used when spliting process collection based on memory variable access. write_ports : int, optional The number of write ports used when spliting process collection based on memory variable access. total_ports : int, optional The total number of ports used when spliting process collection based on memory variable access. Returns ------- A set of new ProcessColleciton objects with the process spliting. """ if total_ports is None: if read_ports is None or write_ports is None: raise ValueError("inteligent quote") else: total_ports = read_ports + write_ports else: read_ports = total_ports if read_ports is None else read_ports write_ports = total_ports if write_ports is None else write_ports if heuristic == "graph_color": return self._split_graph_color( read_ports, write_ports, total_ports ) else: raise ValueError("Invalid heuristic provided") def _split_graph_color( self, read_ports: int, write_ports: int, total_ports: int ) -> Set["ProcessCollection"]: """ Parameters ---------- read_ports : int, optional The number of read ports used when spliting process collection based on memory variable access. write_ports : int, optional The number of write ports used when spliting process collection based on memory variable access. total_ports : int, optional The total number of ports used when spliting process collection based on memory variable access. """ if read_ports != 1 or write_ports != 1: raise ValueError( "Spliting with read and write ports not equal to one with the" " graph coloring heuristic does not make sense." ) if total_ports not in (1, 2): raise ValueError( "Total ports should be either 1 (non-concurent reads/writes)" " or 2 (concurrent read/writes) for graph coloring heuristic." ) # Create new exclusion graph. Nodes are Processes exclusion_graph = nx.Graph() exclusion_graph.add_nodes_from(self._collection) # Add exclusions (arcs) between processes in the exclusion graph for node1 in exclusion_graph: for node2 in exclusion_graph: if node1 == node2: continue else: node1_stop_time = node1.start_time + node1.execution_time node2_stop_time = node2.start_time + node2.execution_time if total_ports == 1: # Single-port assignment if node1.start_time == node2.start_time: exclusion_graph.add_edge(node1, node2) elif node1_stop_time == node2_stop_time: exclusion_graph.add_edge(node1, node2) elif node1.start_time == node2_stop_time: exclusion_graph.add_edge(node1, node2) elif node1_stop_time == node2.start_time: exclusion_graph.add_edge(node1, node2) else: # Dual-port assignment if node1.start_time == node2.start_time: exclusion_graph.add_edge(node1, node2) elif node1_stop_time == node2_stop_time: exclusion_graph.add_edge(node1, node2) # Perform assignment coloring = nx.coloring.greedy_color(exclusion_graph) draw_exclusion_graph_coloring(exclusion_graph, coloring) # process_collection_list = [ProcessCollection()]*(max(coloring.values()) + 1) process_collection_list = [ ProcessCollection() for _ in range(max(coloring.values()) + 1) ] for process, color in coloring.items(): process_collection_list[color].add_process(process) return { process_collection for process_collection in process_collection_list }