diff --git a/b_asic/resources.py b/b_asic/resources.py index 1245138993bd3d90a7c05c632f1c4ba87839e8fd..8977b3b31e1610def27c42781ee0d1ba59f17fae 100644 --- a/b_asic/resources.py +++ b/b_asic/resources.py @@ -68,7 +68,6 @@ def draw_exclusion_graph_coloring( Returns ------- None - """ COLOR_LIST = [ '#aa0000', @@ -151,6 +150,7 @@ class ProcessCollection: marker_read: str = "X", marker_write: str = "o", show_markers: bool = True, + row: Optional[int] = None, ): """ Plot a process variable lifetime chart. @@ -173,6 +173,10 @@ class ProcessCollection: Marker at read time in the lifetime chart. show_markers : bool, default True Show markers at read and write times. + row : int, optional + Render all processes in this collection on a specified row in the matplotlib axes object. + Defaults to None, which renders all processes on separate rows. This option is useful when + drawing cell assignments. Returns ------- @@ -198,6 +202,7 @@ class ProcessCollection: # Generate the life-time chart for i, process in enumerate(_sorted_nicely(self._collection)): + bar_row = i if row == None else row bar_start = process.start_time % self._schedule_time bar_end = process.start_time + process.execution_time bar_end = ( @@ -206,52 +211,55 @@ class ProcessCollection: else bar_end % self._schedule_time ) if show_markers: - _ax.scatter( + _ax.scatter( # type: ignore x=bar_start, - y=i + 1, + y=bar_row + 1, marker=marker_write, color=marker_color, zorder=10, ) - _ax.scatter( + _ax.scatter( # type: ignore x=bar_end, - y=i + 1, + y=bar_row + 1, marker=marker_read, color=marker_color, zorder=10, ) if bar_end >= bar_start: - _ax.broken_barh( + _ax.broken_barh( # type: ignore [(PAD_L + bar_start, bar_end - bar_start - PAD_L - PAD_R)], - (i + 0.55, 0.9), + (bar_row + 0.55, 0.9), color=bar_color, ) else: # bar_end < bar_start - _ax.broken_barh( + _ax.broken_barh( # type: ignore [ ( PAD_L + bar_start, self._schedule_time - bar_start - PAD_L, ) ], - (i + 0.55, 0.9), + (bar_row + 0.55, 0.9), color=bar_color, ) - _ax.broken_barh( - [(0, bar_end - PAD_R)], (i + 0.55, 0.9), color=bar_color + _ax.broken_barh( # type: ignore + [(0, bar_end - PAD_R)], (bar_row + 0.55, 0.9), color=bar_color ) if show_name: - _ax.annotate( + _ax.annotate( # type: ignore str(process), - (bar_start + PAD_L + 0.025, i + 1.00), + (bar_start + PAD_L + 0.025, bar_row + 1.00), va="center", ) - _ax.grid(True) + _ax.grid(True) # type: ignore - _ax.xaxis.set_major_locator(MaxNLocator(integer=True)) - _ax.yaxis.set_major_locator(MaxNLocator(integer=True)) - _ax.set_xlim(0, self._schedule_time) - _ax.set_ylim(0.25, len(self._collection) + 0.75) + _ax.xaxis.set_major_locator(MaxNLocator(integer=True)) # type: ignore + _ax.yaxis.set_major_locator(MaxNLocator(integer=True)) # type: ignore + _ax.set_xlim(0, self._schedule_time) # type: ignore + if row == None: + _ax.set_ylim(0.25, len(self._collection) + 0.75) # type: ignore + else: + pass return _ax def create_exclusion_graph_from_ports( @@ -413,7 +421,7 @@ class ProcessCollection: total_ports: Optional[int] = None, ) -> Set["ProcessCollection"]: """ - Split this process storage based on some heuristic. + Split this process storage based on concurrent read/write times according to some heuristic. Parameters ---------- @@ -528,10 +536,9 @@ class ProcessCollection: e.g. Jupyter Qt console. """ fig, ax = plt.subplots() - self.plot(ax, show_markers=False) + self.plot(ax=ax, show_markers=False) f = io.StringIO() - fig.savefig(f, format="svg") - + fig.savefig(f, format="svg") # type: ignore return f.getvalue() def __repr__(self): @@ -539,3 +546,82 @@ class ProcessCollection: f"ProcessCollection({self._collection}, {self._schedule_time}," f" {self._cyclic})" ) + + def __iter__(self): + return iter(self._collection) + + def graph_color_cell_assignment( + self, + coloring_strategy: str = "saturation_largest_first", + ) -> Dict[int, "ProcessCollection"]: + """graph_color_cell_assignment. + + Parameters + ---------- + + + Returns + ------- + Dict[int, "ProcessCollection"] + + """ + cell_assignment: Dict[int, ProcessCollection] = dict() + exclusion_graph = self.create_exclusion_graph_from_execution_time() + coloring: Dict[Process, int] = nx.coloring.greedy_color( + exclusion_graph, strategy=coloring_strategy + ) + return cell_assignment + + def left_edge_cell_assignment(self) -> Dict[int, "ProcessCollection"]: + """ + Perform left edge cell assignment of this process collection. + + Returns + ------- + Dict[Process, int] + """ + next_empty_cell = 0 + cell_assignment: Dict[int, ProcessCollection] = dict() + for next_process in sorted(self): + insert_to_new_cell = True + for cell in cell_assignment: + insert_to_this_cell = True + for process in cell_assignment[cell]: + next_process_stop_time = ( + next_process.start_time + next_process.execution_time + ) % self._schedule_time + if ( + next_process.start_time + < process.start_time + process.execution_time + or next_process_stop_time < next_process.start_time + and next_process_stop_time > process.start_time + ): + insert_to_this_cell = False + break + if insert_to_this_cell: + cell_assignment[cell].add_process(next_process) + insert_to_new_cell = False + break + if insert_to_new_cell: + cell_assignment[next_empty_cell] = ProcessCollection( + collection=set(), schedule_time=self._schedule_time + ) + cell_assignment[next_empty_cell].add_process(next_process) + next_empty_cell += 1 + return cell_assignment + + def generate_memory_based_storage_vhdl( + self, + filename: str, + ): + """ + Generate VHDL code for memory based storage of processes (MemoryVariables). + + Parameters + ---------- + filename : str + Filename of output file. + """ + + # Check that hardware can be generated for the ProcessCollection... + raise NotImplementedError("Not implemented yet!") diff --git a/test/baseline/test_left_edge_cell_assignment.png b/test/baseline/test_left_edge_cell_assignment.png new file mode 100644 index 0000000000000000000000000000000000000000..45420a64ac885227c686564337c2be329830e71e Binary files /dev/null and b/test/baseline/test_left_edge_cell_assignment.png differ diff --git a/test/test_resources.py b/test/test_resources.py index 5d00ccf248b9db8bbd5e6a0632619fa7988be869..48a5589846d53836ddc73f58142f9e5e30ec1d3f 100644 --- a/test/test_resources.py +++ b/test/test_resources.py @@ -29,6 +29,15 @@ class TestProcessCollectionPlainMemoryVariable: ) assert len(collection_split) == 3 + @pytest.mark.mpl_image_compare(style='mpl20') + def test_left_edge_cell_assignment(self, simple_collection: ProcessCollection): + fig, ax = plt.subplots(1, 2) + assignment = simple_collection.left_edge_cell_assignment() + for cell in assignment.keys(): + assignment[cell].plot(ax=ax[1], row=cell) + simple_collection.plot(ax[0]) + return fig + # Issue: #175 def test_interleaver_issue175(self): with open('test/fixtures/interleaver-two-port-issue175.p', 'rb') as f: