Skip to content
Snippets Groups Projects
signal_flow_graph.py 84.9 KiB
Newer Older
"""
B-ASIC Signal Flow Graph Module.
import itertools
Oscar Gustafsson's avatar
Oscar Gustafsson committed
import re
import warnings
from collections import defaultdict, deque
from collections.abc import Iterable, MutableSet, Sequence
Oscar Gustafsson's avatar
Oscar Gustafsson committed
from numbers import Number
Oscar Gustafsson's avatar
Oscar Gustafsson committed
from typing import (
    DefaultDict,
    Deque,
Oscar Gustafsson's avatar
Oscar Gustafsson committed
    Optional,
Oscar Gustafsson's avatar
Oscar Gustafsson committed
    Union,
    cast,
Oscar Gustafsson's avatar
Oscar Gustafsson committed
)
Samuel Fagerlund's avatar
Samuel Fagerlund committed
import numpy as np
Oscar Gustafsson's avatar
Oscar Gustafsson committed
from graphviz import Digraph

from b_asic.graph_component import GraphComponent
Oscar Gustafsson's avatar
Oscar Gustafsson committed
from b_asic.operation import (
    AbstractOperation,
    MutableDelayMap,
    MutableResultMap,
    Operation,
    ResultKey,
)
from b_asic.port import InputPort, OutputPort, SignalSourceProvider
from b_asic.signal import Signal
Oscar Gustafsson's avatar
Oscar Gustafsson committed
from b_asic.special_operations import Delay, Input, Output
from b_asic.types import GraphID, GraphIDNumber, Name, Num, TypeName
DelayQueue = list[tuple[str, ResultKey, OutputPort]]
_OPERATION_SHAPE: DefaultDict[TypeName, str] = defaultdict(lambda: "ellipse")
_OPERATION_SHAPE.update(
    {
        Input.type_name(): "cds",
        Output.type_name(): "cds",
        Delay.type_name(): "square",
    }
)


class GraphIDGenerator:
    """Generates Graph IDs for objects."""

    _next_id_number: DefaultDict[TypeName, GraphIDNumber]

    def __init__(self, id_number_offset: GraphIDNumber = GraphIDNumber(0)):
        """Construct a GraphIDGenerator."""
        self._next_id_number = defaultdict(lambda: id_number_offset)

Oscar Gustafsson's avatar
Oscar Gustafsson committed
    def next_id(self, type_name: TypeName, used_ids: MutableSet = set()) -> GraphID:
        """Get the next graph id for a certain graph id type."""
        new_id = type_name + str(self._next_id_number[type_name])
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        while new_id in used_ids:
            self._next_id_number[type_name] += 1
            new_id = type_name + str(self._next_id_number[type_name])
        used_ids.add(GraphID(new_id))
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        return GraphID(new_id)

    @property
    def id_number_offset(self) -> GraphIDNumber:
        """Get the graph id number offset of this generator."""
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        return GraphIDNumber(
Oscar Gustafsson's avatar
Oscar Gustafsson committed
            self._next_id_number.default_factory()
        )  # pylint: disable=not-callable


class SFG(AbstractOperation):
    Construct an SFG given its inputs and outputs.

    Contains a set of connected operations, forming a new operation.
    Used as a base for simulation, scheduling, etc.

    Inputs/outputs may be specified using either Input/Output operations
    directly with the *inputs*/*outputs* parameters, or using signals with the
    *input_signals*/*output_signals parameters*. If signals are used, the
    corresponding Input/Output operations will be created automatically.

    The *id_number_offset* parameter specifies what number graph IDs will be
    offset by for each new graph component type. IDs start at 1 by default,
    so the default offset of 0 will result in IDs like "c1", "c2", etc.
    while an offset of 3 will result in "c4", "c5", etc.

    Parameters
    ----------
    inputs : array of Input, optional

    outputs : array of Output, optional

    input_signals : array of Signal, optional

    output_signals : array of Signal, optional

    id_number_offset : GraphIDNumber, optional

    name : Name, optional

    input_sources :
    _components_by_id: dict[GraphID, GraphComponent]
    _components_by_name: DefaultDict[Name, list[GraphComponent]]
    _components_dfs_order: list[GraphComponent]
    _operations_dfs_order: list[Operation]
    _operations_topological_order: list[Operation]
    _graph_id_generator: GraphIDGenerator
    _input_operations: list[Input]
    _output_operations: list[Output]
    _original_components_to_new: dict[GraphComponent, GraphComponent]
    _original_input_signals_to_indices: dict[Signal, int]
    _original_output_signals_to_indices: dict[Signal, int]
    _precedence_list: list[list[OutputPort]] | None
    _used_ids: set[GraphID] = set()
Oscar Gustafsson's avatar
Oscar Gustafsson committed
    def __init__(
        self,
        inputs: Sequence[Input] | None = None,
        outputs: Sequence[Output] | None = None,
        input_signals: Sequence[Signal] | None = None,
        output_signals: Sequence[Signal] | None = None,
        id_number_offset: GraphIDNumber = GraphIDNumber(0),
        name: Name = Name(""),
        input_sources: Sequence[SignalSourceProvider | None] | None = None,
Oscar Gustafsson's avatar
Oscar Gustafsson committed
    ):
        input_signal_count = 0 if input_signals is None else len(input_signals)
        input_operation_count = 0 if inputs is None else len(inputs)
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        output_signal_count = 0 if output_signals is None else len(output_signals)
        output_operation_count = 0 if outputs is None else len(outputs)
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        super().__init__(
            input_count=input_signal_count + input_operation_count,
            output_count=output_signal_count + output_operation_count,
            name=name,
            input_sources=input_sources,
        )
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        self._components_by_id = {}
        self._used_ids = set()
        self._components_by_name = defaultdict(list)
        self._components_dfs_order = []
        self._operations_dfs_order = []
        self._operations_topological_order = []
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        self._graph_id_generator = GraphIDGenerator(GraphIDNumber(id_number_offset))
        self._input_operations = []
        self._output_operations = []
        self._original_components_to_new = {}
        self._original_input_signals_to_indices = {}
        self._original_output_signals_to_indices = {}
        self._precedence_list = None

        # Setup input signals.
        if input_signals is not None:
            for input_index, signal in enumerate(input_signals):
                if signal in self._original_components_to_new:
Oscar Gustafsson's avatar
Oscar Gustafsson committed
                    raise ValueError(f"Duplicate input signal {signal!r} in SFG")
                new_input_op = cast(
                    Input, self._add_component_unconnected_copy(Input())
                )
Oscar Gustafsson's avatar
Oscar Gustafsson committed
                new_signal = cast(Signal, self._add_component_unconnected_copy(signal))
                new_signal.set_source(new_input_op.output(0))
                self._input_operations.append(new_input_op)
                self._original_input_signals_to_indices[signal] = input_index
        # Setup input operations, starting from indices after input signals.
        if inputs is not None:
            for input_index, input_op in enumerate(inputs, input_signal_count):
                if input_op in self._original_components_to_new:
Oscar Gustafsson's avatar
Oscar Gustafsson committed
                    raise ValueError(f"Duplicate input operation {input_op!r} in SFG")
Oscar Gustafsson's avatar
Oscar Gustafsson committed
                new_input_op = cast(
                    Input, self._add_component_unconnected_copy(input_op)
                )
                    if signal in self._original_components_to_new:
                        raise ValueError(
                            "Duplicate input signals connected to input ports"
                            " supplied to SFG constructor."
                        )
Oscar Gustafsson's avatar
Oscar Gustafsson committed
                    new_signal = cast(
                        Signal, self._add_component_unconnected_copy(signal)
                    )
                    new_signal.set_source(new_input_op.output(0))
Oscar Gustafsson's avatar
Oscar Gustafsson committed
                    self._original_input_signals_to_indices[signal] = input_index
                self._input_operations.append(new_input_op)
        # Setup output signals.
        if output_signals is not None:
            for output_index, signal in enumerate(output_signals):
                    Output, self._add_component_unconnected_copy(Output())
                )
                if signal in self._original_components_to_new:
                    # Signal was already added when setting up inputs.
Oscar Gustafsson's avatar
Oscar Gustafsson committed
                    new_signal = cast(Signal, self._original_components_to_new[signal])
                    new_signal.set_destination(new_output_op.input(0))
                else:
                    # New signal has to be created.
                    new_signal = cast(
                        Signal, self._add_component_unconnected_copy(signal)
                    )
                    new_signal.set_destination(new_output_op.input(0))
                self._output_operations.append(new_output_op)
                self._original_output_signals_to_indices[signal] = output_index
        # Setup output operations, starting from indices after output signals.
        if outputs is not None:
Oscar Gustafsson's avatar
Oscar Gustafsson committed
            for output_index, output_op in enumerate(outputs, output_signal_count):
                if output_op in self._original_components_to_new:
Oscar Gustafsson's avatar
Oscar Gustafsson committed
                    raise ValueError(f"Duplicate output operation {output_op!r} in SFG")
                new_output_op = cast(
                    Output, self._add_component_unconnected_copy(output_op)
                )
                for signal in output_op.input(0).signals:
                    if signal in self._original_components_to_new:
                        # Signal was already added when setting up inputs.
Oscar Gustafsson's avatar
Oscar Gustafsson committed
                        new_signal = cast(
                            Signal, self._original_components_to_new[signal]
                        )
Oscar Gustafsson's avatar
Oscar Gustafsson committed
                        new_signal = cast(
                            Signal,
                            self._add_component_unconnected_copy(signal),
Oscar Gustafsson's avatar
Oscar Gustafsson committed
                        )
                    new_signal.set_destination(new_output_op.input(0))
Oscar Gustafsson's avatar
Oscar Gustafsson committed
                    self._original_output_signals_to_indices[signal] = output_index
                self._output_operations.append(new_output_op)

        output_operations_set = set(self._output_operations)

        # Search the graph inwards from each input signal.
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        for (
            signal,
            input_index,
        ) in self._original_input_signals_to_indices.items():
Oscar Gustafsson's avatar
Oscar Gustafsson committed
            new_signal = cast(Signal, self._original_components_to_new[signal])
            if new_signal.destination is None:
                if signal.destination is None:
                    raise ValueError(
Oscar Gustafsson's avatar
Oscar Gustafsson committed
                        f"Input signal #{input_index} is missing destination in SFG"
Oscar Gustafsson's avatar
Oscar Gustafsson committed
                    )
Oscar Gustafsson's avatar
Oscar Gustafsson committed
                if signal.destination.operation not in self._original_components_to_new:
Oscar Gustafsson's avatar
Oscar Gustafsson committed
                        signal.destination.operation
                    )
            elif new_signal.destination.operation in output_operations_set:
                # Add directly connected input to output to ordered list.
Oscar Gustafsson's avatar
Oscar Gustafsson committed
                source = cast(OutputPort, new_signal.source)
Oscar Gustafsson's avatar
Oscar Gustafsson committed
                    [
Oscar Gustafsson's avatar
Oscar Gustafsson committed
                        source.operation,
Oscar Gustafsson's avatar
Oscar Gustafsson committed
                        new_signal,
                        new_signal.destination.operation,
                    ]
                )
Oscar Gustafsson's avatar
Oscar Gustafsson committed
                    [
Oscar Gustafsson's avatar
Oscar Gustafsson committed
                        source.operation,
Oscar Gustafsson's avatar
Oscar Gustafsson committed
                        new_signal.destination.operation,
                    ]
                )

        # Search the graph inwards from each output signal.
        output_sources = []
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        for (
            signal,
            output_index,
        ) in self._original_output_signals_to_indices.items():
Oscar Gustafsson's avatar
Oscar Gustafsson committed
            new_signal = cast(Signal, self._original_components_to_new[signal])

            if new_signal.source in output_sources:
                warnings.warn("Two signals connected to the same output port")
            output_sources.append(new_signal.source)

            if new_signal.source is None:
                if signal.source is None:
                    raise ValueError(
Oscar Gustafsson's avatar
Oscar Gustafsson committed
                        f"Output signal #{output_index} is missing source in SFG"
Oscar Gustafsson's avatar
Oscar Gustafsson committed
                    )
Oscar Gustafsson's avatar
Oscar Gustafsson committed
                if signal.source.operation not in self._original_components_to_new:
                    self._add_operation_connected_tree_copy(signal.source.operation)
        if len(output_sources) != (output_operation_count + output_signal_count):
            raise ValueError(
                "At least one output operation is not connected!, Tips: Check for output ports that are connected to the same signal"
            )

        """Return a string representation of this SFG."""
        string_io = StringIO()
        string_io.write(super().__str__() + "\n")
        string_io.write("Internal Operations:\n")
        line = "-" * 100 + "\n"
        string_io.write(line)

        for operation in self.get_operations_topological_order():
Oscar Gustafsson's avatar
Oscar Gustafsson committed
            string_io.write(f"{operation}\n")
Oscar Gustafsson's avatar
Oscar Gustafsson committed
    def __call__(
        self, *src: SignalSourceProvider | None, name: Name = Name("")
Oscar Gustafsson's avatar
Oscar Gustafsson committed
    ) -> "SFG":
        Return a new independent SFG instance that is identical to this SFG
        except without any of its external connections.
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        return SFG(
            inputs=self._input_operations,
            outputs=self._output_operations,
            id_number_offset=self.id_number_offset,
Oscar Gustafsson's avatar
Oscar Gustafsson committed
            name=Name(name),
Oscar Gustafsson's avatar
Oscar Gustafsson committed
            input_sources=src if src else None,
        )
        # doc-string inherited.
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        return TypeName("sfg")

    def evaluate(self, *args):
        result = self.evaluate_outputs(args)
        n = len(result)
        return None if n == 0 else result[0] if n == 1 else result

Oscar Gustafsson's avatar
Oscar Gustafsson committed
    def evaluate_output(
        self,
        index: int,
        input_values: Sequence[Num],
        results: MutableResultMap | None = None,
        delays: MutableDelayMap | None = None,
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        prefix: str = "",
        bits_override: int | None = None,
        quantize: bool = True,
Oscar Gustafsson's avatar
Oscar Gustafsson committed
    ) -> Number:
        # doc-string inherited
        if index < 0 or index >= self.output_count:
            raise IndexError(
Oscar Gustafsson's avatar
Oscar Gustafsson committed
                "Output index out of range (expected"
                f" 0-{self.output_count - 1}, got {index})"
            )
        if len(input_values) != self.input_count:
            raise ValueError(
Oscar Gustafsson's avatar
Oscar Gustafsson committed
                "Wrong number of inputs supplied to SFG for evaluation"
                f" (expected {self.input_count}, got {len(input_values)})"
            )
        if results is None:
            results = {}
        if delays is None:
            delays = {}

        # Set the values of our input operations to the given input values.
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        for op, arg in zip(
            self._input_operations,
            (
                self.quantize_inputs(input_values, bits_override)
                if quantize
                else input_values
            ),
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        ):
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        value = self._evaluate_source(
            self._output_operations[index].input(0).signals[0].source,
            results,
            delays,
            prefix,
            bits_override,
            quantize,
Oscar Gustafsson's avatar
Oscar Gustafsson committed
            deferred_delays,
        )
        while deferred_delays:
            new_deferred_delays = []
            for key_base, key, src in deferred_delays:
                self._do_evaluate_source(
Oscar Gustafsson's avatar
Oscar Gustafsson committed
                    key_base,
                    key,
                    src,
                    results,
                    delays,
                    prefix,
                    bits_override,
                    quantize,
Loading
Loading full blame...