Skip to content
Snippets Groups Projects
signal_flow_graph.py 48.4 KiB
Newer Older
  • Learn to ignore specific revisions
  •                         new_signal = self._add_component_unconnected_copy(
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                original_signal
                            )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                new_op.input(original_input_port.index)
                            )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            original_connected_op = (
                                original_signal.source.operation
                            )
    
                            # Check if connected Operation has been added before.
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            if (
                                original_connected_op
                                in self._original_components_to_new
                            ):
    
                                # Set source to the already added operations port.
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                new_signal.set_source(
                                    self._original_components_to_new[
                                        original_connected_op
                                    ].output(original_signal.source.index)
                                )
    
                            else:
                                # Create new operation, set signal source to it.
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                new_connected_op = (
                                    self._add_component_unconnected_copy(
                                        original_connected_op
                                    )
                                )
                                new_signal.set_source(
                                    new_connected_op.output(
                                        original_signal.source.index
                                    )
                                )
    
    
                                self._components_dfs_order.append(new_connected_op)
                                self._operations_dfs_order.append(new_connected_op)
    
                                # Add connected operation to queue of operations to visit.
                                op_stack.append(original_connected_op)
    
                # Connect output ports.
                for original_output_port in original_op.outputs:
                    for original_signal in original_output_port.signals:
                        # Check if the signal is one of the SFG's output signals.
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        if (
                            original_signal
                            in self._original_output_signals_to_indices
                        ):
    
                            # New signal already created during first step of constructor.
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            new_signal = self._original_components_to_new[
                                original_signal
                            ]
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                new_op.output(original_output_port.index)
                            )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                [new_signal, new_signal.destination.operation]
                            )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                new_signal.destination.operation
                            )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        elif (
                            original_signal not in self._original_components_to_new
                        ):
    
                            if original_signal.source is None:
                                raise ValueError(
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                    "Dangling signal without source in SFG"
                                )
    
    
                            new_signal = self._add_component_unconnected_copy(
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                original_signal
                            )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                new_op.output(original_output_port.index)
                            )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            original_connected_op = (
                                original_signal.destination.operation
                            )
    
                            # Check if connected operation has been added.
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            if (
                                original_connected_op
                                in self._original_components_to_new
                            ):
    
                                # Set destination to the already connected operations port.
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                new_signal.set_destination(
                                    self._original_components_to_new[
                                        original_connected_op
                                    ].input(original_signal.destination.index)
                                )
    
                            else:
                                # Create new operation, set destination to it.
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                new_connected_op = (
                                    self._add_component_unconnected_copy(
                                        original_connected_op
                                    )
                                )
                                new_signal.set_destination(
                                    new_connected_op.input(
                                        original_signal.destination.index
                                    )
                                )
    
    
                                self._components_dfs_order.append(new_connected_op)
                                self._operations_dfs_order.append(new_connected_op)
    
                                # Add connected operation to the queue of operations to visit.
                                op_stack.append(original_connected_op)
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        def _evaluate_source(
            self,
            src: OutputPort,
            results: MutableResultMap,
            delays: MutableDelayMap,
            prefix: str,
            bits_override: Optional[int],
            truncate: bool,
            deferred_delays: DelayQueue,
        ) -> Number:
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                (prefix + "." + src.operation.graph_id)
                if prefix
                else src.operation.graph_id
            )
    
            key = src.operation.key(src.index, key_base)
            if key in results:
                value = results[key]
                if value is None:
                    raise RuntimeError(
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        "Direct feedback loop detected when evaluating operation."
                    )
    
                return value
    
            value = src.operation.current_output(src.index, delays, key_base)
            results[key] = value
            if value is None:
                value = self._do_evaluate_source(
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                    key_base,
                    key,
                    src,
                    results,
                    delays,
                    prefix,
                    bits_override,
                    truncate,
                    deferred_delays,
                )
    
            else:
                # Evaluate later. Use current value for now.
                deferred_delays.append((key_base, key, src))
            return value
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        def _do_evaluate_source(
            self,
            key_base: str,
            key: ResultKey,
            src: OutputPort,
            results: MutableResultMap,
            delays: MutableDelayMap,
            prefix: str,
            bits_override: Optional[int],
            truncate: bool,
            deferred_delays: DelayQueue,
        ) -> Number:
            input_values = [
                self._evaluate_source(
                    input_port.signals[0].source,
                    results,
                    delays,
                    prefix,
                    bits_override,
                    truncate,
                    deferred_delays,
                )
                for input_port in src.operation.inputs
            ]
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                src.index,
                input_values,
                results,
                delays,
                key_base,
                bits_override,
                truncate,
            )
    
    
        def sfg(self, show_id=False, engine=None) -> Digraph:
            """
            Returns a Digraph of the SFG. Can be directly displayed in IPython.
    
            Parameters
            ----------
            show_id : Boolean, optional
                If True, the graph_id:s of signals are shown. The default is False.
    
            engine: string, optional
                Graphviz layout engine to be used, see https://graphviz.org/documentation/.
                Most common are "dot" and "neato". Default is None leading to dot.
    
            Returns
            -------
            Digraph
                Digraph of the SFG.
    
            """
            dg = Digraph()
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            dg.attr(rankdir="LR")
    
            if engine is not None:
                dg.engine = engine
    
            for op in self._components_by_id.values():
                if isinstance(op, Signal):
                    if show_id:
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        dg.edge(
                            op.source.operation.graph_id,
                            op.destination.operation.graph_id,
                            label=op.graph_id,
                        )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        dg.edge(
                            op.source.operation.graph_id,
                            op.destination.operation.graph_id,
                        )
    
                else:
                    if op.type_name() == Delay.type_name():
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        dg.node(op.graph_id, shape="square")
    
                    else:
                        dg.node(op.graph_id)
            return dg
    
        def _repr_svg_(self):
            return self.sfg()._repr_svg_()
    
        def show_sfg(self, format=None, show_id=False, engine=None) -> None:
            """
            Shows a visual representation of the SFG using the default system viewer.
    
            Parameters
            ----------
            format : string, optional
    
                File format of the generated graph. Output formats can be found at
                https://www.graphviz.org/doc/info/output.html
    
                Most common are "pdf", "eps", "png", and "svg". Default is None which leads to PDF.
    
            show_id : Boolean, optional
                If True, the graph_id:s of signals are shown. The default is False.
    
            engine: string, optional
                Graphviz layout engine to be used, see https://graphviz.org/documentation/.
                Most common are "dot" and "neato". Default is None leading to dot.
            """
    
            dg = self.sfg(show_id=show_id)
    
            if engine is not None:
                dg.engine = engine
            if format is not None:
                dg.format = format