Newer
Older
if isinstance(destination.operation, Delay)
else destination_label
)
pg.edge(node_node, destination_node)
destination_node,
label=destination_label,
shape=_OPERATION_SHAPE[destination.operation.type_name()],
source_node = (
source_label + "Out"
if port.operation.type_name() == Delay.type_name()
else source_label
)
pg.edge(source_node, node_node)
pg.node(
source_node,
label=source_label,
shape=_OPERATION_SHAPE[port.operation.type_name()],
)
Angus Lothian
committed
Angus Lothian
committed
def print_precedence_graph(self) -> None:
Print a representation of the SFG precedence list to the standard out.
If the precedence list already has been calculated then it uses the
cached version, otherwise it calculates the precedence list and then
prints it.
"""
Angus Lothian
committed
precedence_list = self.get_precedence_list()
line = "-" * 120
out_str = StringIO()
out_str.write(line)
printed_ops = set()
for iter_num, iterable in enumerate(precedence_list, start=1):
for outport_num, outport in enumerate(iterable, start=1):
Angus Lothian
committed
if outport not in printed_ops:
# Only print once per operation, even if it has multiple outports
out_str.write("\n")
out_str.write(str(iter_num))
out_str.write(".")
out_str.write(str(outport_num))
out_str.write(" \t")
out_str.write(str(outport.operation))
printed_ops.add(outport)
out_str.write("\n")
out_str.write(line)
print(out_str.getvalue())
def get_operations_topological_order(self) -> Iterable[Operation]:
"""
Return an Iterable of the Operations in the SFG in topological order.
Feedback loops makes an absolutely correct topological order impossible,
so an approximate topological Order is returned in such cases in this
implementation.
Angus Lothian
committed
if self._operations_topological_order:
return self._operations_topological_order
no_inputs_queue = deque(
list(filter(lambda op: op.input_count == 0, self.operations))
)
remaining_inports_per_operation = {op: op.input_count for op in self.operations}
Angus Lothian
committed
# Maps number of input counts to a queue of seen objects with such a size.
seen_with_inputs_dict: Dict[int, Deque] = defaultdict(deque)
Angus Lothian
committed
seen = set()
top_order = []
if len(no_inputs_queue) == 0:
raise ValueError("Illegal SFG state, dangling signals in SFG.")
Angus Lothian
committed
first_op = no_inputs_queue.popleft()
Angus Lothian
committed
p_queue = PriorityQueue()
p_queue_entry_num = itertools.count()
Angus Lothian
committed
# Negative priority as max-heap popping is wanted
p_queue.put((-first_op.output_count, -next(p_queue_entry_num), first_op))
Angus Lothian
committed
operations_left = len(self.operations) - 1
seen_but_not_visited_count = 0
while operations_left > 0:
while not p_queue.empty():
op = p_queue.get()[2]
operations_left -= 1
top_order.append(op)
visited.add(op)
for neighbor_op in op.subsequent_operations:
if neighbor_op not in visited:
remaining_inports_per_operation[neighbor_op] -= 1
remaining_inports = remaining_inports_per_operation[neighbor_op]
Angus Lothian
committed
if remaining_inports == 0:
p_queue.put(
(
-neighbor_op.output_count,
-next(p_queue_entry_num),
neighbor_op,
)
)
Angus Lothian
committed
elif remaining_inports > 0:
if neighbor_op in seen:
seen_with_inputs_dict[remaining_inports + 1].remove(
neighbor_op
)
Angus Lothian
committed
else:
seen.add(neighbor_op)
seen_but_not_visited_count += 1
seen_with_inputs_dict[remaining_inports].append(neighbor_op)
Angus Lothian
committed
# Check if have to fetch Operations from somewhere else since p_queue
# is empty
Angus Lothian
committed
if operations_left > 0:
# First check if can fetch from Operations with no input ports
if no_inputs_queue:
new_op = no_inputs_queue.popleft()
p_queue.put(
(
-new_op.output_count,
-next(p_queue_entry_num),
new_op,
)
)
Angus Lothian
committed
# Else fetch operation with the lowest input count that is not zero
Angus Lothian
committed
elif seen_but_not_visited_count > 0:
for i in itertools.count(start=1):
Angus Lothian
committed
seen_inputs_queue = seen_with_inputs_dict[i]
if seen_inputs_queue:
new_op = seen_inputs_queue.popleft()
p_queue.put(
(
-new_op.output_count,
-next(p_queue_entry_num),
new_op,
)
)
Angus Lothian
committed
seen_but_not_visited_count -= 1
break
else:
raise RuntimeError("Disallowed structure in SFG detected")
Angus Lothian
committed
self._operations_topological_order = top_order
return self._operations_topological_order
def set_latency_of_type(self, type_name: TypeName, latency: int) -> None:
"""
Set the latency of all components with the given type name.
Parameters
----------
type_name : TypeName
The type name of the operation. For example, obtained as
``Addition.type_name()``.
latency : int
The latency of the operation.
"""
Angus Lothian
committed
for op in self.find_by_type_name(type_name):
Angus Lothian
committed
def set_execution_time_of_type(
self, type_name: TypeName, execution_time: int
) -> None:
"""
Set the execution time of all operations with the given type name.
Parameters
----------
type_name : TypeName
The type name of the operation. For example, obtained as
``Addition.type_name()``.
execution_time : int
The execution time of the operation.
for op in self.find_by_type_name(type_name):
cast(Operation, op).execution_time = execution_time
def set_latency_offsets_of_type(
self, type_name: TypeName, latency_offsets: Dict[str, int]
) -> None:
"""
Set the latency offsets of all operations with the given type name.
Parameters
----------
type_name : TypeName
The type name of the operation. For example, obtained as
``Addition.type_name()``.
latency_offsets : {"in1": int, ...}
The latency offsets of the inputs and outputs.
Angus Lothian
committed
for op in self.find_by_type_name(type_name):
cast(Operation, op).set_latency_offsets(latency_offsets)
Angus Lothian
committed
def _traverse_for_precedence_list(
self, first_iter_ports: List[OutputPort]
) -> List[List[OutputPort]]:
Angus Lothian
committed
# Find dependencies of output ports and input ports.
remaining_inports_per_operation = {op: op.input_count for op in self.operations}
Angus Lothian
committed
# Traverse output ports for precedence
curr_iter_ports = first_iter_ports
precedence_list = []
while curr_iter_ports:
# Add the found ports to the current iter
precedence_list.append(curr_iter_ports)
next_iter_ports = []
for outport in curr_iter_ports:
for signal in outport.signals:
new_inport = signal.destination
if new_inport is not None and not isinstance(
new_inport.operation, Delay
):
Angus Lothian
committed
new_op = new_inport.operation
remaining_inports_per_operation[new_op] -= 1
if remaining_inports_per_operation[new_op] == 0:
next_iter_ports.extend(new_op.outputs)
curr_iter_ports = next_iter_ports
return precedence_list
def _add_component_unconnected_copy(
self, original_component: GraphComponent
) -> GraphComponent:
if original_component in self._original_components_to_new:
raise ValueError("Tried to add duplicate SFG component")
new_component = original_component.copy()
Angus Lothian
committed
self._original_components_to_new[original_component] = new_component
if not new_component.graph_id or new_component.graph_id in self._used_ids:
new_id = self._graph_id_generator.next_id(
new_component.type_name(), self._used_ids
)
new_component.graph_id = new_id
self._used_ids.add(new_component.graph_id)
self._components_by_id[new_component.graph_id] = new_component
Angus Lothian
committed
self._components_by_name[new_component.name].append(new_component)
return new_component
def _add_operation_connected_tree_copy(self, start_op: Operation) -> None:
op_stack = deque([start_op])
while op_stack:
original_op = op_stack.pop()
# Add or get the new copy of the operation.
if original_op not in self._original_components_to_new:
new_op = cast(
Operation,
self._add_component_unconnected_copy(original_op),
)
Angus Lothian
committed
self._components_dfs_order.append(new_op)
self._operations_dfs_order.append(new_op)
else:
new_op = cast(Operation, self._original_components_to_new[original_op])
Angus Lothian
committed
# Connect input ports to new signals.
for original_input_port in original_op.inputs:
if original_input_port.signal_count < 1:
raise ValueError("Unconnected input port in SFG")
for original_signal in original_input_port.signals:
# Check if the signal is one of the SFG's input signals.
if original_signal in self._original_input_signals_to_indices:
Angus Lothian
committed
# New signal already created during first step of constructor.
new_signal = cast(
Signal,
self._original_components_to_new[original_signal],
)
Angus Lothian
committed
new_signal.set_destination(
Angus Lothian
committed
Angus Lothian
committed
self._components_dfs_order.extend(
if source.operation not in self._operations_dfs_order:
self._operations_dfs_order.append(source.operation)
Angus Lothian
committed
# Check if the signal has not been added before.
elif original_signal not in self._original_components_to_new:
Angus Lothian
committed
if original_signal.source is None:
dest = (
original_signal.destination.operation.name
if original_signal.destination is not None
else "None"
)
Angus Lothian
committed
raise ValueError(
Angus Lothian
committed
new_signal = cast(
Signal,
self._add_component_unconnected_copy(original_signal),
Angus Lothian
committed
new_signal.set_destination(
Angus Lothian
committed
self._components_dfs_order.append(new_signal)
original_connected_op = original_signal.source.operation
Angus Lothian
committed
# Check if connected Operation has been added before.
if original_connected_op in self._original_components_to_new:
component = cast(
Operation,
self._original_components_to_new[original_connected_op],
Angus Lothian
committed
# Set source to the already added operations port.
component.output(original_signal.source.index)
Angus Lothian
committed
else:
# Create new operation, set signal source to it.
new_connected_op = cast(
Operation,
self._add_component_unconnected_copy(
original_connected_op
Angus Lothian
committed
self._components_dfs_order.append(new_connected_op)
self._operations_dfs_order.append(new_connected_op)
# Add connected operation to queue of operations to visit.
op_stack.append(original_connected_op)
# Connect output ports.
for original_output_port in original_op.outputs:
for original_signal in original_output_port.signals:
# Check if the signal is one of the SFG's output signals.
if original_signal in self._original_output_signals_to_indices:
Angus Lothian
committed
# New signal already created during first step of constructor.
new_signal = cast(
Signal,
self._original_components_to_new[original_signal],
)
new_signal.set_source(new_op.output(original_output_port.index))
Angus Lothian
committed
destination = cast(InputPort, new_signal.destination)
Angus Lothian
committed
self._components_dfs_order.extend(
self._operations_dfs_order.append(destination.operation)
Angus Lothian
committed
# Check if signal has not been added before.
elif original_signal not in self._original_components_to_new:
Angus Lothian
committed
if original_signal.source is None:
raise ValueError(
"Dangling signal ({original_signal}) without"
" source in SFG"
Angus Lothian
committed
self._add_component_unconnected_copy(original_signal),
new_signal.set_source(new_op.output(original_output_port.index))
Angus Lothian
committed
self._components_dfs_order.append(new_signal)
original_destination = cast(
InputPort, original_signal.destination
if original_destination is None:
raise ValueError(
f"Signal ({original_signal}) without destination in SFG"
if original_connected_op is None:
raise ValueError(
"Signal with empty destination port"
f" ({original_destination}) in SFG"
Angus Lothian
committed
# Check if connected operation has been added.
if original_connected_op in self._original_components_to_new:
Angus Lothian
committed
# Set destination to the already connected operations port.
cast(
Operation,
self._original_components_to_new[
original_connected_op
],
).input(original_destination.index)
Angus Lothian
committed
else:
# Create new operation, set destination to it.
new_connected_op = cast(
Operation,
(
self._add_component_unconnected_copy(
original_connected_op
)
),
Angus Lothian
committed
self._components_dfs_order.append(new_connected_op)
self._operations_dfs_order.append(new_connected_op)
# Add connected operation to the queue of operations
# to visit.
Angus Lothian
committed
op_stack.append(original_connected_op)
def _evaluate_source(
self,
src: OutputPort,
results: MutableResultMap,
delays: MutableDelayMap,
prefix: str,
bits_override: Optional[int],
Angus Lothian
committed
key_base = (
(prefix + "." + src.operation.graph_id)
if prefix
else src.operation.graph_id
)
Angus Lothian
committed
key = src.operation.key(src.index, key_base)
if key in results:
value = results[key]
if value is None:
raise RuntimeError(
"Direct feedback loop detected when evaluating operation."
)
Angus Lothian
committed
return value
value = src.operation.current_output(src.index, delays, key_base)
results[key] = value
if value is None:
value = self._do_evaluate_source(
key_base,
key,
src,
results,
delays,
prefix,
bits_override,
Angus Lothian
committed
else:
# Evaluate later. Use current value for now.
deferred_delays.append((key_base, key, src))
return value
def _do_evaluate_source(
self,
key_base: str,
key: ResultKey,
src: OutputPort,
results: MutableResultMap,
delays: MutableDelayMap,
prefix: str,
bits_override: Optional[int],
input_values = [
self._evaluate_source(
input_port.signals[0].source,
results,
delays,
prefix,
bits_override,
deferred_delays,
)
for input_port in src.operation.inputs
]
Angus Lothian
committed
value = src.operation.evaluate_output(
src.index,
input_values,
results,
delays,
key_base,
bits_override,
Angus Lothian
committed
results[key] = value
return value
def sfg_digraph(
self,
show_id: bool = False,
port_numbering: bool = True,
splines: str = "spline",
) -> Digraph:
Can be directly displayed in IPython.
show_id : bool, default: False
If True, the graph_id:s of signals are shown.
Graphviz layout engine to be used, see https://graphviz.org/documentation/.
Most common are "dot" and "neato". Default is None leading to dot.
Add a branch node in case the fan-out of a signal is two or more.
port_numbering : bool, default: True
Show the port number in case the number of ports (input or output) is two or
more.
splines : {"spline", "line", "ortho", "polyline", "curved"}, default: "spline"
Spline style, see https://graphviz.org/docs/attrs/splines/ for more info.
Returns
-------
Digraph
Digraph of the SFG.
"""
dg = Digraph()
dg.attr(rankdir="LR", splines=splines)
branch_nodes = set()
if engine is not None:
dg.engine = engine
for op in self._components_by_id.values():
if isinstance(op, Signal):
source = cast(OutputPort, op.source)
destination = cast(InputPort, op.destination)
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
source_name = (
source.name
if branch_node and source.signal_count > 1
else source.operation.graph_id
)
label = op.graph_id if show_id else None
taillabel = (
str(source.index)
if source.operation.output_count > 1
and (not branch_node or source.signal_count == 1)
and port_numbering
else None
)
headlabel = (
str(destination.index)
if destination.operation.input_count > 1 and port_numbering
else None
)
dg.edge(
source_name,
destination.operation.graph_id,
label=label,
taillabel=taillabel,
headlabel=headlabel,
)
if (
branch_node
and source.signal_count > 1
and source_name not in branch_nodes
):
branch_nodes.add(source_name)
dg.node(source_name, shape='point')
taillabel = (
str(source.index)
if source.operation.output_count > 1 and port_numbering
else None
source_name,
arrowhead='none',
taillabel=taillabel,
dg.node(op.graph_id, shape=_OPERATION_SHAPE[op.type_name()])
def _repr_mimebundle_(self, include=None, exclude=None):
return self.sfg_digraph()._repr_mimebundle_(include=include, exclude=exclude)
return self.sfg_digraph()._repr_mimebundle_(include=["image/jpeg"])[
"image/jpeg"
]
def _repr_png_(self):
return self.sfg_digraph()._repr_mimebundle_(include=["image/png"])["image/png"]
def _repr_svg_(self):
return self.sfg_digraph()._repr_mimebundle_(include=["image/svg+xml"])[
"image/svg+xml"
]
# SVG is valid HTML. This is useful for e.g. sphinx-gallery
_repr_html_ = _repr_svg_
def show(
self,
fmt: Optional[str] = None,
show_id: bool = False,
engine: Optional[str] = None,
port_numbering: bool = True,
splines: str = "spline",
) -> None:
Display a visual representation of the SFG using the default system viewer.
File format of the generated graph. Output formats can be found at
https://www.graphviz.org/doc/info/output.html
Most common are "pdf", "eps", "png", and "svg". Default is None which
leads to PDF.
show_id : bool, default: False
If True, the graph_id:s of signals are shown.
Graphviz layout engine to be used, see https://graphviz.org/documentation/.
Most common are "dot" and "neato". Default is None leading to dot.
Add a branch node in case the fan-out of a signal is two or more.
port_numbering : bool, default: True
Show the port number in case the number of ports (input or output) is two or
more.
splines : {"spline", "line", "ortho", "polyline", "curved"}, default: "spline"
Spline style, see https://graphviz.org/docs/attrs/splines/ for more info.
dg = self.sfg_digraph(
show_id=show_id,
engine=engine,
branch_node=branch_node,
port_numbering=port_numbering,
splines=splines,
)
if fmt is not None:
dg.format = fmt
def critical_path_time(self) -> int:
"""Return the time of the critical path."""
# Import here needed to avoid circular imports
from b_asic.schedule import Schedule
return Schedule(self, algorithm="ASAP").schedule_time
def iteration_period_bound(self) -> int:
"""
Return the iteration period bound of the SFG.
If -1, the SFG does not have any loops and therefore no iteration period bound.
"""
raise NotImplementedError()
def edit(self) -> Dict[str, "SFG"]:
"""Edit SFG in GUI."""
from b_asic.GUI.main_window import start_editor
def unfold(self, factor: int) -> "SFG":
Unfold the SFG *factor* times. Return a new SFG without modifying the original.
Inputs and outputs are ordered with early inputs first. That is for an SFG
with n inputs, the first n inputs are the inputs at time t, the next n
inputs are the inputs at time t+1, the next n at t+2 and so on.
Parameters
----------
Number of times to unfold.
raise ValueError("Unfolding 0 times removes the SFG")
inputs = sfg.input_operations
outputs = sfg.output_operations
# Remove all delay elements in the SFG and replace each one
# with one input operation and one output operation
for delay in sfg.find_by_type_name(Delay.type_name()):
i = Input(name="input_" + delay.graph_id)
o = Output(
src0=delay.input(0).signals[0].source, name="output_" + delay.graph_id
)
inputs.append(i)
outputs.append(o)
# move all outgoing signals from the delay to the new input operation
while len(delay.output(0).signals) > 0:
signal = delay.output(0).signals[0]
destination = signal.destination
destination.remove_signal(signal)
signal.remove_source()
destination.connect(i.output(0))
delay.input(0).signals[0].remove_source()
delay.input(0).clear()
new_sfg = SFG(inputs, outputs) # The new sfg without the delays
sfgs = [new_sfg() for _ in range(factor)] # Copy the SFG factor times
# Add suffixes to all graphIDs and names in order to keep them separated
for i in range(factor):
for operation in sfgs[i].operations:
suffix = f'_{i}'
operation.graph_id = operation.graph_id + suffix
if operation.name[:7] not in ['', 'input_t', 'output_']:
operation.name = operation.name + suffix
input_name_to_idx = {} # save the input port indices for future reference
new_inputs = []
# For each copy of the SFG, create new input operations for every "original"
# input operation and connect them to begin creating the unfolded SFG
for i in range(factor):
for port, operation in zip(sfgs[i].inputs, sfgs[i].input_operations):
if not operation.name.startswith("input_t"):
i = Input()
new_inputs.append(i)
port.connect(i)
# If the input was created earlier when removing the delays
# then just save the index
input_name_to_idx[operation.name] = port.index
# Connect the original outputs in the same way as the inputs
# Also connect the copies of the SFG together according to a formula
# from the TSTE87 course material, and save the number of delays for
# each interconnection
new_outputs = []
delay_placements = {}
for i in range(factor):
for port, operation in zip(sfgs[i].outputs, sfgs[i].output_operations):
if not operation.name.startswith("output_t"):
new_outputs.append(Output(port))
else:
index = operation.name[8:] # Remove the "output_t" prefix
input_port = sfgs[j].input(input_name_to_idx["input_t" + index])
input_port.connect(port)
delay_placements[port] = [i, number_of_delays_between]
sfgs[i].graph_id = (
f'sfg{i}' # deterministically set the graphID of the sfgs
)
sfg = SFG(new_inputs, new_outputs) # create a new SFG to remove floating nodes
# Insert the interconnect delays according to what is saved in delay_placements
i, no_of_delays = val
for _ in range(no_of_delays):
sfg = sfg.insert_operation_after(f'sfg{i}.{port.index}', Delay())
# Flatten all the copies of the original SFG
for i in range(factor):
sfg.find_by_id(f'sfg{i}').connect_external_signals_to_components()
sfg = sfg()
@property
def is_linear(self) -> bool:
return all(op.is_linear for op in self.split())
@property
def is_constant(self) -> bool:
return all(output.is_constant for output in self._output_operations)
def get_used_type_names(self) -> List[TypeName]:
"""Get a list of all TypeNames used in the SFG."""
ret = list({op.type_name() for op in self.operations})
ret.sort()
return ret