Skip to content
Snippets Groups Projects

Unfolding

Merged Frans Skarman requested to merge unfolding into master
All threads resolved!
Files
4
+ 166
0
@@ -26,6 +26,7 @@ from typing import (
)
from graphviz import Digraph
from matplotlib.axes import itertools
from b_asic.graph_component import GraphComponent
from b_asic.operation import (
@@ -1164,8 +1165,14 @@ class SFG(AbstractOperation):
original_signal not in self._original_components_to_new
):
if original_signal.source is None:
dest = (
original_signal.destination.operation.name
if original_signal.destination is not None
else "None"
)
raise ValueError(
"Dangling signal without source in SFG"
f" (destination: {dest})"
)
new_signal = cast(
@@ -1486,3 +1493,162 @@ class SFG(AbstractOperation):
from b_asic.schedule import Schedule
return Schedule(self, scheduling_algorithm="ASAP").schedule_time
def unfold(self, factor: int) -> "SFG":
"""
Unfolds the SFG `factor` times. Returns a new SFG without modifying the original
Inputs and outputs are ordered with early inputs first. I.e. for an sfg
with n inputs, the first n inputs are the inputs at time t, the next n
inputs are the inputs at time t+1, the next n at t+2 and so on.
Parameters
----------
factor : string, optional
Number of times to unfold
"""
if factor == 0:
raise ValueError("Unrollnig 0 times removes the SFG")
# Make `factor` copies of the sfg
new_ops = [
[cast(Operation, op.copy_component()) for op in self.operations]
for _ in range(factor)
]
id_idx_map = {
op.graph_id: idx for (idx, op) in enumerate(self.operations)
}
# The rest of the process is easier if we clear the connections of the inputs
# and outputs of all operations
for layer, op_list in enumerate(new_ops):
for op_idx, op in enumerate(op_list):
for input in op.inputs:
input.clear()
for output in op.outputs:
output.clear()
suffix = layer
new_ops[layer][
op_idx
].name = f"{new_ops[layer][op_idx].name}_{suffix}"
# NOTE: Since these IDs are what show up when printing the graph, it
# is helpful to set them. However, this can cause name collisions when
# names in a graph are already suffixed with _n
new_ops[layer][op_idx].graph_id = GraphID(
f"{new_ops[layer][op_idx].graph_id}_{suffix}"
)
# Walk through the operations, replacing delay nodes with connections
for layer in range(factor):
for op_idx, op in enumerate(self.operations):
if isinstance(op, Delay):
# Port of the operation feeding into this delay
source_port = op.inputs[0].connected_source
if source_port is None:
raise ValueError("Dangling delay input port in sfg")
source_op_idx = id_idx_map[source_port.operation.graph_id]
source_op_output_index = source_port.index
new_source_op = new_ops[layer][source_op_idx]
source_op_output = new_source_op.outputs[
source_op_output_index
]
# If this is the last layer, we need to create a new delay element and connect it instead
# of the copied port
if layer == factor - 1:
delay = Delay(name=op.name)
delay.graph_id = op.graph_id
# Since we're adding a new operation instead of bypassing as in the
# common case, we also need to hook up the inputs to the delay.
delay.inputs[0].connect(source_op_output)
new_source_op = delay
new_source_port = new_source_op.outputs[0]
else:
# The new output port we should connect to
new_source_port = source_op_output
for out_signal in op.outputs[0].signals:
sink_port = out_signal.destination
if sink_port is None:
# It would be weird if we found a signal but it wasn't connected anywere
raise ValueError("Dangling output port in sfg")
sink_op_idx = id_idx_map[sink_port.operation.graph_id]
sink_op_output_index = sink_port.index
target_layer = 0 if layer == factor - 1 else layer + 1
new_dest_op = new_ops[target_layer][sink_op_idx]
new_destination = new_dest_op.inputs[
sink_op_output_index
]
new_destination.connect(new_source_port)
else:
# Other opreations need to be re-targeted to the corresponding output in the
# current layer, as long as that output is not a delay, as that has been solved
# above.
# To avoid double connections, we'll only re-connect inputs
for input_num, original_input in enumerate(op.inputs):
original_source = original_input.connected_source
# We may not always have something connected to the input, if we don't
# we can abort
if original_source is None:
continue
# delay connections are handled elsewhere
if not isinstance(original_source.operation, Delay):
source_op_idx = id_idx_map[
original_source.operation.graph_id
]
source_op_output_idx = original_source.index
target_output = new_ops[layer][
source_op_idx
].outputs[source_op_output_idx]
new_ops[layer][op_idx].inputs[input_num].connect(
target_output
)
all_ops = [op for op_list in new_ops for op in op_list]
# To get the input order correct, we need to know the input order in the original
# sfg and which operations they correspond to
input_ids = [op.graph_id for op in self.input_operations]
output_ids = [op.graph_id for op in self.output_operations]
# Re-order the inputs to the correct order. Internal order of the inputs should
# be preserved, i.e. for a graph with 2 inputs (in1, in2), in1 must occur before in2,
# but the "time" order should be reversed. I.e. the input from layer `factor-1` is the
# first input
all_inputs = list(
itertools.chain.from_iterable(
[
[ops[id_idx_map[input_id]] for input_id in input_ids]
for ops in new_ops
]
)
)
# Outputs are not reversed, but need the same treatment
all_outputs = list(
itertools.chain.from_iterable(
[
[ops[id_idx_map[output_id]] for output_id in output_ids]
for ops in new_ops
]
)
)
# Sanity check to ensure that no duplicate graph IDs have been created
ids = [op.graph_id for op in all_ops]
assert len(ids) == len(set(ids))
return SFG(inputs=all_inputs, outputs=all_outputs)
Loading