Skip to content
Snippets Groups Projects
schema.py 8.29 KiB
Newer Older
  • Learn to ignore specific revisions
  • """@package docstring
    This module contains the Schema class.
    TODO: More info
    """
    
    
    from io import StringIO
    from typing import Dict, List, Set
    import math
    
    
    from b_asic.signal_flow_graph import SFG
    from b_asic.graph_component import GraphID
    from b_asic.operation import Operation
    
    
    class Schema:
        """A class that represents an SFG with scheduled Operations."""
    
        _sfg: SFG
        _start_times: Dict[GraphID, int]
        _laps: Dict[GraphID, List[int]]
        _schedule_time: int
        _cyclic: bool
        _resolution: int
    
        _max_end_op_id: GraphID
        _max_end_time: int
        _non_schedulable_ops: Set[GraphID]
    
    
        def __init__(self, sfg: SFG, schedule_time: int = None, cyclic: bool = False, resolution: int = 1, scheduling_alg: str = "ASAP"):
    
            self._sfg = sfg
            self._start_times = dict()
            self._laps = dict()
            self._cyclic = cyclic
            self._resolution = resolution
    
            if scheduling_alg == "ASAP":
                self._schedule_asap()
            else:
    
                raise ValueError(f"No algorithm with name: {scheduling_alg} defined.")
    
            # Set latest time of the schema.
            self._latest_op_time = 0
    
            for op_id, op_start_time in self._start_times.items():
                op = self._sfg.find_by_id(op_id)
                for outport in op.outputs:
    
                    if op_start_time + outport.latency_offset > self._latest_op_time:
                        self._max_end_op_id = op_id
                        self._max_end_time = op_start_time + outport.latency_offset
    
    
            if not self._cyclic:
                if schedule_time is None:
    
                    self.set_schedule_time(self._max_end_time)
    
                    self.set_schedule_time(schedule_time)
            else:
                raise NotImplementedError("Cyclic schedules not implemented yet.")
    
        def __str__(self):
            string_io = StringIO()
            for op in self._sfg.get_operations_topological_order():
                if op.graph_id in self._start_times:
                    string_io.write("name: " + op.name + ", \tid: " + op.graph_id + ", \t")
                    string_io.write("start_time: " + str(self._start_times[op.graph_id]) + ", \t")
                    string_io.write("backwards_slack: " + str(self.backwards_slack(op.graph_id)) + ", \t")
                    string_io.write("forwards_slack: " + str(self.forwards_slack(op.graph_id)) + "\n")
            return string_io.getvalue()
    
        @property
        def schedule_time(self) -> int:
            """Get the schedule time of the Schema."""
            return self._schedule_time
    
        def set_schedule_time(self, schedule_time: int) -> None:
            """Set the schedule time to the specified value, if the specified value is shorter then 
            what is required by the schedule then raises a ValueError."""
            if not self._cyclic:
                if schedule_time >= self._max_end_time:
    
                else:
                    raise ValueError("Specified schedule time is shorter then the max length of the ")
            else:
                raise NotImplementedError("Cyclic schedules not implemented yet.")
    
        def get_start_time_of_operation(self, op_id: GraphID) -> int:
    
            """Get the start time of the operation with the specified by the op_id."""
            assert op_id in self._start_times, "No operation with the specified op_id in this schema."
            return self._start_times[op_id]
    
    
        def get_laps_of_operation_input_port(self, op_id: GraphID, input_index: int) -> int:
            """Get the laps of input port with the entered index of the operation with the specified op_id."""
            assert op_id in self._laps, "No operation with the specidied op_id in this schema."
            assert input_index >= 0 and input_index < len(
                self._laps[op_id]), "Input index for the specified operation out of bounds."
            return self._laps[op_id][input_index]
    
        def change_start_time_of_operation(self, op_id: GraphID, relative_change: int = None, new_start_time: int = None) -> None:
            """Changes the start time of the operation with the specified op_id either by the 
            relative_change if specified or to the new_start_time if that is specified. If both 
            are specified then the new_start_time value is used.
            A negative relative_change represents decreasing the operations start time.
            If the change to the start time would break dependencies then the change is cancelled
            and a message is printed to the user in the terminal.
            """
            assert op_id in self._start_times, "No operation with the specified op_id in this schema."
            if new_start_time is not None:
                relative_change = new_start_time - self._start_times[op_id]
    
            if relative_change is not None:
                if relative_change > 0:
                    forwards_slack = self.forwards_slack(op_id)
                    if relative_change > forwards_slack:
                        print(f"The specified relative change of {relative_change} is larger than the forwards \
                            slack of the operation of {forwards_slack}.")
                        return
                else:
                    backwards_slack = self.backwards_slack(op_id)
                    if abs(relative_change) > backwards_slack:
                        print(f"The specified relative change of {abs(relative_change)} is larger than the forwards \
                            slack of the operation of {forwards_slack}.")
                        return
    
                self._start_times[op_id] += relative_change
    
            else:
                raise ValueError("At least one of the parameters relative_change or new_start_time has to be specified.")
    
        def backwards_slack(self, op_id: GraphID) -> int:
            return 0
            op_start_time = self.get_start_time_of_operation(op_id)
            min_backwards_slack = math.inf
            for i, inp in enumerate(self._sfg.find_by_id(op_id).inputs):
                inp_time = op_start_time + inp.latency_offset
    
                source_op_time = self.get_start_time_of_operation(inp.connected_source.operation.graph_id)
                source_outp_time = source_op_time + inp.connected_source.latency_offset
    
        def forwards_slack(self, op_id: GraphID) -> int:
            return 0
            assert op_id in self._start_times, "No operation with specified op_id in this schema."
    
            pl = self._sfg.get_precedence_list()
    
            if len(pl) < 2:
                print("Empty signal flow graph cannot be scheduled.")
                return
    
    
            self._non_schedulable_ops = set((outp.operation.graph_id for outp in pl[0]))
    
    
            for outport in pl[1]:
                op = outport.operation
                if op not in self._start_times:
                    # Set start time of all operations in the first iter to 0
                    self._start_times[op.graph_id] = 0
    
            for outports in pl[2:]:
                for outport in outports:
                    op = outport.operation
                    if op.graph_id not in self._start_times:
                        # Schedule the operation if it doesn't have a start time yet.
                        op_start_time = 0
                        for inport in op.inputs:
                            assert len(inport.signals) == 1, "Error in scheduling, dangling input port detected."
                            assert inport.signals[0].source is not None, "Error in scheduling, signal with no source detected."
                            source_port = inport.signals[0].source
    
                            source_end_time = None
    
                            if source_port.operation.graph_id in self._non_schedulable_ops:
    
                                source_end_time = 0
                            else:
                                source_op_time = self._start_times[source_port.operation.graph_id]
    
                                assert source_port.latency_offset is not None, f"Output port: {source_port.index} of operation: \
                                        {source_port.operation.graph_id} has no latency-offset."
                                assert inport.latency_offset is not None, f"Input port: {inport.index} of operation: \
                                        {inport.operation.graph_id} has no latency-offset."
    
                                source_end_time = source_op_time + source_port.latency_offset
    
                            op_start_time_from_in = source_end_time - inport.latency_offset
                            op_start_time = max(op_start_time, op_start_time_from_in)
    
                        self._start_times[op.graph_id] = op_start_time