Skip to content
Snippets Groups Projects
signal_flow_graph.py 84.9 KiB
Newer Older
  • Learn to ignore specific revisions
  •             elif lista[0][0] == 't' and "out" in lista[-1]:
                    row = len(delay_element_used) + int(lista[-1].replace("out", ""))
                    column = int(lista[0].replace("t", ""))
                    temp_value = 1
                    for element in lista:
                        if "cmul" in element:
                            temp_value *= self.find_by_id(element).value
    
    Samuel Fagerlund's avatar
    Samuel Fagerlund committed
                        for key, value in addition_with_constant.items():
                            if key == element:
                                temp_value += int(value)
    
                    mat_content[row, column] += temp_value
            return matrix_answer, mat_content, matrix_in
    
        def find_all_paths(self, graph: dict, start: str, end: str, path=[]) -> list:
            """
            Returns all paths in graph from node start to node end
    
            Parameters
            ----------
            graph : dictionary
                The dictionary that are to be searched for loops.
            start : key in dictionary graph
                The "node" in the dictionary that are set as the start point.
            end : key in dictionary graph
                The "node" in the dictionary that are set as the end point.
    
            Returns
            -------
            The state-space representation of the SFG.
            """
            path = path + [start]
            if start == end:
                return [path]
            if start not in graph:
                return []
            paths = []
            for node in graph[start]:
                if node not in path:
                    newpaths = self.find_all_paths(graph, node, end, path)
                    for newpath in newpaths:
                        paths.append(newpath)
            return paths
    
    
        def edit(self) -> dict[str, "SFG"]:
    
            """Edit SFG in GUI."""
            from b_asic.GUI.main_window import start_editor
    
    
            return start_editor(self)
    
        def unfold(self, factor: int) -> "SFG":
    
            Unfold the SFG *factor* times. Return a new SFG without modifying the original.
    
            Inputs and outputs are ordered with early inputs first. That is for an SFG
    
    Frans Skarman's avatar
    Frans Skarman committed
            with n inputs, the first n inputs are the inputs at time t, the next n
            inputs are the inputs at time t+1, the next n at t+2 and so on.
    
            Parameters
            ----------
    
            if factor == 0:
    
                raise ValueError("Unfolding 0 times removes the SFG")
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            sfg = self()  # copy the sfg
    
            inputs = sfg.input_operations
            outputs = sfg.output_operations
    
            # Remove all delay elements in the SFG and replace each one
            # with one input operation and one output operation
            for delay in sfg.find_by_type_name(Delay.type_name()):
                i = Input(name="input_" + delay.graph_id)
                o = Output(
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                    src0=delay.input(0).signals[0].source, name="output_" + delay.graph_id
                )
    
                inputs.append(i)
                outputs.append(o)
    
                # move all outgoing signals from the delay to the new input operation
                while len(delay.output(0).signals) > 0:
                    signal = delay.output(0).signals[0]
                    destination = signal.destination
                    destination.remove_signal(signal)
                    signal.remove_source()
                    destination.connect(i.output(0))
    
                delay.input(0).signals[0].remove_source()
                delay.input(0).clear()
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            new_sfg = SFG(inputs, outputs)  # The new sfg without the delays
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            sfgs = [new_sfg() for _ in range(factor)]  # Copy the SFG factor times
    
            # Add suffixes to all graphIDs and names in order to keep them separated
    
            for i in range(factor):
                for operation in sfgs[i].operations:
                    suffix = f'_{i}'
                    operation.graph_id = operation.graph_id + suffix
    
                    if operation.name[:7] not in ['', 'input_t', 'output_']:
                        operation.name = operation.name + suffix
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            input_name_to_idx = {}  # save the input port indices for future reference
    
            new_inputs = []
            # For each copy of the SFG, create new input operations for every "original"
            # input operation and connect them to begin creating the unfolded SFG
            for i in range(factor):
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                for port, operation in zip(sfgs[i].inputs, sfgs[i].input_operations):
    
                    if not operation.name.startswith("input_t"):
                        i = Input()
                        new_inputs.append(i)
                        port.connect(i)
    
                        # If the input was created earlier when removing the delays
                        # then just save the index
                        input_name_to_idx[operation.name] = port.index
    
            # Connect the original outputs in the same way as the inputs
            # Also connect the copies of the SFG together according to a formula
            # from the TSTE87 course material, and save the number of delays for
            # each interconnection
            new_outputs = []
            delay_placements = {}
            for i in range(factor):
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                for port, operation in zip(sfgs[i].outputs, sfgs[i].output_operations):
    
                    if not operation.name.startswith("output_t"):
                        new_outputs.append(Output(port))
                    else:
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        index = operation.name[8:]  # Remove the "output_t" prefix
    
                        j = (i + 1) % factor
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        number_of_delays_between = (i + 1) // factor
    
                        input_port = sfgs[j].input(input_name_to_idx["input_t" + index])
                        input_port.connect(port)
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        delay_placements[port] = [i, number_of_delays_between]
                sfgs[i].graph_id = (
                    f'sfg{i}'  # deterministically set the graphID of the sfgs
                )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            sfg = SFG(new_inputs, new_outputs)  # create a new SFG to remove floating nodes
    
            # Insert the interconnect delays according to what is saved in delay_placements
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            for port, val in delay_placements.items():
    
                i, no_of_delays = val
                for _ in range(no_of_delays):
                    sfg = sfg.insert_operation_after(f'sfg{i}.{port.index}', Delay())
    
            # Flatten all the copies of the original SFG
            for i in range(factor):
                sfg.find_by_id(f'sfg{i}').connect_external_signals_to_components()
                sfg = sfg()
    
        @property
        def is_linear(self) -> bool:
            return all(op.is_linear for op in self.split())
    
        @property
        def is_constant(self) -> bool:
            return all(output.is_constant for output in self._output_operations)
    
        def get_used_type_names(self) -> list[TypeName]:
    
            """Get a list of all TypeNames used in the SFG."""
            ret = list({op.type_name() for op in self.operations})
            ret.sort()
            return ret
    
        def get_used_graph_ids(self) -> set[GraphID]:
    
            """Get a list of all GraphID:s used in the SFG."""
            ret = set({op.graph_id for op in self.operations})
            sorted(ret)
            return ret