Newer
Older
B-ASIC Signal Flow Graph Module.
TODO: More info.
"""
from typing import NewType, List, Iterable, Sequence, Dict, Optional, DefaultDict, Set
from numbers import Number

Adam Jakobsson
committed
from collections import defaultdict, deque
from b_asic.port import SignalSourceProvider, OutputPort
from b_asic.operation import Operation, AbstractOperation
Angus Lothian
committed
from b_asic.graph_component import GraphComponent, Name, TypeName
from b_asic.special_operations import Input, Output
GraphID = NewType("GraphID", str)
GraphIDNumber = NewType("GraphIDNumber", int)
class GraphIDGenerator:
"""A class that generates Graph IDs for objects."""
_next_id_number: DefaultDict[TypeName, GraphIDNumber]
def __init__(self, id_number_offset: GraphIDNumber = 0):
self._next_id_number = defaultdict(lambda: id_number_offset)
def next_id(self, type_name: TypeName) -> GraphID:
"""Return the next graph id for a certain graph id type."""
self._next_id_number[type_name] += 1
return type_name + str(self._next_id_number[type_name])
Angus Lothian
committed
class SFG(AbstractOperation):

Adam Jakobsson
committed
_components_by_id: Dict[GraphID, GraphComponent]
_components_by_name: DefaultDict[Name, List[GraphComponent]]
_input_operations: List[Input]
_output_operations: List[Output]
_original_components_added: Set[GraphComponent]
_original_input_signals: Dict[Signal, int]
_original_output_signals: Dict[Signal, int]

Adam Jakobsson
committed
def __init__(self, input_signals: Sequence[Signal] = [], output_signals: Sequence[Signal] = [],
inputs: Sequence[Input] = [], outputs: Sequence[Output] = [],
id_number_offset: GraphIDNumber = 0, name: Name = "",
input_sources: Optional[Sequence[Optional[SignalSourceProvider]]] = None):
super().__init__(

Adam Jakobsson
committed
input_count=len(input_signals) + len(inputs),
output_count=len(output_signals) + len(outputs),
name=name,
input_sources=input_sources)
self._components_by_id = dict()
self._components_by_name = defaultdict(list)

Adam Jakobsson
committed
self._components_in_dfs_order = []
self._graph_id_generator = GraphIDGenerator(id_number_offset)
self._input_operations = []
self._output_operations = []

Adam Jakobsson
committed
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# Maps original components to new copied components
self._added_components_mapping = {}
self._original_input_signals_indexes = {}
self._original_output_signals_indexes = {}
self._id_number_offset = id_number_offset
# Setup input signals.
for input_index, sig in enumerate(input_signals):
assert sig not in self._added_components_mapping, "Duplicate input signals sent to SFG construcctor."
new_input_op = self._add_component_copy_unconnected(Input())
new_sig = self._add_component_copy_unconnected(sig)
new_sig.set_source(new_input_op.output(0))
self._input_operations.append(new_input_op)
self._original_input_signals_indexes[sig] = input_index
# Setup input operations, starting from indexes ater input signals.
for input_index, input_op in enumerate(inputs, len(input_signals)):
assert input_op not in self._added_components_mapping, "Duplicate input operations sent to SFG constructor."
new_input_op = self._add_component_copy_unconnected(input_op)
for sig in input_op.output(0).signals:
assert sig not in self._added_components_mapping, "Duplicate input signals connected to input ports sent to SFG construcctor."
new_sig = self._add_component_copy_unconnected(sig)
new_sig.set_source(new_input_op.output(0))
self._original_input_signals_indexes[sig] = input_index
self._input_operations.append(new_input_op)
# Setup output signals.
for output_ind, sig in enumerate(output_signals):
new_out = self._add_component_copy_unconnected(Output())
if sig in self._added_components_mapping:
# Signal already added when setting up inputs
new_sig = self._added_components_mapping[sig]
new_sig.set_destination(new_out.input(0))
else:
# New signal has to be created
new_sig = self._add_component_copy_unconnected(sig)
new_sig.set_destination(new_out.input(0))
self._output_operations.append(new_out)
self._original_output_signals_indexes[sig] = output_ind
# Setup output operations, starting from indexes after output signals.
for output_ind, output_op in enumerate(outputs, len(output_signals)):
assert output_op not in self._added_components_mapping, "Duplicate output operations sent to SFG constructor."
new_out = self._add_component_copy_unconnected(output_op)
for sig in output_op.input(0).signals:
if sig in self._added_components_mapping:
# Signal already added when setting up inputs
new_sig = self._added_components_mapping[sig]
new_sig.set_destination(new_out.input(0))
else:
# New signal has to be created
new_sig = self._add_component_copy_unconnected(sig)
new_sig.set_destination(new_out.input(0))
self._original_output_signals_indexes[sig] = output_ind
self._output_operations.append(new_out)
output_operations_set = set(self._output_operations)
# Search the graph inwards from each input signal.

Adam Jakobsson
committed
for sig, input_index in self._original_input_signals_indexes.items():
# Check if already added destination.
new_sig = self._added_components_mapping[sig]
if new_sig.destination is not None and new_sig.destination.operation in output_operations_set:
# Add directly connected input to output to dfs order list
self._components_in_dfs_order.extend([
new_sig.source.operation, new_sig, new_sig.destination.operation])
elif sig.destination is None:
raise ValueError(
f"Input signal #{input_index} is missing destination in SFG")
elif sig.destination.operation not in self._added_components_mapping:
self._copy_structure_from_operation_dfs(
sig.destination.operation)
# Search the graph inwards from each output signal.

Adam Jakobsson
committed
for sig, output_index in self._original_output_signals_indexes.items():
# Check if already added source.
mew_sig = self._added_components_mapping[sig]
if new_sig.source is None:
if sig.source is None:
raise ValueError(
f"Output signal #{output_index} is missing source in SFG")
if sig.source.operation not in self._added_components_mapping:
self._copy_structure_from_operation_dfs(
sig.source.operation)
@property
def type_name(self) -> TypeName:
return "sfg"
def evaluate(self, *args):
if len(args) != self.input_count:

Adam Jakobsson
committed
raise ValueError(
"Wrong number of inputs supplied to SFG for evaluation")
for arg, op in zip(args, self._input_operations):
op.value = arg

Adam Jakobsson
committed
result = []
for op in self._output_operations:
result.append(self._evaluate_source(op.input(0).signals[0].source))
n = len(result)
return None if n == 0 else result[0] if n == 1 else result
def evaluate_output(self, i: int, input_values: Sequence[Number]) -> Sequence[Optional[Number]]:
assert i >= 0 and i < self.output_count, "Output index out of range"
result = [None] * self.output_count

Adam Jakobsson
committed
result[i] = self._evaluate_source(
self._output_operations[i].input(0).signals[0].source)
def split(self) -> Iterable[Operation]:
return filter(lambda comp: isinstance(comp, Operation), self._components_by_id.values())
@property
def components(self) -> Iterable[GraphComponent]:

Adam Jakobsson
committed
"""Get all components of this graph in the dfs-traversal order."""
return self._components_in_dfs_order
def find_by_id(self, graph_id: GraphID) -> Optional[GraphComponent]:
Angus Lothian
committed
"""Find a graph object based on the entered Graph ID and return it. If no graph
object with the entered ID was found then return None.
Keyword arguments:
graph_id: Graph ID of the wanted object.
"""
def find_by_name(self, name: Name) -> List[GraphComponent]:
Angus Lothian
committed
"""Find all graph objects that have the entered name and return them
in a list. If no graph object with the entered name was found then return an
empty list.
Keyword arguments:
name: Name of the wanted object.
"""

Adam Jakobsson
committed
def deep_copy(self) -> "SFG":
"""Returns a deep copy of self."""
copy = SFG(inputs=self._input_operations, outputs=self._output_operations,
id_number_offset=self._id_number_offset, name=super().name)
return copy
def _add_component_copy_unconnected(self, original_comp: GraphComponent) -> GraphComponent:

Adam Jakobsson
committed
assert original_comp not in self._added_components_mapping, "Tried to add duplicate SFG component"

Adam Jakobsson
committed
self._added_components_mapping[original_comp] = new_comp
self._components_by_id[self._graph_id_generator.next_id(
new_comp.type_name)] = new_comp
self._components_by_name[new_comp.name].append(new_comp)

Adam Jakobsson
committed

Adam Jakobsson
committed
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
def _copy_structure_from_operation_dfs(self, start_op: Operation):
op_stack = deque([start_op])
while op_stack:
original_op = op_stack.pop()
# Add or get the new copy of the operation..
new_op = None
if original_op not in self._added_components_mapping:
new_op = self._add_component_copy_unconnected(original_op)
self._components_in_dfs_order.append(new_op)
else:
new_op = self._added_components_mapping[original_op]
# Connect input ports to new signals
for original_input_port in original_op.inputs:
if original_input_port.signal_count < 1:
raise ValueError("Unconnected input port in SFG")
for original_signal in original_input_port.signals:
# Check if the signal is one of the SFG's input signals
if original_signal in self._original_input_signals_indexes:
# New signal already created during first step of constructor
new_signal = self._added_components_mapping[
original_signal]
new_signal.set_destination(
new_op.input(original_input_port.index))
self._components_in_dfs_order.extend(
[new_signal, new_signal.source.operation])
# Check if the signal has not been added before
elif original_signal not in self._added_components_mapping:
if original_signal.source is None:
raise ValueError(
"Dangling signal without source in SFG")
new_signal = self._add_component_copy_unconnected(
original_signal)
new_signal.set_destination(
new_op.input(original_input_port.index))
self._components_in_dfs_order.append(new_signal)
original_connected_op = original_signal.source.operation
# Check if connected Operation has been added before
if original_connected_op in self._added_components_mapping:
# Set source to the already added operations port
new_signal.set_source(
self._added_components_mapping[original_connected_op].output(
original_signal.source.index))
else:
# Create new operation, set signal source to it
new_connected_op = self._add_component_copy_unconnected(
original_connected_op)
new_signal.set_source(new_connected_op.output(
original_signal.source.index))
self._components_in_dfs_order.append(
new_connected_op)
# Add connected operation to queue of operations to visit
op_stack.append(original_connected_op)
# Connect output ports
for original_output_port in original_op.outputs:
for original_signal in original_output_port.signals:
# Check if the signal is one of the SFG's output signals.
if original_signal in self._original_output_signals_indexes:
# New signal already created during first step of constructor.
new_signal = self._added_components_mapping[
original_signal]
new_signal.set_source(
new_op.output(original_output_port.index))
self._components_in_dfs_order.extend(
[new_signal, new_signal.destination.operation])
# Check if signal has not been added before.
elif original_signal not in self._added_components_mapping:
if original_signal.source is None:
raise ValueError(
"Dangling signal without source in SFG")
new_signal = self._add_component_copy_unconnected(
original_signal)
new_signal.set_source(
new_op.output(original_output_port.index))
self._components_in_dfs_order.append(new_signal)
original_connected_op = original_signal.destination.operation
# Check if connected operation has been added.
if original_connected_op in self._added_components_mapping:
# Set destination to the already connected operations port
new_signal.set_destination(
self._added_components_mapping[original_connected_op].input(
original_signal.destination.index))
else:
# Create new operation, set destination to it.
new_connected_op = self._add_component_copy_unconnected(
original_connected_op)
new_signal.set_destination(new_connected_op.input(
original_signal.destination.index))
self._components_in_dfs_order.append(
new_connected_op)
# Add connected operation to the queue of operations to visist
op_stack.append(original_connected_op)
def _evaluate_source(self, src: OutputPort) -> Number:
input_values = []
for input_port in src.operation.inputs:
input_src = input_port.signals[0].source
input_values.append(self._evaluate_source(input_src))

Adam Jakobsson
committed
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
return src.operation.evaluate_output(src.index, input_values)
def __str__(self):
"""Prints operations, inputs and outputs in a SFG
"""
output_string = ""
for comp in self._components_in_dfs_order:
if isinstance(comp, Operation):
for key, value in self._components_by_id.items():
if value is comp:
output_string += "id: " + key + ", name: "
if comp.name != None:
output_string += comp.name + ", "
else:
output_string += "-, "
if comp.type_name is "c":
output_string += "value: " + str(comp.value) + ", input: ["
else:
output_string += "input: ["
counter_input = 0
for input in comp.inputs:
counter_input += 1
for signal in input.signals:
for key, value in self._components_by_id.items():
if value is signal:
output_string += key + ", "
if counter_input > 0:
output_string = output_string[:-2]
output_string += "], output: ["
counter_output = 0
for output in comp.outputs:
counter_output += 1
for signal in output.signals:
for key, value in self._components_by_id.items():
if value is signal:
output_string += key + ", "
if counter_output > 0:
output_string = output_string[:-2]
output_string += "]\n"
return output_string