Skip to content
Snippets Groups Projects
signal_flow_graph.py 70 KiB
Newer Older
"""
B-ASIC Signal Flow Graph Module.
import itertools
Oscar Gustafsson's avatar
Oscar Gustafsson committed
import re
from collections import defaultdict, deque
from io import StringIO
Oscar Gustafsson's avatar
Oscar Gustafsson committed
from numbers import Number
Oscar Gustafsson's avatar
Oscar Gustafsson committed
from typing import (
    DefaultDict,
    Deque,
Oscar Gustafsson's avatar
Oscar Gustafsson committed
    Dict,
    Iterable,
    List,
    MutableSet,
    Optional,
    Sequence,
    Set,
Oscar Gustafsson's avatar
Oscar Gustafsson committed
    Tuple,
Oscar Gustafsson's avatar
Oscar Gustafsson committed
    Union,
    cast,
Oscar Gustafsson's avatar
Oscar Gustafsson committed
)
Oscar Gustafsson's avatar
Oscar Gustafsson committed
from graphviz import Digraph

from b_asic.graph_component import GraphComponent
Oscar Gustafsson's avatar
Oscar Gustafsson committed
from b_asic.operation import (
    AbstractOperation,
    MutableDelayMap,
    MutableResultMap,
    Operation,
    ResultKey,
)
from b_asic.port import InputPort, OutputPort, SignalSourceProvider
from b_asic.signal import Signal
Oscar Gustafsson's avatar
Oscar Gustafsson committed
from b_asic.special_operations import Delay, Input, Output
from b_asic.types import GraphID, GraphIDNumber, Name, Num, TypeName

DelayQueue = List[Tuple[str, ResultKey, OutputPort]]


_OPERATION_SHAPE: DefaultDict[TypeName, str] = defaultdict(lambda: "ellipse")
_OPERATION_SHAPE.update(
    {
        Input.type_name(): "cds",
        Output.type_name(): "cds",
        Delay.type_name(): "square",
    }
)


class GraphIDGenerator:
    """Generates Graph IDs for objects."""

    _next_id_number: DefaultDict[TypeName, GraphIDNumber]

    def __init__(self, id_number_offset: GraphIDNumber = GraphIDNumber(0)):
        """Construct a GraphIDGenerator."""
        self._next_id_number = defaultdict(lambda: id_number_offset)

Oscar Gustafsson's avatar
Oscar Gustafsson committed
    def next_id(self, type_name: TypeName, used_ids: MutableSet = set()) -> GraphID:
        """Get the next graph id for a certain graph id type."""
        new_id = type_name + str(self._next_id_number[type_name])
        self._next_id_number[type_name] += 1
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        while new_id in used_ids:
            self._next_id_number[type_name] += 1
            new_id = type_name + str(self._next_id_number[type_name])
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        return GraphID(new_id)

    @property
    def id_number_offset(self) -> GraphIDNumber:
        """Get the graph id number offset of this generator."""
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        return GraphIDNumber(
Oscar Gustafsson's avatar
Oscar Gustafsson committed
            self._next_id_number.default_factory()
        )  # pylint: disable=not-callable


class SFG(AbstractOperation):
    Construct an SFG given its inputs and outputs.

    Contains a set of connected operations, forming a new operation.
    Used as a base for simulation, scheduling, etc.

    Inputs/outputs may be specified using either Input/Output operations
    directly with the *inputs*/*outputs* parameters, or using signals with the
    *input_signals*/*output_signals parameters*. If signals are used, the
    corresponding Input/Output operations will be created automatically.

    The *id_number_offset* parameter specifies what number graph IDs will be
    offset by for each new graph component type. IDs start at 1 by default,
    so the default offset of 0 will result in IDs like "c1", "c2", etc.
    while an offset of 3 will result in "c4", "c5", etc.

    Parameters
    ----------
    inputs : array of Input, optional

    outputs : array of Output, optional

    input_signals : array of Signal, optional

    output_signals : array of Signal, optional

    id_number_offset : GraphIDNumber, optional

    name : Name, optional

    input_sources :
    _components_by_id: Dict[GraphID, GraphComponent]
    _components_by_name: DefaultDict[Name, List[GraphComponent]]
    _components_dfs_order: List[GraphComponent]
    _operations_dfs_order: List[Operation]
    _operations_topological_order: List[Operation]
    _graph_id_generator: GraphIDGenerator
    _input_operations: List[Input]
    _output_operations: List[Output]
    _original_components_to_new: Dict[GraphComponent, GraphComponent]
    _original_input_signals_to_indices: Dict[Signal, int]
    _original_output_signals_to_indices: Dict[Signal, int]
    _precedence_list: Optional[List[List[OutputPort]]]
    _used_ids: Set[GraphID] = set()
Oscar Gustafsson's avatar
Oscar Gustafsson committed
    def __init__(
        self,
        inputs: Optional[Sequence[Input]] = None,
        outputs: Optional[Sequence[Output]] = None,
        input_signals: Optional[Sequence[Signal]] = None,
        output_signals: Optional[Sequence[Signal]] = None,
        id_number_offset: GraphIDNumber = GraphIDNumber(0),
        name: Name = Name(""),
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        input_sources: Optional[Sequence[Optional[SignalSourceProvider]]] = None,
Oscar Gustafsson's avatar
Oscar Gustafsson committed
    ):
        input_signal_count = 0 if input_signals is None else len(input_signals)
        input_operation_count = 0 if inputs is None else len(inputs)
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        output_signal_count = 0 if output_signals is None else len(output_signals)
        output_operation_count = 0 if outputs is None else len(outputs)
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        super().__init__(
            input_count=input_signal_count + input_operation_count,
            output_count=output_signal_count + output_operation_count,
            name=name,
            input_sources=input_sources,
        )
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        self._components_by_id = {}
        self._used_ids = set()
        self._components_by_name = defaultdict(list)
        self._components_dfs_order = []
        self._operations_dfs_order = []
        self._operations_topological_order = []
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        self._graph_id_generator = GraphIDGenerator(GraphIDNumber(id_number_offset))
        self._input_operations = []
        self._output_operations = []
        self._original_components_to_new = {}
        self._original_input_signals_to_indices = {}
        self._original_output_signals_to_indices = {}
        self._precedence_list = None

        # Setup input signals.
        if input_signals is not None:
            for input_index, signal in enumerate(input_signals):
                if signal in self._original_components_to_new:
Oscar Gustafsson's avatar
Oscar Gustafsson committed
                    raise ValueError(f"Duplicate input signal {signal!r} in SFG")
                new_input_op = cast(
                    Input, self._add_component_unconnected_copy(Input())
                )
Oscar Gustafsson's avatar
Oscar Gustafsson committed
                new_signal = cast(Signal, self._add_component_unconnected_copy(signal))
                new_signal.set_source(new_input_op.output(0))
                self._input_operations.append(new_input_op)
                self._original_input_signals_to_indices[signal] = input_index
        # Setup input operations, starting from indices after input signals.
        if inputs is not None:
            for input_index, input_op in enumerate(inputs, input_signal_count):
                if input_op in self._original_components_to_new:
Oscar Gustafsson's avatar
Oscar Gustafsson committed
                    raise ValueError(f"Duplicate input operation {input_op!r} in SFG")
Oscar Gustafsson's avatar
Oscar Gustafsson committed
                new_input_op = cast(
                    Input, self._add_component_unconnected_copy(input_op)
                )
                    if signal in self._original_components_to_new:
                        raise ValueError(
                            "Duplicate input signals connected to input ports"
                            " supplied to SFG constructor."
                        )
Oscar Gustafsson's avatar
Oscar Gustafsson committed
                    new_signal = cast(
                        Signal, self._add_component_unconnected_copy(signal)
                    )
                    new_signal.set_source(new_input_op.output(0))
Oscar Gustafsson's avatar
Oscar Gustafsson committed
                    self._original_input_signals_to_indices[signal] = input_index
                self._input_operations.append(new_input_op)
Loading
Loading full blame...