Skip to content
Snippets Groups Projects
signal_flow_graph.py 88 KiB
Newer Older
"""
B-ASIC Signal Flow Graph Module.
import itertools
Oscar Gustafsson's avatar
Oscar Gustafsson committed
import re
import warnings
from collections import Counter, defaultdict, deque
from collections.abc import Iterable, MutableSet, Sequence
Oscar Gustafsson's avatar
Oscar Gustafsson committed
from numbers import Number
Oscar Gustafsson's avatar
Oscar Gustafsson committed
from typing import (
    Optional,
Oscar Gustafsson's avatar
Oscar Gustafsson committed
    Union,
    cast,
Oscar Gustafsson's avatar
Oscar Gustafsson committed
)
Samuel Fagerlund's avatar
Samuel Fagerlund committed
import numpy as np
Oscar Gustafsson's avatar
Oscar Gustafsson committed
from graphviz import Digraph

from b_asic.graph_component import GraphComponent
Oscar Gustafsson's avatar
Oscar Gustafsson committed
from b_asic.operation import (
    AbstractOperation,
    MutableDelayMap,
    MutableResultMap,
    Operation,
    ResultKey,
)
from b_asic.port import InputPort, OutputPort, SignalSourceProvider
from b_asic.signal import Signal
Oscar Gustafsson's avatar
Oscar Gustafsson committed
from b_asic.special_operations import Delay, Input, Output
from b_asic.types import GraphID, GraphIDNumber, Name, Num, TypeName
DelayQueue = list[tuple[str, ResultKey, OutputPort]]
_OPERATION_SHAPE: defaultdict[TypeName, str] = defaultdict(lambda: "ellipse")
_OPERATION_SHAPE.update(
    {
        Input.type_name(): "cds",
        Output.type_name(): "cds",
        Delay.type_name(): "square",
    }
)


class GraphIDGenerator:
    """Generates Graph IDs for objects."""

    _next_id_number: defaultdict[TypeName, GraphIDNumber]
    def __init__(self, id_number_offset: GraphIDNumber = GraphIDNumber(0)):
        """Construct a GraphIDGenerator."""
        self._next_id_number = defaultdict(lambda: id_number_offset)

Oscar Gustafsson's avatar
Oscar Gustafsson committed
    def next_id(self, type_name: TypeName, used_ids: MutableSet = set()) -> GraphID:
        """Get the next graph id for a certain graph id type."""
        new_id = type_name + str(self._next_id_number[type_name])
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        while new_id in used_ids:
            self._next_id_number[type_name] += 1
            new_id = type_name + str(self._next_id_number[type_name])
        used_ids.add(GraphID(new_id))
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        return GraphID(new_id)

    @property
    def id_number_offset(self) -> GraphIDNumber:
        """Get the graph id number offset of this generator."""
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        return GraphIDNumber(
Oscar Gustafsson's avatar
Oscar Gustafsson committed
            self._next_id_number.default_factory()
        )  # pylint: disable=not-callable


class SFG(AbstractOperation):
    Construct an SFG given its inputs and outputs.

    Contains a set of connected operations, forming a new operation.
    Used as a base for simulation, scheduling, etc.

    Inputs/outputs may be specified using either Input/Output operations
    directly with the *inputs*/*outputs* parameters, or using signals with the
    *input_signals*/*output_signals parameters*. If signals are used, the
    corresponding Input/Output operations will be created automatically.

    The *id_number_offset* parameter specifies what number graph IDs will be
    offset by for each new graph component type. IDs start at 1 by default,
    so the default offset of 0 will result in IDs like "c1", "c2", etc.
    while an offset of 3 will result in "c4", "c5", etc.

    Parameters
    ----------
    inputs : array of Input, optional

    outputs : array of Output, optional

    input_signals : array of Signal, optional

    output_signals : array of Signal, optional

    id_number_offset : GraphIDNumber, optional

    name : Name, optional

    input_sources :
    _components_by_id: dict[GraphID, GraphComponent]
    _components_by_name: defaultdict[Name, list[GraphComponent]]
    _components_dfs_order: list[GraphComponent]
    _operations_dfs_order: list[Operation]
    _operations_topological_order: list[Operation]
    _graph_id_generator: GraphIDGenerator
    _input_operations: list[Input]
    _output_operations: list[Output]
    _original_components_to_new: dict[GraphComponent, GraphComponent]
    _original_input_signals_to_indices: dict[Signal, int]
    _original_output_signals_to_indices: dict[Signal, int]
    _precedence_list: list[list[OutputPort]] | None
    _used_ids: set[GraphID] = set()
Oscar Gustafsson's avatar
Oscar Gustafsson committed
    def __init__(
        self,
        inputs: Sequence[Input] | None = None,
        outputs: Sequence[Output] | None = None,
        input_signals: Sequence[Signal] | None = None,
        output_signals: Sequence[Signal] | None = None,
        id_number_offset: GraphIDNumber = GraphIDNumber(0),
        name: Name = Name(""),
        input_sources: Sequence[SignalSourceProvider | None] | None = None,
Oscar Gustafsson's avatar
Oscar Gustafsson committed
    ):
        input_signal_count = 0 if input_signals is None else len(input_signals)
        input_operation_count = 0 if inputs is None else len(inputs)
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        output_signal_count = 0 if output_signals is None else len(output_signals)
        output_operation_count = 0 if outputs is None else len(outputs)
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        super().__init__(
            input_count=input_signal_count + input_operation_count,
            output_count=output_signal_count + output_operation_count,
            name=name,
            input_sources=input_sources,
        )
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        self._components_by_id = {}
        self._used_ids = set()
        self._components_by_name = defaultdict(list)
        self._components_dfs_order = []
        self._operations_dfs_order = []
        self._operations_topological_order = []
Oscar Gustafsson's avatar
Oscar Gustafsson committed
        self._graph_id_generator = GraphIDGenerator(GraphIDNumber(id_number_offset))
        self._input_operations = []
        self._output_operations = []
        self._original_components_to_new = {}
        self._original_input_signals_to_indices = {}
        self._original_output_signals_to_indices = {}
        self._precedence_list = None

        # Setup input signals.
        if input_signals is not None:
            for input_index, signal in enumerate(input_signals):
                if signal in self._original_components_to_new:
Oscar Gustafsson's avatar
Oscar Gustafsson committed
                    raise ValueError(f"Duplicate input signal {signal!r} in SFG")
                new_input_op = cast(
                    Input, self._add_component_unconnected_copy(Input())
                )
Oscar Gustafsson's avatar
Oscar Gustafsson committed
                new_signal = cast(Signal, self._add_component_unconnected_copy(signal))
                new_signal.set_source(new_input_op.output(0))
                self._input_operations.append(new_input_op)
                self._original_input_signals_to_indices[signal] = input_index
        # Setup input operations, starting from indices after input signals.
        if inputs is not None:
            for input_index, input_op in enumerate(inputs, input_signal_count):
                if input_op in self._original_components_to_new:
Oscar Gustafsson's avatar
Oscar Gustafsson committed
                    raise ValueError(f"Duplicate input operation {input_op!r} in SFG")
Oscar Gustafsson's avatar
Oscar Gustafsson committed
                new_input_op = cast(
                    Input, self._add_component_unconnected_copy(input_op)
                )
                    if signal in self._original_components_to_new:
                        raise ValueError(
                            "Duplicate input signals connected to input ports"
                            " supplied to SFG constructor."
                        )
Oscar Gustafsson's avatar
Oscar Gustafsson committed
                    new_signal = cast(
                        Signal, self._add_component_unconnected_copy(signal)
                    )
                    new_signal.set_source(new_input_op.output(0))
Oscar Gustafsson's avatar
Oscar Gustafsson committed
                    self._original_input_signals_to_indices[signal] = input_index
                self._input_operations.append(new_input_op)
        # Setup output signals.
        if output_signals is not None:
            for output_index, signal in enumerate(output_signals):
                new_output_op = cast(
                    Output, self._add_component_unconnected_copy(Output())
Loading
Loading full blame...