From abb484d72e0c5513408dcbe823c2d83806ef68f5 Mon Sep 17 00:00:00 2001 From: angloth <angus.lothian@hotmail.com> Date: Wed, 8 Apr 2020 15:39:56 +0200 Subject: [PATCH] Change code to be bit more readable and remove minor code duplication in SFG --- b_asic/operation.py | 16 ++++-- b_asic/signal_flow_graph.py | 110 +++++++++++++++++++++++------------- 2 files changed, 80 insertions(+), 46 deletions(-) diff --git a/b_asic/operation.py b/b_asic/operation.py index d644dbd3..eeabb2bf 100644 --- a/b_asic/operation.py +++ b/b_asic/operation.py @@ -166,13 +166,14 @@ class AbstractOperation(Operation, AbstractGraphComponent): if input_sources is not None: source_count = len(input_sources) if source_count != input_count: - raise ValueError(f"Operation expected {input_count} input sources but only got {source_count}") + raise ValueError( + f"Operation expected {input_count} input sources but only got {source_count}") for i, src in enumerate(input_sources): if src is not None: self._input_ports[i].connect(src.source) @abstractmethod - def evaluate(self, *inputs) -> Any: # pylint: disable=arguments-differ + def evaluate(self, *inputs) -> Any: # pylint: disable=arguments-differ """Evaluate the operation and generate a list of output values given a list of input values. """ @@ -246,11 +247,13 @@ class AbstractOperation(Operation, AbstractGraphComponent): result = self.evaluate(*input_values) if isinstance(result, collections.Sequence): if len(result) != self.output_count: - raise RuntimeError("Operation evaluated to incorrect number of outputs") + raise RuntimeError( + "Operation evaluated to incorrect number of outputs") return result if isinstance(result, Number): if self.output_count != 1: - raise RuntimeError("Operation evaluated to incorrect number of outputs") + raise RuntimeError( + "Operation evaluated to incorrect number of outputs") return [result] raise RuntimeError("Operation evaluated to invalid type") @@ -296,11 +299,12 @@ class AbstractOperation(Operation, AbstractGraphComponent): def source(self) -> OutputPort: if self.output_count != 1: diff = "more" if self.output_count > 1 else "less" - raise TypeError(f"{self.__class__.__name__} cannot be used as an input source because it has {diff} than 1 output") + raise TypeError( + f"{self.__class__.__name__} cannot be used as an input source because it has {diff} than 1 output") return self.output(0) def copy_unconnected(self) -> GraphComponent: new_comp: AbstractOperation = super().copy_unconnected() for name, value in self.params.items(): - new_comp.set_param(name, deepcopy(value)) # pylint: disable=no-member + value)) # pylint: disable=no-member return new_comp diff --git a/b_asic/signal_flow_graph.py b/b_asic/signal_flow_graph.py index a011653f..fb70e616 100644 --- a/b_asic/signal_flow_graph.py +++ b/b_asic/signal_flow_graph.py @@ -36,7 +36,7 @@ class SFG(AbstractOperation): """Signal flow graph. TODO: More info. """ - + _components_by_id: Dict[GraphID, GraphComponent] _components_by_name: DefaultDict[Name, List[GraphComponent]] _graph_id_generator: GraphIDGenerator @@ -46,15 +46,15 @@ class SFG(AbstractOperation): _original_input_signals: Dict[Signal, int] _original_output_signals: Dict[Signal, int] - def __init__(self, input_signals: Sequence[Signal] = [], output_signals: Sequence[Signal] = [], \ - inputs: Sequence[Input] = [], outputs: Sequence[Output] = [], operations: Sequence[Operation] = [], \ - id_number_offset: GraphIDNumber = 0, name: Name = "", \ + def __init__(self, input_signals: Sequence[Signal] = [], output_signals: Sequence[Signal] = [], + inputs: Sequence[Input] = [], outputs: Sequence[Output] = [], operations: Sequence[Operation] = [], + id_number_offset: GraphIDNumber = 0, name: Name = "", input_sources: Optional[Sequence[Optional[SignalSourceProvider]]] = None): super().__init__( - input_count = len(input_signals) + len(inputs), - output_count = len(output_signals) + len(outputs), - name = name, - input_sources = input_sources) + input_count=len(input_signals) + len(inputs), + output_count=len(output_signals) + len(outputs), + name=name, + input_sources=input_sources) self._components_by_id = dict() self._components_by_name = defaultdict(list) @@ -67,33 +67,39 @@ class SFG(AbstractOperation): # Setup input operations and signals. for i, s in enumerate(input_signals): - self._input_operations.append(self._add_component_copy_unconnected(Input())) + self._input_operations.append( + self._add_component_copy_unconnected(Input())) self._original_input_signals[s] = i for i, op in enumerate(inputs, len(input_signals)): - self._input_operations.append(self._add_component_copy_unconnected(op)) + self._input_operations.append( + self._add_component_copy_unconnected(op)) for s in op.output(0).signals: self._original_input_signals[s] = i # Setup output operations and signals. for i, s in enumerate(output_signals): - self._output_operations.append(self._add_component_copy_unconnected(Output())) + self._output_operations.append( + self._add_component_copy_unconnected(Output())) self._original_output_signals[s] = i for i, op in enumerate(outputs, len(output_signals)): - self._output_operations.append(self._add_component_copy_unconnected(op)) + self._output_operations.append( + self._add_component_copy_unconnected(op)) for s in op.input(0).signals: self._original_output_signals[s] = i - + # Search the graph inwards from each input signal. for s, i in self._original_input_signals.items(): if s.destination is None: - raise ValueError(f"Input signal #{i} is missing destination in SFG") + raise ValueError( + f"Input signal #{i} is missing destination in SFG") if s.destination.operation not in self._original_components_added: self._add_operation_copy_recursively(s.destination.operation) # Search the graph inwards from each output signal. for s, i in self._original_output_signals.items(): if s.source is None: - raise ValueError(f"Output signal #{i} is missing source in SFG") + raise ValueError( + f"Output signal #{i} is missing source in SFG") if s.source.operation not in self._original_components_added: self._add_operation_copy_recursively(s.source.operation) @@ -108,10 +114,11 @@ class SFG(AbstractOperation): def evaluate(self, *args): if len(args) != self.input_count: - raise ValueError("Wrong number of inputs supplied to SFG for evaluation") + raise ValueError( + "Wrong number of inputs supplied to SFG for evaluation") for arg, op in zip(args, self._input_operations): op.value = arg - + result = [] for op in self._output_operations: result.append(self._evaluate_source(op.input(0).signals[0].source)) @@ -122,7 +129,8 @@ class SFG(AbstractOperation): def evaluate_output(self, i: int, input_values: Sequence[Number]) -> Sequence[Optional[Number]]: assert i >= 0 and i < self.output_count, "Output index out of range" result = [None] * self.output_count - result[i] = self._evaluate_source(self._output_operations[i].input(0).signals[0].source) + result[i] = self._evaluate_source( + self._output_operations[i].input(0).signals[0].source) return result def split(self) -> Iterable[Operation]: @@ -157,7 +165,8 @@ class SFG(AbstractOperation): self._original_components_added.add(original_comp) new_comp = original_comp.copy_unconnected() - self._components_by_id[self._graph_id_generator.next_id(new_comp.type_name)] = new_comp + self._components_by_id[self._graph_id_generator.next_id( + new_comp.type_name)] = new_comp self._components_by_name[new_comp.name].append(new_comp) return new_comp @@ -169,41 +178,62 @@ class SFG(AbstractOperation): for original_input_port, new_input_port in zip(original_op.inputs, new_op.inputs): if original_input_port.signal_count < 1: raise ValueError("Unconnected input port in SFG") + for original_signal in original_input_port.signals: - if original_signal in self._original_input_signals: # Check if the signal is one of the SFG's input signals. - new_signal = self._add_component_copy_unconnected(original_signal) - new_signal.set_destination(new_input_port) - new_signal.set_source(self._input_operations[self._original_input_signals[original_signal]].output(0)) - elif original_signal not in self._original_components_added: # Only add the signal if it wasn't already added. - new_signal = self._add_component_copy_unconnected(original_signal) - new_signal.set_destination(new_input_port) + + new_signal = self._add_component_copy_unconnected( + original_signal) + new_signal.set_destination(new_input_port) + + # Check if the signal is one of the SFG's input signals. + if original_signal in self._original_input_signals: + new_signal.set_source( + self._input_operations[self._original_input_signals[original_signal]].output(0)) + + # Only add the signal if it wasn't already added. + elif original_signal not in self._original_components_added: if original_signal.source is None: - raise ValueError("Dangling signal without source in SFG") + raise ValueError( + "Dangling signal without source in SFG") + # Recursively add the connected operation. - new_connected_op = self._add_operation_copy_recursively(original_signal.source.operation) - new_signal.set_source(new_connected_op.output(original_signal.source.index)) + new_connected_op = self._add_operation_copy_recursively( + original_signal.source.operation) + new_signal.set_source(new_connected_op.output( + original_signal.source.index)) # Connect output ports. for original_output_port, new_output_port in zip(original_op.outputs, new_op.outputs): for original_signal in original_output_port.signals: - if original_signal in self._original_output_signals: # Check if the signal is one of the SFG's output signals. - new_signal = self._add_component_copy_unconnected(original_signal) - new_signal.set_source(new_output_port) - new_signal.set_destination(self._output_operations[self._original_output_signals[original_signal]].input(0)) - elif original_signal not in self._original_components_added: # Only add the signal if it wasn't already added. - new_signal = self._add_component_copy_unconnected(original_signal) + + new_signal = self._add_component_copy_unconnected( + original_signal) + new_signal.set_source(new_output_port) + + # Check if the signal is one of the SFG's output signals. + if original_signal in self._original_output_signals: + new_signal.set_destination( + self._output_operations[self._original_output_signals[original_signal]].input(0)) + + # Only add the signal if it wasn't already added. + elif original_signal not in self._original_components_added: + new_signal = self._add_component_copy_unconnected( + original_signal) new_signal.set_source(new_output_port) if original_signal.destination is None: - raise ValueError("Dangling signal without destination in SFG") + raise ValueError( + "Dangling signal without destination in SFG") # Recursively add the connected operation. - new_connected_op = self._add_operation_copy_recursively(original_signal.destination.operation) - new_signal.set_destination(new_connected_op.input(original_signal.destination.index)) + new_connected_op = self._add_operation_copy_recursively( + original_signal.destination.operation) + new_signal.set_destination(new_connected_op.input( + original_signal.destination.index)) return new_op - + def _evaluate_source(self, src: OutputPort) -> Number: input_values = [] for input_port in src.operation.inputs: input_src = input_port.signals[0].source input_values.append(self._evaluate_source(input_src)) - return src.operation.evaluate_output(src.index, input_values) \ No newline at end of file + return src.operation.evaluate_output(src.index, input_values) -- GitLab