Skip to content
Snippets Groups Projects
schedule.py 15.7 KiB
Newer Older
  • Learn to ignore specific revisions
  • """B-ASIC Schedule Module.
    
    Contains the schedule class for scheduling operations in an SFG.
    
    from collections import defaultdict
    from typing import Dict, List, Optional, Tuple
    import matplotlib.pyplot as plt
    from matplotlib.ticker import MaxNLocator
    import numpy as np
    from scipy import interpolate
    import sys
    import io
    
    
    from b_asic.signal_flow_graph import SFG
    from b_asic.graph_component import GraphID
    
    from b_asic.special_operations import Delay, Output
    
    from b_asic import OutputPort, Signal
    
    class Schedule:
        """Schedule of an SFG with scheduled Operations."""
    
    
        _sfg: SFG
        _start_times: Dict[GraphID, int]
        _laps: Dict[GraphID, List[int]]
        _schedule_time: int
        _cyclic: bool
        _resolution: int
    
        def __init__(self, sfg: SFG, schedule_time: Optional[int] = None, cyclic: bool = False, resolution: int = 1, scheduling_alg: str = "ASAP"):
    
            """Construct a Schedule from an SFG."""
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            self._start_times = {}
    
            self._laps = defaultdict(lambda: 0)
    
            self._cyclic = cyclic
            self._resolution = resolution
    
            if scheduling_alg == "ASAP":
                self._schedule_asap()
            else:
                raise NotImplementedError(
                    f"No algorithm with name: {scheduling_alg} defined.")
    
    
            max_end_time = self._get_max_end_time()
    
            if schedule_time is None:
                self._schedule_time = max_end_time
            elif schedule_time < max_end_time:
                raise ValueError(
                    f"Too short schedule time. Minimum is {max_end_time}.")
            else:
                self._schedule_time = schedule_time
    
    
        def start_time_of_operation(self, op_id: GraphID) -> int:
            """Get the start time of the operation with the specified by the op_id."""
    
            assert op_id in self._start_times, "No operation with the specified op_id in this schedule."
    
        def _get_max_end_time(self) -> int:
            max_end_time = 0
            for op_id, op_start_time in self._start_times.items():
                op = self._sfg.find_by_id(op_id)
                for outport in op.outputs:
                    max_end_time = max(
                        max_end_time, op_start_time + outport.latency_offset)
            return max_end_time
    
    
        def forward_slack(self, op_id: GraphID) -> int:
    
            assert op_id in self._start_times, "No operation with the specified op_id in this schedule."
    
            slack = sys.maxsize
            output_slacks = self._forward_slacks(op_id)
            # Make more pythonic
            for signal_slacks in output_slacks.values():
                for signal_slack in signal_slacks.values():
                    slack = min(slack, signal_slack)
            return slack
    
        def _forward_slacks(self, op_id: GraphID) -> Dict["OutputPort", Dict["Signal", int]]:
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            ret = {}
    
            start_time = self._start_times[op_id]
            op = self._sfg.find_by_id(op_id)
            for output_port in op.outputs:
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                output_slacks = {}
    
                available_time = start_time + output_port.latency_offset
    
                for signal in output_port.signals:
                    usage_time = (signal.destination.latency_offset +
                                 self._start_times[signal.destination.operation.graph_id] +
                                 self._schedule_time*self._laps[signal.graph_id])
                    output_slacks[signal] = usage_time - available_time
                ret[output_port] = output_slacks
            return ret
    
    
        def backward_slack(self, op_id: GraphID) -> int:
    
            assert op_id in self._start_times, "No operation with the specified op_id in this schedule."
    
            slack = sys.maxsize
            input_slacks = self._backward_slacks(op_id)
            # Make more pythonic
            for signal_slacks in input_slacks.values():
                for signal_slack in signal_slacks.values():
                    slack = min(slack, signal_slack)
            return slack
    
        def _backward_slacks(self, op_id: GraphID) -> Dict["OutputPort", Dict["Signal", int]]:
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            ret = {}
    
            start_time = self._start_times[op_id]
            op = self._sfg.find_by_id(op_id)
            for input_port in op.inputs:
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                input_slacks = {}
    
                usage_time = start_time + input_port.latency_offset
    
                for signal in input_port.signals:
                    available_time = (signal.source.latency_offset +
                                 self._start_times[signal.source.operation.graph_id] -
                                 self._schedule_time*self._laps[signal.graph_id])
                    input_slacks[signal] = usage_time - available_time
                ret[input_port] = input_slacks
            return ret
    
    
        def slacks(self, op_id: GraphID) -> Tuple[int, int]:
    
            assert op_id in self._start_times, "No operation with the specified op_id in this schedule."
    
            return (self.backward_slack(op_id), self.forward_slack(op_id))
    
    
        def print_slacks(self) -> None:
            raise NotImplementedError
    
    
        def set_schedule_time(self, time: int) -> "Schedule":
    
            assert self._get_max_end_time() < time, "New schedule time to short."
            self._schedule_time = time
            return self
    
        @property
        def schedule_time(self) -> int:
            return self._schedule_time
    
    
        def increase_time_resolution(self, factor: int) -> "Schedule":
    
            raise NotImplementedError
    
    
        def decrease_time_resolution(self, factor: int) -> "Schedule":
    
            raise NotImplementedError
    
    
        def move_operation(self, op_id: GraphID, time: int) -> "Schedule":
            assert op_id in self._start_times, "No operation with the specified op_id in this schedule."
    
    
            (backward_slack, forward_slack) = self.slacks(op_id)
            if time < 0:
                if -time > backward_slack:
                    raise ValueError
            else:
                if time > forward_slack:
                    raise ValueError
    
            tmp_start = self._start_times[op_id] + time
            new_start = tmp_start % self._schedule_time
    
            # Update input laps
            input_slacks = self._backward_slacks(op_id)
            for in_port, signal_slacks in input_slacks.items():
                tmp_usage = tmp_start + in_port.latency_offset
                new_usage = tmp_usage % self._schedule_time
                for signal, signal_slack in signal_slacks.items():
                    new_slack = signal_slack + time
                    old_laps = self._laps[signal.graph_id]
                    tmp_prev_available = tmp_usage - new_slack
                    prev_available = tmp_prev_available % self._schedule_time
                    laps = new_slack // self._schedule_time
                    if new_usage < prev_available:
                        laps += 1
                    print([signal_slack, new_slack, old_laps, laps, new_usage, prev_available, tmp_usage, tmp_prev_available])
                    self._laps[signal.graph_id] = laps
    
            # Update output laps
            output_slacks = self._forward_slacks(op_id)
            for out_port, signal_slacks in output_slacks.items():
                tmp_available = tmp_start + out_port.latency_offset
                new_available = tmp_available % self._schedule_time
                for signal, signal_slack in signal_slacks.items():
                    new_slack = signal_slack - time
                    tmp_next_usage = tmp_available + new_slack
                    next_usage = tmp_next_usage % self._schedule_time
                    laps = new_slack // self._schedule_time
                    if next_usage < new_available:
                        laps += 1
                    if new_available == 0 and new_slack > 0:
                        laps += 1
                    self._laps[signal.graph_id] = laps
    
    
            # Set new start time
            self._start_times[op_id] = new_start
            return self
    
        def _remove_delays(self) -> None:
            delay_list = self._sfg.find_by_type_name(Delay.type_name())
            while delay_list:
                delay_op = delay_list[0]
                delay_input_id = delay_op.input(0).signals[0].graph_id
                delay_output_ids = [sig.graph_id for sig in delay_op.output(0).signals]
                self._sfg = self._sfg.remove_operation(delay_op.graph_id)
                for output_id in delay_output_ids:
                    self._laps[output_id] += 1 + self._laps[delay_input_id]
                del self._laps[delay_input_id]
                delay_list = self._sfg.find_by_type_name(Delay.type_name())
    
    
        def _schedule_asap(self) -> None:
            pl = self._sfg.get_precedence_list()
    
            if len(pl) < 2:
                print("Empty signal flow graph cannot be scheduled.")
                return
    
    
            non_schedulable_ops = set()
            for outport in pl[0]:
                op = outport.operation
                if op.type_name()  not in [Delay.type_name()]:
                    if op.graph_id not in self._start_times:
                        # Set start time of all operations in the first iter to 0
                        self._start_times[op.graph_id] = 0
                else:
                    non_schedulable_ops.add(op.graph_id)
    
                if op.graph_id not in self._start_times:
    
                    # Set start time of all operations in the first iter to 0
                    self._start_times[op.graph_id] = 0
    
            for outports in pl[2:]:
                for outport in outports:
                    op = outport.operation
                    if op.graph_id not in self._start_times:
                        # Schedule the operation if it doesn't have a start time yet.
                        op_start_time = 0
                        for inport in op.inputs:
                            assert len(
                                inport.signals) == 1, "Error in scheduling, dangling input port detected."
                            assert inport.signals[0].source is not None, "Error in scheduling, signal with no source detected."
                            source_port = inport.signals[0].source
    
                            source_end_time = None
                            if source_port.operation.graph_id in non_schedulable_ops:
                                source_end_time = 0
                            else:
                                source_op_time = self._start_times[source_port.operation.graph_id]
    
                                assert source_port.latency_offset is not None, f"Output port: {source_port.index} of operation: \
                                        {source_port.operation.graph_id} has no latency-offset."
                                assert inport.latency_offset is not None, f"Input port: {inport.index} of operation: \
                                        {inport.operation.graph_id} has no latency-offset."
    
                                source_end_time = source_op_time + source_port.latency_offset
    
                            op_start_time_from_in = source_end_time - inport.latency_offset
                            op_start_time = max(
                                op_start_time, op_start_time_from_in)
    
                        self._start_times[op.graph_id] = op_start_time
    
            for output in self._sfg.find_by_type_name(Output.type_name()):
                source_port = output.inputs[0].signals[0].source
                if source_port.operation.graph_id in non_schedulable_ops:
                    self._start_times[output.graph_id] = 0
                else:
                    self._start_times[output.graph_id] = self._start_times[source_port.operation.graph_id] + source_port.latency_offset
            self._remove_delays()
    
        def _plot_schedule(self):
            def _draw_arrow2(start, end):
                if end[0] < start[0]: # Wrap around
                    plt.plot([start[0], self._schedule_time], [start[1], start[1]])
                    plt.plot([0, end[0]], [end[1], end[1]])
                elif end[0] == start[0]:
                    plt.plot([start[0], start[0] + 0.2, start[0] + 0.2, start[0] - 0.2, start[0] - 0.2, start[0]],
                             [start[1], start[1], (start[1] + end[1])/2, (start[1] + end[1])/2, end[1], end[1]])
                else:
                    plt.plot([start[0], (start[0] + end[0])/2, (start[0] + end[0])/2, end[0]],
                             [start[1], start[1], end[1], end[1]])
    
            def _draw_spline(x, y):
                l = len(x)
                t = np.linspace(0, 1, l-2, endpoint=True)
                t = np.append([0, 0, 0], t)
                t = np.append(t, [1, 1, 1])
                tck = [t, [x, y], 3]
                u3 = np.linspace(0, 1, 50, endpoint=True)
                out = interpolate.splev(u3, tck)
                plt.plot(out[0], out[1], color='black')
    
            def _draw_arrow(start, end, name="", laps=0):
                if end[0] < start[0] or laps > 0: # Wrap around
                    plt.plot([start[0], self._schedule_time + 0.2], [start[1], start[1]], color='black')
                    plt.plot([-0.2, end[0]], [end[1], end[1]], color='black')
                    plt.text(self._schedule_time + 0.2, start[1], name, verticalalignment='center')
                    plt.text(-0.2, end[1], "{}: {}".format(name, laps), verticalalignment='center', horizontalalignment='right')
    
                elif end[0] == start[0]:
                    _draw_spline([start[0], start[0] + 0.2, start[0] + 0.2, start[0] - 0.2, start[0] - 0.2, start[0]],
                                 [start[1], start[1], (start[1] + end[1])/2, (start[1] + end[1])/2, end[1], end[1]])
                else:
                    _draw_spline([start[0], (start[0] + end[0])/2, (start[0] + end[0])/2, end[0]],
                                 [start[1], start[1], end[1], end[1]])
    
            def _draw_offset_arrow(start, end, start_offset, end_offset, name="", laps=0):
                _draw_arrow([start[0] + start_offset[0], start[1] + start_offset[1]],
                            [end[0] + end_offset[0], end[1] + end_offset[1]], name=name, laps=laps)
    
            ypos = 0.5
            ytickpositions = []
            yticklabels = []
            plt.grid(zorder=0.5)
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            ypositions = {}
    
            for op_id, op_start_time in self._start_times.items():
                op = self._sfg.find_by_id(op_id)
                latency_coords, execution_time_coords = op.get_plot_coordinates()
                _x, _y = zip(*latency_coords)
                x = np.array(_x)
                y = np.array(_y)
                plt.fill(x + op_start_time, y + ypos)
    
                if execution_time_coords:
                    _x, _y = zip(*execution_time_coords)
                    x = np.array(_x)
                    y = np.array(_y)
                    plt.plot(x + op_start_time, y + ypos, color='black', linewidth=3, alpha=0.5)
    
                ytickpositions.append(ypos + 0.5)
                yticklabels.append(self._sfg.find_by_id(op_id).name)
                ypositions[op_id] = ypos
                ypos += 1.5
    
            for op_id, op_start_time in self._start_times.items():
                op = self._sfg.find_by_id(op_id)
                _, out_coords = op.get_io_coordinates()
                source_ypos = ypositions[op_id]
                for output_port in op.outputs:
                    for output_signal in output_port.signals:
                        dest_op = output_signal.destination.operation
                        dest_start_time = self._start_times[dest_op.graph_id]
                        dest_ypos = ypositions[dest_op.graph_id]
                        dest_in_coords, _ = output_signal.destination.operation.get_io_coordinates()
                        _draw_offset_arrow(out_coords[output_port.index],
                                           dest_in_coords[output_signal.destination.index],
                                           [op_start_time, source_ypos],
                                           [dest_start_time, dest_ypos], name=op_id,
                                           laps=self._laps[output_signal.graph_id])
    
            plt.yticks(ytickpositions, yticklabels)
            plt.axis([-1, self._schedule_time+1, 0, ypos])
            plt.gca().xaxis.set_major_locator(MaxNLocator(integer=True))
            plt.plot([0, 0], [0, ypos], linestyle='--', color='black')
            plt.plot([self._schedule_time, self._schedule_time], [0, ypos], linestyle='--', color='black')
    
        def plot_schedule(self) -> None:
            plt.figure()
            self._plot_schedule()
            plt.show()
    
        def _repr_svg_(self):
            plt.figure()
            self._plot_schedule()
            f = io.StringIO()
            plt.savefig(f, format='svg')
    
            return f.getvalue()