Newer
Older
Angus Lothian
committed
def _add_operation_connected_tree_copy(self, start_op: Operation) -> None:
op_stack = deque([start_op])
while op_stack:
original_op = op_stack.pop()
# Add or get the new copy of the operation.
if original_op not in self._original_components_to_new:
new_op = cast(
Operation,
self._add_component_unconnected_copy(original_op),
)
Angus Lothian
committed
self._components_dfs_order.append(new_op)
self._operations_dfs_order.append(new_op)
else:
new_op = cast(
Operation, self._original_components_to_new[original_op]
)
Angus Lothian
committed
# Connect input ports to new signals.
for original_input_port in original_op.inputs:
if original_input_port.signal_count < 1:
raise ValueError("Unconnected input port in SFG")
for original_signal in original_input_port.signals:
# Check if the signal is one of the SFG's input signals.
if (
original_signal
in self._original_input_signals_to_indices
):
Angus Lothian
committed
# New signal already created during first step of constructor.
new_signal = cast(
Signal,
self._original_components_to_new[original_signal],
)
Angus Lothian
committed
new_signal.set_destination(
Angus Lothian
committed
Angus Lothian
committed
self._components_dfs_order.extend(
self._operations_dfs_order.append(source.operation)
Angus Lothian
committed
# Check if the signal has not been added before.
elif (
original_signal not in self._original_components_to_new
):
Angus Lothian
committed
if original_signal.source is None:
raise ValueError(
Angus Lothian
committed
new_signal = cast(
Signal,
self._add_component_unconnected_copy(
original_signal
),
)
Angus Lothian
committed
new_signal.set_destination(
Angus Lothian
committed
self._components_dfs_order.append(new_signal)
original_connected_op = (
original_signal.source.operation
)
Angus Lothian
committed
# Check if connected Operation has been added before.
if (
original_connected_op
in self._original_components_to_new
):
component = cast(
Operation,
self._original_components_to_new[
original_connected_op
],
)
Angus Lothian
committed
# Set source to the already added operations port.
component.output(original_signal.source.index)
Angus Lothian
committed
else:
# Create new operation, set signal source to it.
new_connected_op = cast(
Operation,
self._add_component_unconnected_copy(
original_connected_op
)
new_signal.set_source(
new_connected_op.output(
original_signal.source.index
)
)
Angus Lothian
committed
self._components_dfs_order.append(new_connected_op)
self._operations_dfs_order.append(new_connected_op)
# Add connected operation to queue of operations to visit.
op_stack.append(original_connected_op)
# Connect output ports.
for original_output_port in original_op.outputs:
for original_signal in original_output_port.signals:
# Check if the signal is one of the SFG's output signals.
if (
original_signal
in self._original_output_signals_to_indices
):
Angus Lothian
committed
# New signal already created during first step of constructor.
new_signal = cast(
Signal,
self._original_components_to_new[original_signal],
)
Angus Lothian
committed
new_signal.set_source(
Angus Lothian
committed
destination = cast(InputPort, new_signal.destination)
Angus Lothian
committed
self._components_dfs_order.extend(
Angus Lothian
committed
self._operations_dfs_order.append(
Angus Lothian
committed
# Check if signal has not been added before.
elif (
original_signal not in self._original_components_to_new
):
Angus Lothian
committed
if original_signal.source is None:
raise ValueError(
Angus Lothian
committed
new_signal = self._add_component_unconnected_copy(
Angus Lothian
committed
new_signal.set_source(
Angus Lothian
committed
self._components_dfs_order.append(new_signal)
original_connected_op = (
original_signal.destination.operation
)
if original_connected_op is None:
raise ValueError(
"Signal without destination in SFG"
)
Angus Lothian
committed
# Check if connected operation has been added.
if (
original_connected_op
in self._original_components_to_new
):
Angus Lothian
committed
# Set destination to the already connected operations port.
new_signal.set_destination(
self._original_components_to_new[
original_connected_op
].input(original_signal.destination.index)
)
Angus Lothian
committed
else:
# Create new operation, set destination to it.
new_connected_op = (
self._add_component_unconnected_copy(
original_connected_op
)
)
new_signal.set_destination(
new_connected_op.input(
original_signal.destination.index
)
)
Angus Lothian
committed
self._components_dfs_order.append(new_connected_op)
self._operations_dfs_order.append(new_connected_op)
# Add connected operation to the queue of operations to visit.
op_stack.append(original_connected_op)
def _evaluate_source(
self,
src: OutputPort,
results: MutableResultMap,
delays: MutableDelayMap,
prefix: str,
bits_override: Optional[int],
truncate: bool,
deferred_delays: DelayQueue,
) -> Number:
Angus Lothian
committed
key_base = (
(prefix + "." + src.operation.graph_id)
if prefix
else src.operation.graph_id
)
Angus Lothian
committed
key = src.operation.key(src.index, key_base)
if key in results:
value = results[key]
if value is None:
raise RuntimeError(
"Direct feedback loop detected when evaluating operation."
)
Angus Lothian
committed
return value
value = src.operation.current_output(src.index, delays, key_base)
results[key] = value
if value is None:
value = self._do_evaluate_source(
key_base,
key,
src,
results,
delays,
prefix,
bits_override,
truncate,
deferred_delays,
)
Angus Lothian
committed
else:
# Evaluate later. Use current value for now.
deferred_delays.append((key_base, key, src))
return value
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
def _do_evaluate_source(
self,
key_base: str,
key: ResultKey,
src: OutputPort,
results: MutableResultMap,
delays: MutableDelayMap,
prefix: str,
bits_override: Optional[int],
truncate: bool,
deferred_delays: DelayQueue,
) -> Number:
input_values = [
self._evaluate_source(
input_port.signals[0].source,
results,
delays,
prefix,
bits_override,
truncate,
deferred_delays,
)
for input_port in src.operation.inputs
]
Angus Lothian
committed
value = src.operation.evaluate_output(
src.index,
input_values,
results,
delays,
key_base,
bits_override,
truncate,
)
Angus Lothian
committed
results[key] = value
return value
def sfg(self, show_id=False, engine=None) -> Digraph:
"""
Returns a Digraph of the SFG. Can be directly displayed in IPython.
Parameters
----------
show_id : Boolean, optional
If True, the graph_id:s of signals are shown. The default is False.
Graphviz layout engine to be used, see https://graphviz.org/documentation/.
Most common are "dot" and "neato". Default is None leading to dot.
Returns
-------
Digraph
Digraph of the SFG.
"""
dg = Digraph()
if engine is not None:
dg.engine = engine
for op in self._components_by_id.values():
if isinstance(op, Signal):
if show_id:
dg.edge(
op.source.operation.graph_id,
op.destination.operation.graph_id,
label=op.graph_id,
)
dg.edge(
op.source.operation.graph_id,
op.destination.operation.graph_id,
)
else:
if op.type_name() == Delay.type_name():
else:
dg.node(op.graph_id)
return dg
def _repr_mimebundle_(self, include=None, exclude=None):
return self.sfg()._repr_mimebundle_(include=include, exclude=exclude)
def show_sfg(self, format=None, show_id=False, engine=None) -> None:
"""
Shows a visual representation of the SFG using the default system viewer.
Parameters
----------
format : string, optional
File format of the generated graph. Output formats can be found at
https://www.graphviz.org/doc/info/output.html
Most common are "pdf", "eps", "png", and "svg". Default is None which leads to PDF.
show_id : Boolean, optional
If True, the graph_id:s of signals are shown. The default is False.
Graphviz layout engine to be used, see https://graphviz.org/documentation/.
Most common are "dot" and "neato". Default is None leading to dot.
"""
dg = self.sfg(show_id=show_id)
if engine is not None:
dg.engine = engine
if format is not None:
dg.format = format