Skip to content
Snippets Groups Projects
schema.py 6.73 KiB
Newer Older
  • Learn to ignore specific revisions
  • """@package docstring
    This module contains the Schema class.
    TODO: More info
    """
    
    from typing import Dict, List
    
    from b_asic.signal_flow_graph import SFG
    from b_asic.graph_component import GraphID
    from b_asic.operation import Operation
    
    from b_asic.special_operations import *
    
    
    
    class Schema:
        """A class that represents an SFG with scheduled Operations."""
    
        _sfg: SFG
        _start_times: Dict[GraphID, int]
        _laps: Dict[GraphID, List[int]]
        _schedule_time: int
        _cyclic: bool
        _resolution: int
    
        def __init__(self, sfg: SFG, schedule_time: int = None, cyclic: bool = False, resolution: int = 1, scheduling_alg: str = "ASAP"):
    
            self._sfg = sfg
            self._start_times = dict()
            self._laps = dict()
            self._cyclic = cyclic
            self._resolution = resolution
    
            self._memory_elements = dict()
    
    
            if scheduling_alg == "ASAP":
                self._schedule_asap()
            else:
                raise NotImplementedError(f"No algorithm with name: {scheduling_alg} defined.")
    
            max_end_time = 0
            for op_id, op_start_time in self._start_times.items():
                op = self._sfg.find_by_id(op_id)
                for outport in op.outputs:
                    max_end_time = max(max_end_time, op_start_time + outport.latency_offset)
    
            if not self._cyclic:
                if schedule_time is None:
                    self._schedule_time = max_end_time
                elif schedule_time < max_end_time:
                    raise ValueError("Too short schedule time for non-cyclic Schedule entered.")
                else:
                    self._schedule_time = schedule_time
    
        def start_time_of_operation(self, op_id: GraphID):
            """Get the start time of the operation with the specified by the op_id."""
            assert op_id in self._start_times, "No operation with the specified op_id in this schema."
            return self._start_times[op_id]
    
        def forward_slack(self, op_id):
            raise NotImplementedError
    
        def backward_slack(self, op_id):
            raise NotImplementedError
    
        def print_slacks(self):
            raise NotImplementedError
    
        def _schedule_asap(self):
            pl = self._sfg.get_precedence_list()
    
            if len(pl) < 2:
                print("Empty signal flow graph cannot be scheduled.")
                return
    
            non_schedulable_ops = set((outp.operation.graph_id for outp in pl[0]))
    
            for outport in pl[1]:
                op = outport.operation
                if op not in self._start_times:
                    # Set start time of all operations in the first iter to 0
                    self._start_times[op.graph_id] = 0
    
            for outports in pl[2:]:
                for outport in outports:
                    op = outport.operation
                    if op.graph_id not in self._start_times:
                        # Schedule the operation if it doesn't have a start time yet.
                        op_start_time = 0
                        for inport in op.inputs:
                            assert len(inport.signals) == 1, "Error in scheduling, dangling input port detected."
                            assert inport.signals[0].source is not None, "Error in scheduling, signal with no source detected."
                            source_port = inport.signals[0].source
    
                            source_end_time = None
                            if source_port.operation.graph_id in non_schedulable_ops:
                                source_end_time = 0
                            else:
                                source_op_time = self._start_times[source_port.operation.graph_id]
    
                                assert source_port.latency_offset is not None, f"Output port: {source_port.index} of operation: \
                                        {source_port.operation.graph_id} has no latency-offset."
                                assert inport.latency_offset is not None, f"Input port: {inport.index} of operation: \
                                        {inport.operation.graph_id} has no latency-offset."
    
                                source_end_time = source_op_time + source_port.latency_offset
    
                            op_start_time_from_in = source_end_time - inport.latency_offset
                            op_start_time = max(op_start_time, op_start_time_from_in)
    
                        self._start_times[op.graph_id] = op_start_time
    
    
        def get_memory_elements(self):
    
            operation_orderd = self._sfg.get_operations_topological_order()
            
            for op in operation_orderd:
                if isinstance(op, Input) or isinstance(op, Output):
                    pass
                
                for key in self._start_times:
                    if op.graph_id == key:
                        for i in range(len(op.outputs)):
                            time_list = []
                            start_time = self._start_times.get(op.graph_id)+op.outputs[i].latency_offset
                            time_list.append(start_time)
                            for j in range(len(op.outputs[i].signals)):
                                new_op = self.get_op_after_delay(op.outputs[i].signals[j].destination.operation, op.outputs[i].signals[j].destination)
                                
                                end_start_time = self._start_times.get(new_op[0].graph_id)
                                end_start_time_latency_offset = new_op[1].latency_offset
                                
                                if end_start_time_latency_offset is None:
                                    end_start_time_latency_offset = 0
                                if end_start_time is None:
                                    end_time = self._schedule_time
                                else:
                                    end_time = end_start_time + end_start_time_latency_offset
                                
                                time_list.append(end_time)
                                read_name = op.name
                                write_name = new_op[0].name
                                key_name = read_name + "->" + write_name
                                self._memory_elements[key_name] = time_list
                                
    
    
        def get_op_after_delay(self, op, destination):
            if isinstance(op, Delay):
                for i in range(len(op.outputs[0].signals)):
                    connected_op = op.outputs[0].signals[i].destination.operation
                    dest = op.outputs[0].signals[i].destination
                    return self.get_op_after_delay(connected_op, dest)
            
            return [op, destination]
    
    
    
        def print_memory_elements(self):
    
            self.get_memory_elements()
            output_string = ""
    
            for key in self._memory_elements:
    
                output_string += key
                output_string += ": start time: " 
                output_string += str(self._memory_elements[key][0])
                output_string += " end time: "
                output_string += str(self._memory_elements[key][1])
                output_string += '\n'
            print(output_string)