Newer
Older
Angus Lothian
committed
Contains the signal flow graph operation.
Angus Lothian
committed
from collections import defaultdict, deque
from io import StringIO
Angus Lothian
committed
from queue import PriorityQueue
Dict,
Iterable,
List,
MutableSet,
Optional,
Sequence,
from graphviz import Digraph
from b_asic.graph_component import (
GraphComponent,
GraphID,
GraphIDNumber,
Name,
TypeName,
)
from b_asic.operation import (
AbstractOperation,
MutableDelayMap,
MutableResultMap,
Operation,
ResultKey,
)
from b_asic.port import InputPort, OutputPort, SignalSourceProvider
from b_asic.special_operations import Delay, Input, Output
Angus Lothian
committed
DelayQueue = List[Tuple[str, ResultKey, OutputPort]]
class GraphIDGenerator:
"""Generates Graph IDs for objects."""
_next_id_number: DefaultDict[TypeName, GraphIDNumber]
def __init__(self, id_number_offset: GraphIDNumber = GraphIDNumber(0)):
Angus Lothian
committed
"""Construct a GraphIDGenerator."""
self._next_id_number = defaultdict(lambda: id_number_offset)
def next_id(
self, type_name: TypeName, used_ids: MutableSet = set()
) -> GraphID:
Angus Lothian
committed
"""Get the next graph id for a certain graph id type."""
self._next_id_number[type_name] += 1
new_id = type_name + str(self._next_id_number[type_name])
self._next_id_number[type_name] += 1
new_id = type_name + str(self._next_id_number[type_name])
Angus Lothian
committed
@property
def id_number_offset(self) -> GraphIDNumber:
"""Get the graph id number offset of this generator."""
self._next_id_number.default_factory()
) # pylint: disable=not-callable
Angus Lothian
committed
Contains a set of connected operations, forming a new operation.
Used as a base for simulation, scheduling, etc.
Angus Lothian
committed
_components_by_id: Dict[GraphID, GraphComponent]
_components_by_name: DefaultDict[Name, List[GraphComponent]]
_components_dfs_order: List[GraphComponent]
_operations_dfs_order: List[Operation]
_operations_topological_order: List[Operation]
Angus Lothian
committed
_input_operations: List[Input]
_output_operations: List[Output]
_original_components_to_new: Dict[GraphComponent, GraphComponent]
Angus Lothian
committed
_original_input_signals_to_indices: Dict[Signal, int]
_original_output_signals_to_indices: Dict[Signal, int]
_precedence_list: Optional[List[List[OutputPort]]]
Angus Lothian
committed
def __init__(
self,
inputs: Optional[Sequence[Input]] = None,
outputs: Optional[Sequence[Output]] = None,
input_signals: Optional[Sequence[Signal]] = None,
output_signals: Optional[Sequence[Signal]] = None,
id_number_offset: GraphIDNumber = GraphIDNumber(0),
name: Name = Name(""),
input_sources: Optional[
Sequence[Optional[SignalSourceProvider]]
] = None,
):
"""
Construct an SFG given its inputs and outputs.
Angus Lothian
committed
Inputs/outputs may be specified using either Input/Output operations
directly with the inputs/outputs parameters, or using signals with the
input_signals/output_signals parameters. If signals are used, the
corresponding Input/Output operations will be created automatically.
The id_number_offset parameter specifies what number graph IDs will be
offset by for each new graph component type. IDs start at 1 by default,
so the default offset of 0 will result in IDs like "c1", "c2", etc.
while an offset of 3 will result in "c4", "c5", etc.
"""
input_signal_count = 0 if input_signals is None else len(input_signals)
input_operation_count = 0 if inputs is None else len(inputs)
output_signal_count = (
0 if output_signals is None else len(output_signals)
)
Angus Lothian
committed
output_operation_count = 0 if outputs is None else len(outputs)
super().__init__(
input_count=input_signal_count + input_operation_count,
output_count=output_signal_count + output_operation_count,
name=name,
input_sources=input_sources,
)
Angus Lothian
committed
Angus Lothian
committed
self._components_by_name = defaultdict(list)
self._components_dfs_order = []
self._operations_dfs_order = []
self._operations_topological_order = []
self._graph_id_generator = GraphIDGenerator(
GraphIDNumber(id_number_offset)
)
Angus Lothian
committed
self._input_operations = []
self._output_operations = []
self._original_components_to_new = {}
self._original_input_signals_to_indices = {}
self._original_output_signals_to_indices = {}
self._precedence_list = None
# Setup input signals.
if input_signals is not None:
for input_index, signal in enumerate(input_signals):
if signal in self._original_components_to_new:
raise ValueError(
f"Duplicate input signal {signal!r} in SFG"
)
new_input_op = cast(
Input, self._add_component_unconnected_copy(Input())
)
new_signal = cast(
Signal, self._add_component_unconnected_copy(signal)
)
Angus Lothian
committed
new_signal.set_source(new_input_op.output(0))
self._input_operations.append(new_input_op)
self._original_input_signals_to_indices[signal] = input_index
# Setup input operations, starting from indices after input signals.
Angus Lothian
committed
if inputs is not None:
for input_index, input_op in enumerate(inputs, input_signal_count):
if input_op in self._original_components_to_new:
raise ValueError(
f"Duplicate input operation {input_op!r} in SFG"
)
Angus Lothian
committed
new_input_op = self._add_component_unconnected_copy(input_op)
for signal in input_op.output(0).signals:
assert signal not in self._original_components_to_new, (
"Duplicate input signals connected to input ports"
Angus Lothian
committed
new_signal = self._add_component_unconnected_copy(signal)
new_signal.set_source(new_input_op.output(0))
self._original_input_signals_to_indices[
signal
] = input_index
Angus Lothian
committed
self._input_operations.append(new_input_op)
Angus Lothian
committed
# Setup output signals.
if output_signals is not None:
for output_index, signal in enumerate(output_signals):
new_output_op = cast(
Output, self._add_component_unconnected_copy(Output())
)
Angus Lothian
committed
if signal in self._original_components_to_new:
Angus Lothian
committed
# Signal was already added when setting up inputs.
new_signal = cast(
Signal, self._original_components_to_new[signal]
)
Angus Lothian
committed
new_signal.set_destination(new_output_op.input(0))
else:
# New signal has to be created.
new_signal = cast(
Signal, self._add_component_unconnected_copy(signal)
)
Angus Lothian
committed
new_signal.set_destination(new_output_op.input(0))
Angus Lothian
committed
self._output_operations.append(new_output_op)
self._original_output_signals_to_indices[signal] = output_index
Angus Lothian
committed
# Setup output operations, starting from indices after output signals.
if outputs is not None:
for output_index, output_op in enumerate(
outputs, output_signal_count
):
if output_op in self._original_components_to_new:
raise ValueError(
f"Duplicate output operation {output_op!r} in SFG"
)
new_output_op = cast(
Output, self._add_component_unconnected_copy(output_op)
)
Angus Lothian
committed
for signal in output_op.input(0).signals:
if signal in self._original_components_to_new:
# Signal was already added when setting up inputs.
new_signal = self._original_components_to_new[signal]
else:
# New signal has to be created.
new_signal = self._add_component_unconnected_copy(
Angus Lothian
committed
new_signal.set_destination(new_output_op.input(0))
self._original_output_signals_to_indices[
signal
] = output_index
Angus Lothian
committed
self._output_operations.append(new_output_op)
output_operations_set = set(self._output_operations)
# Search the graph inwards from each input signal.
for (
signal,
input_index,
) in self._original_input_signals_to_indices.items():
Angus Lothian
committed
# Check if already added destination.
new_signal = cast(Signal, self._original_components_to_new[signal])
Angus Lothian
committed
if new_signal.destination is None:
if signal.destination is None:
raise ValueError(
f"Input signal #{input_index} is missing destination"
" in SFG"
)
if (
signal.destination.operation
not in self._original_components_to_new
):
Angus Lothian
committed
self._add_operation_connected_tree_copy(
Angus Lothian
committed
elif new_signal.destination.operation in output_operations_set:
# Add directly connected input to output to ordered list.
self._components_dfs_order.extend(
[
new_signal.source.operation,
new_signal,
new_signal.destination.operation,
]
)
Angus Lothian
committed
self._operations_dfs_order.extend(
[
new_signal.source.operation,
new_signal.destination.operation,
]
)
Angus Lothian
committed
# Search the graph inwards from each output signal.
for (
signal,
output_index,
) in self._original_output_signals_to_indices.items():
Angus Lothian
committed
# Check if already added source.
new_signal = self._original_components_to_new[signal]
if new_signal.source is None:
if signal.source is None:
raise ValueError(
f"Output signal #{output_index} is missing source"
" in SFG"
)
if (
signal.source.operation
not in self._original_components_to_new
):
Angus Lothian
committed
self._add_operation_connected_tree_copy(
Angus Lothian
committed
def __str__(self) -> str:
"""Get a string representation of this SFG."""
string_io = StringIO()
string_io.write(super().__str__() + "\n")
string_io.write("Internal Operations:\n")
line = "-" * 100 + "\n"
string_io.write(line)
for operation in self.get_operations_topological_order():
Angus Lothian
committed
string_io.write(line)
return string_io.getvalue()
self, *src: Optional[SignalSourceProvider], name: Name = Name("")
"""
Get a new independent SFG instance that is identical to this SFG
except without any of its external connections.
return SFG(
inputs=self._input_operations,
outputs=self._output_operations,
id_number_offset=self.id_number_offset,
Angus Lothian
committed
@classmethod
def type_name(cls) -> TypeName:
Angus Lothian
committed
def evaluate(self, *args):
result = self.evaluate_outputs(args)
n = len(result)
return None if n == 0 else result[0] if n == 1 else result
def evaluate_output(
self,
index: int,
input_values: Sequence[Number],
results: Optional[MutableResultMap] = None,
delays: Optional[MutableDelayMap] = None,
prefix: str = "",
bits_override: Optional[int] = None,
truncate: bool = True,
) -> Number:
Angus Lothian
committed
if index < 0 or index >= self.output_count:
raise IndexError(
"Output index out of range (expected"
f" 0-{self.output_count - 1}, got {index})"
)
Angus Lothian
committed
if len(input_values) != self.input_count:
raise ValueError(
"Wrong number of inputs supplied to SFG for evaluation"
f" (expected {self.input_count}, got {len(input_values)})"
)
Angus Lothian
committed
if results is None:
results = {}
if delays is None:
delays = {}
# Set the values of our input operations to the given input values.
for op, arg in zip(
self._input_operations,
self.truncate_inputs(input_values, bits_override)
if truncate
else input_values,
):
Angus Lothian
committed
op.value = arg
deferred_delays = []
value = self._evaluate_source(
self._output_operations[index].input(0).signals[0].source,
results,
delays,
prefix,
bits_override,
truncate,
deferred_delays,
)
Angus Lothian
committed
while deferred_delays:
new_deferred_delays = []
for key_base, key, src in deferred_delays:
self._do_evaluate_source(
key_base,
key,
src,
results,
delays,
prefix,
bits_override,
truncate,
new_deferred_delays,
)
Angus Lothian
committed
deferred_delays = new_deferred_delays
results[self.key(index, prefix)] = value
return value
def connect_external_signals_to_components(self) -> bool:
"""
Connects any external signals to this SFG's internal operations.
This SFG becomes unconnected to the SFG it is a component off,
causing it to become invalid afterwards. Returns True if successful,
False otherwise.
Angus Lothian
committed
if len(self.inputs) != len(self.input_operations):
raise IndexError(
f"Number of inputs ({len(self.inputs)}) does not match the"
f" number of input_operations ({len(self.input_operations)})"
" in SFG."
Angus Lothian
committed
if len(self.outputs) != len(self.output_operations):
raise IndexError(
f"Number of outputs ({len(self.outputs)}) does not match the"
f" number of output_operations ({len(self.output_operations)})"
" in SFG."
Angus Lothian
committed
if len(self.input_signals) == 0:
return False
if len(self.output_signals) == 0:
return False
# For each input_signal, connect it to the corresponding operation
for port, input_operation in zip(self.inputs, self.input_operations):
dest = input_operation.output(0).signals[0].destination
if dest is None:
raise ValueError("Missing destination in signal.")
Angus Lothian
committed
dest.clear()
port.signals[0].set_destination(dest)
# For each output_signal, connect it to the corresponding operation
for port, output_operation in zip(
self.outputs, self.output_operations
):
Angus Lothian
committed
src = output_operation.input(0).signals[0].source
if src is None:
raise ValueError("Missing soruce in signal.")
Angus Lothian
committed
src.clear()
port.signals[0].set_source(src)
return True
@property
def input_operations(self) -> Sequence[Operation]:
"""
Get the internal input operations in the same order as their respective input ports.
Angus Lothian
committed
return self._input_operations
@property
def output_operations(self) -> Sequence[Operation]:
"""
Get the internal output operations in the same order as their respective output ports.
Angus Lothian
committed
return self._output_operations
def split(self) -> Iterable[Operation]:
return self.operations
Angus Lothian
committed
return self
def inputs_required_for_output(self, output_index: int) -> Iterable[int]:
if output_index < 0 or output_index >= self.output_count:
raise IndexError(
"Output index out of range (expected"
f" 0-{self.output_count - 1}, got {output_index})"
)
Angus Lothian
committed
input_indexes_required = []
sfg_input_operations_to_indexes = {
input_op: index
for index, input_op in enumerate(self._input_operations)
}
Angus Lothian
committed
output_op = self._output_operations[output_index]
queue: Deque[Operation] = deque([output_op])
visited: Set[Operation] = {output_op}
Angus Lothian
committed
while queue:
op = queue.popleft()
if isinstance(op, Input):
if op in sfg_input_operations_to_indexes:
input_indexes_required.append(
Angus Lothian
committed
del sfg_input_operations_to_indexes[op]
for input_port in op.inputs:
for signal in input_port.signals:
if signal.source is not None:
new_op = signal.source.operation
if new_op not in visited:
queue.append(new_op)
visited.add(new_op)
return input_indexes_required
def copy_component(self, *args, **kwargs) -> GraphComponent:
return super().copy_component(
*args,
**kwargs,
inputs=self._input_operations,
outputs=self._output_operations,
id_number_offset=self.id_number_offset,
name=self.name,
)
Angus Lothian
committed
@property
def id_number_offset(self) -> GraphIDNumber:
"""Get the graph id number offset of the graph id generator for this SFG.
"""
Angus Lothian
committed
return self._graph_id_generator.id_number_offset
@property
Angus Lothian
committed
"""Get all components of this graph in depth-first order."""
return self._components_dfs_order
@property
Angus Lothian
committed
"""Get all operations of this graph in depth-first order."""
return self._operations_dfs_order
def find_by_type_name(
self, type_name: TypeName
) -> Sequence[GraphComponent]:
"""
Find all components in this graph with the specified type name.
Angus Lothian
committed
Returns an empty sequence if no components were found.
Parameters
==========
type_name : TypeName
The TypeName of the desired components.
Loading
Loading full blame...