Skip to content
Snippets Groups Projects
signal_flow_graph.py 17.7 KiB
Newer Older
  • Learn to ignore specific revisions
  • """@package docstring
    
    B-ASIC Signal Flow Graph Module.
    TODO: More info.
    """
    
    
    Ivar Härnqvist's avatar
    Ivar Härnqvist committed
    from typing import NewType, List, Iterable, Sequence, Dict, Optional, DefaultDict, Set
    from numbers import Number
    
    from collections import defaultdict, deque
    
    Ivar Härnqvist's avatar
    Ivar Härnqvist committed
    from b_asic.port import SignalSourceProvider, OutputPort
    from b_asic.operation import Operation, AbstractOperation
    
    Kevin Scott's avatar
    Kevin Scott committed
    from b_asic.signal import Signal
    
    from b_asic.graph_component import GraphComponent, Name, TypeName
    
    Ivar Härnqvist's avatar
    Ivar Härnqvist committed
    from b_asic.special_operations import Input, Output
    
    
    GraphID = NewType("GraphID", str)
    GraphIDNumber = NewType("GraphIDNumber", int)
    
    
    class GraphIDGenerator:
        """A class that generates Graph IDs for objects."""
    
        _next_id_number: DefaultDict[TypeName, GraphIDNumber]
    
        def __init__(self, id_number_offset: GraphIDNumber = 0):
            self._next_id_number = defaultdict(lambda: id_number_offset)
    
        def next_id(self, type_name: TypeName) -> GraphID:
            """Return the next graph id for a certain graph id type."""
            self._next_id_number[type_name] += 1
            return type_name + str(self._next_id_number[type_name])
    
    Jacob Wahlman's avatar
    Jacob Wahlman committed
        """Signal flow graph.
        TODO: More info.
        """
    
    Ivar Härnqvist's avatar
    Ivar Härnqvist committed
        _components_by_id: Dict[GraphID, GraphComponent]
        _components_by_name: DefaultDict[Name, List[GraphComponent]]
    
    Jacob Wahlman's avatar
    Jacob Wahlman committed
        _graph_id_generator: GraphIDGenerator
    
    Ivar Härnqvist's avatar
    Ivar Härnqvist committed
        _input_operations: List[Input]
        _output_operations: List[Output]
        _original_components_added: Set[GraphComponent]
        _original_input_signals: Dict[Signal, int]
        _original_output_signals: Dict[Signal, int]
    
    
        def __init__(self, input_signals: Sequence[Signal] = [], output_signals: Sequence[Signal] = [],
                     inputs: Sequence[Input] = [], outputs: Sequence[Output] = [],
                     id_number_offset: GraphIDNumber = 0, name: Name = "",
    
    Ivar Härnqvist's avatar
    Ivar Härnqvist committed
                     input_sources: Optional[Sequence[Optional[SignalSourceProvider]]] = None):
            super().__init__(
    
                input_count=len(input_signals) + len(inputs),
                output_count=len(output_signals) + len(outputs),
                name=name,
                input_sources=input_sources)
    
    Ivar Härnqvist's avatar
    Ivar Härnqvist committed
    
            self._components_by_id = dict()
            self._components_by_name = defaultdict(list)
    
    Ivar Härnqvist's avatar
    Ivar Härnqvist committed
            self._graph_id_generator = GraphIDGenerator(id_number_offset)
            self._input_operations = []
            self._output_operations = []
    
            # Maps original components to new copied components
            self._added_components_mapping = {}
            self._original_input_signals_indexes = {}
            self._original_output_signals_indexes = {}
            self._id_number_offset = id_number_offset
    
            # Setup input signals.
            for input_index, sig in enumerate(input_signals):
                assert sig not in self._added_components_mapping, "Duplicate input signals sent to SFG construcctor."
    
                new_input_op = self._add_component_copy_unconnected(Input())
                new_sig = self._add_component_copy_unconnected(sig)
                new_sig.set_source(new_input_op.output(0))
    
                self._input_operations.append(new_input_op)
                self._original_input_signals_indexes[sig] = input_index
    
            # Setup input operations, starting from indexes ater input signals.
            for input_index, input_op in enumerate(inputs, len(input_signals)):
                assert input_op not in self._added_components_mapping, "Duplicate input operations sent to SFG constructor."
                new_input_op = self._add_component_copy_unconnected(input_op)
    
                for sig in input_op.output(0).signals:
                    assert sig not in self._added_components_mapping, "Duplicate input signals connected to input ports sent to SFG construcctor."
                    new_sig = self._add_component_copy_unconnected(sig)
                    new_sig.set_source(new_input_op.output(0))
    
                    self._original_input_signals_indexes[sig] = input_index
    
                self._input_operations.append(new_input_op)
    
            # Setup output signals.
            for output_ind, sig in enumerate(output_signals):
                new_out = self._add_component_copy_unconnected(Output())
                if sig in self._added_components_mapping:
                    # Signal already added when setting up inputs
                    new_sig = self._added_components_mapping[sig]
                    new_sig.set_destination(new_out.input(0))
                else:
                    # New signal has to be created
                    new_sig = self._add_component_copy_unconnected(sig)
                    new_sig.set_destination(new_out.input(0))
    
                self._output_operations.append(new_out)
                self._original_output_signals_indexes[sig] = output_ind
    
            # Setup output operations, starting from indexes after output signals.
            for output_ind, output_op in enumerate(outputs, len(output_signals)):
                assert output_op not in self._added_components_mapping, "Duplicate output operations sent to SFG constructor."
    
                new_out = self._add_component_copy_unconnected(output_op)
                for sig in output_op.input(0).signals:
                    if sig in self._added_components_mapping:
                        # Signal already added when setting up inputs
                        new_sig = self._added_components_mapping[sig]
                        new_sig.set_destination(new_out.input(0))
                    else:
                        # New signal has to be created
                        new_sig = self._add_component_copy_unconnected(sig)
                        new_sig.set_destination(new_out.input(0))
    
                    self._original_output_signals_indexes[sig] = output_ind
    
                self._output_operations.append(new_out)
    
            output_operations_set = set(self._output_operations)
    
    
    Ivar Härnqvist's avatar
    Ivar Härnqvist committed
            # Search the graph inwards from each input signal.
    
            for sig, input_index in self._original_input_signals_indexes.items():
                # Check if already added destination.
                new_sig = self._added_components_mapping[sig]
                if new_sig.destination is not None and new_sig.destination.operation in output_operations_set:
                    # Add directly connected input to output to dfs order list
                    self._components_in_dfs_order.extend([
                        new_sig.source.operation, new_sig, new_sig.destination.operation])
                elif sig.destination is None:
                    raise ValueError(
                        f"Input signal #{input_index} is missing destination in SFG")
                elif sig.destination.operation not in self._added_components_mapping:
                    self._copy_structure_from_operation_dfs(
                        sig.destination.operation)
    
    Ivar Härnqvist's avatar
    Ivar Härnqvist committed
    
            # Search the graph inwards from each output signal.
    
            for sig, output_index in self._original_output_signals_indexes.items():
                # Check if already added source.
                mew_sig = self._added_components_mapping[sig]
                if new_sig.source is None:
                    if sig.source is None:
                        raise ValueError(
                            f"Output signal #{output_index} is missing source in SFG")
                    if sig.source.operation not in self._added_components_mapping:
                        self._copy_structure_from_operation_dfs(
                            sig.source.operation)
    
    Jacob Wahlman's avatar
    Jacob Wahlman committed
    
    
    Ivar Härnqvist's avatar
    Ivar Härnqvist committed
        @property
        def type_name(self) -> TypeName:
            return "sfg"
    
    Jacob Wahlman's avatar
    Jacob Wahlman committed
    
    
    Ivar Härnqvist's avatar
    Ivar Härnqvist committed
        def evaluate(self, *args):
            if len(args) != self.input_count:
    
                raise ValueError(
                    "Wrong number of inputs supplied to SFG for evaluation")
    
    Ivar Härnqvist's avatar
    Ivar Härnqvist committed
            for arg, op in zip(args, self._input_operations):
                op.value = arg
    
    Ivar Härnqvist's avatar
    Ivar Härnqvist committed
            result = []
            for op in self._output_operations:
                result.append(self._evaluate_source(op.input(0).signals[0].source))
    
    Jacob Wahlman's avatar
    Jacob Wahlman committed
    
    
    Ivar Härnqvist's avatar
    Ivar Härnqvist committed
            n = len(result)
            return None if n == 0 else result[0] if n == 1 else result
    
    Jacob Wahlman's avatar
    Jacob Wahlman committed
    
    
    Ivar Härnqvist's avatar
    Ivar Härnqvist committed
        def evaluate_output(self, i: int, input_values: Sequence[Number]) -> Sequence[Optional[Number]]:
            assert i >= 0 and i < self.output_count, "Output index out of range"
            result = [None] * self.output_count
    
            result[i] = self._evaluate_source(
                self._output_operations[i].input(0).signals[0].source)
    
    Ivar Härnqvist's avatar
    Ivar Härnqvist committed
            return result
    
    Jacob Wahlman's avatar
    Jacob Wahlman committed
    
    
    Ivar Härnqvist's avatar
    Ivar Härnqvist committed
        def split(self) -> Iterable[Operation]:
            return filter(lambda comp: isinstance(comp, Operation), self._components_by_id.values())
    
    Jacob Wahlman's avatar
    Jacob Wahlman committed
    
    
    Ivar Härnqvist's avatar
    Ivar Härnqvist committed
        @property
        def components(self) -> Iterable[GraphComponent]:
    
            """Get all components of this graph in the dfs-traversal order."""
            return self._components_in_dfs_order
    
    Jacob Wahlman's avatar
    Jacob Wahlman committed
    
        def find_by_id(self, graph_id: GraphID) -> Optional[GraphComponent]:
    
            """Find a graph object based on the entered Graph ID and return it. If no graph
            object with the entered ID was found then return None.
    
    Jacob Wahlman's avatar
    Jacob Wahlman committed
    
            Keyword arguments:
            graph_id: Graph ID of the wanted object.
            """
    
    Ivar Härnqvist's avatar
    Ivar Härnqvist committed
            return self._components_by_id.get(graph_id, None)
    
    Jacob Wahlman's avatar
    Jacob Wahlman committed
    
        def find_by_name(self, name: Name) -> List[GraphComponent]:
    
            """Find all graph objects that have the entered name and return them
            in a list. If no graph object with the entered name was found then return an
    
    Jacob Wahlman's avatar
    Jacob Wahlman committed
            empty list.
    
            Keyword arguments:
            name: Name of the wanted object.
            """
    
    Ivar Härnqvist's avatar
    Ivar Härnqvist committed
            return self._components_by_name.get(name, [])
    
    
        def deep_copy(self) -> "SFG":
            """Returns a deep copy of self."""
            copy = SFG(inputs=self._input_operations, outputs=self._output_operations,
                       id_number_offset=self._id_number_offset, name=super().name)
    
            return copy
    
    
    Ivar Härnqvist's avatar
    Ivar Härnqvist committed
        def _add_component_copy_unconnected(self, original_comp: GraphComponent) -> GraphComponent:
    
    
            assert original_comp not in self._added_components_mapping, "Tried to add duplicate SFG component"
    
    Ivar Härnqvist's avatar
    Ivar Härnqvist committed
    
            new_comp = original_comp.copy_unconnected()
    
    
            self._added_components_mapping[original_comp] = new_comp
            self._components_by_id[self._graph_id_generator.next_id(
                new_comp.type_name)] = new_comp
    
    Ivar Härnqvist's avatar
    Ivar Härnqvist committed
            self._components_by_name[new_comp.name].append(new_comp)
    
    Ivar Härnqvist's avatar
    Ivar Härnqvist committed
            return new_comp
    
    
        def _copy_structure_from_operation_dfs(self, start_op: Operation):
            op_stack = deque([start_op])
    
            while op_stack:
                original_op = op_stack.pop()
                # Add or get the new copy of the operation..
                new_op = None
                if original_op not in self._added_components_mapping:
                    new_op = self._add_component_copy_unconnected(original_op)
                    self._components_in_dfs_order.append(new_op)
                else:
                    new_op = self._added_components_mapping[original_op]
    
                # Connect input ports to new signals
                for original_input_port in original_op.inputs:
                    if original_input_port.signal_count < 1:
                        raise ValueError("Unconnected input port in SFG")
    
                    for original_signal in original_input_port.signals:
    
                        # Check if the signal is one of the SFG's input signals
                        if original_signal in self._original_input_signals_indexes:
    
                            # New signal already created during first step of constructor
                            new_signal = self._added_components_mapping[
                                original_signal]
                            new_signal.set_destination(
                                new_op.input(original_input_port.index))
    
                            self._components_in_dfs_order.extend(
                                [new_signal, new_signal.source.operation])
    
                        # Check if the signal has not been added before
                        elif original_signal not in self._added_components_mapping:
                            if original_signal.source is None:
                                raise ValueError(
                                    "Dangling signal without source in SFG")
    
                            new_signal = self._add_component_copy_unconnected(
                                original_signal)
                            new_signal.set_destination(
                                new_op.input(original_input_port.index))
    
                            self._components_in_dfs_order.append(new_signal)
    
                            original_connected_op = original_signal.source.operation
                            # Check if connected Operation has been added before
                            if original_connected_op in self._added_components_mapping:
                                # Set source to the already added operations port
                                new_signal.set_source(
                                    self._added_components_mapping[original_connected_op].output(
                                        original_signal.source.index))
                            else:
                                # Create new operation, set signal source to it
                                new_connected_op = self._add_component_copy_unconnected(
                                    original_connected_op)
                                new_signal.set_source(new_connected_op.output(
                                    original_signal.source.index))
    
                                self._components_in_dfs_order.append(
                                    new_connected_op)
    
                                # Add connected operation to queue of operations to visit
                                op_stack.append(original_connected_op)
    
                # Connect output ports
                for original_output_port in original_op.outputs:
    
                    for original_signal in original_output_port.signals:
                        # Check if the signal is one of the SFG's output signals.
                        if original_signal in self._original_output_signals_indexes:
    
                            # New signal already created during first step of constructor.
                            new_signal = self._added_components_mapping[
                                original_signal]
                            new_signal.set_source(
                                new_op.output(original_output_port.index))
    
                            self._components_in_dfs_order.extend(
                                [new_signal, new_signal.destination.operation])
    
                        # Check if signal has not been added before.
                        elif original_signal not in self._added_components_mapping:
                            if original_signal.source is None:
                                raise ValueError(
                                    "Dangling signal without source in SFG")
    
                            new_signal = self._add_component_copy_unconnected(
                                original_signal)
                            new_signal.set_source(
                                new_op.output(original_output_port.index))
    
                            self._components_in_dfs_order.append(new_signal)
    
                            original_connected_op = original_signal.destination.operation
                            # Check if connected operation has been added.
                            if original_connected_op in self._added_components_mapping:
                                # Set destination to the already connected operations port
                                new_signal.set_destination(
                                    self._added_components_mapping[original_connected_op].input(
                                        original_signal.destination.index))
    
                            else:
                                # Create new operation, set destination to it.
                                new_connected_op = self._add_component_copy_unconnected(
                                    original_connected_op)
                                new_signal.set_destination(new_connected_op.input(
                                    original_signal.destination.index))
    
                                self._components_in_dfs_order.append(
                                    new_connected_op)
    
                                # Add connected operation to the queue of operations to visist
                                op_stack.append(original_connected_op)
    
    
    Ivar Härnqvist's avatar
    Ivar Härnqvist committed
        def _evaluate_source(self, src: OutputPort) -> Number:
            input_values = []
            for input_port in src.operation.inputs:
                input_src = input_port.signals[0].source
                input_values.append(self._evaluate_source(input_src))
    
            return src.operation.evaluate_output(src.index, input_values)
    
        
        def __str__(self):
            """Prints operations, inputs and outputs in a SFG 
            """
    
            output_string = ""
    
            for comp in self._components_in_dfs_order:
                if isinstance(comp, Operation):
                    for key, value in self._components_by_id.items():
                        if value is comp:
                            output_string += "id: " + key + ", name: "
                    
                    if comp.name != None:
                        output_string += comp.name + ", "
                    else:
                        output_string += "-, "
                    
                    if comp.type_name is "c":
                        output_string += "value: " + str(comp.value) + ", input: ["
                    else:
                        output_string += "input: [" 
    
                    counter_input = 0
                    for input in comp.inputs:
                        counter_input += 1
                        for signal in input.signals:
                            for key, value in self._components_by_id.items():
                                if value is signal:
                                    output_string += key + ", "
                    
                    if counter_input > 0:
                        output_string = output_string[:-2]
                    output_string += "], output: ["
                    counter_output = 0
                    for output in comp.outputs:
                        counter_output += 1
                        for signal in output.signals:
                            for key, value in self._components_by_id.items():
                                if value is signal:
                                    output_string += key + ", "
                    if counter_output > 0:
                        output_string = output_string[:-2]
                    output_string += "]\n"
    
            return output_string