Skip to content
Snippets Groups Projects
signal_flow_graph.py 88 KiB
Newer Older
  • Learn to ignore specific revisions
  • Samuel Fagerlund's avatar
    Samuel Fagerlund committed
                for path in self._dfs(dict_of_sfg, node, node)
    
            ]
            delay_loop_list = []
            for lista in cycles:
                if not len(lista) < 2:
                    temp_list = []
                    for element in lista:
                        temp_list.append(element)
    
                        if element[0] == "t" and len(temp_list) >= 2:
    
                            delay_loop_list.append(temp_list)
                            temp_list = [element]
            state_space_lista = []
            [
                state_space_lista.append(x)
                for x in delay_loop_list
                if x not in state_space_lista
            ]
            mat_row = len(delay_element_used) + len(output_index_used)
            mat_col = len(delay_element_used) + len(input_index_used)
            mat_content = np.zeros((mat_row, mat_col))
            matrix_in = [0] * mat_col
            matrix_answer = [0] * mat_row
            for in_signal in inputs_used:
    
                matrix_in[len(delay_element_used) + int(in_signal.replace("in", ""))] = (
                    in_signal.replace("in", "x")
    
                )
                for delay_element in delay_element_used:
                    matrix_answer[delay_element_used.index(delay_element)] = (
    
                        delay_element.replace("t", "v")
    
                    )
                    matrix_in[delay_element_used.index(delay_element)] = (
    
                        delay_element.replace("t", "v")
    
                    )
                    paths = self.find_all_paths(dict_of_sfg, in_signal, delay_element)
                    for lista in paths:
                        temp_list = []
                        for element in lista:
                            temp_list.append(element)
    
                            if element[0] == "t":
    
                                state_space_lista.append(temp_list)
                                temp_list = [element]
                for out_signal in outputs_used:
                    paths = self.find_all_paths(dict_of_sfg, in_signal, out_signal)
                    matrix_answer[
    
                        len(delay_element_used) + int(out_signal.replace("out", ""))
                    ] = out_signal.replace("out", "y")
    
                    for lista in paths:
                        temp_list1 = []
                        for element in lista:
                            temp_list1.append(element)
    
                            if element[0] == "t":
    
                                state_space_lista.append(temp_list1)
                                temp_list1 = [element]
                            if "out" in element:
                                state_space_lista.append(temp_list1)
                                temp_list1 = []
            state_space_list_no_dup = []
            [
                state_space_list_no_dup.append(x)
                for x in state_space_lista
                if x not in state_space_list_no_dup
            ]
            for lista in state_space_list_no_dup:
    
                if "in" in lista[0] and lista[-1][0] == "t":
    
                    row = int(lista[-1].replace("t", ""))
                    column = len(delay_element_used) + int(lista[0].replace("in", ""))
                    temp_value = 1
                    for element in lista:
                        if "cmul" in element:
                            temp_value *= self.find_by_id(element).value
    
    Samuel Fagerlund's avatar
    Samuel Fagerlund committed
                        for key, value in addition_with_constant.items():
                            if key == element:
                                temp_value += int(value)
    
                    mat_content[row, column] += temp_value
                elif "in" in lista[0] and "out" in lista[-1]:
                    row = len(delay_element_used) + int(lista[-1].replace("out", ""))
                    column = len(delay_element_used) + int(lista[0].replace("in", ""))
                    temp_value = 1
                    for element in lista:
                        if "cmul" in element:
                            temp_value *= self.find_by_id(element).value
    
    Samuel Fagerlund's avatar
    Samuel Fagerlund committed
                        for key, value in addition_with_constant.items():
                            if key == element:
                                temp_value += int(value)
    
                    mat_content[row, column] += temp_value
    
                elif lista[0][0] == "t" and lista[-1][0] == "t":
    
                    row = int(lista[-1].replace("t", ""))
                    column = int(lista[0].replace("t", ""))
                    temp_value = 1
                    for element in lista:
                        if "cmul" in element:
                            temp_value *= self.find_by_id(element).value
    
    Samuel Fagerlund's avatar
    Samuel Fagerlund committed
                        for key, value in addition_with_constant.items():
                            if key == element:
                                temp_value += int(value)
    
                    mat_content[row, column] += temp_value
    
                elif lista[0][0] == "t" and "out" in lista[-1]:
    
                    row = len(delay_element_used) + int(lista[-1].replace("out", ""))
                    column = int(lista[0].replace("t", ""))
                    temp_value = 1
                    for element in lista:
                        if "cmul" in element:
                            temp_value *= self.find_by_id(element).value
    
    Samuel Fagerlund's avatar
    Samuel Fagerlund committed
                        for key, value in addition_with_constant.items():
                            if key == element:
                                temp_value += int(value)
    
                    mat_content[row, column] += temp_value
            return matrix_answer, mat_content, matrix_in
    
    
        def find_all_paths(
            self, graph: dict, start: str, end: str, path: list | None = None
        ) -> list:
    
            """
            Returns all paths in graph from node start to node end
    
            Parameters
            ----------
            graph : dictionary
                The dictionary that are to be searched for loops.
            start : key in dictionary graph
                The "node" in the dictionary that are set as the start point.
            end : key in dictionary graph
                The "node" in the dictionary that are set as the end point.
    
            Returns
            -------
            The state-space representation of the SFG.
            """
    
            path = path + [start]
            if start == end:
                return [path]
            if start not in graph:
                return []
            paths = []
            for node in graph[start]:
                if node not in path:
                    newpaths = self.find_all_paths(graph, node, end, path)
                    for newpath in newpaths:
                        paths.append(newpath)
            return paths
    
    
        def operation_counter(self) -> Counter:
            """Return a Counter with the number of instances for each type."""
            return Counter(op.type_name() for op in self.operations)
    
    
        def edit(self) -> dict[str, "SFG"]:
    
            """Edit SFG in GUI."""
            from b_asic.GUI.main_window import start_editor
    
    
            return start_editor(self)
    
        def unfold(self, factor: int) -> "SFG":
    
            Unfold the SFG *factor* times. Return a new SFG without modifying the original.
    
            Inputs and outputs are ordered with early inputs first. That is for an SFG
    
    Frans Skarman's avatar
    Frans Skarman committed
            with n inputs, the first n inputs are the inputs at time t, the next n
            inputs are the inputs at time t+1, the next n at t+2 and so on.
    
            Parameters
            ----------
    
            if factor == 0:
    
                raise ValueError("Unfolding 0 times removes the SFG")
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            sfg = self()  # copy the sfg
    
            inputs = sfg.input_operations
            outputs = sfg.output_operations
    
            # Remove all delay elements in the SFG and replace each one
            # with one input operation and one output operation
    
            for delay in sfg.find_by_type(Delay):
    
                i = Input(name="input_" + delay.graph_id)
                o = Output(
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                    src0=delay.input(0).signals[0].source, name="output_" + delay.graph_id
                )
    
                inputs.append(i)
                outputs.append(o)
    
                # move all outgoing signals from the delay to the new input operation
                while len(delay.output(0).signals) > 0:
                    signal = delay.output(0).signals[0]
                    destination = signal.destination
                    destination.remove_signal(signal)
                    signal.remove_source()
                    destination.connect(i.output(0))
    
                delay.input(0).signals[0].remove_source()
                delay.input(0).clear()
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            new_sfg = SFG(inputs, outputs)  # The new sfg without the delays
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            sfgs = [new_sfg() for _ in range(factor)]  # Copy the SFG factor times
    
            # Add suffixes to all graphIDs and names in order to keep them separated
    
            for i in range(factor):
                for operation in sfgs[i].operations:
    
                    suffix = f"_{i}"
    
                    operation.graph_id = operation.graph_id + suffix
    
                    if operation.name[:7] not in ["", "input_t", "output_"]:
    
                        operation.name = operation.name + suffix
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            input_name_to_idx = {}  # save the input port indices for future reference
    
            new_inputs = []
            # For each copy of the SFG, create new input operations for every "original"
            # input operation and connect them to begin creating the unfolded SFG
            for i in range(factor):
    
                for port, operation in zip(
                    sfgs[i].inputs, sfgs[i].input_operations, strict=True
                ):
    
                    if not operation.name.startswith("input_t"):
                        i = Input()
                        new_inputs.append(i)
                        port.connect(i)
    
                        # If the input was created earlier when removing the delays
                        # then just save the index
                        input_name_to_idx[operation.name] = port.index
    
            # Connect the original outputs in the same way as the inputs
            # Also connect the copies of the SFG together according to a formula
            # from the TSTE87 course material, and save the number of delays for
            # each interconnection
            new_outputs = []
            delay_placements = {}
            for i in range(factor):
    
                for port, operation in zip(
                    sfgs[i].outputs, sfgs[i].output_operations, strict=True
                ):
    
                    if not operation.name.startswith("output_t"):
                        new_outputs.append(Output(port))
                    else:
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        index = operation.name[8:]  # Remove the "output_t" prefix
    
                        j = (i + 1) % factor
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        number_of_delays_between = (i + 1) // factor
    
                        input_port = sfgs[j].input(input_name_to_idx["input_t" + index])
                        input_port.connect(port)
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        delay_placements[port] = [i, number_of_delays_between]
                sfgs[i].graph_id = (
    
                    f"sfg{i}"  # deterministically set the graphID of the sfgs
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            sfg = SFG(new_inputs, new_outputs)  # create a new SFG to remove floating nodes
    
            # Insert the interconnect delays according to what is saved in delay_placements
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            for port, val in delay_placements.items():
    
                i, no_of_delays = val
                for _ in range(no_of_delays):
    
                    sfg = sfg.insert_operation_after(f"sfg{i}.{port.index}", Delay())
    
    
            # Flatten all the copies of the original SFG
            for i in range(factor):
    
                sfg.find_by_id(f"sfg{i}").connect_external_signals_to_components()
    
        @property
        def is_linear(self) -> bool:
            return all(op.is_linear for op in self.split())
    
        @property
        def is_constant(self) -> bool:
            return all(output.is_constant for output in self._output_operations)
    
        def get_used_type_names(self) -> list[TypeName]:
    
            """Get a list of all TypeNames used in the SFG."""
            ret = list({op.type_name() for op in self.operations})
            ret.sort()
            return ret
    
        def get_used_graph_ids(self) -> set[GraphID]:
    
            """Get a list of all GraphID:s used in the SFG."""
            ret = set({op.graph_id for op in self.operations})
            sorted(ret)
            return ret