Skip to content
Snippets Groups Projects
resources.py 55 KiB
Newer Older
  • Learn to ignore specific revisions
  • import io
    
    import re
    
    from collections import Counter
    
    from functools import reduce
    
    from typing import Dict, Iterable, List, Optional, Tuple, TypeVar, Union
    
    Mikael Henriksson's avatar
    Mikael Henriksson committed
    
    import matplotlib.pyplot as plt
    import networkx as nx
    from matplotlib.axes import Axes
    from matplotlib.ticker import MaxNLocator
    
    
    from b_asic._preferences import LATENCY_COLOR, WARNING_COLOR
    
    from b_asic.codegen.vhdl.common import is_valid_vhdl_identifier
    
    from b_asic.process import (
        MemoryProcess,
        MemoryVariable,
        OperatorProcess,
        PlainMemoryVariable,
        Process,
    )
    
    from b_asic.types import TypeName
    
    Mikael Henriksson's avatar
    Mikael Henriksson committed
    
    
    # Default latency coloring RGB tuple
    _LATENCY_COLOR = tuple(c / 255 for c in LATENCY_COLOR)
    
    _WARNING_COLOR = tuple(c / 255 for c in WARNING_COLOR)
    
    #
    # Human-intuitive sorting:
    # https://stackoverflow.com/questions/2669059/how-to-sort-alpha-numeric-set-in-python
    #
    # Typing '_T' to help Pyright propagate type-information
    #
    _T = TypeVar('_T')
    
    Mikael Henriksson's avatar
    Mikael Henriksson committed
    
    
    
    def _sorted_nicely(to_be_sorted: Iterable[_T]) -> List[_T]:
    
        """Sort the given iterable in the way that humans expect."""
    
    
        def convert(text):
            return int(text) if text.isdigit() else text
    
        def alphanum_key(key):
            return [convert(c) for c in re.split('([0-9]+)', str(key))]
    
    
        return sorted(to_be_sorted, key=alphanum_key)
    
    
    
    def _sanitize_port_option(
        read_ports: Optional[int] = None,
        write_ports: Optional[int] = None,
        total_ports: Optional[int] = None,
    ) -> Tuple[int, int, int]:
        """
    
        General port sanitization function used to test if a port specification makes sense.
    
        Raises ValueError if the port specification is in-proper.
    
        Parameters
        ----------
        read_ports : int, optional
            The number of read ports.
        write_ports : int, optional
            The number of write ports.
        total_ports : int, optional
            The total number of ports
    
        Returns
        -------
    
        Returns a triple int tuple (read_ports, write_ports, total_ports) equal to the
        input, or sanitized if one of the input equals None. If total_ports is set to None
        at the input, it is set to read_ports+write_ports at the output. If read_ports or
        write_ports is set to None at the input, it is set to total_ports at the output.
    
    
        """
        if total_ports is None:
            if read_ports is None or write_ports is None:
                raise ValueError(
                    "If total_ports is unset, both read_ports and write_ports"
                    " must be provided."
                )
            else:
                total_ports = read_ports + write_ports
        else:
            read_ports = total_ports if read_ports is None else read_ports
            write_ports = total_ports if write_ports is None else write_ports
        if total_ports < read_ports:
            raise ValueError(
                f'Total ports ({total_ports}) less then read ports ({read_ports})'
            )
        if total_ports < write_ports:
            raise ValueError(
                f'Total ports ({total_ports}) less then write ports ({write_ports})'
            )
    
        return read_ports, write_ports, total_ports
    
    Mikael Henriksson's avatar
    Mikael Henriksson committed
    def draw_exclusion_graph_coloring(
        exclusion_graph: nx.Graph,
        color_dict: Dict[Process, int],
        ax: Optional[Axes] = None,
    
        color_list: Optional[Union[List[str], List[Tuple[float, float, float]]]] = None,
    
    ) -> None:
    
    Mikael Henriksson's avatar
    Mikael Henriksson committed
        """
    
        Helper function for drawing a colored exclusion graphs.
    
        Example usage:
    
    Mikael Henriksson's avatar
    Mikael Henriksson committed
    
        .. code-block:: python
    
    
            import networkx as nx
            import matplotlib.pyplot as plt
    
            _, ax = plt.subplots()
    
    Mikael Henriksson's avatar
    Mikael Henriksson committed
            collection = ProcessCollection(...)
    
            exclusion_graph = collection.create_exclusion_graph_from_ports(
                read_ports = 1,
                write_ports = 1,
                total_ports = 2,
            )
            coloring = nx.greedy_color(exclusion_graph)
            draw_exclusion_graph_coloring(exclusion_graph, coloring, ax=ax)
    
    Mikael Henriksson's avatar
    Mikael Henriksson committed
            plt.show()
    
        Parameters
        ----------
    
        exclusion_graph : :class:`networkx.Graph`
            The :class:`networkx.Graph` exclusion graph object that is to be drawn.
    
        color_dict : dict
            A dict where keys are :class:`~b_asic.process.Process` objects and values are
            integers representing colors. These dictionaries are automatically generated by
            :func:`networkx.algorithms.coloring.greedy_color`.
    
    Mikael Henriksson's avatar
    Mikael Henriksson committed
        ax : :class:`matplotlib.axes.Axes`, optional
    
            A Matplotlib :class:`~matplotlib.axes.Axes` object to draw the exclusion graph.
        color_list : iterable of color, optional
            A list of colors in Matplotlib format.
    
        Returns
        -------
        None
    
    Mikael Henriksson's avatar
    Mikael Henriksson committed
        """
        COLOR_LIST = [
            '#aa0000',
            '#00aa00',
            '#0000ff',
            '#ff00aa',
            '#ffaa00',
    
    Mikael Henriksson's avatar
    Mikael Henriksson committed
            '#00ffaa',
            '#aaff00',
            '#aa00ff',
            '#00aaff',
            '#ff0000',
            '#00ff00',
            '#0000aa',
            '#aaaa00',
            '#aa00aa',
            '#00aaaa',
    
    Mikael Henriksson's avatar
    Mikael Henriksson committed
        ]
        if color_list is None:
            node_color_dict = {k: COLOR_LIST[v] for k, v in color_dict.items()}
        else:
            node_color_dict = {k: color_list[v] for k, v in color_dict.items()}
        node_color_list = [node_color_dict[node] for node in exclusion_graph]
        nx.draw_networkx(
            exclusion_graph,
            node_color=node_color_list,
            ax=ax,
            pos=nx.spring_layout(exclusion_graph, seed=1),
        )
    
    
    
    class _ForwardBackwardEntry:
        def __init__(
            self,
            inputs: Optional[List[Process]] = None,
            outputs: Optional[List[Process]] = None,
            regs: Optional[List[Optional[Process]]] = None,
            back_edge_to: Optional[Dict[int, int]] = None,
            back_edge_from: Optional[Dict[int, int]] = None,
            outputs_from: Optional[int] = None,
        ):
            """
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            Single entry in a _ForwardBackwardTable.
    
            Aggregate type of input, output and list of registers.
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            inputs : list of Process, optional
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            outputs : list of Process, optional
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            regs : list of Process, optional
    
                regs
            back_edge_to : dict, optional
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                Dictionary containing back edges of this entry to registers in the next
                entry.
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                Dictionary containing the back edge of the previous entry to registers in
                this entry.
    
            outputs_from : int, optional
            """
            self.inputs: List[Process] = [] if inputs is None else inputs
            self.outputs: List[Process] = [] if outputs is None else outputs
            self.regs: List[Optional[Process]] = [] if regs is None else regs
            self.back_edge_to: Dict[int, int] = {} if back_edge_to is None else back_edge_to
            self.back_edge_from: Dict[int, int] = (
                {} if back_edge_from is None else back_edge_from
            )
            self.outputs_from = outputs_from
    
    
    class _ForwardBackwardTable:
        def __init__(self, collection: 'ProcessCollection'):
            """
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            Forward-Backward allocation table for ProcessCollections.
    
            This structure implements the forward-backward register allocation algorithm,
            which is used to generate hardware from MemoryVariables in a ProcessCollection.
    
    
            Parameters
            ----------
            collection : ProcessCollection
                ProcessCollection to apply forward-backward allocation on
            """
            # Generate an alive variable list
    
            self._collection = set(collection.collection)
            self._live_variables: List[int] = [0] * collection.schedule_time
    
            for mv in self._collection:
                stop_time = mv.start_time + mv.execution_time
                for alive_time in range(mv.start_time, stop_time):
    
                    self._live_variables[alive_time % collection.schedule_time] += 1
    
    
            # First, create an empty forward-backward table with the right dimensions
            self.table: List[_ForwardBackwardEntry] = []
            for _ in range(collection.schedule_time):
                entry = _ForwardBackwardEntry()
                # https://github.com/microsoft/pyright/issues/1073
                for _ in range(max(self._live_variables)):
                    entry.regs.append(None)
                self.table.append(entry)
    
            # Insert all processes (one per time-slot) to the table input
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            # TODO: "Input each variable at the time step corresponding to the beginning of
            #        its lifetime. If multiple variables are input in a given cycle, these
    
            #        are allocated to multiple registers such that the variable with the
            #        longest lifetime is allocated to the initial register and the other
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            #        variables are allocated to consecutive registers in decreasing order
            #        of lifetime." -- K. Parhi
    
            for mv in collection:
                self.table[mv.start_time].inputs.append(mv)
                if mv.execution_time:
                    self.table[(mv.start_time + 1) % collection.schedule_time].regs[0] = mv
                else:
                    self.table[mv.start_time].outputs.append(mv)
                    self.table[mv.start_time].outputs_from = -1
    
            # Forward-backward allocation
            forward = True
            while not self._forward_backward_is_complete():
                if forward:
                    self._do_forward_allocation()
                else:
                    self._do_single_backward_allocation()
    
    
        def _forward_backward_is_complete(self) -> bool:
            s = {proc for e in self.table for proc in e.outputs}
    
            return len(self._collection - s) == 0
    
    
        def _do_forward_allocation(self):
            """
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            Forward all Processes as far as possible in the register chain.
    
            Processes are forwarded until they reach their end time (at which they are
            added to the output list), or until they reach the end of the register chain.
    
            """
            rows = len(self.table)
            cols = len(self.table[0].regs)
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            # Note that two passes of the forward allocation need to be done, since
            # variables may loop around the schedule cycle boundary.
    
            for _ in range(2):
                for time, entry in enumerate(self.table):
                    for reg_idx, reg in enumerate(entry.regs):
                        if reg is not None:
                            reg_end_time = (reg.start_time + reg.execution_time) % rows
                            if reg_end_time == time:
                                if reg not in self.table[time].outputs:
                                    self.table[time].outputs.append(reg)
                                    self.table[time].outputs_from = reg_idx
                            elif reg_idx != cols - 1:
                                next_row = (time + 1) % rows
                                next_col = reg_idx + 1
                                if self.table[next_row].regs[next_col] not in (None, reg):
                                    cell = self.table[next_row].regs[next_col]
                                    raise ValueError(
                                        f'Can\'t forward allocate {reg} in row={time},'
                                        f' col={reg_idx} to next_row={next_row},'
                                        f' next_col={next_col} (cell contains: {cell})'
                                    )
                                else:
                                    self.table[(time + 1) % rows].regs[reg_idx + 1] = reg
    
        def _do_single_backward_allocation(self):
            """
            Perform backward allocation of Processes in the allocation table.
            """
            rows = len(self.table)
            cols = len(self.table[0].regs)
            outputs = {out for e in self.table for out in e.outputs}
            #
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            # Pass #1: Find any (one) non-dead variable from the last register and try to
            # backward allocate it to a previous register where it is not blocking an open
            # path. This heuristic helps minimize forward allocation moves later.
    
            #
            for time, entry in enumerate(self.table):
                reg = entry.regs[-1]
                if reg is not None and reg not in outputs:
                    next_entry = self.table[(time + 1) % rows]
                    for nreg_idx, nreg in enumerate(next_entry.regs):
                        if nreg is None and (
                            nreg_idx == 0 or entry.regs[nreg_idx - 1] is not None
                        ):
                            next_entry.regs[nreg_idx] = reg
                            entry.back_edge_to[cols - 1] = nreg_idx
                            next_entry.back_edge_from[nreg_idx] = cols - 1
                            return
            #
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            # Pass #2: Backward allocate the first non-dead variable from the last
            # registers to an empty register.
    
            #
            for time, entry in enumerate(self.table):
                reg = entry.regs[-1]
                if reg is not None and reg not in outputs:
                    next_entry = self.table[(time + 1) % rows]
                    for nreg_idx, nreg in enumerate(next_entry.regs):
                        if nreg is None:
                            next_entry.regs[nreg_idx] = reg
                            entry.back_edge_to[cols - 1] = nreg_idx
    
                            next_entry.back_edge_from[nreg_idx] = cols - 1
    
                            return
    
            # All passes failed, raise exception...
            raise ValueError(
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                "Can't backward allocate any variable. This should not happen."
    
            )
    
        def __getitem__(self, key):
            return self.table[key]
    
        def __iter__(self):
            yield from self.table
    
        def __len__(self):
            return len(self.table)
    
        def __str__(self):
    
            # ANSI escape codes for coloring in the forward-backward table string
    
            GREEN_BACKGROUND_ANSI = "\u001b[42m"
            BROWN_BACKGROUND_ANSI = "\u001b[43m"
            RESET_BACKGROUND_ANSI = "\033[0m"
    
    
            # Text width of input and output column
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            def lst_w(proc_lst):
                return reduce(lambda n, p: n + len(str(p)) + 1, proc_lst, 0)
    
    
            input_col_w = max(5, max(lst_w(pl.inputs) for pl in self.table) + 1)
            output_col_w = max(5, max(lst_w(pl.outputs) for pl in self.table) + 1)
    
            # Text width of register columns
            reg_col_w = 0
            for entry in self.table:
                for reg in entry.regs:
                    reg_col_w = max(len(str(reg)), reg_col_w)
            reg_col_w = max(4, reg_col_w + 2)
    
            # Header row of the string
            res = f' T |{"In":^{input_col_w}}|'
            for i in range(max(self._live_variables)):
                reg = f'R{i}'
                res += f'{reg:^{reg_col_w}}|'
            res += f'{"Out":^{output_col_w}}|'
            res += '\n'
            res += (
                6 + input_col_w + (reg_col_w + 1) * max(self._live_variables) + output_col_w
            ) * '-' + '\n'
    
            for time, entry in enumerate(self.table):
                # Time
                res += f'{time:^3}| '
    
                # Input column
                inputs_str = ''
    
                for input_ in entry.inputs:
                    inputs_str += input_.name + ','
    
                if inputs_str:
                    inputs_str = inputs_str[:-1]
                res += f'{inputs_str:^{input_col_w-1}}|'
    
                # Register columns
                for reg_idx, reg in enumerate(entry.regs):
                    if reg is None:
                        res += " " * reg_col_w + "|"
                    else:
                        if reg_idx in entry.back_edge_to:
                            res += f'{GREEN_BACKGROUND_ANSI}'
                            res += f'{reg.name:^{reg_col_w}}'
                            res += f'{RESET_BACKGROUND_ANSI}|'
                        elif reg_idx in entry.back_edge_from:
                            res += f'{BROWN_BACKGROUND_ANSI}'
                            res += f'{reg.name:^{reg_col_w}}'
                            res += f'{RESET_BACKGROUND_ANSI}|'
                        else:
                            res += f'{reg.name:^{reg_col_w}}' + "|"
    
                # Output column
                outputs_str = ''
                for output in entry.outputs:
                    outputs_str += output.name + ','
                if outputs_str:
                    outputs_str = outputs_str[:-1]
                if entry.outputs_from is not None:
                    outputs_str += f"({entry.outputs_from})"
                res += f'{outputs_str:^{output_col_w}}|'
    
                res += '\n'
            return res
    
    
    
    Mikael Henriksson's avatar
    Mikael Henriksson committed
    class ProcessCollection:
    
        r"""
        Collection of :class:`~b_asic.process.Process` objects.
    
    Mikael Henriksson's avatar
    Mikael Henriksson committed
    
        Parameters
        ----------
    
        collection : Iterable of :class:`~b_asic.process.Process` objects
            The :class:`~b_asic.process.Process` objects forming this
            :class:`~b_asic.resources.ProcessCollection`.
    
            The scheduling time associated with this
            :class:`~b_asic.resources.ProcessCollection`.
    
        cyclic : bool, default: False
    
            Whether the processes operate cyclically, i.e., if time
    
    
            .. math:: t = t \bmod T_{\textrm{schedule}}.
    
    Mikael Henriksson's avatar
    Mikael Henriksson committed
        """
    
    
        def __init__(
            self,
    
            collection: Iterable[Process],
    
            schedule_time: int,
            cyclic: bool = False,
        ):
    
            self._collection = list(collection)
    
            self._schedule_time = schedule_time
            self._cyclic = cyclic
    
    Mikael Henriksson's avatar
    Mikael Henriksson committed
    
    
        def collection(self) -> List[Process]:
    
            return self._collection
    
    
        @property
        def schedule_time(self) -> int:
            return self._schedule_time
    
    
        def __len__(self):
    
    Mikael Henriksson's avatar
    Mikael Henriksson committed
        def add_process(self, process: Process):
            """
    
            Add a new :class:`~b_asic.process.Process` to this
            :class:`~b_asic.resources.ProcessCollection`.
    
    Mikael Henriksson's avatar
    Mikael Henriksson committed
    
            Parameters
            ----------
    
            process : :class:`~b_asic.process.Process`
                The :class:`~b_asic.process.Process` object to add.
    
    Mikael Henriksson's avatar
    Mikael Henriksson committed
            """
    
            if process in self.collection:
                raise ValueError("Process already in ProcessCollection")
            self.collection.append(process)
    
            Remove a :class:`~b_asic.process.Process` from this
            :class:`~b_asic.resources.ProcessCollection`.
    
            Raises :class:`KeyError` if the specified :class:`~b_asic.process.Process` is
            not in this collection.
    
            process : :class:`~b_asic.process.Process`
                The :class:`~b_asic.process.Process` object to remove from this collection.
    
            """
            if process not in self.collection:
                raise KeyError(
                    f"Can't remove process: '{process}', as it is not in collection."
                )
            else:
                self.collection.remove(process)
    
    Mikael Henriksson's avatar
    Mikael Henriksson committed
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        def plot(
    
    Mikael Henriksson's avatar
    Mikael Henriksson committed
            self,
            ax: Optional[Axes] = None,
    
    Mikael Henriksson's avatar
    Mikael Henriksson committed
            show_name: bool = True,
    
            bar_color: Union[str, Tuple[float, ...]] = _LATENCY_COLOR,
            marker_color: Union[str, Tuple[float, ...]] = "black",
            marker_read: str = "X",
            marker_write: str = "o",
            show_markers: bool = True,
    
    Mikael Henriksson's avatar
    Mikael Henriksson committed
        ):
            """
    
            Plot all :class:`~b_asic.process.Process` objects of this
            :class:`~b_asic.resources.ProcessCollection` in a lifetime diagram.
    
            If the ``ax`` parameter is not specified, a new Matplotlib figure is created.
    
            Raises :class:`KeyError` if any :class:`~b_asic.process.Process` lifetime
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            exceeds this :class:`~b_asic.resources.ProcessCollection` schedule time,
    
            unless ``allow_excessive_lifetimes`` is specified. In that case,
    
            :class:`~b_asic.process.Process` objects whose lifetime exceed the schedule
    
            time are drawn using the B-ASIC warning color.
    
    Mikael Henriksson's avatar
    Mikael Henriksson committed
    
            Parameters
            ----------
            ax : :class:`matplotlib.axes.Axes`, optional
    
                Matplotlib :class:`~matplotlib.axes.Axes` object to draw this lifetime chart
                onto. If not provided (i.e., set to None), this method will return a new
                Axes object.
    
    Mikael Henriksson's avatar
    Mikael Henriksson committed
            show_name : bool, default: True
                Show name of all processes in the lifetime chart.
    
            bar_color : color, optional
                Bar color in lifetime chart.
            marker_color : color, default 'black'
                Color for read and write marker.
            marker_read : str, default 'o'
                Marker at read time in the lifetime chart.
    
            marker_write : str, default 'x'
                Marker at write time in the lifetime chart.
    
            show_markers : bool, default True
                Show markers at read and write times.
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                Render all processes in this collection on a specified row in the Matplotlib
                axes object. Defaults to None, which renders all processes on separate rows.
                This option is useful when drawing cell assignments.
    
            allow_excessive_lifetimes : bool, default False
    
                If set to true, the plot method allows plotting collections of variables
                with a longer lifetime than the schedule time.
    
    Mikael Henriksson's avatar
    Mikael Henriksson committed
    
            Returns
            -------
    
            ax : Associated Matplotlib Axes (or array of Axes) object
    
    Mikael Henriksson's avatar
    Mikael Henriksson committed
            """
    
    
            # Set up the Axes object
    
    Mikael Henriksson's avatar
    Mikael Henriksson committed
            if ax is None:
                _, _ax = plt.subplots()
            else:
                _ax = ax
    
    
    Mikael Henriksson's avatar
    Mikael Henriksson committed
            PAD_L, PAD_R = 0.05, 0.05
    
            max_execution_time = max(process.execution_time for process in self._collection)
    
            if not allow_excessive_lifetimes and max_execution_time > self._schedule_time:
    
                # Schedule time needs to be greater than or equal to the maximum process
                # lifetime
    
                raise ValueError(
                    f'Schedule time: {self._schedule_time} < Max execution'
    
                    f' time: {max_execution_time}'
    
    Mikael Henriksson's avatar
    Mikael Henriksson committed
                )
    
            for i, process in enumerate(_sorted_nicely(self._collection)):
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                bar_row = i if row is None else row
    
                bar_start = process.start_time
                bar_end = bar_start + process.execution_time
                bar_start = (
                    bar_start
                    if process.execution_time == 0
                    else bar_start % self._schedule_time
                )
    
    Mikael Henriksson's avatar
    Mikael Henriksson committed
                bar_end = (
    
                    self.schedule_time
                    if bar_end and bar_end % self._schedule_time == 0
    
                        marker=marker_write,
                        color=marker_color,
                        zorder=10,
                    )
    
                    for end_time in process.read_times:
                        end_time = (
    
                            self.schedule_time
                            if end_time and end_time % self.schedule_time == 0
    
                            else end_time % self._schedule_time
                        )
                        _ax.scatter(  # type: ignore
                            x=end_time,
                            y=bar_row + 1,
                            marker=marker_read,
                            color=marker_color,
                            zorder=10,
                        )
    
                if process.execution_time > self.schedule_time:
    
                    # Execution time longer than schedule time, draw with warning color
    
                    _ax.broken_barh(  # type: ignore
                        [(0, self.schedule_time)],
                        (bar_row + 0.55, 0.9),
                        color=_WARNING_COLOR,
                    )
    
                elif process.execution_time == 0:
    
                    # Execution time zero, draw a slim bar
                    _ax.broken_barh(  # type: ignore
                        [(PAD_L + bar_start, bar_end - bar_start - PAD_L - PAD_R)],
                        (bar_row + 0.55, 0.9),
                        color=bar_color,
                    )
    
                    _ax.broken_barh(  # type: ignore
    
    Mikael Henriksson's avatar
    Mikael Henriksson committed
                        [(PAD_L + bar_start, bar_end - bar_start - PAD_L - PAD_R)],
    
    Mikael Henriksson's avatar
    Mikael Henriksson committed
                    )
    
                    _ax.broken_barh(  # type: ignore
    
                        [
                            (
                                PAD_L + bar_start,
                                self._schedule_time - bar_start - PAD_L,
                            )
                        ],
    
                    _ax.broken_barh(  # type: ignore
                        [(0, bar_end - PAD_R)], (bar_row + 0.55, 0.9), color=bar_color
    
    Mikael Henriksson's avatar
    Mikael Henriksson committed
                if show_name:
    
                    _ax.annotate(  # type: ignore
    
    Mikael Henriksson's avatar
    Mikael Henriksson committed
                        str(process),
    
                        (bar_start + PAD_L + 0.025, bar_row + 1.00),
    
    Mikael Henriksson's avatar
    Mikael Henriksson committed
                        va="center",
                    )
    
            _ax.grid(True)  # type: ignore
    
    Mikael Henriksson's avatar
    Mikael Henriksson committed
    
    
            _ax.xaxis.set_major_locator(
                MaxNLocator(integer=True, min_n_ticks=1)
            )  # type: ignore
            _ax.yaxis.set_major_locator(
                MaxNLocator(integer=True, min_n_ticks=1)
            )  # type: ignore
    
            _ax.set_xlim(0, self._schedule_time)  # type: ignore
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            if row is None:
    
                _ax.set_ylim(0.25, len(self._collection) + 0.75)  # type: ignore
            else:
                pass
    
    Mikael Henriksson's avatar
    Mikael Henriksson committed
            return _ax
    
    
        def show(
            self,
            *,
            show_name: bool = True,
            bar_color: Union[str, Tuple[float, ...]] = _LATENCY_COLOR,
            marker_color: Union[str, Tuple[float, ...]] = "black",
            marker_read: str = "X",
            marker_write: str = "o",
            show_markers: bool = True,
            allow_excessive_lifetimes: bool = False,
    
            title: Optional[str] = None,
    
            Display this :class:`~b_asic.resources.ProcessCollection` using the current
            Matplotlib backend.
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
    
            Equivalent to creating a Matplotlib figure, passing it and arguments to
            :meth:`plot` and invoking :py:meth:`matplotlib.figure.Figure.show`.
    
    
            Parameters
            ----------
            show_name : bool, default: True
                Show name of all processes in the lifetime chart.
            bar_color : color, optional
                Bar color in lifetime chart.
            marker_color : color, default 'black'
                Color for read and write marker.
            marker_read : str, default 'o'
                Marker at read time in the lifetime chart.
            marker_write : str, default 'x'
                Marker at write time in the lifetime chart.
            show_markers : bool, default True
                Show markers at read and write times.
            allow_excessive_lifetimes : bool, default False
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                If True, the plot method allows plotting collections of variables with
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                a greater lifetime than the schedule time.
    
            title : str, optional
                Title of plot.
    
            """
            fig, ax = plt.subplots()
            self.plot(
                ax=ax,
                show_name=show_name,
                bar_color=bar_color,
                marker_color=marker_color,
                marker_read=marker_read,
                marker_write=marker_write,
                show_markers=show_markers,
                allow_excessive_lifetimes=allow_excessive_lifetimes,
            )
    
            if title:
                fig.suptitle(title)
    
            fig.show()  # type: ignore
    
    
        def create_exclusion_graph_from_ports(
            self,
            read_ports: Optional[int] = None,
            write_ports: Optional[int] = None,
            total_ports: Optional[int] = None,
    
    Mikael Henriksson's avatar
    Mikael Henriksson committed
        ) -> nx.Graph:
            """
    
            Create an exclusion graph from a given number of read and write ports based on
            concurrent read and write accesses to this
            :class:`~b_asic.resources.ProcessCollection`.
    
    Mikael Henriksson's avatar
    Mikael Henriksson committed
    
            Parameters
            ----------
    
                The number of read ports used when splitting process collection based on
                memory variable access.
    
                The number of write ports used when splitting process collection based on
                memory variable access.
    
                The total number of ports used when splitting process collection based on
                memory variable access.
    
            read_ports, write_ports, total_ports = _sanitize_port_option(
                read_ports, write_ports, total_ports
            )
    
    
            # Guard for proper read/write port settings
            if read_ports != 1 or write_ports != 1:
                raise ValueError(
                    "Splitting with read and write ports not equal to one with the"
                    " graph coloring heuristic does not make sense."
                )
            if total_ports not in (1, 2):
                raise ValueError(
                    "Total ports should be either 1 (non-concurrent reads/writes)"
                    " or 2 (concurrent read/writes) for graph coloring heuristic."
                )
    
            # Create new exclusion graph. Nodes are Processes
            exclusion_graph = nx.Graph()
            exclusion_graph.add_nodes_from(self._collection)
            for node1 in exclusion_graph:
    
                node1_stop_times = set(
    
                    read_time % self.schedule_time for read_time in node1.read_times
                )
    
                node1_start_time = node1.start_time % self.schedule_time
    
                if total_ports == 1 and node1.start_time in node1_stop_times:
                    print(node1.start_time, node1_stop_times)
                    raise ValueError("Cannot read and write in same cycle.")
    
                for node2 in exclusion_graph:
                    if node1 == node2:
                        continue
                    else:
    
                        node2_stop_times = tuple(
                            read_time % self.schedule_time for read_time in node2.read_times
                        )
    
                        node2_start_time = node2.start_time % self.schedule_time
                        if write_ports == 1 and node1_start_time == node2_start_time:
                            exclusion_graph.add_edge(node1, node2)
                        if read_ports == 1 and node1_stop_times.intersection(
                            node2_stop_times
                        ):
                            exclusion_graph.add_edge(node1, node2)
                        if total_ports == 1 and (
                            node1_start_time in node2_stop_times
                            or node2_start_time in node1_stop_times
                        ):
                            exclusion_graph.add_edge(node1, node2)
    
            return exclusion_graph
    
        def create_exclusion_graph_from_execution_time(self) -> nx.Graph:
            """
    
            Create an exclusion graph from processes overlapping in execution time.
    
    Mikael Henriksson's avatar
    Mikael Henriksson committed
    
            Returns
            -------
    
    Mikael Henriksson's avatar
    Mikael Henriksson committed
            """
            exclusion_graph = nx.Graph()
            exclusion_graph.add_nodes_from(self._collection)
            for process1 in self._collection:
                for process2 in self._collection:
                    if process1 == process2:
                        continue
                    else:
                        t1 = set(
                            range(
                                process1.start_time,
    
                                min(
                                    process1.start_time + process1.execution_time,
                                    self._schedule_time,
                                ),
                            )
                        ).union(
                            set(
                                range(
                                    0,
                                    process1.start_time
                                    + process1.execution_time
                                    - self._schedule_time,
                                )
    
    Mikael Henriksson's avatar
    Mikael Henriksson committed
                            )
                        )
                        t2 = set(
                            range(
                                process2.start_time,
    
                                min(
                                    process2.start_time + process2.execution_time,
                                    self._schedule_time,
                                ),
                            )
                        ).union(
                            set(
                                range(
                                    0,
                                    process2.start_time
                                    + process2.execution_time
                                    - self._schedule_time,
                                )
    
    Mikael Henriksson's avatar
    Mikael Henriksson committed
                            )
                        )
                        if t1.intersection(t2):
                            exclusion_graph.add_edge(process1, process2)
            return exclusion_graph
    
    
        def split_on_execution_time(
    
            self,
            heuristic: str = "graph_color",
            coloring_strategy: str = "saturation_largest_first",
    
        ) -> List["ProcessCollection"]:
    
            """
            Split a ProcessCollection based on overlapping execution time.
    
            Parameters
            ----------
    
            heuristic : {'graph_color', 'left_edge'}, default: 'graph_color'
    
                The heuristic used when splitting based on execution times.
    
            coloring_strategy : str, default: 'saturation_largest_first'
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                Node ordering strategy passed to
                :func:`networkx.algorithms.coloring.greedy_color`.
    
                This parameter is only considered if *heuristic* is set to 'graph_color'.
    
                * 'largest_first'
                * 'random_sequential'
                * 'smallest_last'
                * 'independent_set'
                * 'connected_sequential_bfs'
                * 'connected_sequential_dfs' or 'connected_sequential'
                * 'saturation_largest_first' or 'DSATUR'
    
            A list of new ProcessCollection objects with the process splitting.
    
                return self._graph_color_assignment(coloring_strategy)
    
                return self._left_edge_assignment()
    
            else:
                raise ValueError(f"Invalid heuristic '{heuristic}'")
    
    
        def split_on_ports(
    
    Mikael Henriksson's avatar
    Mikael Henriksson committed
            self,
            heuristic: str = "graph_color",
            read_ports: Optional[int] = None,
            write_ports: Optional[int] = None,
            total_ports: Optional[int] = None,
    
        ) -> List["ProcessCollection"]:
    
    Mikael Henriksson's avatar
    Mikael Henriksson committed
            """
    
            Split this process storage based on concurrent read/write times according.
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            Different heuristic methods can be used.
    
    Mikael Henriksson's avatar
    Mikael Henriksson committed
    
            Parameters
            ----------
            heuristic : str, default: "graph_color"
    
                The heuristic used when splitting this :class:`ProcessCollection`.
    
    Mikael Henriksson's avatar
    Mikael Henriksson committed
                Valid options are:
    
                * "graph_color"
                * "..."
    
    Mikael Henriksson's avatar
    Mikael Henriksson committed
            read_ports : int, optional
    
                The number of read ports used when splitting process collection based on
                memory variable access.
    
    Mikael Henriksson's avatar
    Mikael Henriksson committed
            write_ports : int, optional
    
                The number of write ports used when splitting process collection based on
                memory variable access.
    
    Mikael Henriksson's avatar
    Mikael Henriksson committed
            total_ports : int, optional
    
                The total number of ports used when splitting process collection based on
                memory variable access.
    
    Mikael Henriksson's avatar
    Mikael Henriksson committed
    
            Returns
            -------
    
            A set of new ProcessCollection objects with the process splitting.
    
    Mikael Henriksson's avatar
    Mikael Henriksson committed
            """
    
            read_ports, write_ports, total_ports = _sanitize_port_option(
                read_ports, write_ports, total_ports
            )
    
    Mikael Henriksson's avatar
    Mikael Henriksson committed
            if heuristic == "graph_color":
    
                return self._split_ports_graph_color(read_ports, write_ports, total_ports)
    
    Mikael Henriksson's avatar
    Mikael Henriksson committed
            else:
    
                raise ValueError("Invalid heuristic provided.")
    
    Mikael Henriksson's avatar
    Mikael Henriksson committed
    
    
        def _split_ports_graph_color(
            self,
            read_ports: int,
            write_ports: int,
            total_ports: int,
    
            coloring_strategy: str = "saturation_largest_first",
    
        ) -> List["ProcessCollection"]:
    
    Mikael Henriksson's avatar
    Mikael Henriksson committed
            """
            Parameters
            ----------
    
                The number of read ports used when splitting process collection based on
                memory variable access.
    
                The number of write ports used when splitting process collection based on
                memory variable access.
    
                The total number of ports used when splitting process collection based on
                memory variable access.
            coloring_strategy : str, default: 'saturation_largest_first'
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                Node ordering strategy passed to
                :func:`networkx.algorithms.coloring.greedy_color`
    
                * 'largest_first'
                * 'random_sequential'
                * 'smallest_last'
                * 'independent_set'
                * 'connected_sequential_bfs'
                * 'connected_sequential_dfs' or 'connected_sequential'
                * 'saturation_largest_first' or 'DSATUR'
    
    Mikael Henriksson's avatar
    Mikael Henriksson committed
            """
            # Create new exclusion graph. Nodes are Processes
    
            exclusion_graph = self.create_exclusion_graph_from_ports(
                read_ports, write_ports, total_ports
            )
    
    Mikael Henriksson's avatar
    Mikael Henriksson committed
    
    
            # Perform assignment from coloring and return result
    
            coloring = nx.coloring.greedy_color(exclusion_graph, strategy=coloring_strategy)
    
            return self._split_from_graph_coloring(coloring)
    
        def _split_from_graph_coloring(
            self,
            coloring: Dict[Process, int],
    
        ) -> List["ProcessCollection"]:
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            Split :class:`Process` objects into a set of :class:`ProcessesCollection`
            objects based on a provided graph coloring.
    
    
            Resulting :class:`ProcessCollection` will have the same schedule time and cyclic
            property as self.
    
            coloring : dict
    
    Mikael Henriksson's avatar
    Mikael Henriksson committed
    
    
            process_collection_set_list = [[] for _ in range(max(coloring.values()) + 1)]
    
    Mikael Henriksson's avatar
    Mikael Henriksson committed
            for process, color in coloring.items():
    
                process_collection_set_list[color].append(process)
    
                ProcessCollection(process_collection_set, self._schedule_time, self._cyclic)
    
                for process_collection_set in process_collection_set_list