Skip to content
Snippets Groups Projects
signal_flow_graph.py 81.3 KiB
Newer Older
  • Learn to ignore specific revisions
  •         # Remove all delay elements in the SFG and replace each one
            # with one input operation and one output operation
            for delay in sfg.find_by_type_name(Delay.type_name()):
                i = Input(name="input_" + delay.graph_id)
                o = Output(
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                    src0=delay.input(0).signals[0].source, name="output_" + delay.graph_id
                )
    
                inputs.append(i)
                outputs.append(o)
    
                # move all outgoing signals from the delay to the new input operation
                while len(delay.output(0).signals) > 0:
                    signal = delay.output(0).signals[0]
                    destination = signal.destination
                    destination.remove_signal(signal)
                    signal.remove_source()
                    destination.connect(i.output(0))
    
                delay.input(0).signals[0].remove_source()
                delay.input(0).clear()
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            new_sfg = SFG(inputs, outputs)  # The new sfg without the delays
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            sfgs = [new_sfg() for _ in range(factor)]  # Copy the SFG factor times
    
            # Add suffixes to all graphIDs and names in order to keep them separated
    
            for i in range(factor):
                for operation in sfgs[i].operations:
                    suffix = f'_{i}'
                    operation.graph_id = operation.graph_id + suffix
    
                    if operation.name[:7] not in ['', 'input_t', 'output_']:
                        operation.name = operation.name + suffix
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            input_name_to_idx = {}  # save the input port indices for future reference
    
            new_inputs = []
            # For each copy of the SFG, create new input operations for every "original"
            # input operation and connect them to begin creating the unfolded SFG
            for i in range(factor):
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                for port, operation in zip(sfgs[i].inputs, sfgs[i].input_operations):
    
                    if not operation.name.startswith("input_t"):
                        i = Input()
                        new_inputs.append(i)
                        port.connect(i)
    
                        # If the input was created earlier when removing the delays
                        # then just save the index
                        input_name_to_idx[operation.name] = port.index
    
            # Connect the original outputs in the same way as the inputs
            # Also connect the copies of the SFG together according to a formula
            # from the TSTE87 course material, and save the number of delays for
            # each interconnection
            new_outputs = []
            delay_placements = {}
            for i in range(factor):
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                for port, operation in zip(sfgs[i].outputs, sfgs[i].output_operations):
    
                    if not operation.name.startswith("output_t"):
                        new_outputs.append(Output(port))
                    else:
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        index = operation.name[8:]  # Remove the "output_t" prefix
    
                        j = (i + 1) % factor
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        number_of_delays_between = (i + 1) // factor
    
                        input_port = sfgs[j].input(input_name_to_idx["input_t" + index])
                        input_port.connect(port)
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        delay_placements[port] = [i, number_of_delays_between]
                sfgs[i].graph_id = (
                    f'sfg{i}'  # deterministically set the graphID of the sfgs
                )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            sfg = SFG(new_inputs, new_outputs)  # create a new SFG to remove floating nodes
    
            # Insert the interconnect delays according to what is saved in delay_placements
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            for port, val in delay_placements.items():
    
                i, no_of_delays = val
                for _ in range(no_of_delays):
                    sfg = sfg.insert_operation_after(f'sfg{i}.{port.index}', Delay())
    
            # Flatten all the copies of the original SFG
            for i in range(factor):
                sfg.find_by_id(f'sfg{i}').connect_external_signals_to_components()
                sfg = sfg()
    
        @property
        def is_linear(self) -> bool:
            return all(op.is_linear for op in self.split())
    
        @property
        def is_constant(self) -> bool:
            return all(output.is_constant for output in self._output_operations)
    
    
        def get_used_type_names(self) -> List[TypeName]:
            """Get a list of all TypeNames used in the SFG."""
            ret = list({op.type_name() for op in self.operations})
            ret.sort()
            return ret