Newer
Older
Angus Lothian
committed
# Set source to the already added operations port.
new_signal.set_source(
self._original_components_to_new[
original_connected_op
].output(original_signal.source.index)
)
Angus Lothian
committed
else:
# Create new operation, set signal source to it.
new_connected_op = (
self._add_component_unconnected_copy(
original_connected_op
)
)
new_signal.set_source(
new_connected_op.output(
original_signal.source.index
)
)
Angus Lothian
committed
self._components_dfs_order.append(new_connected_op)
self._operations_dfs_order.append(new_connected_op)
# Add connected operation to queue of operations to visit.
op_stack.append(original_connected_op)
# Connect output ports.
for original_output_port in original_op.outputs:
for original_signal in original_output_port.signals:
# Check if the signal is one of the SFG's output signals.
if (
original_signal
in self._original_output_signals_to_indices
):
Angus Lothian
committed
# New signal already created during first step of constructor.
new_signal = self._original_components_to_new[
original_signal
]
Angus Lothian
committed
new_signal.set_source(
Angus Lothian
committed
self._components_dfs_order.extend(
[new_signal, new_signal.destination.operation]
)
Angus Lothian
committed
self._operations_dfs_order.append(
Angus Lothian
committed
# Check if signal has not been added before.
elif (
original_signal not in self._original_components_to_new
):
Angus Lothian
committed
if original_signal.source is None:
raise ValueError(
Angus Lothian
committed
new_signal = self._add_component_unconnected_copy(
Angus Lothian
committed
new_signal.set_source(
Angus Lothian
committed
self._components_dfs_order.append(new_signal)
original_connected_op = (
original_signal.destination.operation
)
Angus Lothian
committed
# Check if connected operation has been added.
if (
original_connected_op
in self._original_components_to_new
):
Angus Lothian
committed
# Set destination to the already connected operations port.
new_signal.set_destination(
self._original_components_to_new[
original_connected_op
].input(original_signal.destination.index)
)
Angus Lothian
committed
else:
# Create new operation, set destination to it.
new_connected_op = (
self._add_component_unconnected_copy(
original_connected_op
)
)
new_signal.set_destination(
new_connected_op.input(
original_signal.destination.index
)
)
Angus Lothian
committed
self._components_dfs_order.append(new_connected_op)
self._operations_dfs_order.append(new_connected_op)
# Add connected operation to the queue of operations to visit.
op_stack.append(original_connected_op)
def _evaluate_source(
self,
src: OutputPort,
results: MutableResultMap,
delays: MutableDelayMap,
prefix: str,
bits_override: Optional[int],
truncate: bool,
deferred_delays: DelayQueue,
) -> Number:
Angus Lothian
committed
key_base = (
(prefix + "." + src.operation.graph_id)
if prefix
else src.operation.graph_id
)
Angus Lothian
committed
key = src.operation.key(src.index, key_base)
if key in results:
value = results[key]
if value is None:
raise RuntimeError(
"Direct feedback loop detected when evaluating operation."
)
Angus Lothian
committed
return value
value = src.operation.current_output(src.index, delays, key_base)
results[key] = value
if value is None:
value = self._do_evaluate_source(
key_base,
key,
src,
results,
delays,
prefix,
bits_override,
truncate,
deferred_delays,
)
Angus Lothian
committed
else:
# Evaluate later. Use current value for now.
deferred_delays.append((key_base, key, src))
return value
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
def _do_evaluate_source(
self,
key_base: str,
key: ResultKey,
src: OutputPort,
results: MutableResultMap,
delays: MutableDelayMap,
prefix: str,
bits_override: Optional[int],
truncate: bool,
deferred_delays: DelayQueue,
) -> Number:
input_values = [
self._evaluate_source(
input_port.signals[0].source,
results,
delays,
prefix,
bits_override,
truncate,
deferred_delays,
)
for input_port in src.operation.inputs
]
Angus Lothian
committed
value = src.operation.evaluate_output(
src.index,
input_values,
results,
delays,
key_base,
bits_override,
truncate,
)
Angus Lothian
committed
results[key] = value
return value
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
def sfg(self, show_id=False, engine=None) -> Digraph:
"""
Returns a Digraph of the SFG. Can be directly displayed in IPython.
Parameters
----------
show_id : Boolean, optional
If True, the graph_id:s of signals are shown. The default is False.
engine: string, optional
Graphviz layout engine to be used, see https://graphviz.org/documentation/.
Most common are "dot" and "neato". Default is None leading to dot.
Returns
-------
Digraph
Digraph of the SFG.
"""
dg = Digraph()
if engine:
assert engine in GRAPHVIZ_ENGINES, "Unknown layout engine"
dg.engine = engine
for op in self._components_by_id.values():
if isinstance(op, Signal):
if show_id:
dg.edge(
op.source.operation.graph_id,
op.destination.operation.graph_id,
label=op.graph_id,
)
dg.edge(
op.source.operation.graph_id,
op.destination.operation.graph_id,
)
else:
if op.type_name() == Delay.type_name():
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
else:
dg.node(op.graph_id)
return dg
def _repr_svg_(self):
return self.sfg()._repr_svg_()
def show_sfg(self, format=None, show_id=False, engine=None) -> None:
"""
Shows a visual representation of the SFG using the default system viewer.
Parameters
----------
format : string, optional
File format of the generated graph. Output formats can be found at https://www.graphviz.org/doc/info/output.html
Most common are "pdf", "eps", "png", and "svg". Default is None which leads to PDF.
show_id : Boolean, optional
If True, the graph_id:s of signals are shown. The default is False.
engine: string, optional
Graphviz layout engine to be used, see https://graphviz.org/documentation/.
Most common are "dot" and "neato". Default is None leading to dot.
"""
dg = self.sfg(show_id=show_id)
if format:
assert format in GRAPHVIZ_FORMATS, "Unknown file format"
dg.format = format
if engine:
assert engine in GRAPHVIZ_ENGINES, "Unknown layout engine"
dg.engine = engine
dg.view()