diff --git a/b_asic/signal_flow_graph.py b/b_asic/signal_flow_graph.py index fb70e6167786bf9acd676fdba752fafaece96bbd..6aedd296ead4aa4e369548a2b61d1473ef9ac186 100644 --- a/b_asic/signal_flow_graph.py +++ b/b_asic/signal_flow_graph.py @@ -5,7 +5,8 @@ TODO: More info. from typing import NewType, List, Iterable, Sequence, Dict, Optional, DefaultDict, Set from numbers import Number -from collections import defaultdict +from collections import defaultdict, deque +from pprint import pprint from b_asic.port import SignalSourceProvider, OutputPort from b_asic.operation import Operation, AbstractOperation @@ -47,7 +48,7 @@ class SFG(AbstractOperation): _original_output_signals: Dict[Signal, int] def __init__(self, input_signals: Sequence[Signal] = [], output_signals: Sequence[Signal] = [], - inputs: Sequence[Input] = [], outputs: Sequence[Output] = [], operations: Sequence[Operation] = [], + inputs: Sequence[Input] = [], outputs: Sequence[Output] = [], id_number_offset: GraphIDNumber = 0, name: Name = "", input_sources: Optional[Sequence[Optional[SignalSourceProvider]]] = None): super().__init__( @@ -61,52 +62,49 @@ class SFG(AbstractOperation): self._graph_id_generator = GraphIDGenerator(id_number_offset) self._input_operations = [] self._output_operations = [] - self._original_components_added = set() - self._original_input_signals = {} - self._original_output_signals = {} + # Maps original components to new copied components + self._added_components_mapping = {} + self._original_input_signals_indexes = {} + self._original_output_signals_indexes = {} # Setup input operations and signals. for i, s in enumerate(input_signals): self._input_operations.append( self._add_component_copy_unconnected(Input())) - self._original_input_signals[s] = i + self._original_input_signals_indexes[s] = i for i, op in enumerate(inputs, len(input_signals)): self._input_operations.append( self._add_component_copy_unconnected(op)) for s in op.output(0).signals: - self._original_input_signals[s] = i + self._original_input_signals_indexes[s] = i # Setup output operations and signals. for i, s in enumerate(output_signals): self._output_operations.append( self._add_component_copy_unconnected(Output())) - self._original_output_signals[s] = i + self._original_output_signals_indexes[s] = i for i, op in enumerate(outputs, len(output_signals)): self._output_operations.append( self._add_component_copy_unconnected(op)) for s in op.input(0).signals: - self._original_output_signals[s] = i + self._original_output_signals_indexes[s] = i # Search the graph inwards from each input signal. - for s, i in self._original_input_signals.items(): + for s, i in self._original_input_signals_indexes.items(): if s.destination is None: raise ValueError( f"Input signal #{i} is missing destination in SFG") - if s.destination.operation not in self._original_components_added: - self._add_operation_copy_recursively(s.destination.operation) + if s.destination.operation not in self._added_components_mapping: + self._copy_structure_from_operation_bfs( + s.destination.operation) # Search the graph inwards from each output signal. - for s, i in self._original_output_signals.items(): + for s, i in self._original_output_signals_indexes.items(): if s.source is None: raise ValueError( f"Output signal #{i} is missing source in SFG") - if s.source.operation not in self._original_components_added: - self._add_operation_copy_recursively(s.source.operation) - - # Search the graph outwards from each operation. - for op in operations: - if op not in self._original_components_added: - self._add_operation_copy_recursively(op) + if s.source.operation not in self._added_components_mapping: + self._copy_structure_from_operation_bfs(s.source.operation) @property def type_name(self) -> TypeName: @@ -161,75 +159,125 @@ class SFG(AbstractOperation): return self._components_by_name.get(name, []) def _add_component_copy_unconnected(self, original_comp: GraphComponent) -> GraphComponent: - assert original_comp not in self._original_components_added, "Tried to add duplicate SFG component" - self._original_components_added.add(original_comp) + + assert original_comp not in self._added_components_mapping, "Tried to add duplicate SFG component" new_comp = original_comp.copy_unconnected() + + self._added_components_mapping[original_comp] = new_comp self._components_by_id[self._graph_id_generator.next_id( new_comp.type_name)] = new_comp self._components_by_name[new_comp.name].append(new_comp) + return new_comp - def _add_operation_copy_recursively(self, original_op: Operation) -> Operation: - # Add a copy of the operation without any connections. - new_op = self._add_component_copy_unconnected(original_op) - - # Connect input ports. - for original_input_port, new_input_port in zip(original_op.inputs, new_op.inputs): - if original_input_port.signal_count < 1: - raise ValueError("Unconnected input port in SFG") - - for original_signal in original_input_port.signals: - - new_signal = self._add_component_copy_unconnected( - original_signal) - new_signal.set_destination(new_input_port) - - # Check if the signal is one of the SFG's input signals. - if original_signal in self._original_input_signals: - new_signal.set_source( - self._input_operations[self._original_input_signals[original_signal]].output(0)) - - # Only add the signal if it wasn't already added. - elif original_signal not in self._original_components_added: - if original_signal.source is None: - raise ValueError( - "Dangling signal without source in SFG") - - # Recursively add the connected operation. - new_connected_op = self._add_operation_copy_recursively( - original_signal.source.operation) - new_signal.set_source(new_connected_op.output( - original_signal.source.index)) - - # Connect output ports. - for original_output_port, new_output_port in zip(original_op.outputs, new_op.outputs): - for original_signal in original_output_port.signals: - - new_signal = self._add_component_copy_unconnected( - original_signal) - new_signal.set_source(new_output_port) - - # Check if the signal is one of the SFG's output signals. - if original_signal in self._original_output_signals: - new_signal.set_destination( - self._output_operations[self._original_output_signals[original_signal]].input(0)) - - # Only add the signal if it wasn't already added. - elif original_signal not in self._original_components_added: - new_signal = self._add_component_copy_unconnected( - original_signal) - new_signal.set_source(new_output_port) - if original_signal.destination is None: - raise ValueError( - "Dangling signal without destination in SFG") - # Recursively add the connected operation. - new_connected_op = self._add_operation_copy_recursively( - original_signal.destination.operation) - new_signal.set_destination(new_connected_op.input( - original_signal.destination.index)) - - return new_op + def _copy_structure_from_operation_bfs(self, start_op: Operation): + op_queue = deque([start_op]) + + while op_queue: + original_op = op_queue.popleft() + print("CURRENT:", original_op.name, "-------------------") + # Add a copy of the operation without any connections. + new_op = None + if original_op not in self._added_components_mapping: + new_op = self._add_component_copy_unconnected(original_op) + else: + new_op = self._added_components_mapping[original_op] + + # Connect input ports to new signals + for original_input_port in original_op.inputs: + if original_input_port.signal_count < 1: + raise ValueError("Unconnected input port in SFG") + + for original_signal in original_input_port.signals: + + # Check if the signal is one of the SFG's input signals + if original_signal in self._original_input_signals_indexes: + + new_signal = self._add_component_copy_unconnected( + original_signal) + new_signal.set_destination( + new_op.input(original_input_port.index)) + new_signal.set_source( + self._input_operations[self._original_input_signals_indexes[ + original_signal]].output(0)) + + # Check if the signal has not been added before + elif original_signal not in self._added_components_mapping: + if original_signal.source is None: + raise ValueError( + "Dangling signal without source in SFG") + + new_signal = self._add_component_copy_unconnected( + original_signal) + new_signal.set_destination( + new_op.input(original_input_port.index)) + + original_connected_op = original_signal.source.operation + # Check if connected Operation has been added before + if original_connected_op in self._added_components_mapping: + # Set source to the already added operations port + new_signal.set_source( + self._added_components_mapping[original_connected_op].output( + original_signal.source.index)) + else: + # Create new operation, set signal source to it + new_connected_op = self._add_component_copy_unconnected( + original_connected_op) + new_signal.set_source(new_connected_op.output( + original_signal.source.index)) + + # Add connected operation to queue of operations to visit + op_queue.append(original_connected_op) + + # Connect output ports + for original_output_port in original_op.outputs: + + for original_signal in original_output_port.signals: + # Check if the signal is one of the SFG's output signals. + if original_signal in self._original_output_signals_indexes: + + new_signal = self._add_component_copy_unconnected( + original_signal) + new_signal.set_source( + new_op.output(original_output_port.index)) + new_signal.set_destination( + self._output_operations[self._original_output_signals_indexes[ + original_signal]].input(0)) + + # Check if signal has not been added before. + elif original_signal not in self._added_components_mapping: + if original_signal.source is None: + raise ValueError( + "Dangling signal without source in SFG") + + new_signal = self._add_component_copy_unconnected( + original_signal) + new_signal.set_source( + new_op.output(original_output_port.index)) + + original_connected_op = original_signal.destination.operation + # Check if connected operation has been added. + if original_connected_op in self._added_components_mapping: + # Set destination to the already connected operations port + new_signal.set_destination( + self._added_components_mapping[original_connected_op].input( + original_signal.destination.index)) + + else: + # Create new operation, set destination to it. + new_connected_op = self._add_component_copy_unconnected( + original_connected_op) + new_signal.set_destination(new_connected_op.input( + original_signal.destination.index)) + + print("Adding signal:", new_signal.name, + "to op:", new_connected_op.name) + print( + [inport.signals for inport in new_connected_op.inputs]) + + # Add connected operation to the queue of operations to visist + op_queue.append(original_connected_op) def _evaluate_source(self, src: OutputPort) -> Number: input_values = []