Newer
Older
Unfold the SFG *factor* times. Return a new SFG without modifying the original.
Inputs and outputs are ordered with early inputs first. That is for an SFG
with n inputs, the first n inputs are the inputs at time t, the next n
inputs are the inputs at time t+1, the next n at t+2 and so on.
Parameters
----------
Number of times to unfold.
raise ValueError("Unfolding 0 times removes the SFG")
inputs = sfg.input_operations
outputs = sfg.output_operations
# Remove all delay elements in the SFG and replace each one
# with one input operation and one output operation
for delay in sfg.find_by_type_name(Delay.type_name()):
i = Input(name="input_" + delay.graph_id)
o = Output(
src0=delay.input(0).signals[0].source, name="output_" + delay.graph_id
)
inputs.append(i)
outputs.append(o)
# move all outgoing signals from the delay to the new input operation
while len(delay.output(0).signals) > 0:
signal = delay.output(0).signals[0]
destination = signal.destination
destination.remove_signal(signal)
signal.remove_source()
destination.connect(i.output(0))
delay.input(0).signals[0].remove_source()
delay.input(0).clear()
new_sfg = SFG(inputs, outputs) # The new sfg without the delays
sfgs = [new_sfg() for _ in range(factor)] # Copy the SFG factor times
# Add suffixes to all graphIDs and names in order to keep them separated
for i in range(factor):
for operation in sfgs[i].operations:
suffix = f'_{i}'
operation.graph_id = operation.graph_id + suffix
if operation.name[:7] not in ['', 'input_t', 'output_']:
operation.name = operation.name + suffix
input_name_to_idx = {} # save the input port indices for future reference
new_inputs = []
# For each copy of the SFG, create new input operations for every "original"
# input operation and connect them to begin creating the unfolded SFG
for i in range(factor):
for port, operation in zip(sfgs[i].inputs, sfgs[i].input_operations):
if not operation.name.startswith("input_t"):
i = Input()
new_inputs.append(i)
port.connect(i)
# If the input was created earlier when removing the delays
# then just save the index
input_name_to_idx[operation.name] = port.index
# Connect the original outputs in the same way as the inputs
# Also connect the copies of the SFG together according to a formula
# from the TSTE87 course material, and save the number of delays for
# each interconnection
new_outputs = []
delay_placements = {}
for i in range(factor):
for port, operation in zip(sfgs[i].outputs, sfgs[i].output_operations):
if not operation.name.startswith("output_t"):
new_outputs.append(Output(port))
else:
index = operation.name[8:] # Remove the "output_t" prefix
input_port = sfgs[j].input(input_name_to_idx["input_t" + index])
input_port.connect(port)
delay_placements[port] = [i, number_of_delays_between]
sfgs[i].graph_id = (
f'sfg{i}' # deterministically set the graphID of the sfgs
)
sfg = SFG(new_inputs, new_outputs) # create a new SFG to remove floating nodes
# Insert the interconnect delays according to what is saved in delay_placements
i, no_of_delays = val
for _ in range(no_of_delays):
sfg = sfg.insert_operation_after(f'sfg{i}.{port.index}', Delay())
# Flatten all the copies of the original SFG
for i in range(factor):
sfg.find_by_id(f'sfg{i}').connect_external_signals_to_components()
sfg = sfg()
@property
def is_linear(self) -> bool:
return all(op.is_linear for op in self.split())
@property
def is_constant(self) -> bool:
return all(output.is_constant for output in self._output_operations)
2114
2115
2116
2117
2118
2119
2120
2121
2122
2123
2124
2125
2126
2127
2128
2129
2130
2131
2132
2133
2134
2135
2136
2137
2138
2139
2140
2141
2142
2143
2144
2145
2146
2147
2148
2149
2150
2151
2152
2153
2154
2155
2156
2157
2158
2159
2160
2161
2162
2163
2164
2165
2166
2167
2168
2169
2170
2171
2172
2173
2174
2175
2176
2177
2178
2179
2180
2181
2182
2183
2184
2185
2186
2187
2188
2189
2190
2191
2192
2193
2194
2195
2196
2197
2198
2199
2200
2201
2202
2203
2204
2205
2206
2207
2208
2209
2210
2211
2212
2213
2214
2215
2216
2217
2218
2219
2220
2221
2222
2223
2224
2225
2226
2227
2228
2229
2230
2231
2232
2233
2234
2235
2236
2237
2238
2239
2240
2241
2242
2243
2244
2245
2246
2247
2248
2249
2250
2251
2252
2253
2254
2255
2256
2257
2258
2259
2260
@property
def is_commutative(self) -> bool:
"""
Checks if all operations in the SFG are commutative.
An operation is considered commutative if it is not in B-ASIC Special Operations Module,
and its `is_commutative` property is `True`.
Returns
-------
bool: `True` if all operations are commutative, `False` otherwise.
"""
return all(
(
op.is_commutative
if op.type_name() not in ["in", "out", "t", "c"]
else True
)
for op in self.split()
)
@property
def is_distributive(self) -> bool:
"""
Checks if the SFG is distributive.
An operation is considered distributive if it can be applied to each element of a set separately.
For example, multiplication is distributive over addition, meaning that `a * (b + c)` is equivalent to `a * b + a * c`.
Returns
-------
bool: True if the SFG is distributive, False otherwise.
Examples
--------
>>> Mad_op = MAD(Input(), Input(), Input()) # Creates an instance of the Mad operation, MAD is defined in b_asic.core_operations
>>> Mad_sfg = Mad_op.to_sfg() # The operation is turned into a sfg
>>> Mad_sfg.is_distributive # True # if the distributive property holds, False otherwise
"""
structures = []
operations = self.get_operations_topological_order()
for op in operations:
if not (
op.type_name() == "in"
or op.type_name() == "out"
or op.type_name() == "c"
or op.type_name() == "t"
):
structures.append(op)
return (
all(self.has_distributive_structure(op) for op in structures)
if len(structures) > 1
else False
)
def has_distributive_structure(self, op: Operation) -> bool:
"""
Checks if the SFG contains distributive structures.
NOTE: a*b + c = a*(b + c/a) is considered distributive. Meaning that an algorithm transformation would require an additionat operation.
Parameters:
----------
op : Operation
The operation that is the start of the structure to check for distributivity.
Returns:
-------
bool: True if a distributive structures is found, False otherwise.
"""
# TODO Butterfly and SymmetricTwoportAdaptor needs to be converted to a SF using to_sfg() in order to be evaluated
if op.type_name() == 'mac':
return True
elif op.type_name() in ['mul', 'div']:
for subsequent_op in op.subsequent_operations:
if subsequent_op.type_name() in [
'add',
'sub',
'addsub',
'min',
'max',
'sqrt',
'abs',
'rec',
'out',
't',
]:
return True
elif subsequent_op.type_name() in ['mul', 'div']:
for subsequent_op in subsequent_op.subsequent_operations:
return self.has_distributive_structure(subsequent_op)
else:
return False
elif op.type_name() in ['cmul', 'shift', 'rshift', 'lshift']:
for subsequent_op in op.subsequent_operations:
if subsequent_op.type_name() in [
'add',
'sub',
'addsub',
'min',
'max',
'out',
't',
]:
return True
elif subsequent_op.type_name() in ['cmul', 'shift', 'rshift', 'lshift']:
for subsequent_op in subsequent_op.subsequent_operations:
return self.has_distributive_structure(subsequent_op)
else:
return False
elif op.type_name() in ['add', 'sub', 'addsub']:
for subsequent_op in op.subsequent_operations:
if subsequent_op.type_name() in [
'mul',
'div',
'min',
'max',
'out',
'cmul',
't',
]:
return True
elif subsequent_op.type_name() in ['add', 'sub', 'addsub']:
for subsequent_op in subsequent_op.subsequent_operations:
return self.has_distributive_structure(subsequent_op)
else:
return False
elif op.type_name() in ['min', 'max']:
for subsequent_op in op.subsequent_operations:
if subsequent_op.type_name() in [
'add',
'sub',
'addsub',
'mul',
'div',
'cmul',
'out',
't',
]:
return True
elif subsequent_op.type_name() in ['min', 'max']:
for subsequent_op in subsequent_op.subsequent_operations:
return self.has_distributive_structure(subsequent_op)
else:
return False
return False
def get_used_type_names(self) -> List[TypeName]:
"""Get a list of all TypeNames used in the SFG."""
ret = list({op.type_name() for op in self.operations})
ret.sort()
return ret
def get_used_graph_ids(self) -> Set[GraphID]:
"""Get a list of all GraphID:s used in the SFG."""
ret = set({op.graph_id for op in self.operations})
sorted(ret)
return ret