Skip to content
Snippets Groups Projects
signal_flow_graph.py 12.8 KiB
Newer Older
  • Learn to ignore specific revisions
  • """@package docstring
    
    B-ASIC Signal Flow Graph Module.
    TODO: More info.
    """
    
    
    Ivar Härnqvist's avatar
    Ivar Härnqvist committed
    from typing import NewType, List, Iterable, Sequence, Dict, Optional, DefaultDict, Set
    from numbers import Number
    
    from collections import defaultdict, deque
    from pprint import pprint
    
    Ivar Härnqvist's avatar
    Ivar Härnqvist committed
    from b_asic.port import SignalSourceProvider, OutputPort
    from b_asic.operation import Operation, AbstractOperation
    
    Kevin Scott's avatar
    Kevin Scott committed
    from b_asic.signal import Signal
    
    from b_asic.graph_component import GraphComponent, Name, TypeName
    
    Ivar Härnqvist's avatar
    Ivar Härnqvist committed
    from b_asic.special_operations import Input, Output
    
    
    GraphID = NewType("GraphID", str)
    GraphIDNumber = NewType("GraphIDNumber", int)
    
    
    class GraphIDGenerator:
        """A class that generates Graph IDs for objects."""
    
        _next_id_number: DefaultDict[TypeName, GraphIDNumber]
    
        def __init__(self, id_number_offset: GraphIDNumber = 0):
            self._next_id_number = defaultdict(lambda: id_number_offset)
    
        def next_id(self, type_name: TypeName) -> GraphID:
            """Return the next graph id for a certain graph id type."""
            self._next_id_number[type_name] += 1
            return type_name + str(self._next_id_number[type_name])
    
    Jacob Wahlman's avatar
    Jacob Wahlman committed
        """Signal flow graph.
        TODO: More info.
        """
    
    Ivar Härnqvist's avatar
    Ivar Härnqvist committed
        _components_by_id: Dict[GraphID, GraphComponent]
        _components_by_name: DefaultDict[Name, List[GraphComponent]]
    
    Jacob Wahlman's avatar
    Jacob Wahlman committed
        _graph_id_generator: GraphIDGenerator
    
    Ivar Härnqvist's avatar
    Ivar Härnqvist committed
        _input_operations: List[Input]
        _output_operations: List[Output]
        _original_components_added: Set[GraphComponent]
        _original_input_signals: Dict[Signal, int]
        _original_output_signals: Dict[Signal, int]
    
    
        def __init__(self, input_signals: Sequence[Signal] = [], output_signals: Sequence[Signal] = [],
    
                     inputs: Sequence[Input] = [], outputs: Sequence[Output] = [],
    
                     id_number_offset: GraphIDNumber = 0, name: Name = "",
    
    Ivar Härnqvist's avatar
    Ivar Härnqvist committed
                     input_sources: Optional[Sequence[Optional[SignalSourceProvider]]] = None):
            super().__init__(
    
                input_count=len(input_signals) + len(inputs),
                output_count=len(output_signals) + len(outputs),
                name=name,
                input_sources=input_sources)
    
    Ivar Härnqvist's avatar
    Ivar Härnqvist committed
    
            self._components_by_id = dict()
            self._components_by_name = defaultdict(list)
            self._graph_id_generator = GraphIDGenerator(id_number_offset)
            self._input_operations = []
            self._output_operations = []
    
            # Maps original components to new copied components
            self._added_components_mapping = {}
            self._original_input_signals_indexes = {}
            self._original_output_signals_indexes = {}
    
    Ivar Härnqvist's avatar
    Ivar Härnqvist committed
    
            # Setup input operations and signals.
            for i, s in enumerate(input_signals):
    
                self._input_operations.append(
                    self._add_component_copy_unconnected(Input()))
    
                self._original_input_signals_indexes[s] = i
    
    Ivar Härnqvist's avatar
    Ivar Härnqvist committed
            for i, op in enumerate(inputs, len(input_signals)):
    
                self._input_operations.append(
                    self._add_component_copy_unconnected(op))
    
    Ivar Härnqvist's avatar
    Ivar Härnqvist committed
                for s in op.output(0).signals:
    
                    self._original_input_signals_indexes[s] = i
    
    Ivar Härnqvist's avatar
    Ivar Härnqvist committed
    
            # Setup output operations and signals.
            for i, s in enumerate(output_signals):
    
                self._output_operations.append(
                    self._add_component_copy_unconnected(Output()))
    
                self._original_output_signals_indexes[s] = i
    
    Ivar Härnqvist's avatar
    Ivar Härnqvist committed
            for i, op in enumerate(outputs, len(output_signals)):
    
                self._output_operations.append(
                    self._add_component_copy_unconnected(op))
    
    Ivar Härnqvist's avatar
    Ivar Härnqvist committed
                for s in op.input(0).signals:
    
                    self._original_output_signals_indexes[s] = i
    
    Ivar Härnqvist's avatar
    Ivar Härnqvist committed
            # Search the graph inwards from each input signal.
    
            for s, i in self._original_input_signals_indexes.items():
    
    Ivar Härnqvist's avatar
    Ivar Härnqvist committed
                if s.destination is None:
    
                    raise ValueError(
                        f"Input signal #{i} is missing destination in SFG")
    
                if s.destination.operation not in self._added_components_mapping:
                    self._copy_structure_from_operation_bfs(
                        s.destination.operation)
    
    Ivar Härnqvist's avatar
    Ivar Härnqvist committed
    
            # Search the graph inwards from each output signal.
    
            for s, i in self._original_output_signals_indexes.items():
    
    Ivar Härnqvist's avatar
    Ivar Härnqvist committed
                if s.source is None:
    
                    raise ValueError(
                        f"Output signal #{i} is missing source in SFG")
    
                if s.source.operation not in self._added_components_mapping:
                    self._copy_structure_from_operation_bfs(s.source.operation)
    
    Jacob Wahlman's avatar
    Jacob Wahlman committed
    
    
    Ivar Härnqvist's avatar
    Ivar Härnqvist committed
        @property
        def type_name(self) -> TypeName:
            return "sfg"
    
    Jacob Wahlman's avatar
    Jacob Wahlman committed
    
    
    Ivar Härnqvist's avatar
    Ivar Härnqvist committed
        def evaluate(self, *args):
            if len(args) != self.input_count:
    
                raise ValueError(
                    "Wrong number of inputs supplied to SFG for evaluation")
    
    Ivar Härnqvist's avatar
    Ivar Härnqvist committed
            for arg, op in zip(args, self._input_operations):
                op.value = arg
    
    Ivar Härnqvist's avatar
    Ivar Härnqvist committed
            result = []
            for op in self._output_operations:
                result.append(self._evaluate_source(op.input(0).signals[0].source))
    
    Jacob Wahlman's avatar
    Jacob Wahlman committed
    
    
    Ivar Härnqvist's avatar
    Ivar Härnqvist committed
            n = len(result)
            return None if n == 0 else result[0] if n == 1 else result
    
    Jacob Wahlman's avatar
    Jacob Wahlman committed
    
    
    Ivar Härnqvist's avatar
    Ivar Härnqvist committed
        def evaluate_output(self, i: int, input_values: Sequence[Number]) -> Sequence[Optional[Number]]:
            assert i >= 0 and i < self.output_count, "Output index out of range"
            result = [None] * self.output_count
    
            result[i] = self._evaluate_source(
                self._output_operations[i].input(0).signals[0].source)
    
    Ivar Härnqvist's avatar
    Ivar Härnqvist committed
            return result
    
    Jacob Wahlman's avatar
    Jacob Wahlman committed
    
    
    Ivar Härnqvist's avatar
    Ivar Härnqvist committed
        def split(self) -> Iterable[Operation]:
            return filter(lambda comp: isinstance(comp, Operation), self._components_by_id.values())
    
    Jacob Wahlman's avatar
    Jacob Wahlman committed
    
    
    Ivar Härnqvist's avatar
    Ivar Härnqvist committed
        @property
        def components(self) -> Iterable[GraphComponent]:
            """Get all components of this graph."""
            return self._components_by_id.values()
    
    Jacob Wahlman's avatar
    Jacob Wahlman committed
    
        def find_by_id(self, graph_id: GraphID) -> Optional[GraphComponent]:
    
            """Find a graph object based on the entered Graph ID and return it. If no graph
            object with the entered ID was found then return None.
    
    Jacob Wahlman's avatar
    Jacob Wahlman committed
    
            Keyword arguments:
            graph_id: Graph ID of the wanted object.
            """
    
    Ivar Härnqvist's avatar
    Ivar Härnqvist committed
            return self._components_by_id.get(graph_id, None)
    
    Jacob Wahlman's avatar
    Jacob Wahlman committed
    
        def find_by_name(self, name: Name) -> List[GraphComponent]:
    
            """Find all graph objects that have the entered name and return them
            in a list. If no graph object with the entered name was found then return an
    
    Jacob Wahlman's avatar
    Jacob Wahlman committed
            empty list.
    
            Keyword arguments:
            name: Name of the wanted object.
            """
    
    Ivar Härnqvist's avatar
    Ivar Härnqvist committed
            return self._components_by_name.get(name, [])
    
        def _add_component_copy_unconnected(self, original_comp: GraphComponent) -> GraphComponent:
    
    
            assert original_comp not in self._added_components_mapping, "Tried to add duplicate SFG component"
    
    Ivar Härnqvist's avatar
    Ivar Härnqvist committed
    
            new_comp = original_comp.copy_unconnected()
    
    
            self._added_components_mapping[original_comp] = new_comp
    
            self._components_by_id[self._graph_id_generator.next_id(
                new_comp.type_name)] = new_comp
    
    Ivar Härnqvist's avatar
    Ivar Härnqvist committed
            self._components_by_name[new_comp.name].append(new_comp)
    
    Ivar Härnqvist's avatar
    Ivar Härnqvist committed
            return new_comp
    
    
        def _copy_structure_from_operation_bfs(self, start_op: Operation):
            op_queue = deque([start_op])
    
            while op_queue:
                original_op = op_queue.popleft()
                print("CURRENT:", original_op.name, "-------------------")
                # Add a copy of the operation without any connections.
                new_op = None
                if original_op not in self._added_components_mapping:
                    new_op = self._add_component_copy_unconnected(original_op)
                else:
                    new_op = self._added_components_mapping[original_op]
    
                # Connect input ports to new signals
                for original_input_port in original_op.inputs:
                    if original_input_port.signal_count < 1:
                        raise ValueError("Unconnected input port in SFG")
    
                    for original_signal in original_input_port.signals:
    
                        # Check if the signal is one of the SFG's input signals
                        if original_signal in self._original_input_signals_indexes:
    
                            new_signal = self._add_component_copy_unconnected(
                                original_signal)
                            new_signal.set_destination(
                                new_op.input(original_input_port.index))
                            new_signal.set_source(
                                self._input_operations[self._original_input_signals_indexes[
                                    original_signal]].output(0))
    
                        # Check if the signal has not been added before
                        elif original_signal not in self._added_components_mapping:
                            if original_signal.source is None:
                                raise ValueError(
                                    "Dangling signal without source in SFG")
    
                            new_signal = self._add_component_copy_unconnected(
                                original_signal)
                            new_signal.set_destination(
                                new_op.input(original_input_port.index))
    
                            original_connected_op = original_signal.source.operation
                            # Check if connected Operation has been added before
                            if original_connected_op in self._added_components_mapping:
                                # Set source to the already added operations port
                                new_signal.set_source(
                                    self._added_components_mapping[original_connected_op].output(
                                        original_signal.source.index))
                            else:
                                # Create new operation, set signal source to it
                                new_connected_op = self._add_component_copy_unconnected(
                                    original_connected_op)
                                new_signal.set_source(new_connected_op.output(
                                    original_signal.source.index))
    
                                # Add connected operation to queue of operations to visit
                                op_queue.append(original_connected_op)
    
                # Connect output ports
                for original_output_port in original_op.outputs:
    
                    for original_signal in original_output_port.signals:
                        # Check if the signal is one of the SFG's output signals.
                        if original_signal in self._original_output_signals_indexes:
    
                            new_signal = self._add_component_copy_unconnected(
                                original_signal)
                            new_signal.set_source(
                                new_op.output(original_output_port.index))
                            new_signal.set_destination(
                                self._output_operations[self._original_output_signals_indexes[
                                    original_signal]].input(0))
    
                        # Check if signal has not been added before.
                        elif original_signal not in self._added_components_mapping:
                            if original_signal.source is None:
                                raise ValueError(
                                    "Dangling signal without source in SFG")
    
                            new_signal = self._add_component_copy_unconnected(
                                original_signal)
                            new_signal.set_source(
                                new_op.output(original_output_port.index))
    
                            original_connected_op = original_signal.destination.operation
                            # Check if connected operation has been added.
                            if original_connected_op in self._added_components_mapping:
                                # Set destination to the already connected operations port
                                new_signal.set_destination(
                                    self._added_components_mapping[original_connected_op].input(
                                        original_signal.destination.index))
    
                            else:
                                # Create new operation, set destination to it.
                                new_connected_op = self._add_component_copy_unconnected(
                                    original_connected_op)
                                new_signal.set_destination(new_connected_op.input(
                                    original_signal.destination.index))
    
                                print("Adding signal:", new_signal.name,
                                      "to op:", new_connected_op.name)
                                print(
                                    [inport.signals for inport in new_connected_op.inputs])
    
                                # Add connected operation to the queue of operations to visist
                                op_queue.append(original_connected_op)
    
    Ivar Härnqvist's avatar
    Ivar Härnqvist committed
        def _evaluate_source(self, src: OutputPort) -> Number:
            input_values = []
            for input_port in src.operation.inputs:
                input_src = input_port.signals[0].source
                input_values.append(self._evaluate_source(input_src))
    
            return src.operation.evaluate_output(src.index, input_values)