Skip to content
Snippets Groups Projects
schedule.py 23.9 KiB
Newer Older
  • Learn to ignore specific revisions
  • """
    B-ASIC Schedule Module.
    
    Contains the schedule class for scheduling operations in an SFG.
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
    import io
    import sys
    
    from collections import defaultdict
    
    from typing import Dict, List, Optional, Tuple, cast
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
    
    
    import matplotlib.pyplot as plt
    
    import numpy as np
    
    from matplotlib.lines import Line2D
    
    from matplotlib.patches import PathPatch, Polygon
    
    from matplotlib.path import Path
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
    from matplotlib.ticker import MaxNLocator
    
    from b_asic import Signal
    
    from b_asic._preferences import (
        EXECUTION_TIME_COLOR,
        LATENCY_COLOR,
    
        SIGNAL_COLOR,
        SIGNAL_LINEWIDTH,
    )
    
    from b_asic.graph_component import GraphID
    
    from b_asic.operation import Operation
    from b_asic.port import InputPort, OutputPort
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
    from b_asic.process import MemoryVariable, Process
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
    from b_asic.signal_flow_graph import SFG
    
    from b_asic.special_operations import Delay, Output
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
    
    
    # Need RGB from 0 to 1
    
    _EXECUTION_TIME_COLOR = tuple(c / 255 for c in EXECUTION_TIME_COLOR)
    _LATENCY_COLOR = tuple(c / 255 for c in LATENCY_COLOR)
    _SIGNAL_COLOR = tuple(c / 255 for c in SIGNAL_COLOR)
    
    
    
    class Schedule:
        """Schedule of an SFG with scheduled Operations."""
    
        _laps: Dict[GraphID, int]
    
        _y_locations: Dict[GraphID, Optional[int]]
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        def __init__(
            self,
            sfg: SFG,
            schedule_time: Optional[int] = None,
            cyclic: bool = False,
            scheduling_alg: str = "ASAP",
        ):
    
            """Construct a Schedule from an SFG."""
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            self._start_times = {}
    
            self._laps = defaultdict(lambda: 0)
    
            self._y_locations = defaultdict(lambda: None)
    
            if scheduling_alg == "ASAP":
                self._schedule_asap()
            else:
                raise NotImplementedError(
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                    f"No algorithm with name: {scheduling_alg} defined."
                )
    
    Andreas Bolin's avatar
    Andreas Bolin committed
            max_end_time = self.get_max_end_time()
    
            if schedule_time is None:
                self._schedule_time = max_end_time
            elif schedule_time < max_end_time:
                raise ValueError(
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                    f"Too short schedule time. Minimum is {max_end_time}."
                )
    
            else:
                self._schedule_time = schedule_time
    
        def start_time_of_operation(self, graph_id: GraphID) -> int:
    
            Get the start time of the operation with the specified by *graph_id*.
    
            if graph_id not in self._start_times:
                raise ValueError(
                    f"No operation with graph_id {graph_id} in schedule"
                )
            return self._start_times[graph_id]
    
    Andreas Bolin's avatar
    Andreas Bolin committed
        def get_max_end_time(self) -> int:
    
            """Returns the current maximum end time among all operations."""
    
            max_end_time = 0
    
            for graph_id, op_start_time in self._start_times.items():
                op = cast(Operation, self._sfg.find_by_id(graph_id))
    
                for outport in op.outputs:
                    max_end_time = max(
    
                        max_end_time,
                        op_start_time + cast(int, outport.latency_offset),
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                    )
    
            return max_end_time
    
    
        def forward_slack(self, graph_id: GraphID) -> int:
            if graph_id not in self._start_times:
                raise ValueError(
                    f"No operation with graph_id {graph_id} in schedule"
                )
    
            slack = sys.maxsize
    
            output_slacks = self._forward_slacks(graph_id)
    
            # Make more pythonic
            for signal_slacks in output_slacks.values():
                for signal_slack in signal_slacks.values():
                    slack = min(slack, signal_slack)
            return slack
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        def _forward_slacks(
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        ) -> Dict["OutputPort", Dict["Signal", int]]:
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            ret = {}
    
            start_time = self._start_times[graph_id]
            op = cast(Operation, self._sfg.find_by_id(graph_id))
    
            for output_port in op.outputs:
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                output_slacks = {}
    
                available_time = start_time + cast(int, output_port.latency_offset)
    
    
                for signal in output_port.signals:
    
                    destination = cast(InputPort, signal.destination)
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                    usage_time = (
    
                        cast(int, destination.latency_offset)
                        + self._start_times[destination.operation.graph_id]
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        + self._schedule_time * self._laps[signal.graph_id]
                    )
    
                    output_slacks[signal] = usage_time - available_time
                ret[output_port] = output_slacks
            return ret
    
        def backward_slack(self, graph_id: GraphID) -> int:
            if graph_id not in self._start_times:
                raise ValueError(
                    f"No operation with graph_id {graph_id} in schedule"
                )
    
            slack = sys.maxsize
    
            input_slacks = self._backward_slacks(graph_id)
    
            # Make more pythonic
            for signal_slacks in input_slacks.values():
                for signal_slack in signal_slacks.values():
                    slack = min(slack, signal_slack)
            return slack
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        def _backward_slacks(
    
        ) -> Dict[InputPort, Dict[Signal, int]]:
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            ret = {}
    
            start_time = self._start_times[graph_id]
            op = cast(Operation, self._sfg.find_by_id(graph_id))
    
            for input_port in op.inputs:
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                input_slacks = {}
    
                usage_time = start_time + cast(int, input_port.latency_offset)
    
    
                for signal in input_port.signals:
    
                    source = cast(OutputPort, signal.source)
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                    available_time = (
    
                        cast(int, source.latency_offset)
                        + self._start_times[source.operation.graph_id]
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        - self._schedule_time * self._laps[signal.graph_id]
                    )
    
                    input_slacks[signal] = usage_time - available_time
                ret[input_port] = input_slacks
            return ret
    
    
        def slacks(self, graph_id: GraphID) -> Tuple[int, int]:
            if graph_id not in self._start_times:
                raise ValueError(
                    f"No operation with graph_id {graph_id} in schedule"
                )
            return self.backward_slack(graph_id), self.forward_slack(graph_id)
    
    
        def print_slacks(self) -> None:
            raise NotImplementedError
    
    
        def set_schedule_time(self, time: int) -> "Schedule":
    
            if time < self.get_max_end_time():
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                raise ValueError(
                    "New schedule time ({time})to short, minimum:"
                    " ({self.get_max_end_time()})."
                )
    
            self._schedule_time = time
            return self
    
    
    Andreas Bolin's avatar
    Andreas Bolin committed
        @property
        def sfg(self) -> SFG:
            return self._sfg
    
    Andreas Bolin's avatar
    Andreas Bolin committed
        @property
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        def start_times(self) -> Dict[GraphID, int]:
    
    Andreas Bolin's avatar
    Andreas Bolin committed
            return self._start_times
    
    Andreas Bolin's avatar
    Andreas Bolin committed
        @property
    
        def laps(self) -> Dict[GraphID, int]:
    
    Andreas Bolin's avatar
    Andreas Bolin committed
            return self._laps
    
        @property
        def schedule_time(self) -> int:
            return self._schedule_time
    
    Andreas Bolin's avatar
    Andreas Bolin committed
        @property
        def cyclic(self) -> bool:
            return self._cyclic
    
        def increase_time_resolution(self, factor: int) -> "Schedule":
    
            """
            Increase time resolution for a schedule.
    
            Parameters
            ==========
    
            factor : int
                The time resolution increment.
            """
            self._start_times = {
                k: factor * v for k, v in self._start_times.items()
            }
    
            for graph_id in self._start_times:
    
                    Operation, self._sfg.find_by_id(graph_id)
    
                )._increase_time_resolution(factor)
    
            self._schedule_time *= factor
    
            return self
    
        def _get_all_times(self) -> List[int]:
            """
            Return a list of all times for the schedule. Used to check how the
            resolution can be modified.
            """
            # Local values
            ret = [self._schedule_time, *self._start_times.values()]
            # Loop over operations
    
            for graph_id in self._start_times:
                op = cast(Operation, self._sfg.find_by_id(graph_id))
    
                ret += [cast(int, op.execution_time), *op.latency_offsets.values()]
    
            # Remove not set values (None)
            ret = [v for v in ret if v is not None]
            return ret
    
        def get_possible_time_resolution_decrements(self) -> List[int]:
            """Return a list with possible factors to reduce time resolution."""
            vals = self._get_all_times()
            maxloop = min(val for val in vals if val)
            if maxloop <= 1:
                return [1]
            ret = [1]
            for candidate in range(2, maxloop + 1):
                if not any(val % candidate for val in vals):
                    ret.append(candidate)
            return ret
    
        def decrease_time_resolution(self, factor: int) -> "Schedule":
    
            """
            Decrease time resolution for a schedule.
    
            Parameters
            ==========
    
            factor : int
                The time resolution decrement.
            """
            possible_values = self.get_possible_time_resolution_decrements()
            if factor not in possible_values:
                raise ValueError(
                    f"Not possible to decrease resolution with {factor}. Possible"
                    f" values are {possible_values}"
                )
            self._start_times = {
                k: v // factor for k, v in self._start_times.items()
            }
    
            for graph_id in self._start_times:
    
                    Operation, self._sfg.find_by_id(graph_id)
    
                )._decrease_time_resolution(factor)
    
            self._schedule_time = self._schedule_time // factor
            return self
    
        def move_operation(self, graph_id: GraphID, time: int) -> "Schedule":
            if graph_id not in self._start_times:
                raise ValueError(
                    f"No operation with graph_id {graph_id} in schedule"
                )
    
            (backward_slack, forward_slack) = self.slacks(graph_id)
    
            if not -backward_slack <= time <= forward_slack:
                raise ValueError
    
            tmp_start = self._start_times[graph_id] + time
    
            new_start = tmp_start % self._schedule_time
    
            # Update input laps
    
            input_slacks = self._backward_slacks(graph_id)
    
            for in_port, signal_slacks in input_slacks.items():
    
                tmp_usage = tmp_start + cast(int, in_port.latency_offset)
    
                new_usage = tmp_usage % self._schedule_time
                for signal, signal_slack in signal_slacks.items():
                    new_slack = signal_slack + time
                    old_laps = self._laps[signal.graph_id]
                    tmp_prev_available = tmp_usage - new_slack
                    prev_available = tmp_prev_available % self._schedule_time
                    laps = new_slack // self._schedule_time
                    if new_usage < prev_available:
                        laps += 1
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                    print(
                        [
                            signal_slack,
                            new_slack,
                            old_laps,
                            laps,
                            new_usage,
                            prev_available,
                            tmp_usage,
                            tmp_prev_available,
                        ]
                    )
    
                    self._laps[signal.graph_id] = laps
    
            # Update output laps
    
            output_slacks = self._forward_slacks(graph_id)
    
            for out_port, signal_slacks in output_slacks.items():
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                tmp_available = tmp_start + cast(int, out_port.latency_offset)
    
                new_available = tmp_available % self._schedule_time
                for signal, signal_slack in signal_slacks.items():
                    new_slack = signal_slack - time
                    tmp_next_usage = tmp_available + new_slack
                    next_usage = tmp_next_usage % self._schedule_time
                    laps = new_slack // self._schedule_time
                    if next_usage < new_available:
                        laps += 1
                    if new_available == 0 and new_slack > 0:
                        laps += 1
                    self._laps[signal.graph_id] = laps
    
            # Set new start time
    
            self._start_times[graph_id] = new_start
    
            return self
    
        def _remove_delays(self) -> None:
            delay_list = self._sfg.find_by_type_name(Delay.type_name())
            while delay_list:
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                delay_op = cast(Delay, delay_list[0])
    
                delay_input_id = delay_op.input(0).signals[0].graph_id
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                delay_output_ids = [
                    sig.graph_id for sig in delay_op.output(0).signals
                ]
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                self._sfg = cast(
                    SFG, self._sfg.remove_operation(delay_op.graph_id)
                )
    
                for output_id in delay_output_ids:
                    self._laps[output_id] += 1 + self._laps[delay_input_id]
                del self._laps[delay_input_id]
                delay_list = self._sfg.find_by_type_name(Delay.type_name())
    
    
        def _schedule_asap(self) -> None:
            pl = self._sfg.get_precedence_list()
    
            if len(pl) < 2:
                print("Empty signal flow graph cannot be scheduled.")
                return
    
    
            non_schedulable_ops = set()
            for outport in pl[0]:
                op = outport.operation
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                if op.type_name() not in [Delay.type_name()]:
    
                    if op.graph_id not in self._start_times:
                        # Set start time of all operations in the first iter to 0
                        self._start_times[op.graph_id] = 0
                else:
                    non_schedulable_ops.add(op.graph_id)
    
                if op.graph_id not in self._start_times:
    
                    # Set start time of all operations in the first iter to 0
                    self._start_times[op.graph_id] = 0
    
            for outports in pl[2:]:
                for outport in outports:
                    op = outport.operation
                    if op.graph_id not in self._start_times:
    
                        # Schedule the operation if it does not have a start time yet.
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            assert (
                                len(inport.signals) == 1
                            ), "Error in scheduling, dangling input port detected."
                            assert inport.signals[0].source is not None, (
                                "Error in scheduling, signal with no source"
                                " detected."
                            )
    
                            source_port = inport.signals[0].source
    
                            source_end_time = None
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            if (
                                source_port.operation.graph_id
                                in non_schedulable_ops
                            ):
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                source_op_time = self._start_times[
                                    source_port.operation.graph_id
                                ]
    
                                assert source_port.latency_offset is not None, (
                                    f"Output port: {source_port.index} of"
                                    " operation:                                  "
                                    f"   {source_port.operation.graph_id} has no"
                                    " latency-offset."
                                )
    
                                source_end_time = (
                                    source_op_time + source_port.latency_offset
                                )
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            assert inport.latency_offset is not None, (
                                f"Input port: {inport.index} of operation:    "
                                "                                "
                                f" {inport.operation.graph_id} has no"
                                " latency-offset."
                            )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            op_start_time_from_in = (
                                source_end_time - inport.latency_offset
                            )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                op_start_time, op_start_time_from_in
                            )
    
                        self._start_times[op.graph_id] = op_start_time
    
            for output in self._sfg.find_by_type_name(Output.type_name()):
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                output = cast(Output, output)
                source_port = cast(OutputPort, output.inputs[0].signals[0].source)
    
                if source_port.operation.graph_id in non_schedulable_ops:
                    self._start_times[output.graph_id] = 0
                else:
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                    self._start_times[output.graph_id] = self._start_times[
                        source_port.operation.graph_id
                    ] + cast(int, source_port.latency_offset)
    
            self._remove_delays()
    
    
        def _get_memory_variables_list(self) -> List['Process']:
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            ret: List['Process'] = []
    
            for graph_id, start_time in self._start_times.items():
                slacks = self._forward_slacks(graph_id)
    
                for outport, signals in slacks.items():
                    reads = {
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        cast(InputPort, signal.destination): slack
    
                        for signal, slack in signals.items()
                    }
                    ret.append(
                        MemoryVariable(
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            start_time + cast(int, outport.latency_offset),
                            outport,
                            reads,
    
        def _get_y_position(self, graph_id):
            y_location = self._y_locations[graph_id]
    
            if y_location == None:
                # Assign the lowest row number not yet in use
                used = set(
                    loc for loc in self._y_locations.values() if loc is not None
                )
                possible = set(range(len(self._start_times))) - used
                y_location = min(possible)
    
                self._y_locations[graph_id] = y_location
    
            return OPERATION_GAP + y_location * (1 + OPERATION_GAP)
    
    
        def _plot_schedule(self, ax):
    
            def _draw_arrow(start, end, name="", laps=0):
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                if end[0] < start[0] or laps > 0:  # Wrap around
    
                    if start not in line_cache:
                        line = Line2D(
    
                            [start[0], self._schedule_time + 0.2],
                            [start[1], start[1]],
    
                            color=_SIGNAL_COLOR,
                            lw=SIGNAL_LINEWIDTH,
    
                        ax.add_line(line)
                        ax.text(
                            self._schedule_time + 0.2,
                            start[1],
                            name,
                            verticalalignment="center",
                        )
                    line = Line2D(
                        [-0.2, end[0]],
                        [end[1], end[1]],
                        color=_SIGNAL_COLOR,
                        lw=SIGNAL_LINEWIDTH,
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        -0.2,
                        end[1],
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        f"{name}: {laps}",
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        verticalalignment="center",
                        horizontalalignment="right",
                    )
    
                    line_cache.append(start)
    
    
                elif end[0] == start[0]:
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        [
    
                            start,
                            [start[0] + 0.2, start[1]],
                            [start[0] + 0.2, (start[1] + end[1]) / 2],
                            [start[0], (start[1] + end[1]) / 2],
                            [start[0] - 0.2, (start[1] + end[1]) / 2],
                            [start[0] - 0.2, end[1]],
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            end,
                        ],
                        [
                            Path.MOVETO,
                            Path.CURVE4,
                            Path.CURVE4,
                            Path.CURVE4,
                            Path.CURVE4,
                            Path.CURVE4,
                            Path.CURVE4,
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        ],
                    )
    
                    pp = PathPatch(
                        p,
                        fc='none',
                        ec=_SIGNAL_COLOR,
                        lw=SIGNAL_LINEWIDTH,
                        zorder=10,
                    )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        [
    
                            start,
                            [(start[0] + end[0]) / 2, start[1]],
                            [(start[0] + end[0]) / 2, end[1]],
                            end,
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        ],
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        [Path.MOVETO, Path.CURVE4, Path.CURVE4, Path.CURVE4],
                    )
    
                    pp = PathPatch(
                        p,
                        fc='none',
                        ec=_SIGNAL_COLOR,
                        lw=SIGNAL_LINEWIDTH,
                        zorder=10,
                    )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            def _draw_offset_arrow(
                start, end, start_offset, end_offset, name="", laps=0
            ):
                _draw_arrow(
                    [start[0] + start_offset[0], start[1] + start_offset[1]],
                    [end[0] + end_offset[0], end[1] + end_offset[1]],
                    name=name,
                    laps=laps,
                )
    
    
            ytickpositions = []
            yticklabels = []
    
            ax.set_axisbelow(True)
            ax.grid()
    
            for graph_id, op_start_time in self._start_times.items():
                ypos = -self._get_y_position(graph_id)
                op = self._sfg.find_by_id(graph_id)
    
                # Rewrite to make better use of NumPy
    
                latency_coords, execution_time_coords = op.get_plot_coordinates()
                _x, _y = zip(*latency_coords)
                x = np.array(_x)
                y = np.array(_y)
    
                xy = np.stack((x + op_start_time, y + ypos))
                p = Polygon(xy.T, fc=_LATENCY_COLOR)
                ax.add_patch(p)
    
                if execution_time_coords:
                    _x, _y = zip(*execution_time_coords)
                    x = np.array(_x)
                    y = np.array(_y)
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        x + op_start_time,
                        y + ypos,
    
                        color=_EXECUTION_TIME_COLOR,
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        linewidth=3,
                    )
    
                ytickpositions.append(ypos + 0.5)
    
                yticklabels.append(self._sfg.find_by_id(graph_id).name)
    
            for graph_id, op_start_time in self._start_times.items():
                op = self._sfg.find_by_id(graph_id)
    
                _, out_coords = op.get_io_coordinates()
    
                source_ypos = -self._get_y_position(graph_id)
    
                for output_port in op.outputs:
                    for output_signal in output_port.signals:
                        dest_op = output_signal.destination.operation
                        dest_start_time = self._start_times[dest_op.graph_id]
    
                        dest_ypos = -self._get_y_position(dest_op.graph_id)
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        (
                            dest_in_coords,
                            _,
                        ) = (
                            output_signal.destination.operation.get_io_coordinates()
                        )
                        _draw_offset_arrow(
                            out_coords[output_port.index],
                            dest_in_coords[output_signal.destination.index],
                            [op_start_time, source_ypos],
                            [dest_start_time, dest_ypos],
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            laps=self._laps[output_signal.graph_id],
                        )
    
            ax.set_yticks(ytickpositions)
            ax.set_yticklabels(yticklabels)
    
    
            # Get operation with maximum position
    
            max_pos_graph_id = max(self._y_locations, key=self._y_locations.get)
            yposmin = -self._get_y_position(max_pos_graph_id) - OPERATION_GAP
    
            ax.axis([-1, self._schedule_time + 1, yposmin, 1])
    
            ax.xaxis.set_major_locator(MaxNLocator(integer=True))
    
            ax.add_line(
                Line2D([0, 0], [yposmin, 1], linestyle="--", color="black")
            )
    
            ax.add_line(
                Line2D(
                    [self._schedule_time, self._schedule_time],
    
                    [yposmin, 1],
    
                    linestyle="--",
                    color="black",
                )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            )
    
        def _reset_y_locations(self):
            self._y_locations = self._y_locations = defaultdict(lambda: None)
    
    
        def plot_schedule(self) -> None:
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            self._get_figure().show()
    
        def _get_figure(self):
    
            fig, ax = plt.subplots()
            self._plot_schedule(ax)
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            return fig
    
    
        def _repr_svg_(self):
    
            fig, ax = plt.subplots()
            self._plot_schedule(ax)
    
            f = io.StringIO()
    
            fig.savefig(f, format="svg")
    
    
            return f.getvalue()