Newer
Older
# Remove all delay elements in the SFG and replace each one
# with one input operation and one output operation
for delay in sfg.find_by_type_name(Delay.type_name()):
i = Input(name="input_" + delay.graph_id)
o = Output(
src0=delay.input(0).signals[0].source, name="output_" + delay.graph_id
)
inputs.append(i)
outputs.append(o)
# move all outgoing signals from the delay to the new input operation
while len(delay.output(0).signals) > 0:
signal = delay.output(0).signals[0]
destination = signal.destination
destination.remove_signal(signal)
signal.remove_source()
destination.connect(i.output(0))
delay.input(0).signals[0].remove_source()
delay.input(0).clear()
new_sfg = SFG(inputs, outputs) # The new sfg without the delays
sfgs = [new_sfg() for _ in range(factor)] # Copy the SFG factor times
# Add suffixes to all graphIDs and names in order to keep them separated
for i in range(factor):
for operation in sfgs[i].operations:
suffix = f'_{i}'
operation.graph_id = operation.graph_id + suffix
if operation.name[:7] not in ['', 'input_t', 'output_']:
operation.name = operation.name + suffix
input_name_to_idx = {} # save the input port indices for future reference
new_inputs = []
# For each copy of the SFG, create new input operations for every "original"
# input operation and connect them to begin creating the unfolded SFG
for i in range(factor):
for port, operation in zip(sfgs[i].inputs, sfgs[i].input_operations):
if not operation.name.startswith("input_t"):
i = Input()
new_inputs.append(i)
port.connect(i)
# If the input was created earlier when removing the delays
# then just save the index
input_name_to_idx[operation.name] = port.index
# Connect the original outputs in the same way as the inputs
# Also connect the copies of the SFG together according to a formula
# from the TSTE87 course material, and save the number of delays for
# each interconnection
new_outputs = []
delay_placements = {}
for i in range(factor):
for port, operation in zip(sfgs[i].outputs, sfgs[i].output_operations):
if not operation.name.startswith("output_t"):
new_outputs.append(Output(port))
else:
index = operation.name[8:] # Remove the "output_t" prefix
input_port = sfgs[j].input(input_name_to_idx["input_t" + index])
input_port.connect(port)
delay_placements[port] = [i, number_of_delays_between]
sfgs[i].graph_id = (
f'sfg{i}' # deterministically set the graphID of the sfgs
)
sfg = SFG(new_inputs, new_outputs) # create a new SFG to remove floating nodes
# Insert the interconnect delays according to what is saved in delay_placements
i, no_of_delays = val
for _ in range(no_of_delays):
sfg = sfg.insert_operation_after(f'sfg{i}.{port.index}', Delay())
# Flatten all the copies of the original SFG
for i in range(factor):
sfg.find_by_id(f'sfg{i}').connect_external_signals_to_components()
sfg = sfg()
@property
def is_linear(self) -> bool:
return all(op.is_linear for op in self.split())
@property
def is_constant(self) -> bool:
return all(output.is_constant for output in self._output_operations)
def get_used_type_names(self) -> List[TypeName]:
"""Get a list of all TypeNames used in the SFG."""
ret = list({op.type_name() for op in self.operations})
ret.sort()
return ret