Skip to content
Snippets Groups Projects
signal_flow_graph.py 87.7 KiB
Newer Older
  • Learn to ignore specific revisions
  •                             shape='rectangle',
                                height="0.1",
                                width="0.1",
                            )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            sub.node(
    
                                port_string,
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                label=port.operation.graph_id,
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            )
    
            # Creates edges for each output port and creates nodes for each operation
            # and edges for them as well
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            for i, ports in enumerate(p_list):
    
                    source_label = port.operation.graph_id
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                    node_node = port.name
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        destination = cast(InputPort, signal.destination)
    
                        destination_label = destination.operation.graph_id
                        destination_node = (
                            destination_label + "In"
                            if isinstance(destination.operation, Delay)
                            else destination_label
                        )
                        pg.edge(node_node, destination_node)
    
                            destination_node,
                            label=destination_label,
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            shape=_OPERATION_SHAPE[destination.operation.type_name()],
    
                    source_node = (
                        source_label + "Out"
                        if port.operation.type_name() == Delay.type_name()
                        else source_label
                    )
    
                    pg.edge(source_node, node_node)
    
                    pg.node(
                        source_node,
                        label=source_label,
                        shape=_OPERATION_SHAPE[port.operation.type_name()],
                    )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            Print a representation of the SFG precedence list to the standard out.
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            If the precedence list already has been calculated then it uses the
            cached version, otherwise it calculates the precedence list and then
            prints it.
            """
    
            precedence_list = self.get_precedence_list()
    
            line = "-" * 120
            out_str = StringIO()
            out_str.write(line)
    
            printed_ops = set()
    
    
            for iter_num, iterable in enumerate(precedence_list, start=1):
                for outport_num, outport in enumerate(iterable, start=1):
    
                    if outport not in printed_ops:
                        # Only print once per operation, even if it has multiple outports
                        out_str.write("\n")
                        out_str.write(str(iter_num))
                        out_str.write(".")
                        out_str.write(str(outport_num))
                        out_str.write(" \t")
                        out_str.write(str(outport.operation))
                        printed_ops.add(outport)
    
                out_str.write("\n")
                out_str.write(line)
    
            print(out_str.getvalue())
    
        def get_operations_topological_order(self) -> Iterable[Operation]:
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            """
            Return an Iterable of the Operations in the SFG in topological order.
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            Feedback loops makes an absolutely correct topological order impossible,
            so an approximate topological Order is returned in such cases in this
            implementation.
    
            if self._operations_topological_order:
                return self._operations_topological_order
    
            no_inputs_queue = deque(
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                list(filter(lambda op: op.input_count == 0, self.operations))
            )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            remaining_inports_per_operation = {op: op.input_count for op in self.operations}
    
    
            # Maps number of input counts to a queue of seen objects with such a size.
    
            seen_with_inputs_dict: Dict[int, Deque] = defaultdict(deque)
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            if len(no_inputs_queue) == 0:
                raise ValueError("Illegal SFG state, dangling signals in SFG.")
    
            visited = {first_op}
    
            p_queue_entry_num = itertools.count()
    
            # Negative priority as max-heap popping is wanted
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            p_queue.put((-first_op.output_count, -next(p_queue_entry_num), first_op))
    
    
            operations_left = len(self.operations) - 1
    
            seen_but_not_visited_count = 0
    
            while operations_left > 0:
                while not p_queue.empty():
                    op = p_queue.get()[2]
    
                    operations_left -= 1
                    top_order.append(op)
                    visited.add(op)
    
                    for neighbor_op in op.subsequent_operations:
                        if neighbor_op not in visited:
                            remaining_inports_per_operation[neighbor_op] -= 1
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            remaining_inports = remaining_inports_per_operation[neighbor_op]
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                    (
                                        -neighbor_op.output_count,
                                        -next(p_queue_entry_num),
                                        neighbor_op,
                                    )
                                )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                    seen_with_inputs_dict[remaining_inports + 1].remove(
                                        neighbor_op
                                    )
    
                                else:
                                    seen.add(neighbor_op)
                                    seen_but_not_visited_count += 1
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                seen_with_inputs_dict[remaining_inports].append(neighbor_op)
    
                # Check if have to fetch Operations from somewhere else since p_queue
                # is empty
    
                if operations_left > 0:
                    # First check if can fetch from Operations with no input ports
                    if no_inputs_queue:
                        new_op = no_inputs_queue.popleft()
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        p_queue.put(
                            (
                                -new_op.output_count,
                                -next(p_queue_entry_num),
                                new_op,
                            )
                        )
    
                    # Else fetch operation with the lowest input count that is not zero
    
                        for i in itertools.count(start=1):
    
                            seen_inputs_queue = seen_with_inputs_dict[i]
                            if seen_inputs_queue:
                                new_op = seen_inputs_queue.popleft()
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                p_queue.put(
                                    (
                                        -new_op.output_count,
                                        -next(p_queue_entry_num),
                                        new_op,
                                    )
                                )
    
                        raise RuntimeError("Disallowed structure in SFG detected")
    
    
            self._operations_topological_order = top_order
            return self._operations_topological_order
    
        def set_latency_of_type(self, type_name: TypeName, latency: int) -> None:
    
            """
            Set the latency of all components with the given type name.
    
            Parameters
            ----------
            type_name : TypeName
    
                The type name of the operation. For example, obtained as
                ``Addition.type_name()``.
    
            latency : int
                The latency of the operation.
            """
    
            for op in self.find_by_type_name(type_name):
    
                cast(Operation, op).set_latency(latency)
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        def set_execution_time_of_type(
            self, type_name: TypeName, execution_time: int
        ) -> None:
    
            """
            Set the execution time of all operations with the given type name.
    
            Parameters
            ----------
            type_name : TypeName
    
                The type name of the operation. For example, obtained as
                ``Addition.type_name()``.
    
            execution_time : int
                The execution time of the operation.
    
            for op in self.find_by_type_name(type_name):
    
                cast(Operation, op).execution_time = execution_time
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        def set_latency_offsets_of_type(
            self, type_name: TypeName, latency_offsets: Dict[str, int]
        ) -> None:
    
            """
            Set the latency offsets of all operations with the given type name.
    
            Parameters
            ----------
            type_name : TypeName
    
                The type name of the operation. For example, obtained as
                ``Addition.type_name()``.
    
            latency_offsets : {"in1": int, ...}
                The latency offsets of the inputs and outputs.
    
            for op in self.find_by_type_name(type_name):
    
                cast(Operation, op).set_latency_offsets(latency_offsets)
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        def _traverse_for_precedence_list(
            self, first_iter_ports: List[OutputPort]
        ) -> List[List[OutputPort]]:
    
            # Find dependencies of output ports and input ports.
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            remaining_inports_per_operation = {op: op.input_count for op in self.operations}
    
    
            # Traverse output ports for precedence
            curr_iter_ports = first_iter_ports
            precedence_list = []
    
            while curr_iter_ports:
                # Add the found ports to the current iter
                precedence_list.append(curr_iter_ports)
    
                next_iter_ports = []
    
                for outport in curr_iter_ports:
                    for signal in outport.signals:
    
                        new_input_port = signal.destination
    
                        # Do not traverse over delays.
    
                        if new_input_port is not None and not isinstance(
                            new_input_port.operation, Delay
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        ):
    
                            new_op = new_input_port.operation
    
                            remaining_inports_per_operation[new_op] -= 1
                            if remaining_inports_per_operation[new_op] == 0:
                                next_iter_ports.extend(new_op.outputs)
    
                curr_iter_ports = next_iter_ports
    
            return precedence_list
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        def _add_component_unconnected_copy(
            self, original_component: GraphComponent
        ) -> GraphComponent:
    
            if original_component in self._original_components_to_new:
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                id = (
                    original_component.name
                    if original_component.name
                    else (
                        original_component.graph_id
                        if original_component.graph_id
                        else original_component.type_name()
                    )
                )
                raise ValueError(f"Tried to add duplicate SFG component: {id}")
    
            new_component = original_component.copy()
    
            self._original_components_to_new[original_component] = new_component
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            if not new_component.graph_id or new_component.graph_id in self._used_ids:
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                new_id = self._graph_id_generator.next_id(
                    new_component.type_name(), self._used_ids
                )
    
                new_component.graph_id = new_id
            self._used_ids.add(new_component.graph_id)
            self._components_by_id[new_component.graph_id] = new_component
    
            self._components_by_name[new_component.name].append(new_component)
            return new_component
    
        def _add_operation_connected_tree_copy(self, start_op: Operation) -> None:
            op_stack = deque([start_op])
            while op_stack:
                original_op = op_stack.pop()
                # Add or get the new copy of the operation.
                if original_op not in self._original_components_to_new:
    
                    new_op = cast(
                        Operation,
                        self._add_component_unconnected_copy(original_op),
                    )
    
                    self._components_dfs_order.append(new_op)
                    self._operations_dfs_order.append(new_op)
                else:
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                    new_op = cast(Operation, self._original_components_to_new[original_op])
    
    
                # Connect input ports to new signals.
                for original_input_port in original_op.inputs:
                    if original_input_port.signal_count < 1:
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        id = (
                            original_op.name
                            if original_op.name
                            else (
                                original_op.graph_id
                                if original_op.graph_id
                                else original_op.type_name()
                            )
                        )
                        raise ValueError(f"Unconnected input port in SFG. Operation: {id}")
    
    
                    for original_signal in original_input_port.signals:
                        # Check if the signal is one of the SFG's input signals.
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        if original_signal in self._original_input_signals_to_indices:
    
                            # New signal already created during first step of constructor.
    
                            new_signal = cast(
                                Signal,
                                self._original_components_to_new[original_signal],
                            )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                new_op.input(original_input_port.index)
                            )
    
                            source = cast(OutputPort, new_signal.source)
    
                                [new_signal, source.operation]
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            )
    
                            if source.operation not in self._operations_dfs_order:
    
                                self._operations_dfs_order.append(source.operation)
    
    
                        # Check if the signal has not been added before.
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        elif original_signal not in self._original_components_to_new:
    
    Frans Skarman's avatar
    Frans Skarman committed
                                dest = (
                                    original_signal.destination.operation.name
                                    if original_signal.destination is not None
                                    else "None"
                                )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                    "Dangling signal without source in SFG"
    
    Frans Skarman's avatar
    Frans Skarman committed
                                    f" (destination: {dest})"
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                self._add_component_unconnected_copy(original_signal),
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                new_op.input(original_input_port.index)
                            )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            original_connected_op = original_signal.source.operation
    
                            # Check if connected Operation has been added before.
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            if original_connected_op in self._original_components_to_new:
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                    self._original_components_to_new[original_connected_op],
    
                                # Set source to the already added operations port.
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                new_signal.set_source(
    
                                    component.output(original_signal.source.index)
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                )
    
                            else:
                                # Create new operation, set signal source to it.
    
                                new_connected_op = cast(
                                    Operation,
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                    self._add_component_unconnected_copy(
                                        original_connected_op
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                )
                                new_signal.set_source(
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                    new_connected_op.output(original_signal.source.index)
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                )
    
    
                                self._components_dfs_order.append(new_connected_op)
                                self._operations_dfs_order.append(new_connected_op)
    
                                # Add connected operation to queue of operations to visit.
                                op_stack.append(original_connected_op)
    
                # Connect output ports.
                for original_output_port in original_op.outputs:
                    for original_signal in original_output_port.signals:
                        # Check if the signal is one of the SFG's output signals.
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        if original_signal in self._original_output_signals_to_indices:
    
                            # New signal already created during first step of constructor.
    
                            new_signal = cast(
                                Signal,
                                self._original_components_to_new[original_signal],
                            )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            new_signal.set_source(new_op.output(original_output_port.index))
    
                            destination = cast(InputPort, new_signal.destination)
    
                                [new_signal, destination.operation]
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            self._operations_dfs_order.append(destination.operation)
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        elif original_signal not in self._original_components_to_new:
    
                            if original_signal.source is None:
                                raise ValueError(
    
                                    "Dangling signal ({original_signal}) without"
                                    " source in SFG"
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            new_signal = cast(
                                Signal,
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                self._add_component_unconnected_copy(original_signal),
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            new_signal.set_source(new_op.output(original_output_port.index))
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            original_destination = cast(
                                InputPort, original_signal.destination
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            )
    
                            if original_destination is None:
                                raise ValueError(
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                    f"Signal ({original_signal}) without destination in SFG"
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            original_connected_op = original_destination.operation
    
                            if original_connected_op is None:
                                raise ValueError(
    
                                    "Signal with empty destination port"
                                    f" ({original_destination}) in SFG"
    
                            # Check if connected operation has been added.
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            if original_connected_op in self._original_components_to_new:
    
                                # Set destination to the already connected operations port.
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                new_signal.set_destination(
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                    cast(
                                        Operation,
                                        self._original_components_to_new[
                                            original_connected_op
                                        ],
                                    ).input(original_destination.index)
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                )
    
                            else:
                                # Create new operation, set destination to it.
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                new_connected_op = cast(
                                    Operation,
                                    (
                                        self._add_component_unconnected_copy(
                                            original_connected_op
                                        )
                                    ),
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                )
                                new_signal.set_destination(
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                    new_connected_op.input(original_destination.index)
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                )
    
    
                                self._components_dfs_order.append(new_connected_op)
                                self._operations_dfs_order.append(new_connected_op)
    
    
                                # Add connected operation to the queue of operations
                                # to visit.
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        def _evaluate_source(
            self,
            src: OutputPort,
            results: MutableResultMap,
            delays: MutableDelayMap,
            prefix: str,
            bits_override: Optional[int],
    
            quantize: bool,
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            deferred_delays: DelayQueue,
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                (prefix + "." + src.operation.graph_id)
                if prefix
                else src.operation.graph_id
            )
    
            key = src.operation.key(src.index, key_base)
            if key in results:
                value = results[key]
                if value is None:
                    raise RuntimeError(
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        "Direct feedback loop detected when evaluating operation."
                    )
    
                return value
    
            value = src.operation.current_output(src.index, delays, key_base)
            results[key] = value
            if value is None:
                value = self._do_evaluate_source(
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                    key_base,
                    key,
                    src,
                    results,
                    delays,
                    prefix,
                    bits_override,
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                    deferred_delays,
                )
    
            else:
                # Evaluate later. Use current value for now.
                deferred_delays.append((key_base, key, src))
            return value
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        def _do_evaluate_source(
            self,
            key_base: str,
            key: ResultKey,
            src: OutputPort,
            results: MutableResultMap,
            delays: MutableDelayMap,
            prefix: str,
            bits_override: Optional[int],
    
            quantize: bool,
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            deferred_delays: DelayQueue,
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            input_values = [
                self._evaluate_source(
                    input_port.signals[0].source,
                    results,
                    delays,
                    prefix,
                    bits_override,
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                    deferred_delays,
                )
                for input_port in src.operation.inputs
            ]
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                src.index,
                input_values,
                results,
                delays,
                key_base,
                bits_override,
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            show_signal_id: bool = False,
    
            engine: Optional[str] = None,
    
            branch_node: bool = True,
    
            port_numbering: bool = True,
            splines: str = "spline",
        ) -> Digraph:
    
            Return a Digraph of the SFG.
    
    
            Can be directly displayed in IPython.
    
    
            Parameters
            ----------
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            show_signal_id : bool, default: False
    
                If True, the graph_id:s of signals are shown.
    
            engine : str, optional
    
                Graphviz layout engine to be used, see https://graphviz.org/documentation/.
                Most common are "dot" and "neato". Default is None leading to dot.
    
            branch_node : bool, default: True
    
                Add a branch node in case the fan-out of a signal is two or more.
            port_numbering : bool, default: True
    
                Show the port number in case the number of ports (input or output) is two or
                more.
    
            splines : {"spline", "line", "ortho", "polyline", "curved"}, default: "spline"
                Spline style, see https://graphviz.org/docs/attrs/splines/ for more info.
    
    
            Returns
            -------
            Digraph
                Digraph of the SFG.
            """
            dg = Digraph()
    
            dg.attr(rankdir="LR", splines=splines)
            branch_nodes = set()
    
            if engine is not None:
                dg.engine = engine
    
            for op in self._components_by_id.values():
                if isinstance(op, Signal):
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                    source = cast(OutputPort, op.source)
                    destination = cast(InputPort, op.destination)
    
                    source_name = (
                        source.name
                        if branch_node and source.signal_count > 1
                        else source.operation.graph_id
                    )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                    label = op.graph_id if show_signal_id else None
    
                    taillabel = (
                        str(source.index)
                        if source.operation.output_count > 1
                        and (not branch_node or source.signal_count == 1)
                        and port_numbering
                        else None
                    )
                    headlabel = (
                        str(destination.index)
                        if destination.operation.input_count > 1 and port_numbering
                        else None
                    )
                    dg.edge(
                        source_name,
                        destination.operation.graph_id,
                        label=label,
                        taillabel=taillabel,
                        headlabel=headlabel,
                    )
                    if (
                        branch_node
                        and source.signal_count > 1
                        and source_name not in branch_nodes
                    ):
                        branch_nodes.add(source_name)
                        dg.node(source_name, shape='point')
                        taillabel = (
                            str(source.index)
                            if source.operation.output_count > 1 and port_numbering
                            else None
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        )
                        dg.edge(
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            source.operation.graph_id,
    
                            source_name,
                            arrowhead='none',
                            taillabel=taillabel,
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                    dg.node(
                        op.graph_id,
                        shape=_OPERATION_SHAPE[op.type_name()],
                        label=f"{op.name}\n({op.graph_id})" if op.name else None,
                    )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        def _repr_mimebundle_(self, include=None, exclude=None):
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            return self.sfg_digraph()._repr_mimebundle_(include=include, exclude=exclude)
    
        def _repr_jpeg_(self):
    
    Frans Skarman's avatar
    Frans Skarman committed
            return self.sfg_digraph()._repr_mimebundle_(include=["image/jpeg"])[
    
                "image/jpeg"
            ]
    
        def _repr_png_(self):
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            return self.sfg_digraph()._repr_mimebundle_(include=["image/png"])["image/png"]
    
        def _repr_svg_(self):
            return self.sfg_digraph()._repr_mimebundle_(include=["image/svg+xml"])[
                "image/svg+xml"
            ]
    
        # SVG is valid HTML. This is useful for e.g. sphinx-gallery
        _repr_html_ = _repr_svg_
    
    
        def show(
            self,
            fmt: Optional[str] = None,
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            show_signal_id: bool = False,
    
            engine: Optional[str] = None,
    
            branch_node: bool = True,
    
            port_numbering: bool = True,
            splines: str = "spline",
        ) -> None:
    
            Display a visual representation of the SFG using the default system viewer.
    
    
            Parameters
            ----------
    
            fmt : str, optional
    
                File format of the generated graph. Output formats can be found at
                https://www.graphviz.org/doc/info/output.html
    
                Most common are "pdf", "eps", "png", and "svg". Default is None which
                leads to PDF.
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            show_signal_id : bool, default: False
    
                If True, the graph_id:s of signals are shown.
    
            engine : str, optional
    
                Graphviz layout engine to be used, see https://graphviz.org/documentation/.
                Most common are "dot" and "neato". Default is None leading to dot.
    
            branch_node : bool, default: True
    
                Add a branch node in case the fan-out of a signal is two or more.
            port_numbering : bool, default: True
    
                Show the port number in case the number of ports (input or output) is two or
                more.
    
            splines : {"spline", "line", "ortho", "polyline", "curved"}, default: "spline"
                Spline style, see https://graphviz.org/docs/attrs/splines/ for more info.
    
            dg = self.sfg_digraph(
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                show_signal_id=show_signal_id,
    
                engine=engine,
                branch_node=branch_node,
                port_numbering=port_numbering,
                splines=splines,
            )
    
            if fmt is not None:
                dg.format = fmt
    
        def critical_path_time(self) -> int:
            """Return the time of the critical path."""
    
            # Import here needed to avoid circular imports
            from b_asic.schedule import Schedule
    
    
            return Schedule(self, algorithm="ASAP").schedule_time
    
    Samuel Fagerlund's avatar
    Samuel Fagerlund committed
        def _dfs(self, graph, start, end):
    
    Samuel Fagerlund's avatar
    Samuel Fagerlund committed
            """
            Find loop(s) in graph
    
            Parameters
            ----------
            graph : dictionary
                The dictionary that are to be searched for loops.
            start : key in dictionary graph
                The "node" in the dictionary that are set as the start point.
            end : key in dictionary graph
                The "node" in the dictionary that are set as the end point.
            """
            fringe = [(start, [])]
            while fringe:
                state, path = fringe.pop()
                if path and state == end:
                    yield path
                    continue
                for next_state in graph[state]:
                    if next_state in path:
                        continue
                    fringe.append((next_state, path + [next_state]))
    
    
        def iteration_period_bound(self) -> int:
            """
            Return the iteration period bound of the SFG.
    
            If -1, the SFG does not have any loops and therefore no iteration period bound.
    
    Samuel Fagerlund's avatar
    Samuel Fagerlund committed
    
            Returns
            -------
            The iteration period bound.
    
    Samuel Fagerlund's avatar
    Samuel Fagerlund committed
            inputs_used = []
            for used_input in self._used_ids:
                if 'in' in str(used_input):
                    used_input = used_input.replace("in", "")
                    inputs_used.append(int(used_input))
            if inputs_used == []:
                raise ValueError("No inputs to sfg")
            for input in inputs_used:
                input_op = self._input_operations[input]
            queue: Deque[Operation] = deque([input_op])
            visited: Set[Operation] = {input_op}
            dict_of_sfg = {}
            while queue:
                op = queue.popleft()
                for output_port in op.outputs:
                    if not (isinstance(op, Input) or isinstance(op, Output)):
                        dict_of_sfg[op.graph_id] = []
                    for signal in output_port.signals:
                        if signal.destination is not None:
                            new_op = signal.destination.operation
                            if not (isinstance(op, Input) or isinstance(op, Output)):
                                if not isinstance(new_op, Output):
                                    dict_of_sfg[op.graph_id] += [new_op.graph_id]
                            if new_op not in visited:
                                queue.append(new_op)
                                visited.add(new_op)
                        else:
                            raise ValueError("Destination does not exist")
    
    Samuel Fagerlund's avatar
    Samuel Fagerlund committed
            if not dict_of_sfg:
    
    Samuel Fagerlund's avatar
    Samuel Fagerlund committed
                raise ValueError(
                    "the SFG does not have any loops and therefore no iteration period bound."
                )
            cycles = [
                [node] + path
                for node in dict_of_sfg
    
    Samuel Fagerlund's avatar
    Samuel Fagerlund committed
                for path in self._dfs(dict_of_sfg, node, node)
    
    Samuel Fagerlund's avatar
    Samuel Fagerlund committed
            ]
    
    Samuel Fagerlund's avatar
    Samuel Fagerlund committed
            if not cycles:
    
    Samuel Fagerlund's avatar
    Samuel Fagerlund committed
                return -1
            op_and_latency = {}
            for op in self.operations:
                for lista in cycles:
                    for element in lista:
                        if op.type_name() not in op_and_latency:
                            op_and_latency[op.type_name()] = op.latency
            t_l_values = []
            for loop in cycles:
                loop.pop()
                time_of_loop = 0
                number_of_t_in_loop = 0
                for element in loop:
                    if ''.join([i for i in element if not i.isdigit()]) == 't':
                        number_of_t_in_loop += 1
                    for key, item in op_and_latency.items():
                        if key in element:
                            time_of_loop += item
                if number_of_t_in_loop in (0, 1):
                    t_l_values.append(time_of_loop)
                else:
                    t_l_values.append(time_of_loop / number_of_t_in_loop)
            return max(t_l_values)
    
        def state_space_representation(self):
            """
            Find the state-space representation of the SFG.
    
            Returns
            -------
            The state-space representation.
            """
            delay_element_used = []
            for delay_element in self._used_ids:
                if ''.join([i for i in delay_element if not i.isdigit()]) == 't':
                    delay_element_used.append(delay_element)
            delay_element_used.sort()
            input_index_used = []
            inputs_used = []
            output_index_used = []
            outputs_used = []
            for used_input in self._used_ids:
                if 'in' in str(used_input):
                    inputs_used.append(used_input)
                    input_index_used.append(int(used_input.replace("in", "")))
            for used_output in self._used_ids:
                if 'out' in str(used_output):
                    outputs_used.append(used_output)
                    output_index_used.append(int(used_output.replace("out", "")))
            if input_index_used == []:
                raise ValueError("No input(s) to sfg")
            if output_index_used == []:
                raise ValueError("No output(s) to sfg")
            for input in input_index_used:
                input_op = self._input_operations[input]
            queue: Deque[Operation] = deque([input_op])
            visited: Set[Operation] = {input_op}
            dict_of_sfg = {}
            while queue:
                op = queue.popleft()
                if isinstance(op, Output):
                    dict_of_sfg[op.graph_id] = []
                for output_port in op.outputs:
                    dict_of_sfg[op.graph_id] = []
                    for signal in output_port.signals:
                        if signal.destination is not None:
                            new_op = signal.destination.operation
                            dict_of_sfg[op.graph_id] += [new_op.graph_id]
                            if new_op not in visited:
                                queue.append(new_op)
                                visited.add(new_op)
                        else:
                            raise ValueError("Destination does not exist")
    
    Samuel Fagerlund's avatar
    Samuel Fagerlund committed
            if not dict_of_sfg:
                raise ValueError("Empty SFG")
    
            cycles = [
                [node] + path
                for node in dict_of_sfg
                if node[0] == 't'
    
    Samuel Fagerlund's avatar
    Samuel Fagerlund committed
                for path in self._dfs(dict_of_sfg, node, node)
    
            ]
            delay_loop_list = []
            for lista in cycles:
                if not len(lista) < 2:
                    temp_list = []
                    for element in lista:
                        temp_list.append(element)
                        if element[0] == 't' and len(temp_list) > 2:
                            delay_loop_list.append(temp_list)
                            temp_list = [element]
            state_space_lista = []
            [
                state_space_lista.append(x)
                for x in delay_loop_list
                if x not in state_space_lista
            ]
    
            mat_row = len(delay_element_used) + len(output_index_used)
            mat_col = len(delay_element_used) + len(input_index_used)
            mat_content = np.zeros((mat_row, mat_col))
            matrix_in = [0] * mat_col
            matrix_answer = [0] * mat_row
            for in_signal in inputs_used:
                matrix_in[len(delay_element_used) + int(in_signal.replace('in', ''))] = (
                    in_signal.replace('in', 'x')
                )
                for delay_element in delay_element_used:
                    matrix_answer[delay_element_used.index(delay_element)] = (
                        delay_element.replace('t', 'v')
                    )
                    matrix_in[delay_element_used.index(delay_element)] = (
                        delay_element.replace('t', 'v')
                    )
                    paths = self.find_all_paths(dict_of_sfg, in_signal, delay_element)
                    for lista in paths:
                        temp_list = []
                        for element in lista:
                            temp_list.append(element)
                            if element[0] == 't':
                                state_space_lista.append(temp_list)
                                temp_list = [element]
                for out_signal in outputs_used:
                    paths = self.find_all_paths(dict_of_sfg, in_signal, out_signal)
                    matrix_answer[
                        len(delay_element_used) + int(out_signal.replace('out', ''))
                    ] = out_signal.replace('out', 'y')
                    for lista in paths:
                        temp_list1 = []
                        for element in lista:
                            temp_list1.append(element)
                            if element[0] == 't':
                                state_space_lista.append(temp_list1)
                                temp_list1 = [element]
                            if "out" in element:
                                state_space_lista.append(temp_list1)
                                temp_list1 = []
            state_space_list_no_dup = []
            [
                state_space_list_no_dup.append(x)
                for x in state_space_lista
                if x not in state_space_list_no_dup
            ]
            for lista in state_space_list_no_dup:
                if "in" in lista[0] and lista[-1][0] == 't':
                    row = int(lista[-1].replace("t", ""))
                    column = len(delay_element_used) + int(lista[0].replace("in", ""))
                    temp_value = 1
                    for element in lista:
                        if "cmul" in element:
                            temp_value *= self.find_by_id(element).value
                    mat_content[row, column] += temp_value
                elif "in" in lista[0] and "out" in lista[-1]:
                    row = len(delay_element_used) + int(lista[-1].replace("out", ""))
                    column = len(delay_element_used) + int(lista[0].replace("in", ""))
                    temp_value = 1
                    for element in lista:
                        if "cmul" in element:
                            temp_value *= self.find_by_id(element).value
                    mat_content[row, column] += temp_value
                elif lista[0][0] == 't' and lista[-1][0] == 't':
                    row = int(lista[-1].replace("t", ""))
                    column = int(lista[0].replace("t", ""))
                    temp_value = 1
                    for element in lista:
                        if "cmul" in element:
                            temp_value *= self.find_by_id(element).value
                    mat_content[row, column] += temp_value
                elif lista[0][0] == 't' and "out" in lista[-1]:
                    row = len(delay_element_used) + int(lista[-1].replace("out", ""))
                    column = int(lista[0].replace("t", ""))
                    temp_value = 1
                    for element in lista:
                        if "cmul" in element:
                            temp_value *= self.find_by_id(element).value
                    mat_content[row, column] += temp_value
            return matrix_answer, mat_content, matrix_in
    
        def find_all_paths(self, graph: dict, start: str, end: str, path=[]) -> list:
            """
            Returns all paths in graph from node start to node end
    
            Parameters
            ----------
            graph : dictionary
                The dictionary that are to be searched for loops.
            start : key in dictionary graph
                The "node" in the dictionary that are set as the start point.
            end : key in dictionary graph
                The "node" in the dictionary that are set as the end point.
    
            Returns
            -------
            The state-space representation of the SFG.
            """
            path = path + [start]
            if start == end:
                return [path]
            if start not in graph:
                return []
            paths = []
            for node in graph[start]:
                if node not in path:
                    newpaths = self.find_all_paths(graph, node, end, path)
                    for newpath in newpaths:
                        paths.append(newpath)
            return paths
    
    
        def edit(self) -> Dict[str, "SFG"]:
    
            """Edit SFG in GUI."""
            from b_asic.GUI.main_window import start_editor
    
    
            return start_editor(self)
    
        def unfold(self, factor: int) -> "SFG":