Newer
Older
Angus Lothian
committed
Contains the signal flow graph operation.
Angus Lothian
committed
from collections import defaultdict, deque
from collections.abc import Iterable, MutableSet, Sequence
Angus Lothian
committed
from io import StringIO
Simon Bjurek
committed
from math import ceil
Angus Lothian
committed
from queue import PriorityQueue
from b_asic.graph_component import GraphComponent
from b_asic.operation import (
AbstractOperation,
MutableDelayMap,
MutableResultMap,
Operation,
ResultKey,
)
from b_asic.port import InputPort, OutputPort, SignalSourceProvider
from b_asic.special_operations import Delay, Input, Output
from b_asic.types import GraphID, GraphIDNumber, Name, Num, TypeName
Angus Lothian
committed
DelayQueue = list[tuple[str, ResultKey, OutputPort]]
Angus Lothian
committed
_OPERATION_SHAPE: DefaultDict[TypeName, str] = defaultdict(lambda: "ellipse")
_OPERATION_SHAPE.update(
{
Input.type_name(): "cds",
Output.type_name(): "cds",
Delay.type_name(): "square",
}
)
Angus Lothian
committed
class GraphIDGenerator:
"""Generates Graph IDs for objects."""
_next_id_number: DefaultDict[TypeName, GraphIDNumber]
def __init__(self, id_number_offset: GraphIDNumber = GraphIDNumber(0)):
Angus Lothian
committed
"""Construct a GraphIDGenerator."""
self._next_id_number = defaultdict(lambda: id_number_offset)
def next_id(self, type_name: TypeName, used_ids: MutableSet = set()) -> GraphID:
Angus Lothian
committed
"""Get the next graph id for a certain graph id type."""
new_id = type_name + str(self._next_id_number[type_name])
self._next_id_number[type_name] += 1
new_id = type_name + str(self._next_id_number[type_name])
Angus Lothian
committed
@property
def id_number_offset(self) -> GraphIDNumber:
"""Get the graph id number offset of this generator."""
self._next_id_number.default_factory()
) # pylint: disable=not-callable
Construct an SFG given its inputs and outputs.
Angus Lothian
committed
Contains a set of connected operations, forming a new operation.
Used as a base for simulation, scheduling, etc.
Inputs/outputs may be specified using either Input/Output operations
directly with the *inputs*/*outputs* parameters, or using signals with the
*input_signals*/*output_signals parameters*. If signals are used, the
corresponding Input/Output operations will be created automatically.
The *id_number_offset* parameter specifies what number graph IDs will be
offset by for each new graph component type. IDs start at 1 by default,
so the default offset of 0 will result in IDs like "c1", "c2", etc.
while an offset of 3 will result in "c4", "c5", etc.
Parameters
----------
inputs : array of Input, optional
outputs : array of Output, optional
input_signals : array of Signal, optional
output_signals : array of Signal, optional
id_number_offset : GraphIDNumber, optional
name : Name, optional
input_sources :
_components_by_id: dict[GraphID, GraphComponent]
_components_by_name: DefaultDict[Name, list[GraphComponent]]
_components_dfs_order: list[GraphComponent]
_operations_dfs_order: list[Operation]
_operations_topological_order: list[Operation]
_input_operations: list[Input]
_output_operations: list[Output]
_original_components_to_new: dict[GraphComponent, GraphComponent]
_original_input_signals_to_indices: dict[Signal, int]
_original_output_signals_to_indices: dict[Signal, int]
_precedence_list: list[list[OutputPort]] | None
_used_ids: set[GraphID] = set()
Angus Lothian
committed
inputs: Sequence[Input] | None = None,
outputs: Sequence[Output] | None = None,
input_signals: Sequence[Signal] | None = None,
output_signals: Sequence[Signal] | None = None,
id_number_offset: GraphIDNumber = GraphIDNumber(0),
name: Name = Name(""),
input_sources: Sequence[SignalSourceProvider | None] | None = None,
Angus Lothian
committed
input_signal_count = 0 if input_signals is None else len(input_signals)
input_operation_count = 0 if inputs is None else len(inputs)
output_signal_count = 0 if output_signals is None else len(output_signals)
Angus Lothian
committed
output_operation_count = 0 if outputs is None else len(outputs)
super().__init__(
input_count=input_signal_count + input_operation_count,
output_count=output_signal_count + output_operation_count,
name=name,
input_sources=input_sources,
)
Angus Lothian
committed
Angus Lothian
committed
self._components_by_name = defaultdict(list)
self._components_dfs_order = []
self._operations_dfs_order = []
self._operations_topological_order = []
self._graph_id_generator = GraphIDGenerator(GraphIDNumber(id_number_offset))
Angus Lothian
committed
self._input_operations = []
self._output_operations = []
self._original_components_to_new = {}
self._original_input_signals_to_indices = {}
self._original_output_signals_to_indices = {}
self._precedence_list = None
# Setup input signals.
if input_signals is not None:
for input_index, signal in enumerate(input_signals):
if signal in self._original_components_to_new:
raise ValueError(f"Duplicate input signal {signal!r} in SFG")
new_input_op = cast(
Input, self._add_component_unconnected_copy(Input())
)
new_signal = cast(Signal, self._add_component_unconnected_copy(signal))
Angus Lothian
committed
new_signal.set_source(new_input_op.output(0))
self._input_operations.append(new_input_op)
self._original_input_signals_to_indices[signal] = input_index
# Setup input operations, starting from indices after input signals.
Angus Lothian
committed
if inputs is not None:
for input_index, input_op in enumerate(inputs, input_signal_count):
if input_op in self._original_components_to_new:
raise ValueError(f"Duplicate input operation {input_op!r} in SFG")
new_input_op = cast(
Input, self._add_component_unconnected_copy(input_op)
)
Angus Lothian
committed
for signal in input_op.output(0).signals:
if signal in self._original_components_to_new:
raise ValueError(
"Duplicate input signals connected to input ports"
" supplied to SFG constructor."
)
new_signal = cast(
Signal, self._add_component_unconnected_copy(signal)
)
Angus Lothian
committed
new_signal.set_source(new_input_op.output(0))
self._original_input_signals_to_indices[signal] = input_index
Angus Lothian
committed
self._input_operations.append(new_input_op)
Angus Lothian
committed
# Setup output signals.
if output_signals is not None:
for output_index, signal in enumerate(output_signals):
new_output_op = cast(
Output, self._add_component_unconnected_copy(Output())
)
Angus Lothian
committed
if signal in self._original_components_to_new:
# Signal was already added when setting up inputs.
new_signal = cast(Signal, self._original_components_to_new[signal])
Angus Lothian
committed
new_signal.set_destination(new_output_op.input(0))
else:
# New signal has to be created.
new_signal = cast(
Signal, self._add_component_unconnected_copy(signal)
)
Angus Lothian
committed
new_signal.set_destination(new_output_op.input(0))
Angus Lothian
committed
self._output_operations.append(new_output_op)
self._original_output_signals_to_indices[signal] = output_index
Angus Lothian
committed
# Setup output operations, starting from indices after output signals.
if outputs is not None:
for output_index, output_op in enumerate(outputs, output_signal_count):
if output_op in self._original_components_to_new:
raise ValueError(f"Duplicate output operation {output_op!r} in SFG")
new_output_op = cast(
Output, self._add_component_unconnected_copy(output_op)
)
Angus Lothian
committed
for signal in output_op.input(0).signals:
if signal in self._original_components_to_new:
# Signal was already added when setting up inputs.
new_signal = cast(
Signal, self._original_components_to_new[signal]
)
Angus Lothian
committed
else:
# New signal has to be created.
new_signal = cast(
Signal,
self._add_component_unconnected_copy(signal),
Angus Lothian
committed
new_signal.set_destination(new_output_op.input(0))
self._original_output_signals_to_indices[signal] = output_index
Angus Lothian
committed
self._output_operations.append(new_output_op)
output_operations_set = set(self._output_operations)
# Search the graph inwards from each input signal.
for (
signal,
input_index,
) in self._original_input_signals_to_indices.items():
Angus Lothian
committed
# Check if already added destination.
new_signal = cast(Signal, self._original_components_to_new[signal])
Angus Lothian
committed
if new_signal.destination is None:
if signal.destination is None:
raise ValueError(
f"Input signal #{input_index} is missing destination in SFG"
if signal.destination.operation not in self._original_components_to_new:
Angus Lothian
committed
self._add_operation_connected_tree_copy(
Angus Lothian
committed
elif new_signal.destination.operation in output_operations_set:
# Add directly connected input to output to ordered list.
Angus Lothian
committed
self._components_dfs_order.extend(
new_signal,
new_signal.destination.operation,
]
)
Angus Lothian
committed
self._operations_dfs_order.extend(
Angus Lothian
committed
# Search the graph inwards from each output signal.
for (
signal,
output_index,
) in self._original_output_signals_to_indices.items():
Angus Lothian
committed
# Check if already added source.
new_signal = cast(Signal, self._original_components_to_new[signal])
if new_signal.source in output_sources:
warnings.warn("Two signals connected to the same output port")
output_sources.append(new_signal.source)
Angus Lothian
committed
if new_signal.source is None:
if signal.source is None:
raise ValueError(
f"Output signal #{output_index} is missing source in SFG"
if signal.source.operation not in self._original_components_to_new:
self._add_operation_connected_tree_copy(signal.source.operation)
Angus Lothian
committed
if len(output_sources) != (output_operation_count + output_signal_count):
raise ValueError(
"At least one output operation is not connected!, Tips: Check for output ports that are connected to the same signal"
)
Angus Lothian
committed
def __str__(self) -> str:
"""Return a string representation of this SFG."""
Angus Lothian
committed
string_io = StringIO()
string_io.write(super().__str__() + "\n")
string_io.write("Internal Operations:\n")
line = "-" * 100 + "\n"
string_io.write(line)
for operation in self.get_operations_topological_order():
Angus Lothian
committed
string_io.write(line)
return string_io.getvalue()
self, *src: SignalSourceProvider | None, name: Name = Name("")
Return a new independent SFG instance that is identical to this SFG
except without any of its external connections.
return SFG(
inputs=self._input_operations,
outputs=self._output_operations,
id_number_offset=self.id_number_offset,
Angus Lothian
committed
@classmethod
def type_name(cls) -> TypeName:
Angus Lothian
committed
def evaluate(self, *args):
result = self.evaluate_outputs(args)
n = len(result)
return None if n == 0 else result[0] if n == 1 else result
results: MutableResultMap | None = None,
delays: MutableDelayMap | None = None,
Angus Lothian
committed
if index < 0 or index >= self.output_count:
raise IndexError(
"Output index out of range (expected"
f" 0-{self.output_count - 1}, got {index})"
)
Angus Lothian
committed
if len(input_values) != self.input_count:
raise ValueError(
"Wrong number of inputs supplied to SFG for evaluation"
f" (expected {self.input_count}, got {len(input_values)})"
)
Angus Lothian
committed
if results is None:
results = {}
if delays is None:
delays = {}
# Set the values of our input operations to the given input values.
(
self.quantize_inputs(input_values, bits_override)
if quantize
else input_values
),
Angus Lothian
committed
op.value = arg
deferred_delays = []
value = self._evaluate_source(
self._output_operations[index].input(0).signals[0].source,
results,
delays,
prefix,
bits_override,
Angus Lothian
committed
while deferred_delays:
new_deferred_delays = []
for key_base, key, src in deferred_delays:
self._do_evaluate_source(
key_base,
key,
src,
results,
delays,
prefix,
bits_override,
Loading
Loading full blame...