Skip to content
Snippets Groups Projects
signal_flow_graph.py 60.1 KiB
Newer Older
  • Learn to ignore specific revisions
  • 
            # Traverse output ports for precedence
            curr_iter_ports = first_iter_ports
            precedence_list = []
    
            while curr_iter_ports:
                # Add the found ports to the current iter
                precedence_list.append(curr_iter_ports)
    
                next_iter_ports = []
    
                for outport in curr_iter_ports:
                    for signal in outport.signals:
                        new_inport = signal.destination
    
                        # Do not traverse over delays.
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        if new_inport is not None and not isinstance(
                            new_inport.operation, Delay
                        ):
    
                            new_op = new_inport.operation
                            remaining_inports_per_operation[new_op] -= 1
                            if remaining_inports_per_operation[new_op] == 0:
                                next_iter_ports.extend(new_op.outputs)
    
                curr_iter_ports = next_iter_ports
    
            return precedence_list
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        def _add_component_unconnected_copy(
            self, original_component: GraphComponent
        ) -> GraphComponent:
    
            if original_component in self._original_components_to_new:
                raise ValueError("Tried to add duplicate SFG component")
    
            new_component = original_component.copy()
    
            self._original_components_to_new[original_component] = new_component
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            if not new_component.graph_id or new_component.graph_id in self._used_ids:
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                new_id = self._graph_id_generator.next_id(
                    new_component.type_name(), self._used_ids
                )
    
                new_component.graph_id = new_id
            self._used_ids.add(new_component.graph_id)
            self._components_by_id[new_component.graph_id] = new_component
    
            self._components_by_name[new_component.name].append(new_component)
            return new_component
    
        def _add_operation_connected_tree_copy(self, start_op: Operation) -> None:
            op_stack = deque([start_op])
            while op_stack:
                original_op = op_stack.pop()
                # Add or get the new copy of the operation.
                if original_op not in self._original_components_to_new:
    
                    new_op = cast(
                        Operation,
                        self._add_component_unconnected_copy(original_op),
                    )
    
                    self._components_dfs_order.append(new_op)
                    self._operations_dfs_order.append(new_op)
                else:
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                    new_op = cast(Operation, self._original_components_to_new[original_op])
    
    
                # Connect input ports to new signals.
                for original_input_port in original_op.inputs:
                    if original_input_port.signal_count < 1:
                        raise ValueError("Unconnected input port in SFG")
    
                    for original_signal in original_input_port.signals:
                        # Check if the signal is one of the SFG's input signals.
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        if original_signal in self._original_input_signals_to_indices:
    
                            # New signal already created during first step of constructor.
    
                            new_signal = cast(
                                Signal,
                                self._original_components_to_new[original_signal],
                            )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                new_op.input(original_input_port.index)
                            )
    
                            source = cast(OutputPort, new_signal.source)
    
                                [new_signal, source.operation]
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            )
    
                            if source.operation not in self._operations_dfs_order:
    
                                self._operations_dfs_order.append(source.operation)
    
    
                        # Check if the signal has not been added before.
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        elif original_signal not in self._original_components_to_new:
    
    Frans Skarman's avatar
    Frans Skarman committed
                                dest = (
                                    original_signal.destination.operation.name
                                    if original_signal.destination is not None
                                    else "None"
                                )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                    "Dangling signal without source in SFG"
    
    Frans Skarman's avatar
    Frans Skarman committed
                                    f" (destination: {dest})"
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                self._add_component_unconnected_copy(original_signal),
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                new_op.input(original_input_port.index)
                            )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            original_connected_op = original_signal.source.operation
    
                            # Check if connected Operation has been added before.
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            if original_connected_op in self._original_components_to_new:
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                    self._original_components_to_new[original_connected_op],
    
                                # Set source to the already added operations port.
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                new_signal.set_source(
    
                                    component.output(original_signal.source.index)
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                )
    
                            else:
                                # Create new operation, set signal source to it.
    
                                new_connected_op = cast(
                                    Operation,
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                    self._add_component_unconnected_copy(
                                        original_connected_op
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                )
                                new_signal.set_source(
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                    new_connected_op.output(original_signal.source.index)
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                )
    
    
                                self._components_dfs_order.append(new_connected_op)
                                self._operations_dfs_order.append(new_connected_op)
    
                                # Add connected operation to queue of operations to visit.
                                op_stack.append(original_connected_op)
    
                # Connect output ports.
                for original_output_port in original_op.outputs:
                    for original_signal in original_output_port.signals:
                        # Check if the signal is one of the SFG's output signals.
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        if original_signal in self._original_output_signals_to_indices:
    
                            # New signal already created during first step of constructor.
    
                            new_signal = cast(
                                Signal,
                                self._original_components_to_new[original_signal],
                            )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            new_signal.set_source(new_op.output(original_output_port.index))
    
                            destination = cast(InputPort, new_signal.destination)
    
                                [new_signal, destination.operation]
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            self._operations_dfs_order.append(destination.operation)
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        elif original_signal not in self._original_components_to_new:
    
                            if original_signal.source is None:
                                raise ValueError(
    
                                    "Dangling signal ({original_signal}) without"
                                    " source in SFG"
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            new_signal = cast(
                                Signal,
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                self._add_component_unconnected_copy(original_signal),
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            new_signal.set_source(new_op.output(original_output_port.index))
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            original_destination = cast(
                                InputPort, original_signal.destination
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            )
    
                            if original_destination is None:
                                raise ValueError(
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                    f"Signal ({original_signal}) without destination in SFG"
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            original_connected_op = original_destination.operation
    
                            if original_connected_op is None:
                                raise ValueError(
    
                                    "Signal with empty destination port"
                                    f" ({original_destination}) in SFG"
    
                            # Check if connected operation has been added.
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            if original_connected_op in self._original_components_to_new:
    
                                # Set destination to the already connected operations port.
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                new_signal.set_destination(
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                    cast(
                                        Operation,
                                        self._original_components_to_new[
                                            original_connected_op
                                        ],
                                    ).input(original_destination.index)
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                )
    
                            else:
                                # Create new operation, set destination to it.
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                new_connected_op = cast(
                                    Operation,
                                    (
                                        self._add_component_unconnected_copy(
                                            original_connected_op
                                        )
                                    ),
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                )
                                new_signal.set_destination(
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                    new_connected_op.input(original_destination.index)
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                )
    
    
                                self._components_dfs_order.append(new_connected_op)
                                self._operations_dfs_order.append(new_connected_op)
    
    
                                # Add connected operation to the queue of operations
                                # to visit.
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        def _evaluate_source(
            self,
            src: OutputPort,
            results: MutableResultMap,
            delays: MutableDelayMap,
            prefix: str,
            bits_override: Optional[int],
    
            quantize: bool,
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            deferred_delays: DelayQueue,
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                (prefix + "." + src.operation.graph_id)
                if prefix
                else src.operation.graph_id
            )
    
            key = src.operation.key(src.index, key_base)
            if key in results:
                value = results[key]
                if value is None:
                    raise RuntimeError(
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        "Direct feedback loop detected when evaluating operation."
                    )
    
                return value
    
            value = src.operation.current_output(src.index, delays, key_base)
            results[key] = value
            if value is None:
                value = self._do_evaluate_source(
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                    key_base,
                    key,
                    src,
                    results,
                    delays,
                    prefix,
                    bits_override,
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                    deferred_delays,
                )
    
            else:
                # Evaluate later. Use current value for now.
                deferred_delays.append((key_base, key, src))
            return value
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        def _do_evaluate_source(
            self,
            key_base: str,
            key: ResultKey,
            src: OutputPort,
            results: MutableResultMap,
            delays: MutableDelayMap,
            prefix: str,
            bits_override: Optional[int],
    
            quantize: bool,
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            deferred_delays: DelayQueue,
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            input_values = [
                self._evaluate_source(
                    input_port.signals[0].source,
                    results,
                    delays,
                    prefix,
                    bits_override,
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                    deferred_delays,
                )
                for input_port in src.operation.inputs
            ]
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                src.index,
                input_values,
                results,
                delays,
                key_base,
                bits_override,
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            )
    
    Frans Skarman's avatar
    Frans Skarman committed
        def sfg_digraph(self, show_id=False, engine=None) -> Digraph:
    
            """
            Returns a Digraph of the SFG. Can be directly displayed in IPython.
    
            Parameters
            ----------
            show_id : Boolean, optional
                If True, the graph_id:s of signals are shown. The default is False.
    
            engine : string, optional
    
                Graphviz layout engine to be used, see https://graphviz.org/documentation/.
                Most common are "dot" and "neato". Default is None leading to dot.
    
            Returns
            -------
            Digraph
                Digraph of the SFG.
    
            """
            dg = Digraph()
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            dg.attr(rankdir="LR")
    
            if engine is not None:
                dg.engine = engine
    
            for op in self._components_by_id.values():
                if isinstance(op, Signal):
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                    source = cast(OutputPort, op.source)
                    destination = cast(InputPort, op.destination)
    
                    if show_id:
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        dg.edge(
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            source.operation.graph_id,
                            destination.operation.graph_id,
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            label=op.graph_id,
                        )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        dg.edge(
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            source.operation.graph_id,
                            destination.operation.graph_id,
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        )
    
                    dg.node(op.graph_id, shape=_OPERATION_SHAPE[op.type_name()])
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        def _repr_mimebundle_(self, include=None, exclude=None):
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            return self.sfg_digraph()._repr_mimebundle_(include=include, exclude=exclude)
    
        def _repr_jpeg_(self):
    
    Frans Skarman's avatar
    Frans Skarman committed
            return self.sfg_digraph()._repr_mimebundle_(include=["image/jpeg"])[
    
                "image/jpeg"
            ]
    
        def _repr_png_(self):
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            return self.sfg_digraph()._repr_mimebundle_(include=["image/png"])["image/png"]
    
        def show(self, fmt=None, show_id=False, engine=None) -> None:
    
            """
            Shows a visual representation of the SFG using the default system viewer.
    
            Parameters
            ----------
    
            fmt : string, optional
    
                File format of the generated graph. Output formats can be found at
                https://www.graphviz.org/doc/info/output.html
    
                Most common are "pdf", "eps", "png", and "svg". Default is None which
                leads to PDF.
    
            show_id : Boolean, optional
                If True, the graph_id:s of signals are shown. The default is False.
    
            engine : string, optional
    
                Graphviz layout engine to be used, see https://graphviz.org/documentation/.
                Most common are "dot" and "neato". Default is None leading to dot.
            """
    
    
    Frans Skarman's avatar
    Frans Skarman committed
            dg = self.sfg_digraph(show_id=show_id)
    
            if engine is not None:
                dg.engine = engine
    
            if fmt is not None:
                dg.format = fmt
    
        def critical_path_time(self) -> int:
            """Return the time of the critical path."""
    
            # Import here needed to avoid circular imports
            from b_asic.schedule import Schedule
    
            return Schedule(self, scheduling_algorithm="ASAP").schedule_time
    
        def edit(self) -> None:
            """Edit SFG in GUI."""
            from b_asic.GUI.main_window import start_editor
    
            start_editor(self)
    
    
        def unfold(self, factor: int) -> "SFG":
    
            Unfold the SFG *factor* times. Return a new SFG without modifying the original.
    
            Inputs and outputs are ordered with early inputs first. That is for an SFG
    
    Frans Skarman's avatar
    Frans Skarman committed
            with n inputs, the first n inputs are the inputs at time t, the next n
            inputs are the inputs at time t+1, the next n at t+2 and so on.
    
            Parameters
            ----------
            factor : string, optional
                Number of times to unfold
            """
    
    
            if factor == 0:
    
                raise ValueError("Unfolding 0 times removes the SFG")
    
    
            # Make `factor` copies of the sfg
            new_ops = [
    
                [cast(Operation, op.copy()) for op in self.operations]
    
                for _ in range(factor)
            ]
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            id_idx_map = {op.graph_id: idx for (idx, op) in enumerate(self.operations)}
    
    
            # The rest of the process is easier if we clear the connections of the inputs
            # and outputs of all operations
    
    Frans Skarman's avatar
    Frans Skarman committed
            for layer, op_list in enumerate(new_ops):
                for op_idx, op in enumerate(op_list):
    
                    for input_ in op.inputs:
                        input_.clear()
    
                    for output in op.outputs:
                        output.clear()
    
    
    Frans Skarman's avatar
    Frans Skarman committed
                    suffix = layer
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                    new_ops[layer][op_idx].name = f"{new_ops[layer][op_idx].name}_{suffix}"
    
    Frans Skarman's avatar
    Frans Skarman committed
                    # NOTE: Since these IDs are what show up when printing the graph, it
                    # is helpful to set them. However, this can cause name collisions when
                    # names in a graph are already suffixed with _n
    
                    new_ops[layer][op_idx].graph_id = GraphID(
    
    Frans Skarman's avatar
    Frans Skarman committed
                        f"{new_ops[layer][op_idx].graph_id}_{suffix}"
    
    Frans Skarman's avatar
    Frans Skarman committed
    
            # Walk through the operations, replacing delay nodes with connections
            for layer in range(factor):
                for op_idx, op in enumerate(self.operations):
    
                    if isinstance(op, Delay):
                        # Port of the operation feeding into this delay
                        source_port = op.inputs[0].connected_source
                        if source_port is None:
                            raise ValueError("Dangling delay input port in sfg")
    
                        source_op_idx = id_idx_map[source_port.operation.graph_id]
                        source_op_output_index = source_port.index
                        new_source_op = new_ops[layer][source_op_idx]
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        source_op_output = new_source_op.outputs[source_op_output_index]
    
    
                        # If this is the last layer, we need to create a new delay element and connect it instead
                        # of the copied port
                        if layer == factor - 1:
                            delay = Delay(name=op.name)
                            delay.graph_id = op.graph_id
    
                            # Since we're adding a new operation instead of bypassing as in the
                            # common case, we also need to hook up the inputs to the delay.
                            delay.inputs[0].connect(source_op_output)
    
                            new_source_op = delay
                            new_source_port = new_source_op.outputs[0]
                        else:
                            # The new output port we should connect to
                            new_source_port = source_op_output
    
                        for out_signal in op.outputs[0].signals:
                            sink_port = out_signal.destination
                            if sink_port is None:
    
                                # It would be weird if we found a signal that wasn't connected anywhere
    
                                raise ValueError("Dangling output port in sfg")
    
                            sink_op_idx = id_idx_map[sink_port.operation.graph_id]
                            sink_op_output_index = sink_port.index
    
                            target_layer = 0 if layer == factor - 1 else layer + 1
    
                            new_dest_op = new_ops[target_layer][sink_op_idx]
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                            new_destination = new_dest_op.inputs[sink_op_output_index]
    
                            new_destination.connect(new_source_port)
                    else:
                        # Other opreations need to be re-targeted to the corresponding output in the
                        # current layer, as long as that output is not a delay, as that has been solved
                        # above.
                        # To avoid double connections, we'll only re-connect inputs
                        for input_num, original_input in enumerate(op.inputs):
                            original_source = original_input.connected_source
                            # We may not always have something connected to the input, if we don't
                            # we can abort
                            if original_source is None:
                                continue
    
                            # delay connections are handled elsewhere
                            if not isinstance(original_source.operation, Delay):
                                source_op_idx = id_idx_map[
                                    original_source.operation.graph_id
                                ]
                                source_op_output_idx = original_source.index
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                                target_output = new_ops[layer][source_op_idx].outputs[
                                    source_op_output_idx
                                ]
    
    
                                new_ops[layer][op_idx].inputs[input_num].connect(
                                    target_output
                                )
    
            all_ops = [op for op_list in new_ops for op in op_list]
    
    Frans Skarman's avatar
    Frans Skarman committed
    
            # To get the input order correct, we need to know the input order in the original
            # sfg and which operations they correspond to
            input_ids = [op.graph_id for op in self.input_operations]
            output_ids = [op.graph_id for op in self.output_operations]
    
            # Re-order the inputs to the correct order. Internal order of the inputs should
            # be preserved, i.e. for a graph with 2 inputs (in1, in2), in1 must occur before in2,
            # but the "time" order should be reversed. I.e. the input from layer `factor-1` is the
            # first input
            all_inputs = list(
                itertools.chain.from_iterable(
                    [
                        [ops[id_idx_map[input_id]] for input_id in input_ids]
                        for ops in new_ops
                    ]
                )
            )
    
            # Outputs are not reversed, but need the same treatment
            all_outputs = list(
                itertools.chain.from_iterable(
                    [
                        [ops[id_idx_map[output_id]] for output_id in output_ids]
                        for ops in new_ops
                    ]
                )
            )
    
            # Sanity check to ensure that no duplicate graph IDs have been created
            ids = [op.graph_id for op in all_ops]
            assert len(ids) == len(set(ids))
    
    
            return SFG(inputs=all_inputs, outputs=all_outputs)
    
    
        @property
        def is_linear(self) -> bool:
            return all(op.is_linear for op in self.split())
    
        @property
        def is_constant(self) -> bool:
            return all(output.is_constant for output in self._output_operations)