Newer
Older
Angus Lothian
committed
# Find dependencies of output ports and input ports.
remaining_inports_per_operation = {op: op.input_count for op in self.operations}
Angus Lothian
committed
# Traverse output ports for precedence
curr_iter_ports = first_iter_ports
precedence_list = []
while curr_iter_ports:
# Add the found ports to the current iter
precedence_list.append(curr_iter_ports)
next_iter_ports = []
for outport in curr_iter_ports:
for signal in outport.signals:
new_inport = signal.destination
if new_inport is not None and not isinstance(
new_inport.operation, Delay
):
Angus Lothian
committed
new_op = new_inport.operation
remaining_inports_per_operation[new_op] -= 1
if remaining_inports_per_operation[new_op] == 0:
next_iter_ports.extend(new_op.outputs)
curr_iter_ports = next_iter_ports
return precedence_list
def _add_component_unconnected_copy(
self, original_component: GraphComponent
) -> GraphComponent:
if original_component in self._original_components_to_new:
raise ValueError("Tried to add duplicate SFG component")
Angus Lothian
committed
new_component = original_component.copy_component()
self._original_components_to_new[original_component] = new_component
if not new_component.graph_id or new_component.graph_id in self._used_ids:
new_id = self._graph_id_generator.next_id(
new_component.type_name(), self._used_ids
)
new_component.graph_id = new_id
self._used_ids.add(new_component.graph_id)
self._components_by_id[new_component.graph_id] = new_component
Angus Lothian
committed
self._components_by_name[new_component.name].append(new_component)
return new_component
def _add_operation_connected_tree_copy(self, start_op: Operation) -> None:
op_stack = deque([start_op])
while op_stack:
original_op = op_stack.pop()
# Add or get the new copy of the operation.
if original_op not in self._original_components_to_new:
new_op = cast(
Operation,
self._add_component_unconnected_copy(original_op),
)
Angus Lothian
committed
self._components_dfs_order.append(new_op)
self._operations_dfs_order.append(new_op)
else:
new_op = cast(Operation, self._original_components_to_new[original_op])
Angus Lothian
committed
# Connect input ports to new signals.
for original_input_port in original_op.inputs:
if original_input_port.signal_count < 1:
raise ValueError("Unconnected input port in SFG")
for original_signal in original_input_port.signals:
# Check if the signal is one of the SFG's input signals.
if original_signal in self._original_input_signals_to_indices:
Angus Lothian
committed
# New signal already created during first step of constructor.
new_signal = cast(
Signal,
self._original_components_to_new[original_signal],
)
Angus Lothian
committed
new_signal.set_destination(
Angus Lothian
committed
Angus Lothian
committed
self._components_dfs_order.extend(
if source.operation not in self._operations_dfs_order:
self._operations_dfs_order.append(source.operation)
Angus Lothian
committed
# Check if the signal has not been added before.
elif original_signal not in self._original_components_to_new:
Angus Lothian
committed
if original_signal.source is None:
dest = (
original_signal.destination.operation.name
if original_signal.destination is not None
else "None"
)
Angus Lothian
committed
raise ValueError(
Angus Lothian
committed
new_signal = cast(
Signal,
self._add_component_unconnected_copy(original_signal),
Angus Lothian
committed
new_signal.set_destination(
Angus Lothian
committed
self._components_dfs_order.append(new_signal)
original_connected_op = original_signal.source.operation
Angus Lothian
committed
# Check if connected Operation has been added before.
if original_connected_op in self._original_components_to_new:
component = cast(
Operation,
self._original_components_to_new[original_connected_op],
Angus Lothian
committed
# Set source to the already added operations port.
component.output(original_signal.source.index)
Angus Lothian
committed
else:
# Create new operation, set signal source to it.
new_connected_op = cast(
Operation,
self._add_component_unconnected_copy(
original_connected_op
Angus Lothian
committed
self._components_dfs_order.append(new_connected_op)
self._operations_dfs_order.append(new_connected_op)
# Add connected operation to queue of operations to visit.
op_stack.append(original_connected_op)
# Connect output ports.
for original_output_port in original_op.outputs:
for original_signal in original_output_port.signals:
# Check if the signal is one of the SFG's output signals.
if original_signal in self._original_output_signals_to_indices:
Angus Lothian
committed
# New signal already created during first step of constructor.
new_signal = cast(
Signal,
self._original_components_to_new[original_signal],
)
new_signal.set_source(new_op.output(original_output_port.index))
Angus Lothian
committed
destination = cast(InputPort, new_signal.destination)
Angus Lothian
committed
self._components_dfs_order.extend(
self._operations_dfs_order.append(destination.operation)
Angus Lothian
committed
# Check if signal has not been added before.
elif original_signal not in self._original_components_to_new:
Angus Lothian
committed
if original_signal.source is None:
raise ValueError(
"Dangling signal ({original_signal}) without"
" source in SFG"
Angus Lothian
committed
self._add_component_unconnected_copy(original_signal),
new_signal.set_source(new_op.output(original_output_port.index))
Angus Lothian
committed
self._components_dfs_order.append(new_signal)
original_destination = cast(
InputPort, original_signal.destination
if original_destination is None:
raise ValueError(
f"Signal ({original_signal}) without destination in SFG"
if original_connected_op is None:
raise ValueError(
"Signal with empty destination port"
f" ({original_destination}) in SFG"
Angus Lothian
committed
# Check if connected operation has been added.
if original_connected_op in self._original_components_to_new:
Angus Lothian
committed
# Set destination to the already connected operations port.
cast(
Operation,
self._original_components_to_new[
original_connected_op
],
).input(original_destination.index)
Angus Lothian
committed
else:
# Create new operation, set destination to it.
new_connected_op = cast(
Operation,
(
self._add_component_unconnected_copy(
original_connected_op
)
),
Angus Lothian
committed
self._components_dfs_order.append(new_connected_op)
self._operations_dfs_order.append(new_connected_op)
# Add connected operation to the queue of operations
# to visit.
Angus Lothian
committed
op_stack.append(original_connected_op)
def _evaluate_source(
self,
src: OutputPort,
results: MutableResultMap,
delays: MutableDelayMap,
prefix: str,
bits_override: Optional[int],
Angus Lothian
committed
key_base = (
(prefix + "." + src.operation.graph_id)
if prefix
else src.operation.graph_id
)
Angus Lothian
committed
key = src.operation.key(src.index, key_base)
if key in results:
value = results[key]
if value is None:
raise RuntimeError(
"Direct feedback loop detected when evaluating operation."
)
Angus Lothian
committed
return value
value = src.operation.current_output(src.index, delays, key_base)
results[key] = value
if value is None:
value = self._do_evaluate_source(
key_base,
key,
src,
results,
delays,
prefix,
bits_override,
Angus Lothian
committed
else:
# Evaluate later. Use current value for now.
deferred_delays.append((key_base, key, src))
return value
def _do_evaluate_source(
self,
key_base: str,
key: ResultKey,
src: OutputPort,
results: MutableResultMap,
delays: MutableDelayMap,
prefix: str,
bits_override: Optional[int],
input_values = [
self._evaluate_source(
input_port.signals[0].source,
results,
delays,
prefix,
bits_override,
deferred_delays,
)
for input_port in src.operation.inputs
]
Angus Lothian
committed
value = src.operation.evaluate_output(
src.index,
input_values,
results,
delays,
key_base,
bits_override,
Angus Lothian
committed
results[key] = value
return value
def sfg_digraph(self, show_id=False, engine=None) -> Digraph:
"""
Returns a Digraph of the SFG. Can be directly displayed in IPython.
Parameters
----------
show_id : Boolean, optional
If True, the graph_id:s of signals are shown. The default is False.
Graphviz layout engine to be used, see https://graphviz.org/documentation/.
Most common are "dot" and "neato". Default is None leading to dot.
Returns
-------
Digraph
Digraph of the SFG.
"""
dg = Digraph()
if engine is not None:
dg.engine = engine
for op in self._components_by_id.values():
if isinstance(op, Signal):
source = cast(OutputPort, op.source)
destination = cast(InputPort, op.destination)
source.operation.graph_id,
destination.operation.graph_id,
source.operation.graph_id,
destination.operation.graph_id,
dg.node(op.graph_id, shape=_OPERATION_SHAPE[op.type_name()])
def _repr_mimebundle_(self, include=None, exclude=None):
return self.sfg_digraph()._repr_mimebundle_(include=include, exclude=exclude)
return self.sfg_digraph()._repr_mimebundle_(include=["image/jpeg"])[
"image/jpeg"
]
def _repr_png_(self):
return self.sfg_digraph()._repr_mimebundle_(include=["image/png"])["image/png"]
def show(self, fmt=None, show_id=False, engine=None) -> None:
"""
Shows a visual representation of the SFG using the default system viewer.
Parameters
----------
File format of the generated graph. Output formats can be found at
https://www.graphviz.org/doc/info/output.html
Most common are "pdf", "eps", "png", and "svg". Default is None which
leads to PDF.
show_id : Boolean, optional
If True, the graph_id:s of signals are shown. The default is False.
Graphviz layout engine to be used, see https://graphviz.org/documentation/.
Most common are "dot" and "neato". Default is None leading to dot.
"""
if engine is not None:
dg.engine = engine
if fmt is not None:
dg.format = fmt
def critical_path_time(self) -> int:
"""Return the time of the critical path."""
# Import here needed to avoid circular imports
from b_asic.schedule import Schedule
return Schedule(self, scheduling_algorithm="ASAP").schedule_time
def unfold(self, factor: int) -> "SFG":
Unfold the SFG *factor* times. Return a new SFG without modifying the original.
Inputs and outputs are ordered with early inputs first. That is for an SFG
with n inputs, the first n inputs are the inputs at time t, the next n
inputs are the inputs at time t+1, the next n at t+2 and so on.
Parameters
----------
factor : string, optional
Number of times to unfold
"""
raise ValueError("Unfolding 0 times removes the SFG")
# Make `factor` copies of the sfg
new_ops = [
[cast(Operation, op.copy_component()) for op in self.operations]
for _ in range(factor)
]
id_idx_map = {op.graph_id: idx for (idx, op) in enumerate(self.operations)}
# The rest of the process is easier if we clear the connections of the inputs
# and outputs of all operations
for layer, op_list in enumerate(new_ops):
for op_idx, op in enumerate(op_list):
for input_ in op.inputs:
input_.clear()
for output in op.outputs:
output.clear()
new_ops[layer][op_idx].name = f"{new_ops[layer][op_idx].name}_{suffix}"
# NOTE: Since these IDs are what show up when printing the graph, it
# is helpful to set them. However, this can cause name collisions when
# names in a graph are already suffixed with _n
new_ops[layer][op_idx].graph_id = GraphID(
# Walk through the operations, replacing delay nodes with connections
for layer in range(factor):
for op_idx, op in enumerate(self.operations):
if isinstance(op, Delay):
# Port of the operation feeding into this delay
source_port = op.inputs[0].connected_source
if source_port is None:
raise ValueError("Dangling delay input port in sfg")
source_op_idx = id_idx_map[source_port.operation.graph_id]
source_op_output_index = source_port.index
new_source_op = new_ops[layer][source_op_idx]
source_op_output = new_source_op.outputs[source_op_output_index]
# If this is the last layer, we need to create a new delay element and connect it instead
# of the copied port
if layer == factor - 1:
delay = Delay(name=op.name)
delay.graph_id = op.graph_id
# Since we're adding a new operation instead of bypassing as in the
# common case, we also need to hook up the inputs to the delay.
delay.inputs[0].connect(source_op_output)
new_source_op = delay
new_source_port = new_source_op.outputs[0]
else:
# The new output port we should connect to
new_source_port = source_op_output
for out_signal in op.outputs[0].signals:
sink_port = out_signal.destination
if sink_port is None:
# It would be weird if we found a signal that wasn't connected anywhere
raise ValueError("Dangling output port in sfg")
sink_op_idx = id_idx_map[sink_port.operation.graph_id]
sink_op_output_index = sink_port.index
target_layer = 0 if layer == factor - 1 else layer + 1
new_dest_op = new_ops[target_layer][sink_op_idx]
new_destination = new_dest_op.inputs[sink_op_output_index]
new_destination.connect(new_source_port)
else:
# Other opreations need to be re-targeted to the corresponding output in the
# current layer, as long as that output is not a delay, as that has been solved
# above.
# To avoid double connections, we'll only re-connect inputs
for input_num, original_input in enumerate(op.inputs):
original_source = original_input.connected_source
# We may not always have something connected to the input, if we don't
# we can abort
if original_source is None:
continue
# delay connections are handled elsewhere
if not isinstance(original_source.operation, Delay):
source_op_idx = id_idx_map[
original_source.operation.graph_id
]
source_op_output_idx = original_source.index
target_output = new_ops[layer][source_op_idx].outputs[
source_op_output_idx
]
new_ops[layer][op_idx].inputs[input_num].connect(
target_output
)
all_ops = [op for op_list in new_ops for op in op_list]
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
# To get the input order correct, we need to know the input order in the original
# sfg and which operations they correspond to
input_ids = [op.graph_id for op in self.input_operations]
output_ids = [op.graph_id for op in self.output_operations]
# Re-order the inputs to the correct order. Internal order of the inputs should
# be preserved, i.e. for a graph with 2 inputs (in1, in2), in1 must occur before in2,
# but the "time" order should be reversed. I.e. the input from layer `factor-1` is the
# first input
all_inputs = list(
itertools.chain.from_iterable(
[
[ops[id_idx_map[input_id]] for input_id in input_ids]
for ops in new_ops
]
)
)
# Outputs are not reversed, but need the same treatment
all_outputs = list(
itertools.chain.from_iterable(
[
[ops[id_idx_map[output_id]] for output_id in output_ids]
for ops in new_ops
]
)
)
# Sanity check to ensure that no duplicate graph IDs have been created
ids = [op.graph_id for op in all_ops]
assert len(ids) == len(set(ids))
return SFG(inputs=all_inputs, outputs=all_outputs)
@property
def is_linear(self) -> bool:
return all(op.is_linear for op in self.split())
@property
def is_constant(self) -> bool:
return all(output.is_constant for output in self._output_operations)