Newer
Older
shape='rectangle',
height="0.1",
width="0.1",
# Creates edges for each output port and creates nodes for each operation
# and edges for them as well
Angus Lothian
committed
for port in ports:
source_label = port.operation.graph_id
Angus Lothian
committed
for signal in port.signals:
destination_label = destination.operation.graph_id
destination_node = (
destination_label + "In"
if isinstance(destination.operation, Delay)
else destination_label
)
pg.edge(node_node, destination_node)
destination_node,
label=destination_label,
shape=_OPERATION_SHAPE[destination.operation.type_name()],
source_node = (
source_label + "Out"
if port.operation.type_name() == Delay.type_name()
else source_label
)
pg.edge(source_node, node_node)
pg.node(
source_node,
label=source_label,
shape=_OPERATION_SHAPE[port.operation.type_name()],
)
Angus Lothian
committed
Angus Lothian
committed
def print_precedence_graph(self) -> None:
Print a representation of the SFG precedence list to the standard out.
If the precedence list already has been calculated then it uses the
cached version, otherwise it calculates the precedence list and then
prints it.
"""
Angus Lothian
committed
precedence_list = self.get_precedence_list()
line = "-" * 120
out_str = StringIO()
out_str.write(line)
printed_ops = set()
for iter_num, iterable in enumerate(precedence_list, start=1):
for outport_num, outport in enumerate(iterable, start=1):
Angus Lothian
committed
if outport not in printed_ops:
# Only print once per operation, even if it has multiple outports
out_str.write("\n")
out_str.write(str(iter_num))
out_str.write(".")
out_str.write(str(outport_num))
out_str.write(" \t")
out_str.write(str(outport.operation))
printed_ops.add(outport)
out_str.write("\n")
out_str.write(line)
print(out_str.getvalue())
def get_operations_topological_order(self) -> Iterable[Operation]:
"""
Return an Iterable of the Operations in the SFG in topological order.
Feedback loops makes an absolutely correct topological order impossible,
so an approximate topological Order is returned in such cases in this
implementation.
Angus Lothian
committed
if self._operations_topological_order:
return self._operations_topological_order
no_inputs_queue = deque(
list(filter(lambda op: op.input_count == 0, self.operations))
)
remaining_inports_per_operation = {op: op.input_count for op in self.operations}
Angus Lothian
committed
# Maps number of input counts to a queue of seen objects with such a size.
seen_with_inputs_dict: dict[int, Deque] = defaultdict(deque)
Angus Lothian
committed
seen = set()
top_order = []
if len(no_inputs_queue) == 0:
raise ValueError("Illegal SFG state, dangling signals in SFG.")
Angus Lothian
committed
first_op = no_inputs_queue.popleft()
Angus Lothian
committed
p_queue = PriorityQueue()
p_queue_entry_num = itertools.count()
Angus Lothian
committed
# Negative priority as max-heap popping is wanted
p_queue.put((-first_op.output_count, -next(p_queue_entry_num), first_op))
Angus Lothian
committed
operations_left = len(self.operations) - 1
seen_but_not_visited_count = 0
while operations_left > 0:
while not p_queue.empty():
op = p_queue.get()[2]
operations_left -= 1
top_order.append(op)
visited.add(op)
for neighbor_op in op.subsequent_operations:
if neighbor_op not in visited:
remaining_inports_per_operation[neighbor_op] -= 1
remaining_inports = remaining_inports_per_operation[neighbor_op]
Angus Lothian
committed
if remaining_inports == 0:
p_queue.put(
(
-neighbor_op.output_count,
-next(p_queue_entry_num),
neighbor_op,
)
)
Angus Lothian
committed
elif remaining_inports > 0:
if neighbor_op in seen:
seen_with_inputs_dict[remaining_inports + 1].remove(
neighbor_op
)
Angus Lothian
committed
else:
seen.add(neighbor_op)
seen_but_not_visited_count += 1
seen_with_inputs_dict[remaining_inports].append(neighbor_op)
Angus Lothian
committed
# Check if have to fetch Operations from somewhere else since p_queue
# is empty
Angus Lothian
committed
if operations_left > 0:
# First check if can fetch from Operations with no input ports
if no_inputs_queue:
new_op = no_inputs_queue.popleft()
p_queue.put(
(
-new_op.output_count,
-next(p_queue_entry_num),
new_op,
)
)
Angus Lothian
committed
# Else fetch operation with the lowest input count that is not zero
Angus Lothian
committed
elif seen_but_not_visited_count > 0:
for i in itertools.count(start=1):
Angus Lothian
committed
seen_inputs_queue = seen_with_inputs_dict[i]
if seen_inputs_queue:
new_op = seen_inputs_queue.popleft()
p_queue.put(
(
-new_op.output_count,
-next(p_queue_entry_num),
new_op,
)
)
Angus Lothian
committed
seen_but_not_visited_count -= 1
break
else:
raise RuntimeError("Disallowed structure in SFG detected")
Angus Lothian
committed
self._operations_topological_order = top_order
return self._operations_topological_order
def set_latency_of_type(self, type_name: TypeName, latency: int) -> None:
"""
Set the latency of all components with the given type name.
Parameters
----------
type_name : TypeName
The type name of the operation. For example, obtained as
``Addition.type_name()``.
latency : int
The latency of the operation.
"""
Angus Lothian
committed
for op in self.find_by_type_name(type_name):
Angus Lothian
committed
def set_execution_time_of_type(
self, type_name: TypeName, execution_time: int
) -> None:
"""
Set the execution time of all operations with the given type name.
Parameters
----------
type_name : TypeName
The type name of the operation. For example, obtained as
``Addition.type_name()``.
execution_time : int
The execution time of the operation.
for op in self.find_by_type_name(type_name):
cast(Operation, op).execution_time = execution_time
self, type_name: TypeName, latency_offsets: dict[str, int]
"""
Set the latency offsets of all operations with the given type name.
Parameters
----------
type_name : TypeName
The type name of the operation. For example, obtained as
``Addition.type_name()``.
latency_offsets : {"in1": int, ...}
The latency offsets of the inputs and outputs.
Angus Lothian
committed
for op in self.find_by_type_name(type_name):
cast(Operation, op).set_latency_offsets(latency_offsets)
Angus Lothian
committed
self, first_iter_ports: list[OutputPort]
) -> list[list[OutputPort]]:
Angus Lothian
committed
# Find dependencies of output ports and input ports.
remaining_inports_per_operation = {op: op.input_count for op in self.operations}
Angus Lothian
committed
# Traverse output ports for precedence
curr_iter_ports = first_iter_ports
precedence_list = []
while curr_iter_ports:
# Add the found ports to the current iter
precedence_list.append(curr_iter_ports)
next_iter_ports = []
for outport in curr_iter_ports:
for signal in outport.signals:
if new_input_port is not None and not isinstance(
new_input_port.operation, Delay
Angus Lothian
committed
remaining_inports_per_operation[new_op] -= 1
if remaining_inports_per_operation[new_op] == 0:
next_iter_ports.extend(new_op.outputs)
curr_iter_ports = next_iter_ports
return precedence_list
def _add_component_unconnected_copy(
self, original_component: GraphComponent
) -> GraphComponent:
if original_component in self._original_components_to_new:
id = (
original_component.name
if original_component.name
else (
original_component.graph_id
if original_component.graph_id
else original_component.type_name()
)
)
raise ValueError(f"Tried to add duplicate SFG component: {id}")
new_component = original_component.copy()
Angus Lothian
committed
self._original_components_to_new[original_component] = new_component
if not new_component.graph_id or new_component.graph_id in self._used_ids:
new_id = self._graph_id_generator.next_id(
new_component.type_name(), self._used_ids
)
new_component.graph_id = new_id
self._used_ids.add(new_component.graph_id)
self._components_by_id[new_component.graph_id] = new_component
Angus Lothian
committed
self._components_by_name[new_component.name].append(new_component)
return new_component
def _add_operation_connected_tree_copy(self, start_op: Operation) -> None:
op_stack = deque([start_op])
while op_stack:
original_op = op_stack.pop()
# Add or get the new copy of the operation.
if original_op not in self._original_components_to_new:
new_op = cast(
Operation,
self._add_component_unconnected_copy(original_op),
)
Angus Lothian
committed
self._components_dfs_order.append(new_op)
self._operations_dfs_order.append(new_op)
else:
new_op = cast(Operation, self._original_components_to_new[original_op])
Angus Lothian
committed
# Connect input ports to new signals.
for original_input_port in original_op.inputs:
if original_input_port.signal_count < 1:
id = (
original_op.name
if original_op.name
else (
original_op.graph_id
if original_op.graph_id
else original_op.type_name()
)
)
raise ValueError(f"Unconnected input port in SFG. Operation: {id}")
Angus Lothian
committed
for original_signal in original_input_port.signals:
# Check if the signal is one of the SFG's input signals.
if original_signal in self._original_input_signals_to_indices:
Angus Lothian
committed
# New signal already created during first step of constructor.
new_signal = cast(
Signal,
self._original_components_to_new[original_signal],
)
Angus Lothian
committed
new_signal.set_destination(
Angus Lothian
committed
Angus Lothian
committed
self._components_dfs_order.extend(
if source.operation not in self._operations_dfs_order:
self._operations_dfs_order.append(source.operation)
Angus Lothian
committed
# Check if the signal has not been added before.
elif original_signal not in self._original_components_to_new:
Angus Lothian
committed
if original_signal.source is None:
dest = (
original_signal.destination.operation.name
if original_signal.destination is not None
else "None"
)
Angus Lothian
committed
raise ValueError(
Angus Lothian
committed
new_signal = cast(
Signal,
self._add_component_unconnected_copy(original_signal),
Angus Lothian
committed
new_signal.set_destination(
Angus Lothian
committed
self._components_dfs_order.append(new_signal)
original_connected_op = original_signal.source.operation
Angus Lothian
committed
# Check if connected Operation has been added before.
if original_connected_op in self._original_components_to_new:
component = cast(
Operation,
self._original_components_to_new[original_connected_op],
Angus Lothian
committed
# Set source to the already added operations port.
component.output(original_signal.source.index)
Angus Lothian
committed
else:
# Create new operation, set signal source to it.
new_connected_op = cast(
Operation,
self._add_component_unconnected_copy(
original_connected_op
Angus Lothian
committed
self._components_dfs_order.append(new_connected_op)
self._operations_dfs_order.append(new_connected_op)
# Add connected operation to queue of operations to visit.
op_stack.append(original_connected_op)
# Connect output ports.
for original_output_port in original_op.outputs:
for original_signal in original_output_port.signals:
# Check if the signal is one of the SFG's output signals.
if original_signal in self._original_output_signals_to_indices:
Angus Lothian
committed
# New signal already created during first step of constructor.
new_signal = cast(
Signal,
self._original_components_to_new[original_signal],
)
new_signal.set_source(new_op.output(original_output_port.index))
Angus Lothian
committed
destination = cast(InputPort, new_signal.destination)
Angus Lothian
committed
self._components_dfs_order.extend(
self._operations_dfs_order.append(destination.operation)
Angus Lothian
committed
# Check if signal has not been added before.
elif original_signal not in self._original_components_to_new:
Angus Lothian
committed
if original_signal.source is None:
raise ValueError(
"Dangling signal ({original_signal}) without"
" source in SFG"
Angus Lothian
committed
self._add_component_unconnected_copy(original_signal),
new_signal.set_source(new_op.output(original_output_port.index))
Angus Lothian
committed
self._components_dfs_order.append(new_signal)
original_destination = cast(
InputPort, original_signal.destination
if original_destination is None:
raise ValueError(
f"Signal ({original_signal}) without destination in SFG"
if original_connected_op is None:
raise ValueError(
"Signal with empty destination port"
f" ({original_destination}) in SFG"
Angus Lothian
committed
# Check if connected operation has been added.
if original_connected_op in self._original_components_to_new:
Angus Lothian
committed
# Set destination to the already connected operations port.
cast(
Operation,
self._original_components_to_new[
original_connected_op
],
).input(original_destination.index)
Angus Lothian
committed
else:
# Create new operation, set destination to it.
new_connected_op = cast(
Operation,
(
self._add_component_unconnected_copy(
original_connected_op
)
),
Angus Lothian
committed
self._components_dfs_order.append(new_connected_op)
self._operations_dfs_order.append(new_connected_op)
# Add connected operation to the queue of operations
# to visit.
Angus Lothian
committed
op_stack.append(original_connected_op)
def _evaluate_source(
self,
src: OutputPort,
results: MutableResultMap,
delays: MutableDelayMap,
prefix: str,
Angus Lothian
committed
key_base = (
(prefix + "." + src.operation.graph_id)
if prefix
else src.operation.graph_id
)
Angus Lothian
committed
key = src.operation.key(src.index, key_base)
if key in results:
value = results[key]
if value is None:
raise RuntimeError(
"Direct feedback loop detected when evaluating operation."
)
Angus Lothian
committed
return value
value = src.operation.current_output(src.index, delays, key_base)
results[key] = value
if value is None:
value = self._do_evaluate_source(
key_base,
key,
src,
results,
delays,
prefix,
bits_override,
Angus Lothian
committed
else:
# Evaluate later. Use current value for now.
deferred_delays.append((key_base, key, src))
return value
def _do_evaluate_source(
self,
key_base: str,
key: ResultKey,
src: OutputPort,
results: MutableResultMap,
delays: MutableDelayMap,
prefix: str,
input_values = [
self._evaluate_source(
input_port.signals[0].source,
results,
delays,
prefix,
bits_override,
deferred_delays,
)
for input_port in src.operation.inputs
]
Angus Lothian
committed
value = src.operation.evaluate_output(
src.index,
input_values,
results,
delays,
key_base,
bits_override,
Angus Lothian
committed
results[key] = value
return value
def sfg_digraph(
self,
port_numbering: bool = True,
splines: str = "spline",
) -> Digraph:
Can be directly displayed in IPython.
If True, the graph_id:s of signals are shown.
Graphviz layout engine to be used, see https://graphviz.org/documentation/.
Most common are "dot" and "neato". Default is None leading to dot.
Add a branch node in case the fan-out of a signal is two or more.
port_numbering : bool, default: True
Show the port number in case the number of ports (input or output) is two or
more.
splines : {"spline", "line", "ortho", "polyline", "curved"}, default: "spline"
Spline style, see https://graphviz.org/docs/attrs/splines/ for more info.
Returns
-------
Digraph
Digraph of the SFG.
"""
dg = Digraph()
dg.attr(rankdir="LR", splines=splines)
branch_nodes = set()
if engine is not None:
dg.engine = engine
for op in self._components_by_id.values():
if isinstance(op, Signal):
source = cast(OutputPort, op.source)
destination = cast(InputPort, op.destination)
source_name = (
source.name
if branch_node and source.signal_count > 1
else source.operation.graph_id
)
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
taillabel = (
str(source.index)
if source.operation.output_count > 1
and (not branch_node or source.signal_count == 1)
and port_numbering
else None
)
headlabel = (
str(destination.index)
if destination.operation.input_count > 1 and port_numbering
else None
)
dg.edge(
source_name,
destination.operation.graph_id,
label=label,
taillabel=taillabel,
headlabel=headlabel,
)
if (
branch_node
and source.signal_count > 1
and source_name not in branch_nodes
):
branch_nodes.add(source_name)
dg.node(source_name, shape='point')
taillabel = (
str(source.index)
if source.operation.output_count > 1 and port_numbering
else None
source_name,
arrowhead='none',
taillabel=taillabel,
dg.node(
op.graph_id,
shape=_OPERATION_SHAPE[op.type_name()],
label=f"{op.name}\n({op.graph_id})" if op.name else None,
)
def _repr_mimebundle_(self, include=None, exclude=None):
return self.sfg_digraph()._repr_mimebundle_(include=include, exclude=exclude)
return self.sfg_digraph()._repr_mimebundle_(include=["image/jpeg"])[
"image/jpeg"
]
def _repr_png_(self):
return self.sfg_digraph()._repr_mimebundle_(include=["image/png"])["image/png"]
def _repr_svg_(self):
return self.sfg_digraph()._repr_mimebundle_(include=["image/svg+xml"])[
"image/svg+xml"
]
# SVG is valid HTML. This is useful for e.g. sphinx-gallery
_repr_html_ = _repr_svg_
port_numbering: bool = True,
splines: str = "spline",
) -> None:
Display a visual representation of the SFG using the default system viewer.
File format of the generated graph. Output formats can be found at
https://www.graphviz.org/doc/info/output.html
Most common are "pdf", "eps", "png", and "svg". Default is None which
leads to PDF.
If True, the graph_id:s of signals are shown.
Graphviz layout engine to be used, see https://graphviz.org/documentation/.
Most common are "dot" and "neato". Default is None leading to dot.
Add a branch node in case the fan-out of a signal is two or more.
port_numbering : bool, default: True
Show the port number in case the number of ports (input or output) is two or
more.
splines : {"spline", "line", "ortho", "polyline", "curved"}, default: "spline"
Spline style, see https://graphviz.org/docs/attrs/splines/ for more info.
engine=engine,
branch_node=branch_node,
port_numbering=port_numbering,
splines=splines,
)
if fmt is not None:
dg.format = fmt
def critical_path_time(self) -> int:
"""Return the time of the critical path."""
# Import here needed to avoid circular imports
from b_asic.schedule import Schedule
Simon Bjurek
committed
from b_asic.scheduler import ASAPScheduler
return Schedule(self, ASAPScheduler()).schedule_time
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
"""
Find loop(s) in graph
Parameters
----------
graph : dictionary
The dictionary that are to be searched for loops.
start : key in dictionary graph
The "node" in the dictionary that are set as the start point.
end : key in dictionary graph
The "node" in the dictionary that are set as the end point.
"""
fringe = [(start, [])]
while fringe:
state, path = fringe.pop()
if path and state == end:
yield path
continue
for next_state in graph[state]:
if next_state in path:
continue
fringe.append((next_state, path + [next_state]))
Simon Bjurek
committed
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
def resource_lower_bound(self, type_name: str, schedule_time: int) -> int:
"""Return the lowest amount of resources of the given type needed to reach the scheduling time.
Parameters
----------
type_name : str
Type name of the given resource.
schedule_time : int
Scheduling time to evaluate for.
"""
ops = self.find_by_type_name(type_name)
if not ops:
return 0
if schedule_time <= 0:
raise ValueError(
f"Schedule time must be positive, current schedule time is: {schedule_time}."
)
exec_times = [op.execution_time for op in ops]
if any(time is None for time in exec_times):
raise ValueError(
f"Execution times not set for all operations of type {type_name}."
)
total_exec_time = sum([op.execution_time for op in ops])
return ceil(total_exec_time / schedule_time)
def iteration_period_bound(self) -> Fraction:
"""
Return the iteration period bound of the SFG.
If -1, the SFG does not have any loops and therefore no iteration period bound.
Returns
-------
The iteration period bound.
Simon Bjurek
committed
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
loops = self.loops
if not loops:
return -1
op_and_latency = {}
for op in self.operations:
for loop in loops:
for element in loop:
if op.type_name() not in op_and_latency:
op_and_latency[op.type_name()] = op.latency
t_l_values = []
for loop in loops:
loop.pop()
time_of_loop = 0
number_of_t_in_loop = 0
for element in loop:
if ''.join([i for i in element if not i.isdigit()]) == 't':
number_of_t_in_loop += 1
for key, item in op_and_latency.items():
if key in element:
time_of_loop += item
if number_of_t_in_loop in (0, 1):
t_l_values.append(Fraction(time_of_loop, 1))
else:
t_l_values.append(Fraction(time_of_loop, number_of_t_in_loop))
return max(t_l_values)
@property
def loops(self) -> list[list[GraphID]]:
"""
Return the recursive loops found in the SFG.
If -1, the SFG does not have any loops.
Returns
-------
A list of the recursive loops.
"""
inputs_used = []
for used_input in self._used_ids:
if 'in' in str(used_input):
used_input = used_input.replace("in", "")
inputs_used.append(int(used_input))
if inputs_used == []:
Simon Bjurek
committed
return []
for input in inputs_used:
input_op = self._input_operations[input]
queue: Deque[Operation] = deque([input_op])
dict_of_sfg = {}
while queue:
op = queue.popleft()
for output_port in op.outputs:
if not (isinstance(op, Input) or isinstance(op, Output)):
dict_of_sfg[op.graph_id] = []
for signal in output_port.signals:
if signal.destination is not None:
new_op = signal.destination.operation
if not (isinstance(op, Input) or isinstance(op, Output)):
if not isinstance(new_op, Output):
dict_of_sfg[op.graph_id] += [new_op.graph_id]
if new_op not in visited:
queue.append(new_op)
visited.add(new_op)
else:
raise ValueError("Destination does not exist")
cycles = [
[node] + path
for node in dict_of_sfg
Simon Bjurek
committed
loops = self._get_non_redundant_cycles(cycles)
return loops
def _get_non_redundant_cycles(self, loops):
unique_lists = []
seen_cycles = set()
for loop in loops:
operation_set = frozenset(loop)
if operation_set not in seen_cycles:
unique_lists.append(loop)
seen_cycles.add(operation_set)
return unique_lists
def state_space_representation(self):
"""
Find the state-space representation of the SFG.
Returns
-------
The state-space representation.
"""
delay_element_used = []
for delay_element in self._used_ids:
if ''.join([i for i in delay_element if not i.isdigit()]) == 't':
delay_element_used.append(delay_element)
delay_element_used.sort()
input_index_used = []
inputs_used = []
output_index_used = []
outputs_used = []
for used_inout in self._used_ids:
if 'in' in str(used_inout):
inputs_used.append(used_inout)
input_index_used.append(int(used_inout.replace("in", "")))
elif 'out' in str(used_inout):
outputs_used.append(used_inout)
output_index_used.append(int(used_inout.replace("out", "")))
if input_index_used == []:
raise ValueError("No input(s) to sfg")
if output_index_used == []:
raise ValueError("No output(s) to sfg")
dict_of_sfg = {}
for output in output_index_used:
output_op = self._output_operations[output]
queue: Deque[Operation] = deque([output_op])
while queue:
op = queue.popleft()
for input_port in op.inputs:
for signal in input_port.signals:
if signal.source is not None:
new_op = signal.source.operation
dict_of_sfg[new_op.graph_id] = [op.graph_id]
if new_op not in visited:
queue.append(new_op)
visited.add(new_op)
else:
raise ValueError("Source does not exist")
for input in input_index_used:
input_op = self._input_operations[input]
while queue:
op = queue.popleft()
if isinstance(op, Output):
dict_of_sfg[op.graph_id] = []
for output_port in op.outputs:
dict_of_sfg[op.graph_id] = []
for signal in output_port.signals:
if signal.destination is not None:
new_op = signal.destination.operation
dict_of_sfg[op.graph_id] += [new_op.graph_id]
if new_op not in visited:
queue.append(new_op)
visited.add(new_op)
else:
raise ValueError("Destination does not exist")
addition_with_constant = {}
for key, item in dict_of_sfg.items():
if ''.join([i for i in key if not i.isdigit()]) == 'c':
addition_with_constant[item[0]] = self.find_by_id(key).value
cycles = [
[node] + path
for node in dict_of_sfg
if node[0] == 't'
]
delay_loop_list = []
for lista in cycles:
if not len(lista) < 2:
temp_list = []
for element in lista:
temp_list.append(element)
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
1973
1974
1975
1976
1977
1978
1979
1980
1981
1982
1983
1984
1985
1986
1987
1988
1989
1990
1991
1992
1993
1994
1995
1996
1997
1998
1999
delay_loop_list.append(temp_list)
temp_list = [element]
state_space_lista = []
[
state_space_lista.append(x)
for x in delay_loop_list
if x not in state_space_lista
]
mat_row = len(delay_element_used) + len(output_index_used)
mat_col = len(delay_element_used) + len(input_index_used)
mat_content = np.zeros((mat_row, mat_col))
matrix_in = [0] * mat_col
matrix_answer = [0] * mat_row
for in_signal in inputs_used:
matrix_in[len(delay_element_used) + int(in_signal.replace('in', ''))] = (
in_signal.replace('in', 'x')
)
for delay_element in delay_element_used:
matrix_answer[delay_element_used.index(delay_element)] = (
delay_element.replace('t', 'v')
)
matrix_in[delay_element_used.index(delay_element)] = (
delay_element.replace('t', 'v')
)
paths = self.find_all_paths(dict_of_sfg, in_signal, delay_element)
for lista in paths:
temp_list = []
for element in lista:
temp_list.append(element)
if element[0] == 't':
state_space_lista.append(temp_list)
temp_list = [element]
for out_signal in outputs_used:
paths = self.find_all_paths(dict_of_sfg, in_signal, out_signal)
matrix_answer[
len(delay_element_used) + int(out_signal.replace('out', ''))
] = out_signal.replace('out', 'y')
for lista in paths:
temp_list1 = []
for element in lista:
temp_list1.append(element)
if element[0] == 't':
state_space_lista.append(temp_list1)
temp_list1 = [element]
if "out" in element:
state_space_lista.append(temp_list1)
temp_list1 = []
state_space_list_no_dup = []
[
state_space_list_no_dup.append(x)
for x in state_space_lista
if x not in state_space_list_no_dup
]
for lista in state_space_list_no_dup:
if "in" in lista[0] and lista[-1][0] == 't':
row = int(lista[-1].replace("t", ""))
column = len(delay_element_used) + int(lista[0].replace("in", ""))
temp_value = 1
for element in lista:
if "cmul" in element:
temp_value *= self.find_by_id(element).value
for key, value in addition_with_constant.items():