Skip to content
Snippets Groups Projects
signal_flow_graph.py 54.3 KiB
Newer Older
  • Learn to ignore specific revisions
  •             The type name of the operation. For example, obtained as
                ``Addition.type_name()``.
    
            latency : int
                The latency of the operation.
    
            """
    
            for op in self.find_by_type_name(type_name):
    
                cast(Operation, op).set_latency(latency)
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        def set_execution_time_of_type(
            self, type_name: TypeName, execution_time: int
        ) -> None:
    
            """
            Set the execution time of all operations with the given type name.
    
            Parameters
            ----------
            type_name : TypeName
    
                The type name of the operation. For example, obtained as
                ``Addition.type_name()``.
    
            execution_time : int
                The execution time of the operation.
    
            for op in self.find_by_type_name(type_name):
    
                cast(Operation, op).execution_time = execution_time
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        def set_latency_offsets_of_type(
            self, type_name: TypeName, latency_offsets: Dict[str, int]
        ) -> None:
    
            """
            Set the latency offsets of all operations with the given type name.
    
            Parameters
            ----------
            type_name : TypeName
    
                The type name of the operation. For example, obtained as
                ``Addition.type_name()``.
    
            latency_offsets : {"in1": int, ...}
                The latency offsets of the inputs and outputs.
    
            for op in self.find_by_type_name(type_name):
    
                cast(Operation, op).set_latency_offsets(latency_offsets)
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        def _traverse_for_precedence_list(
            self, first_iter_ports: List[OutputPort]
        ) -> List[List[OutputPort]]:
    
            # Find dependencies of output ports and input ports.
            remaining_inports_per_operation = {
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                op: op.input_count for op in self.operations
            }
    
    
            # Traverse output ports for precedence
            curr_iter_ports = first_iter_ports
            precedence_list = []
    
            while curr_iter_ports:
                # Add the found ports to the current iter
                precedence_list.append(curr_iter_ports)
    
                next_iter_ports = []
    
                for outport in curr_iter_ports:
                    for signal in outport.signals:
                        new_inport = signal.destination
    
                        # Do not traverse over delays.
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        if new_inport is not None and not isinstance(
                            new_inport.operation, Delay
                        ):
    
                            new_op = new_inport.operation
                            remaining_inports_per_operation[new_op] -= 1
                            if remaining_inports_per_operation[new_op] == 0:
                                next_iter_ports.extend(new_op.outputs)
    
                curr_iter_ports = next_iter_ports
    
            return precedence_list
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        def _add_component_unconnected_copy(
            self, original_component: GraphComponent
        ) -> GraphComponent:
    
            if original_component in self._original_components_to_new:
                raise ValueError("Tried to add duplicate SFG component")
    
            new_component = original_component.copy_component()
            self._original_components_to_new[original_component] = new_component
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            if (
                not new_component.graph_id
                or new_component.graph_id in self._used_ids
            ):
                new_id = self._graph_id_generator.next_id(
                    new_component.type_name(), self._used_ids
                )
    
                new_component.graph_id = new_id
            self._used_ids.add(new_component.graph_id)
            self._components_by_id[new_component.graph_id] = new_component
    
            self._components_by_name[new_component.name].append(new_component)
            return new_component
    
        def _add_operation_connected_tree_copy(self, start_op: Operation) -> None:
            op_stack = deque([start_op])
            while op_stack:
                original_op = op_stack.pop()
                # Add or get the new copy of the operation.
                if original_op not in self._original_components_to_new:
    
                    new_op = cast(
                        Operation,
                        self._add_component_unconnected_copy(original_op),
                    )
    
                    self._components_dfs_order.append(new_op)
                    self._operations_dfs_order.append(new_op)
                else:
    
                    new_op = cast(
                        Operation, self._original_components_to_new[original_op]
                    )
    
    
                # Connect input ports to new signals.
                for original_input_port in original_op.inputs:
                    if original_input_port.signal_count < 1:
                        raise ValueError("Unconnected input port in SFG")
    
                    for original_signal in original_input_port.signals:
                        # Check if the signal is one of the SFG's input signals.
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        if (
                            original_signal
                            in self._original_input_signals_to_indices
                        ):
    
                            # New signal already created during first step of constructor.
    
                            new_signal = cast(
                                Signal,
                                self._original_components_to_new[original_signal],
                            )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                new_op.input(original_input_port.index)
                            )
    
                            source = cast(OutputPort, new_signal.source)
    
                                [new_signal, source.operation]
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            )
    
                            if not source.operation in self._operations_dfs_order:
                                self._operations_dfs_order.append(source.operation)
    
    
                        # Check if the signal has not been added before.
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        elif (
                            original_signal not in self._original_components_to_new
                        ):
    
                            if original_signal.source is None:
                                raise ValueError(
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                    "Dangling signal without source in SFG"
                                )
    
                            new_signal = cast(
                                Signal,
                                self._add_component_unconnected_copy(
                                    original_signal
                                ),
                            )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                new_op.input(original_input_port.index)
                            )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            original_connected_op = (
                                original_signal.source.operation
                            )
    
                            # Check if connected Operation has been added before.
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            if (
                                original_connected_op
                                in self._original_components_to_new
                            ):
    
                                component = cast(
                                    Operation,
                                    self._original_components_to_new[
                                        original_connected_op
                                    ],
                                )
    
                                # Set source to the already added operations port.
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                new_signal.set_source(
    
                                    component.output(original_signal.source.index)
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                )
    
                            else:
                                # Create new operation, set signal source to it.
    
                                new_connected_op = cast(
                                    Operation,
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                    self._add_component_unconnected_copy(
                                        original_connected_op
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                )
                                new_signal.set_source(
                                    new_connected_op.output(
                                        original_signal.source.index
                                    )
                                )
    
    
                                self._components_dfs_order.append(new_connected_op)
                                self._operations_dfs_order.append(new_connected_op)
    
                                # Add connected operation to queue of operations to visit.
                                op_stack.append(original_connected_op)
    
                # Connect output ports.
                for original_output_port in original_op.outputs:
                    for original_signal in original_output_port.signals:
                        # Check if the signal is one of the SFG's output signals.
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        if (
                            original_signal
                            in self._original_output_signals_to_indices
                        ):
    
                            # New signal already created during first step of constructor.
    
                            new_signal = cast(
                                Signal,
                                self._original_components_to_new[original_signal],
                            )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                new_op.output(original_output_port.index)
                            )
    
                            destination = cast(InputPort, new_signal.destination)
    
                                [new_signal, destination.operation]
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            )
    
                                destination.operation
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        elif (
                            original_signal not in self._original_components_to_new
                        ):
    
                            if original_signal.source is None:
                                raise ValueError(
    
                                    "Dangling signal ({original_signal}) without"
                                    " source in SFG"
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            new_signal = cast(
                                Signal,
                                self._add_component_unconnected_copy(
                                    original_signal
                                ),
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                new_op.output(original_output_port.index)
                            )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            original_destination = cast(
                                InputPort, original_signal.destination
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            )
    
                            if original_destination is None:
                                raise ValueError(
                                    f"Signal ({original_signal}) without"
                                    " destination in SFG"
                                )
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            original_connected_op = original_destination.operation
    
                            if original_connected_op is None:
                                raise ValueError(
    
                                    "Signal with empty destination port"
                                    f" ({original_destination}) in SFG"
    
                            # Check if connected operation has been added.
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            if (
                                original_connected_op
                                in self._original_components_to_new
                            ):
    
                                # Set destination to the already connected operations port.
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                new_signal.set_destination(
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                    cast(
                                        Operation,
                                        self._original_components_to_new[
                                            original_connected_op
                                        ],
                                    ).input(original_destination.index)
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                )
    
                            else:
                                # Create new operation, set destination to it.
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                new_connected_op = cast(
                                    Operation,
                                    (
                                        self._add_component_unconnected_copy(
                                            original_connected_op
                                        )
                                    ),
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                )
                                new_signal.set_destination(
                                    new_connected_op.input(
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                        original_destination.index
    
    
                                self._components_dfs_order.append(new_connected_op)
                                self._operations_dfs_order.append(new_connected_op)
    
    
                                # Add connected operation to the queue of operations
                                # to visit.
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        def _evaluate_source(
            self,
            src: OutputPort,
            results: MutableResultMap,
            delays: MutableDelayMap,
            prefix: str,
            bits_override: Optional[int],
            truncate: bool,
            deferred_delays: DelayQueue,
        ) -> Number:
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                (prefix + "." + src.operation.graph_id)
                if prefix
                else src.operation.graph_id
            )
    
            key = src.operation.key(src.index, key_base)
            if key in results:
                value = results[key]
                if value is None:
                    raise RuntimeError(
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        "Direct feedback loop detected when evaluating operation."
                    )
    
                return value
    
            value = src.operation.current_output(src.index, delays, key_base)
            results[key] = value
            if value is None:
                value = self._do_evaluate_source(
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                    key_base,
                    key,
                    src,
                    results,
                    delays,
                    prefix,
                    bits_override,
                    truncate,
                    deferred_delays,
                )
    
            else:
                # Evaluate later. Use current value for now.
                deferred_delays.append((key_base, key, src))
            return value
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        def _do_evaluate_source(
            self,
            key_base: str,
            key: ResultKey,
            src: OutputPort,
            results: MutableResultMap,
            delays: MutableDelayMap,
            prefix: str,
            bits_override: Optional[int],
            truncate: bool,
            deferred_delays: DelayQueue,
        ) -> Number:
            input_values = [
                self._evaluate_source(
                    input_port.signals[0].source,
                    results,
                    delays,
                    prefix,
                    bits_override,
                    truncate,
                    deferred_delays,
                )
                for input_port in src.operation.inputs
            ]
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                src.index,
                input_values,
                results,
                delays,
                key_base,
                bits_override,
                truncate,
            )
    
    Frans Skarman's avatar
    Frans Skarman committed
        def sfg_digraph(self, show_id=False, engine=None) -> Digraph:
    
            """
            Returns a Digraph of the SFG. Can be directly displayed in IPython.
    
            Parameters
            ----------
            show_id : Boolean, optional
                If True, the graph_id:s of signals are shown. The default is False.
    
    
            engine : string, optional
    
                Graphviz layout engine to be used, see https://graphviz.org/documentation/.
                Most common are "dot" and "neato". Default is None leading to dot.
    
            Returns
            -------
            Digraph
                Digraph of the SFG.
    
            """
            dg = Digraph()
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            dg.attr(rankdir="LR")
    
            if engine is not None:
                dg.engine = engine
    
            for op in self._components_by_id.values():
                if isinstance(op, Signal):
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                    source = cast(OutputPort, op.source)
                    destination = cast(InputPort, op.destination)
    
                    if show_id:
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        dg.edge(
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            source.operation.graph_id,
                            destination.operation.graph_id,
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            label=op.graph_id,
                        )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        dg.edge(
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            source.operation.graph_id,
                            destination.operation.graph_id,
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        )
    
                    if isinstance(op, Delay):
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        dg.node(op.graph_id, shape="square")
    
                    elif isinstance(op, (Input, Output)):
                        dg.node(op.graph_id, shape="cds")
    
                    else:
                        dg.node(op.graph_id)
            return dg
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        def _repr_mimebundle_(self, include=None, exclude=None):
    
    Frans Skarman's avatar
    Frans Skarman committed
            return self.sfg_digraph()._repr_mimebundle_(
                include=include, exclude=exclude
            )
    
        def _repr_jpeg_(self):
    
    Frans Skarman's avatar
    Frans Skarman committed
            return self.sfg_digraph()._repr_mimebundle_(include=["image/jpeg"])[
    
                "image/jpeg"
            ]
    
        def _repr_png_(self):
    
    Frans Skarman's avatar
    Frans Skarman committed
            return self.sfg_digraph()._repr_mimebundle_(include=["image/png"])[
                "image/png"
    
    Frans Skarman's avatar
    Frans Skarman committed
        def show(self, format=None, show_id=False, engine=None) -> None:
    
            """
            Shows a visual representation of the SFG using the default system viewer.
    
            Parameters
            ----------
            format : string, optional
    
                File format of the generated graph. Output formats can be found at
                https://www.graphviz.org/doc/info/output.html
    
                Most common are "pdf", "eps", "png", and "svg". Default is None which
                leads to PDF.
    
    
            show_id : Boolean, optional
                If True, the graph_id:s of signals are shown. The default is False.
    
    
            engine : string, optional
    
                Graphviz layout engine to be used, see https://graphviz.org/documentation/.
                Most common are "dot" and "neato". Default is None leading to dot.
            """
    
    
    Frans Skarman's avatar
    Frans Skarman committed
            dg = self.sfg_digraph(show_id=show_id)
    
            if engine is not None:
                dg.engine = engine
            if format is not None:
                dg.format = format
    
    
        def critical_path(self):
            # Import here needed to avoid circular imports
            from b_asic.schedule import Schedule
    
            return Schedule(self, scheduling_algorithm="ASAP").schedule_time