Skip to content
Snippets Groups Projects
signal_flow_graph.py 50.6 KiB
Newer Older
  • Learn to ignore specific revisions
  • 
        def _add_operation_connected_tree_copy(self, start_op: Operation) -> None:
            op_stack = deque([start_op])
            while op_stack:
                original_op = op_stack.pop()
                # Add or get the new copy of the operation.
                if original_op not in self._original_components_to_new:
    
                    new_op = cast(
                        Operation,
                        self._add_component_unconnected_copy(original_op),
                    )
    
                    self._components_dfs_order.append(new_op)
                    self._operations_dfs_order.append(new_op)
                else:
    
                    new_op = cast(
                        Operation, self._original_components_to_new[original_op]
                    )
    
    
                # Connect input ports to new signals.
                for original_input_port in original_op.inputs:
                    if original_input_port.signal_count < 1:
                        raise ValueError("Unconnected input port in SFG")
    
                    for original_signal in original_input_port.signals:
                        # Check if the signal is one of the SFG's input signals.
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        if (
                            original_signal
                            in self._original_input_signals_to_indices
                        ):
    
                            # New signal already created during first step of constructor.
    
                            new_signal = cast(
                                Signal,
                                self._original_components_to_new[original_signal],
                            )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                new_op.input(original_input_port.index)
                            )
    
                            source = cast(OutputPort, new_signal.source)
    
                                [new_signal, source.operation]
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            )
    
                            self._operations_dfs_order.append(source.operation)
    
    
                        # Check if the signal has not been added before.
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        elif (
                            original_signal not in self._original_components_to_new
                        ):
    
                            if original_signal.source is None:
                                raise ValueError(
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                    "Dangling signal without source in SFG"
                                )
    
                            new_signal = cast(
                                Signal,
                                self._add_component_unconnected_copy(
                                    original_signal
                                ),
                            )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                new_op.input(original_input_port.index)
                            )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            original_connected_op = (
                                original_signal.source.operation
                            )
    
                            # Check if connected Operation has been added before.
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            if (
                                original_connected_op
                                in self._original_components_to_new
                            ):
    
                                component = cast(
                                    Operation,
                                    self._original_components_to_new[
                                        original_connected_op
                                    ],
                                )
    
                                # Set source to the already added operations port.
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                new_signal.set_source(
    
                                    component.output(original_signal.source.index)
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                )
    
                            else:
                                # Create new operation, set signal source to it.
    
                                new_connected_op = cast(
                                    Operation,
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                    self._add_component_unconnected_copy(
                                        original_connected_op
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                )
                                new_signal.set_source(
                                    new_connected_op.output(
                                        original_signal.source.index
                                    )
                                )
    
    
                                self._components_dfs_order.append(new_connected_op)
                                self._operations_dfs_order.append(new_connected_op)
    
                                # Add connected operation to queue of operations to visit.
                                op_stack.append(original_connected_op)
    
                # Connect output ports.
                for original_output_port in original_op.outputs:
                    for original_signal in original_output_port.signals:
                        # Check if the signal is one of the SFG's output signals.
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        if (
                            original_signal
                            in self._original_output_signals_to_indices
                        ):
    
                            # New signal already created during first step of constructor.
    
                            new_signal = cast(
                                Signal,
                                self._original_components_to_new[original_signal],
                            )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                new_op.output(original_output_port.index)
                            )
    
                            destination = cast(InputPort, new_signal.destination)
    
                                [new_signal, destination.operation]
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            )
    
                                destination.operation
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        elif (
                            original_signal not in self._original_components_to_new
                        ):
    
                            if original_signal.source is None:
                                raise ValueError(
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                    "Dangling signal without source in SFG"
                                )
    
    
                            new_signal = self._add_component_unconnected_copy(
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                original_signal
                            )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                new_op.output(original_output_port.index)
                            )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            original_connected_op = (
                                original_signal.destination.operation
                            )
    
                            if original_connected_op is None:
                                raise ValueError(
                                    "Signal without destination in SFG"
                                )
    
                            # Check if connected operation has been added.
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            if (
                                original_connected_op
                                in self._original_components_to_new
                            ):
    
                                # Set destination to the already connected operations port.
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                new_signal.set_destination(
                                    self._original_components_to_new[
                                        original_connected_op
                                    ].input(original_signal.destination.index)
                                )
    
                            else:
                                # Create new operation, set destination to it.
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                new_connected_op = (
                                    self._add_component_unconnected_copy(
                                        original_connected_op
                                    )
                                )
                                new_signal.set_destination(
                                    new_connected_op.input(
                                        original_signal.destination.index
                                    )
                                )
    
    
                                self._components_dfs_order.append(new_connected_op)
                                self._operations_dfs_order.append(new_connected_op)
    
                                # Add connected operation to the queue of operations to visit.
                                op_stack.append(original_connected_op)
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        def _evaluate_source(
            self,
            src: OutputPort,
            results: MutableResultMap,
            delays: MutableDelayMap,
            prefix: str,
            bits_override: Optional[int],
            truncate: bool,
            deferred_delays: DelayQueue,
        ) -> Number:
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                (prefix + "." + src.operation.graph_id)
                if prefix
                else src.operation.graph_id
            )
    
            key = src.operation.key(src.index, key_base)
            if key in results:
                value = results[key]
                if value is None:
                    raise RuntimeError(
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        "Direct feedback loop detected when evaluating operation."
                    )
    
                return value
    
            value = src.operation.current_output(src.index, delays, key_base)
            results[key] = value
            if value is None:
                value = self._do_evaluate_source(
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                    key_base,
                    key,
                    src,
                    results,
                    delays,
                    prefix,
                    bits_override,
                    truncate,
                    deferred_delays,
                )
    
            else:
                # Evaluate later. Use current value for now.
                deferred_delays.append((key_base, key, src))
            return value
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        def _do_evaluate_source(
            self,
            key_base: str,
            key: ResultKey,
            src: OutputPort,
            results: MutableResultMap,
            delays: MutableDelayMap,
            prefix: str,
            bits_override: Optional[int],
            truncate: bool,
            deferred_delays: DelayQueue,
        ) -> Number:
            input_values = [
                self._evaluate_source(
                    input_port.signals[0].source,
                    results,
                    delays,
                    prefix,
                    bits_override,
                    truncate,
                    deferred_delays,
                )
                for input_port in src.operation.inputs
            ]
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                src.index,
                input_values,
                results,
                delays,
                key_base,
                bits_override,
                truncate,
            )
    
    
        def sfg(self, show_id=False, engine=None) -> Digraph:
            """
            Returns a Digraph of the SFG. Can be directly displayed in IPython.
    
            Parameters
            ----------
            show_id : Boolean, optional
                If True, the graph_id:s of signals are shown. The default is False.
    
    
            engine : string, optional
    
                Graphviz layout engine to be used, see https://graphviz.org/documentation/.
                Most common are "dot" and "neato". Default is None leading to dot.
    
            Returns
            -------
            Digraph
                Digraph of the SFG.
    
            """
            dg = Digraph()
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            dg.attr(rankdir="LR")
    
            if engine is not None:
                dg.engine = engine
    
            for op in self._components_by_id.values():
                if isinstance(op, Signal):
                    if show_id:
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        dg.edge(
                            op.source.operation.graph_id,
                            op.destination.operation.graph_id,
                            label=op.graph_id,
                        )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        dg.edge(
                            op.source.operation.graph_id,
                            op.destination.operation.graph_id,
                        )
    
                else:
                    if op.type_name() == Delay.type_name():
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        dg.node(op.graph_id, shape="square")
    
                    else:
                        dg.node(op.graph_id)
            return dg
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        def _repr_mimebundle_(self, include=None, exclude=None):
            return self.sfg()._repr_mimebundle_(include=include, exclude=exclude)
    
    
        def show_sfg(self, format=None, show_id=False, engine=None) -> None:
            """
            Shows a visual representation of the SFG using the default system viewer.
    
            Parameters
            ----------
            format : string, optional
    
                File format of the generated graph. Output formats can be found at
                https://www.graphviz.org/doc/info/output.html
    
                Most common are "pdf", "eps", "png", and "svg". Default is None which leads to PDF.
    
            show_id : Boolean, optional
                If True, the graph_id:s of signals are shown. The default is False.
    
    
            engine : string, optional
    
                Graphviz layout engine to be used, see https://graphviz.org/documentation/.
                Most common are "dot" and "neato". Default is None leading to dot.
            """
    
            dg = self.sfg(show_id=show_id)
    
            if engine is not None:
                dg.engine = engine
            if format is not None:
                dg.format = format