Newer
Older
B-ASIC Signal Flow Graph Module.
TODO: More info.
"""
from typing import NewType, List, Iterable, Sequence, Dict, Optional, DefaultDict, Set
from numbers import Number
angloth
committed
from collections import defaultdict, deque
from pprint import pprint
from b_asic.port import SignalSourceProvider, OutputPort
from b_asic.operation import Operation, AbstractOperation
Angus Lothian
committed
from b_asic.graph_component import GraphComponent, Name, TypeName
from b_asic.special_operations import Input, Output
GraphID = NewType("GraphID", str)
GraphIDNumber = NewType("GraphIDNumber", int)
class GraphIDGenerator:
"""A class that generates Graph IDs for objects."""
_next_id_number: DefaultDict[TypeName, GraphIDNumber]
def __init__(self, id_number_offset: GraphIDNumber = 0):
self._next_id_number = defaultdict(lambda: id_number_offset)
def next_id(self, type_name: TypeName) -> GraphID:
"""Return the next graph id for a certain graph id type."""
self._next_id_number[type_name] += 1
return type_name + str(self._next_id_number[type_name])
Angus Lothian
committed
class SFG(AbstractOperation):
_components_by_id: Dict[GraphID, GraphComponent]
_components_by_name: DefaultDict[Name, List[GraphComponent]]
_input_operations: List[Input]
_output_operations: List[Output]
_original_components_added: Set[GraphComponent]
_original_input_signals: Dict[Signal, int]
_original_output_signals: Dict[Signal, int]
def __init__(self, input_signals: Sequence[Signal] = [], output_signals: Sequence[Signal] = [],
angloth
committed
inputs: Sequence[Input] = [], outputs: Sequence[Output] = [],
id_number_offset: GraphIDNumber = 0, name: Name = "",
input_sources: Optional[Sequence[Optional[SignalSourceProvider]]] = None):
super().__init__(
input_count=len(input_signals) + len(inputs),
output_count=len(output_signals) + len(outputs),
name=name,
input_sources=input_sources)
self._components_by_id = dict()
self._components_by_name = defaultdict(list)
self._graph_id_generator = GraphIDGenerator(id_number_offset)
self._input_operations = []
self._output_operations = []
angloth
committed
# Maps original components to new copied components
self._added_components_mapping = {}
self._original_input_signals_indexes = {}
self._original_output_signals_indexes = {}
# Setup input operations and signals.
for i, s in enumerate(input_signals):
self._input_operations.append(
self._add_component_copy_unconnected(Input()))
angloth
committed
self._original_input_signals_indexes[s] = i
for i, op in enumerate(inputs, len(input_signals)):
self._input_operations.append(
self._add_component_copy_unconnected(op))
angloth
committed
self._original_input_signals_indexes[s] = i
# Setup output operations and signals.
for i, s in enumerate(output_signals):
self._output_operations.append(
self._add_component_copy_unconnected(Output()))
angloth
committed
self._original_output_signals_indexes[s] = i
for i, op in enumerate(outputs, len(output_signals)):
self._output_operations.append(
self._add_component_copy_unconnected(op))
angloth
committed
self._original_output_signals_indexes[s] = i
angloth
committed
for s, i in self._original_input_signals_indexes.items():
raise ValueError(
f"Input signal #{i} is missing destination in SFG")
angloth
committed
if s.destination.operation not in self._added_components_mapping:
self._copy_structure_from_operation_bfs(
s.destination.operation)
# Search the graph inwards from each output signal.
angloth
committed
for s, i in self._original_output_signals_indexes.items():
raise ValueError(
f"Output signal #{i} is missing source in SFG")
angloth
committed
if s.source.operation not in self._added_components_mapping:
self._copy_structure_from_operation_bfs(s.source.operation)
@property
def type_name(self) -> TypeName:
return "sfg"
def evaluate(self, *args):
if len(args) != self.input_count:
raise ValueError(
"Wrong number of inputs supplied to SFG for evaluation")
for arg, op in zip(args, self._input_operations):
op.value = arg
result = []
for op in self._output_operations:
result.append(self._evaluate_source(op.input(0).signals[0].source))
n = len(result)
return None if n == 0 else result[0] if n == 1 else result
def evaluate_output(self, i: int, input_values: Sequence[Number]) -> Sequence[Optional[Number]]:
assert i >= 0 and i < self.output_count, "Output index out of range"
result = [None] * self.output_count
result[i] = self._evaluate_source(
self._output_operations[i].input(0).signals[0].source)
def split(self) -> Iterable[Operation]:
return filter(lambda comp: isinstance(comp, Operation), self._components_by_id.values())
@property
def components(self) -> Iterable[GraphComponent]:
"""Get all components of this graph."""
return self._components_by_id.values()
def find_by_id(self, graph_id: GraphID) -> Optional[GraphComponent]:
Angus Lothian
committed
"""Find a graph object based on the entered Graph ID and return it. If no graph
object with the entered ID was found then return None.
Keyword arguments:
graph_id: Graph ID of the wanted object.
"""
def find_by_name(self, name: Name) -> List[GraphComponent]:
Angus Lothian
committed
"""Find all graph objects that have the entered name and return them
in a list. If no graph object with the entered name was found then return an
empty list.
Keyword arguments:
name: Name of the wanted object.
"""
return self._components_by_name.get(name, [])
def _add_component_copy_unconnected(self, original_comp: GraphComponent) -> GraphComponent:
angloth
committed
assert original_comp not in self._added_components_mapping, "Tried to add duplicate SFG component"
angloth
committed
self._added_components_mapping[original_comp] = new_comp
self._components_by_id[self._graph_id_generator.next_id(
new_comp.type_name)] = new_comp
self._components_by_name[new_comp.name].append(new_comp)
angloth
committed
angloth
committed
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
def _copy_structure_from_operation_bfs(self, start_op: Operation):
op_queue = deque([start_op])
while op_queue:
original_op = op_queue.popleft()
print("CURRENT:", original_op.name, "-------------------")
# Add a copy of the operation without any connections.
new_op = None
if original_op not in self._added_components_mapping:
new_op = self._add_component_copy_unconnected(original_op)
else:
new_op = self._added_components_mapping[original_op]
# Connect input ports to new signals
for original_input_port in original_op.inputs:
if original_input_port.signal_count < 1:
raise ValueError("Unconnected input port in SFG")
for original_signal in original_input_port.signals:
# Check if the signal is one of the SFG's input signals
if original_signal in self._original_input_signals_indexes:
new_signal = self._add_component_copy_unconnected(
original_signal)
new_signal.set_destination(
new_op.input(original_input_port.index))
new_signal.set_source(
self._input_operations[self._original_input_signals_indexes[
original_signal]].output(0))
# Check if the signal has not been added before
elif original_signal not in self._added_components_mapping:
if original_signal.source is None:
raise ValueError(
"Dangling signal without source in SFG")
new_signal = self._add_component_copy_unconnected(
original_signal)
new_signal.set_destination(
new_op.input(original_input_port.index))
original_connected_op = original_signal.source.operation
# Check if connected Operation has been added before
if original_connected_op in self._added_components_mapping:
# Set source to the already added operations port
new_signal.set_source(
self._added_components_mapping[original_connected_op].output(
original_signal.source.index))
else:
# Create new operation, set signal source to it
new_connected_op = self._add_component_copy_unconnected(
original_connected_op)
new_signal.set_source(new_connected_op.output(
original_signal.source.index))
# Add connected operation to queue of operations to visit
op_queue.append(original_connected_op)
# Connect output ports
for original_output_port in original_op.outputs:
for original_signal in original_output_port.signals:
# Check if the signal is one of the SFG's output signals.
if original_signal in self._original_output_signals_indexes:
new_signal = self._add_component_copy_unconnected(
original_signal)
new_signal.set_source(
new_op.output(original_output_port.index))
new_signal.set_destination(
self._output_operations[self._original_output_signals_indexes[
original_signal]].input(0))
# Check if signal has not been added before.
elif original_signal not in self._added_components_mapping:
if original_signal.source is None:
raise ValueError(
"Dangling signal without source in SFG")
new_signal = self._add_component_copy_unconnected(
original_signal)
new_signal.set_source(
new_op.output(original_output_port.index))
original_connected_op = original_signal.destination.operation
# Check if connected operation has been added.
if original_connected_op in self._added_components_mapping:
# Set destination to the already connected operations port
new_signal.set_destination(
self._added_components_mapping[original_connected_op].input(
original_signal.destination.index))
else:
# Create new operation, set destination to it.
new_connected_op = self._add_component_copy_unconnected(
original_connected_op)
new_signal.set_destination(new_connected_op.input(
original_signal.destination.index))
print("Adding signal:", new_signal.name,
"to op:", new_connected_op.name)
print(
[inport.signals for inport in new_connected_op.inputs])
# Add connected operation to the queue of operations to visist
op_queue.append(original_connected_op)
def _evaluate_source(self, src: OutputPort) -> Number:
input_values = []
for input_port in src.operation.inputs:
input_src = input_port.signals[0].source
input_values.append(self._evaluate_source(input_src))
return src.operation.evaluate_output(src.index, input_values)