Newer
Older
shape='rectangle',
height="0.1",
width="0.1",
)
shape='rectangle',
height="0.1",
width="0.1",
# Creates edges for each output port and creates nodes for each operation
# and edges for them as well
Angus Lothian
committed
for port in ports:
source_label = port.operation.graph_id
Angus Lothian
committed
for signal in port.signals:
destination_label = destination.operation.graph_id
destination_node = (
destination_label + "In"
if isinstance(destination.operation, Delay)
else destination_label
)
pg.edge(node_node, destination_node)
destination_node,
label=destination_label,
shape=_OPERATION_SHAPE[destination.operation.type_name()],
source_node = (
source_label + "Out"
if port.operation.type_name() == Delay.type_name()
else source_label
)
pg.edge(source_node, node_node)
pg.node(
source_node,
label=source_label,
shape=_OPERATION_SHAPE[port.operation.type_name()],
)
Angus Lothian
committed
Angus Lothian
committed
def print_precedence_graph(self) -> None:
Print a representation of the SFG precedence list to the standard out.
If the precedence list already has been calculated then it uses the
cached version, otherwise it calculates the precedence list and then
prints it.
"""
Angus Lothian
committed
precedence_list = self.get_precedence_list()
line = "-" * 120
out_str = StringIO()
out_str.write(line)
printed_ops = set()
for iter_num, iterable in enumerate(precedence_list, start=1):
for outport_num, outport in enumerate(iterable, start=1):
Angus Lothian
committed
if outport not in printed_ops:
# Only print once per operation, even if it has multiple outports
out_str.write("\n")
out_str.write(str(iter_num))
out_str.write(".")
out_str.write(str(outport_num))
out_str.write(" \t")
out_str.write(str(outport.operation))
printed_ops.add(outport)
out_str.write("\n")
out_str.write(line)
print(out_str.getvalue())
def get_operations_topological_order(self) -> Iterable[Operation]:
"""
Return an Iterable of the Operations in the SFG in topological order.
Feedback loops makes an absolutely correct topological order impossible,
so an approximate topological Order is returned in such cases in this
implementation.
Angus Lothian
committed
if self._operations_topological_order:
return self._operations_topological_order
no_inputs_queue = deque(
list(filter(lambda op: op.input_count == 0, self.operations))
)
remaining_inports_per_operation = {op: op.input_count for op in self.operations}
Angus Lothian
committed
# Maps number of input counts to a queue of seen objects with such a size.
seen_with_inputs_dict: Dict[int, Deque] = defaultdict(deque)
Angus Lothian
committed
seen = set()
top_order = []
if len(no_inputs_queue) == 0:
raise ValueError("Illegal SFG state, dangling signals in SFG.")
Angus Lothian
committed
first_op = no_inputs_queue.popleft()
Angus Lothian
committed
p_queue = PriorityQueue()
p_queue_entry_num = itertools.count()
Angus Lothian
committed
# Negative priority as max-heap popping is wanted
p_queue.put((-first_op.output_count, -next(p_queue_entry_num), first_op))
Angus Lothian
committed
operations_left = len(self.operations) - 1
seen_but_not_visited_count = 0
while operations_left > 0:
while not p_queue.empty():
op = p_queue.get()[2]
operations_left -= 1
top_order.append(op)
visited.add(op)
for neighbor_op in op.subsequent_operations:
if neighbor_op not in visited:
remaining_inports_per_operation[neighbor_op] -= 1
remaining_inports = remaining_inports_per_operation[neighbor_op]
Angus Lothian
committed
if remaining_inports == 0:
p_queue.put(
(
-neighbor_op.output_count,
-next(p_queue_entry_num),
neighbor_op,
)
)
Angus Lothian
committed
elif remaining_inports > 0:
if neighbor_op in seen:
seen_with_inputs_dict[remaining_inports + 1].remove(
neighbor_op
)
Angus Lothian
committed
else:
seen.add(neighbor_op)
seen_but_not_visited_count += 1
seen_with_inputs_dict[remaining_inports].append(neighbor_op)
Angus Lothian
committed
# Check if have to fetch Operations from somewhere else since p_queue
# is empty
Angus Lothian
committed
if operations_left > 0:
# First check if can fetch from Operations with no input ports
if no_inputs_queue:
new_op = no_inputs_queue.popleft()
p_queue.put(
(
-new_op.output_count,
-next(p_queue_entry_num),
new_op,
)
)
Angus Lothian
committed
# Else fetch operation with the lowest input count that is not zero
Angus Lothian
committed
elif seen_but_not_visited_count > 0:
for i in itertools.count(start=1):
Angus Lothian
committed
seen_inputs_queue = seen_with_inputs_dict[i]
if seen_inputs_queue:
new_op = seen_inputs_queue.popleft()
p_queue.put(
(
-new_op.output_count,
-next(p_queue_entry_num),
new_op,
)
)
Angus Lothian
committed
seen_but_not_visited_count -= 1
break
else:
raise RuntimeError("Disallowed structure in SFG detected")
Angus Lothian
committed
self._operations_topological_order = top_order
return self._operations_topological_order
def set_latency_of_type(self, type_name: TypeName, latency: int) -> None:
"""
Set the latency of all components with the given type name.
Parameters
----------
type_name : TypeName
The type name of the operation. For example, obtained as
``Addition.type_name()``.
latency : int
The latency of the operation.
"""
Angus Lothian
committed
for op in self.find_by_type_name(type_name):
Angus Lothian
committed
def set_execution_time_of_type(
self, type_name: TypeName, execution_time: int
) -> None:
"""
Set the execution time of all operations with the given type name.
Parameters
----------
type_name : TypeName
The type name of the operation. For example, obtained as
``Addition.type_name()``.
execution_time : int
The execution time of the operation.
for op in self.find_by_type_name(type_name):
cast(Operation, op).execution_time = execution_time
def set_latency_offsets_of_type(
self, type_name: TypeName, latency_offsets: Dict[str, int]
) -> None:
"""
Set the latency offsets of all operations with the given type name.
Parameters
----------
type_name : TypeName
The type name of the operation. For example, obtained as
``Addition.type_name()``.
latency_offsets : {"in1": int, ...}
The latency offsets of the inputs and outputs.
Angus Lothian
committed
for op in self.find_by_type_name(type_name):
cast(Operation, op).set_latency_offsets(latency_offsets)
Angus Lothian
committed
def _traverse_for_precedence_list(
self, first_iter_ports: List[OutputPort]
) -> List[List[OutputPort]]:
Angus Lothian
committed
# Find dependencies of output ports and input ports.
remaining_inports_per_operation = {op: op.input_count for op in self.operations}
Angus Lothian
committed
# Traverse output ports for precedence
curr_iter_ports = first_iter_ports
precedence_list = []
while curr_iter_ports:
# Add the found ports to the current iter
precedence_list.append(curr_iter_ports)
next_iter_ports = []
for outport in curr_iter_ports:
for signal in outport.signals:
if new_input_port is not None and not isinstance(
new_input_port.operation, Delay
Angus Lothian
committed
remaining_inports_per_operation[new_op] -= 1
if remaining_inports_per_operation[new_op] == 0:
next_iter_ports.extend(new_op.outputs)
curr_iter_ports = next_iter_ports
return precedence_list
def _add_component_unconnected_copy(
self, original_component: GraphComponent
) -> GraphComponent:
if original_component in self._original_components_to_new:
id = (
original_component.name
if original_component.name
else (
original_component.graph_id
if original_component.graph_id
else original_component.type_name()
)
)
raise ValueError(f"Tried to add duplicate SFG component: {id}")
new_component = original_component.copy()
Angus Lothian
committed
self._original_components_to_new[original_component] = new_component
if not new_component.graph_id or new_component.graph_id in self._used_ids:
new_id = self._graph_id_generator.next_id(
new_component.type_name(), self._used_ids
)
new_component.graph_id = new_id
self._used_ids.add(new_component.graph_id)
self._components_by_id[new_component.graph_id] = new_component
Angus Lothian
committed
self._components_by_name[new_component.name].append(new_component)
return new_component
def _add_operation_connected_tree_copy(self, start_op: Operation) -> None:
op_stack = deque([start_op])
while op_stack:
original_op = op_stack.pop()
# Add or get the new copy of the operation.
if original_op not in self._original_components_to_new:
new_op = cast(
Operation,
self._add_component_unconnected_copy(original_op),
)
Angus Lothian
committed
self._components_dfs_order.append(new_op)
self._operations_dfs_order.append(new_op)
else:
new_op = cast(Operation, self._original_components_to_new[original_op])
Angus Lothian
committed
# Connect input ports to new signals.
for original_input_port in original_op.inputs:
if original_input_port.signal_count < 1:
id = (
original_op.name
if original_op.name
else (
original_op.graph_id
if original_op.graph_id
else original_op.type_name()
)
)
raise ValueError(f"Unconnected input port in SFG. Operation: {id}")
Angus Lothian
committed
for original_signal in original_input_port.signals:
# Check if the signal is one of the SFG's input signals.
if original_signal in self._original_input_signals_to_indices:
Angus Lothian
committed
# New signal already created during first step of constructor.
new_signal = cast(
Signal,
self._original_components_to_new[original_signal],
)
Angus Lothian
committed
new_signal.set_destination(
Angus Lothian
committed
Angus Lothian
committed
self._components_dfs_order.extend(
if source.operation not in self._operations_dfs_order:
self._operations_dfs_order.append(source.operation)
Angus Lothian
committed
# Check if the signal has not been added before.
elif original_signal not in self._original_components_to_new:
Angus Lothian
committed
if original_signal.source is None:
dest = (
original_signal.destination.operation.name
if original_signal.destination is not None
else "None"
)
Angus Lothian
committed
raise ValueError(
Angus Lothian
committed
new_signal = cast(
Signal,
self._add_component_unconnected_copy(original_signal),
Angus Lothian
committed
new_signal.set_destination(
Angus Lothian
committed
self._components_dfs_order.append(new_signal)
original_connected_op = original_signal.source.operation
Angus Lothian
committed
# Check if connected Operation has been added before.
if original_connected_op in self._original_components_to_new:
component = cast(
Operation,
self._original_components_to_new[original_connected_op],
Angus Lothian
committed
# Set source to the already added operations port.
component.output(original_signal.source.index)
Angus Lothian
committed
else:
# Create new operation, set signal source to it.
new_connected_op = cast(
Operation,
self._add_component_unconnected_copy(
original_connected_op
Angus Lothian
committed
self._components_dfs_order.append(new_connected_op)
self._operations_dfs_order.append(new_connected_op)
# Add connected operation to queue of operations to visit.
op_stack.append(original_connected_op)
# Connect output ports.
for original_output_port in original_op.outputs:
for original_signal in original_output_port.signals:
# Check if the signal is one of the SFG's output signals.
if original_signal in self._original_output_signals_to_indices:
Angus Lothian
committed
# New signal already created during first step of constructor.
new_signal = cast(
Signal,
self._original_components_to_new[original_signal],
)
new_signal.set_source(new_op.output(original_output_port.index))
Angus Lothian
committed
destination = cast(InputPort, new_signal.destination)
Angus Lothian
committed
self._components_dfs_order.extend(
self._operations_dfs_order.append(destination.operation)
Angus Lothian
committed
# Check if signal has not been added before.
elif original_signal not in self._original_components_to_new:
Angus Lothian
committed
if original_signal.source is None:
raise ValueError(
"Dangling signal ({original_signal}) without"
" source in SFG"
Angus Lothian
committed
self._add_component_unconnected_copy(original_signal),
new_signal.set_source(new_op.output(original_output_port.index))
Angus Lothian
committed
self._components_dfs_order.append(new_signal)
original_destination = cast(
InputPort, original_signal.destination
if original_destination is None:
raise ValueError(
f"Signal ({original_signal}) without destination in SFG"
if original_connected_op is None:
raise ValueError(
"Signal with empty destination port"
f" ({original_destination}) in SFG"
Angus Lothian
committed
# Check if connected operation has been added.
if original_connected_op in self._original_components_to_new:
Angus Lothian
committed
# Set destination to the already connected operations port.
cast(
Operation,
self._original_components_to_new[
original_connected_op
],
).input(original_destination.index)
Angus Lothian
committed
else:
# Create new operation, set destination to it.
new_connected_op = cast(
Operation,
(
self._add_component_unconnected_copy(
original_connected_op
)
),
Angus Lothian
committed
self._components_dfs_order.append(new_connected_op)
self._operations_dfs_order.append(new_connected_op)
# Add connected operation to the queue of operations
# to visit.
Angus Lothian
committed
op_stack.append(original_connected_op)
def _evaluate_source(
self,
src: OutputPort,
results: MutableResultMap,
delays: MutableDelayMap,
prefix: str,
bits_override: Optional[int],
Angus Lothian
committed
key_base = (
(prefix + "." + src.operation.graph_id)
if prefix
else src.operation.graph_id
)
Angus Lothian
committed
key = src.operation.key(src.index, key_base)
if key in results:
value = results[key]
if value is None:
raise RuntimeError(
"Direct feedback loop detected when evaluating operation."
)
Angus Lothian
committed
return value
value = src.operation.current_output(src.index, delays, key_base)
results[key] = value
if value is None:
value = self._do_evaluate_source(
key_base,
key,
src,
results,
delays,
prefix,
bits_override,
Angus Lothian
committed
else:
# Evaluate later. Use current value for now.
deferred_delays.append((key_base, key, src))
return value
def _do_evaluate_source(
self,
key_base: str,
key: ResultKey,
src: OutputPort,
results: MutableResultMap,
delays: MutableDelayMap,
prefix: str,
bits_override: Optional[int],
input_values = [
self._evaluate_source(
input_port.signals[0].source,
results,
delays,
prefix,
bits_override,
deferred_delays,
)
for input_port in src.operation.inputs
]
Angus Lothian
committed
value = src.operation.evaluate_output(
src.index,
input_values,
results,
delays,
key_base,
bits_override,
Angus Lothian
committed
results[key] = value
return value
def sfg_digraph(
self,
port_numbering: bool = True,
splines: str = "spline",
) -> Digraph:
Can be directly displayed in IPython.
If True, the graph_id:s of signals are shown.
Graphviz layout engine to be used, see https://graphviz.org/documentation/.
Most common are "dot" and "neato". Default is None leading to dot.
Add a branch node in case the fan-out of a signal is two or more.
port_numbering : bool, default: True
Show the port number in case the number of ports (input or output) is two or
more.
splines : {"spline", "line", "ortho", "polyline", "curved"}, default: "spline"
Spline style, see https://graphviz.org/docs/attrs/splines/ for more info.
Returns
-------
Digraph
Digraph of the SFG.
"""
dg = Digraph()
dg.attr(rankdir="LR", splines=splines)
branch_nodes = set()
if engine is not None:
dg.engine = engine
for op in self._components_by_id.values():
if isinstance(op, Signal):
source = cast(OutputPort, op.source)
destination = cast(InputPort, op.destination)
source_name = (
source.name
if branch_node and source.signal_count > 1
else source.operation.graph_id
)
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
taillabel = (
str(source.index)
if source.operation.output_count > 1
and (not branch_node or source.signal_count == 1)
and port_numbering
else None
)
headlabel = (
str(destination.index)
if destination.operation.input_count > 1 and port_numbering
else None
)
dg.edge(
source_name,
destination.operation.graph_id,
label=label,
taillabel=taillabel,
headlabel=headlabel,
)
if (
branch_node
and source.signal_count > 1
and source_name not in branch_nodes
):
branch_nodes.add(source_name)
dg.node(source_name, shape='point')
taillabel = (
str(source.index)
if source.operation.output_count > 1 and port_numbering
else None
source_name,
arrowhead='none',
taillabel=taillabel,
dg.node(
op.graph_id,
shape=_OPERATION_SHAPE[op.type_name()],
label=f"{op.name}\n({op.graph_id})" if op.name else None,
)
def _repr_mimebundle_(self, include=None, exclude=None):
return self.sfg_digraph()._repr_mimebundle_(include=include, exclude=exclude)
return self.sfg_digraph()._repr_mimebundle_(include=["image/jpeg"])[
"image/jpeg"
]
def _repr_png_(self):
return self.sfg_digraph()._repr_mimebundle_(include=["image/png"])["image/png"]
def _repr_svg_(self):
return self.sfg_digraph()._repr_mimebundle_(include=["image/svg+xml"])[
"image/svg+xml"
]
# SVG is valid HTML. This is useful for e.g. sphinx-gallery
_repr_html_ = _repr_svg_
def show(
self,
fmt: Optional[str] = None,
port_numbering: bool = True,
splines: str = "spline",
) -> None:
Display a visual representation of the SFG using the default system viewer.
File format of the generated graph. Output formats can be found at
https://www.graphviz.org/doc/info/output.html
Most common are "pdf", "eps", "png", and "svg". Default is None which
leads to PDF.
If True, the graph_id:s of signals are shown.
Graphviz layout engine to be used, see https://graphviz.org/documentation/.
Most common are "dot" and "neato". Default is None leading to dot.
Add a branch node in case the fan-out of a signal is two or more.
port_numbering : bool, default: True
Show the port number in case the number of ports (input or output) is two or
more.
splines : {"spline", "line", "ortho", "polyline", "curved"}, default: "spline"
Spline style, see https://graphviz.org/docs/attrs/splines/ for more info.
engine=engine,
branch_node=branch_node,
port_numbering=port_numbering,
splines=splines,
)
if fmt is not None:
dg.format = fmt
def critical_path_time(self) -> int:
"""Return the time of the critical path."""
# Import here needed to avoid circular imports
from b_asic.schedule import Schedule
return Schedule(self, algorithm="ASAP").schedule_time
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
"""
Find loop(s) in graph
Parameters
----------
graph : dictionary
The dictionary that are to be searched for loops.
start : key in dictionary graph
The "node" in the dictionary that are set as the start point.
end : key in dictionary graph
The "node" in the dictionary that are set as the end point.
"""
fringe = [(start, [])]
while fringe:
state, path = fringe.pop()
if path and state == end:
yield path
continue
for next_state in graph[state]:
if next_state in path:
continue
fringe.append((next_state, path + [next_state]))
def iteration_period_bound(self) -> int:
"""
Return the iteration period bound of the SFG.
If -1, the SFG does not have any loops and therefore no iteration period bound.
Returns
-------
The iteration period bound.
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
inputs_used = []
for used_input in self._used_ids:
if 'in' in str(used_input):
used_input = used_input.replace("in", "")
inputs_used.append(int(used_input))
if inputs_used == []:
raise ValueError("No inputs to sfg")
for input in inputs_used:
input_op = self._input_operations[input]
queue: Deque[Operation] = deque([input_op])
visited: Set[Operation] = {input_op}
dict_of_sfg = {}
while queue:
op = queue.popleft()
for output_port in op.outputs:
if not (isinstance(op, Input) or isinstance(op, Output)):
dict_of_sfg[op.graph_id] = []
for signal in output_port.signals:
if signal.destination is not None:
new_op = signal.destination.operation
if not (isinstance(op, Input) or isinstance(op, Output)):
if not isinstance(new_op, Output):
dict_of_sfg[op.graph_id] += [new_op.graph_id]
if new_op not in visited:
queue.append(new_op)
visited.add(new_op)
else:
raise ValueError("Destination does not exist")
raise ValueError(
"the SFG does not have any loops and therefore no iteration period bound."
)
cycles = [
[node] + path
for node in dict_of_sfg
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
return -1
op_and_latency = {}
for op in self.operations:
for lista in cycles:
for element in lista:
if op.type_name() not in op_and_latency:
op_and_latency[op.type_name()] = op.latency
t_l_values = []
for loop in cycles:
loop.pop()
time_of_loop = 0
number_of_t_in_loop = 0
for element in loop:
if ''.join([i for i in element if not i.isdigit()]) == 't':
number_of_t_in_loop += 1
for key, item in op_and_latency.items():
if key in element:
time_of_loop += item
if number_of_t_in_loop in (0, 1):
t_l_values.append(time_of_loop)
else:
t_l_values.append(time_of_loop / number_of_t_in_loop)
return max(t_l_values)
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
def state_space_representation(self):
"""
Find the state-space representation of the SFG.
Returns
-------
The state-space representation.
"""
delay_element_used = []
for delay_element in self._used_ids:
if ''.join([i for i in delay_element if not i.isdigit()]) == 't':
delay_element_used.append(delay_element)
delay_element_used.sort()
input_index_used = []
inputs_used = []
output_index_used = []
outputs_used = []
for used_input in self._used_ids:
if 'in' in str(used_input):
inputs_used.append(used_input)
input_index_used.append(int(used_input.replace("in", "")))
for used_output in self._used_ids:
if 'out' in str(used_output):
outputs_used.append(used_output)
output_index_used.append(int(used_output.replace("out", "")))
if input_index_used == []:
raise ValueError("No input(s) to sfg")
if output_index_used == []:
raise ValueError("No output(s) to sfg")
for input in input_index_used:
input_op = self._input_operations[input]
queue: Deque[Operation] = deque([input_op])
visited: Set[Operation] = {input_op}
dict_of_sfg = {}
while queue:
op = queue.popleft()
if isinstance(op, Output):
dict_of_sfg[op.graph_id] = []
for output_port in op.outputs:
dict_of_sfg[op.graph_id] = []
for signal in output_port.signals:
if signal.destination is not None:
new_op = signal.destination.operation
dict_of_sfg[op.graph_id] += [new_op.graph_id]
if new_op not in visited:
queue.append(new_op)
visited.add(new_op)
else:
raise ValueError("Destination does not exist")
cycles = [
[node] + path
for node in dict_of_sfg
if node[0] == 't'
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
1973
1974
1975
1976
1977
1978
1979
1980
1981
1982
1983
1984
1985
1986
1987
1988
1989
1990
1991
1992
]
delay_loop_list = []
for lista in cycles:
if not len(lista) < 2:
temp_list = []
for element in lista:
temp_list.append(element)
if element[0] == 't' and len(temp_list) > 2:
delay_loop_list.append(temp_list)
temp_list = [element]
state_space_lista = []
[
state_space_lista.append(x)
for x in delay_loop_list
if x not in state_space_lista
]
mat_row = len(delay_element_used) + len(output_index_used)
mat_col = len(delay_element_used) + len(input_index_used)
mat_content = np.zeros((mat_row, mat_col))
matrix_in = [0] * mat_col
matrix_answer = [0] * mat_row
for in_signal in inputs_used:
matrix_in[len(delay_element_used) + int(in_signal.replace('in', ''))] = (
in_signal.replace('in', 'x')
)
for delay_element in delay_element_used:
matrix_answer[delay_element_used.index(delay_element)] = (
delay_element.replace('t', 'v')
)
matrix_in[delay_element_used.index(delay_element)] = (
delay_element.replace('t', 'v')
)
paths = self.find_all_paths(dict_of_sfg, in_signal, delay_element)
for lista in paths:
temp_list = []
for element in lista:
temp_list.append(element)
if element[0] == 't':
state_space_lista.append(temp_list)
temp_list = [element]
for out_signal in outputs_used:
paths = self.find_all_paths(dict_of_sfg, in_signal, out_signal)
matrix_answer[
len(delay_element_used) + int(out_signal.replace('out', ''))
] = out_signal.replace('out', 'y')
for lista in paths:
temp_list1 = []
for element in lista:
temp_list1.append(element)
if element[0] == 't':
state_space_lista.append(temp_list1)
temp_list1 = [element]
if "out" in element:
state_space_lista.append(temp_list1)
temp_list1 = []
state_space_list_no_dup = []
[
state_space_list_no_dup.append(x)
for x in state_space_lista
if x not in state_space_list_no_dup
]
for lista in state_space_list_no_dup:
if "in" in lista[0] and lista[-1][0] == 't':
row = int(lista[-1].replace("t", ""))
column = len(delay_element_used) + int(lista[0].replace("in", ""))
temp_value = 1
for element in lista:
if "cmul" in element:
temp_value *= self.find_by_id(element).value
mat_content[row, column] += temp_value
elif "in" in lista[0] and "out" in lista[-1]:
row = len(delay_element_used) + int(lista[-1].replace("out", ""))
column = len(delay_element_used) + int(lista[0].replace("in", ""))
temp_value = 1
for element in lista:
if "cmul" in element:
temp_value *= self.find_by_id(element).value
mat_content[row, column] += temp_value
elif lista[0][0] == 't' and lista[-1][0] == 't':
row = int(lista[-1].replace("t", ""))
column = int(lista[0].replace("t", ""))
temp_value = 1
for element in lista:
if "cmul" in element:
temp_value *= self.find_by_id(element).value
mat_content[row, column] += temp_value
elif lista[0][0] == 't' and "out" in lista[-1]:
row = len(delay_element_used) + int(lista[-1].replace("out", ""))
column = int(lista[0].replace("t", ""))
temp_value = 1
for element in lista:
if "cmul" in element:
temp_value *= self.find_by_id(element).value
mat_content[row, column] += temp_value
return matrix_answer, mat_content, matrix_in
def find_all_paths(self, graph: dict, start: str, end: str, path=[]) -> list:
"""
Returns all paths in graph from node start to node end
Parameters
----------
graph : dictionary
The dictionary that are to be searched for loops.
start : key in dictionary graph
The "node" in the dictionary that are set as the start point.
end : key in dictionary graph
The "node" in the dictionary that are set as the end point.
Returns
-------
The state-space representation of the SFG.
"""
path = path + [start]
if start == end:
return [path]
if start not in graph:
return []
paths = []
for node in graph[start]:
if node not in path:
newpaths = self.find_all_paths(graph, node, end, path)
for newpath in newpaths:
paths.append(newpath)
return paths
def edit(self) -> Dict[str, "SFG"]:
"""Edit SFG in GUI."""
from b_asic.GUI.main_window import start_editor
def unfold(self, factor: int) -> "SFG":