Newer
Older
Angus Lothian
committed
Contains the base for operations that are used by B-ASIC.
Angus Lothian
committed
import collections
from abc import abstractmethod
from numbers import Number
Dict,
Iterable,
List,
Mapping,
MutableMapping,
NewType,
Optional,
Sequence,
Tuple,
Union,
from b_asic.graph_component import AbstractGraphComponent, GraphComponent, GraphID, Name
from b_asic.port import InputPort, OutputPort, SignalSourceProvider
from b_asic.signal import Signal
if TYPE_CHECKING:
from b_asic.signal_flow_graph import SFG
Angus Lothian
committed
ResultKey = NewType("ResultKey", str)
ResultMap = Mapping[ResultKey, Optional[Num]]
MutableResultMap = MutableMapping[ResultKey, Optional[Num]]
DelayMap = Mapping[ResultKey, Num]
MutableDelayMap = MutableMapping[ResultKey, Num]
Angus Lothian
committed
class Operation(GraphComponent, SignalSourceProvider):
Angus Lothian
committed
Operations are graph components that perform a certain function.
They are connected to each other by signals through their input/output
Angus Lothian
committed
ports.
Operations can be evaluated independently using evaluate_output().
Operations may specify how to quantize inputs through quantize_input().
Angus Lothian
committed
@abstractmethod
def __ilshift__(self, src: SignalSourceProvider) -> "Operation":
Overload the inline left shift operator to make it connect the provided signal
source to this operation's input, assuming it has exactly one input port.
Angus Lothian
committed
"""
raise NotImplementedError
@property
@abstractmethod
def input_count(self) -> int:
"""Get the number of input ports."""
raise NotImplementedError
Angus Lothian
committed
@property
@abstractmethod
def output_count(self) -> int:
"""Get the number of output ports."""
raise NotImplementedError
@abstractmethod
Angus Lothian
committed
def input(self, index: int) -> InputPort:
"""Get the input port at the given index."""
raise NotImplementedError
@abstractmethod
Angus Lothian
committed
def output(self, index: int) -> OutputPort:
"""Get the output port at the given index."""
Angus Lothian
committed
@property
Angus Lothian
committed
def inputs(self) -> Sequence[InputPort]:
"""Get all input ports."""
Angus Lothian
committed
@property
@abstractmethod
def outputs(self) -> Sequence[OutputPort]:
"""Get all output ports."""
raise NotImplementedError
@property
@abstractmethod
Get all the signals that are connected to this operation's input ports.
The signals are ore not ordered.
Angus Lothian
committed
"""
raise NotImplementedError
@property
Get all the signals that are connected to this operation's output ports.
The signals are ore not ordered.
"""
raise NotImplementedError
@abstractmethod
Angus Lothian
committed
def key(self, index: int, prefix: str = "") -> ResultKey:
Get the key used to access the output of a certain index.
This keu can be used to access the simulation results and used as
the *output* parameter passed to current_output(s) or evaluate_output(s).
"""
raise NotImplementedError
@abstractmethod
def current_output(
self, index: int, delays: Optional[DelayMap] = None, prefix: str = ""
"""
Get the current output at the given index of this operation, if available.
The *delays* parameter will be used for lookup.
The *prefix* parameter will be used as a prefix for the key string when looking
for delays.
current_outputs, evaluate_output, evaluate_outputs
"""
raise NotImplementedError
@abstractmethod
results: Optional[MutableResultMap] = None,
delays: Optional[MutableDelayMap] = None,
prefix: str = "",
bits_override: Optional[int] = None,
Evaluate the output at the given index with the given input values.
Parameters
----------
index : int
Which output to return the value for.
input_values : array of float or complex
The input values.
results : MutableResultMap. optional
Used to store any results (including intermediate results)
for caching.
delays : MutableDelayMap. optional
Used to get the current value of any intermediate delay elements
that are encountered, and be updated with their new values.
prefix : str, optional
Used as a prefix for the key string when storing results/delays.
Specifies a word length override when truncating inputs
which ignores the word length specified by the input signal.
Specifies whether input truncation should be enabled in the first
place. If set to False, input values will be used directly without any
bit truncation.
evaluate_outputs, current_output, current_outputs
Angus Lothian
committed
"""
raise NotImplementedError
@abstractmethod
def current_outputs(
self, delays: Optional[DelayMap] = None, prefix: str = ""
"""
Get all current outputs of this operation, if available.
Angus Lothian
committed
"""
raise NotImplementedError
@abstractmethod
results: Optional[MutableResultMap] = None,
delays: Optional[MutableDelayMap] = None,
prefix: str = "",
bits_override: Optional[int] = None,
"""
Evaluate all outputs of this operation given the input values.
See Also
--------
evaluate_output
Angus Lothian
committed
"""
raise NotImplementedError
@abstractmethod
def split(self) -> Iterable["Operation"]:
"""
Split the operation into multiple operations.
If splitting is not possible, this may return a list containing only the
operation itself.
Angus Lothian
committed
@abstractmethod
"""
Convert the operation into its corresponding SFG.
If the operation is composed by multiple operations, the operation will be
split.
Angus Lothian
committed
"""
raise NotImplementedError
@abstractmethod
def inputs_required_for_output(self, output_index: int) -> Iterable[int]:
"""
Get the input indices of all inputs in this operation whose values are
required in order to evaluate the output at the given output index.
Angus Lothian
committed
raise NotImplementedError
@abstractmethod
def quantize_input(self, index: int, value: Num, bits: int) -> Num:
Quantize the value to be used as input at the given index to a certain bit
Angus Lothian
committed
raise NotImplementedError
Angus Lothian
committed
def latency(self) -> int:
Get the latency of the operation.
This is the longest time it takes from one of the input ports to one of the
output ports.
Angus Lothian
committed
"""
raise NotImplementedError
@property
@abstractmethod
def latency_offsets(self) -> Dict[str, Optional[int]]:
"""
Get a dictionary with all the operations ports latency-offsets.
Angus Lothian
committed
"""
raise NotImplementedError
@abstractmethod
def set_latency(self, latency: int) -> None:
Set the latency of the operation to the specified integer value.
This is done by setting the latency-offsets of operations input ports to 0
and the latency-offsets of the operations output ports to the specified value.
The latency is the time it takes to produce an output from the corresponding
input of the underlying operator.
The latency cannot be a negative integer.
Parameters
----------
latency : int
Non-negative int corresponding to the latency of the operation.
Angus Lothian
committed
"""
raise NotImplementedError
@abstractmethod
def set_latency_offsets(self, latency_offsets: Dict[str, int]) -> None:
Set the latency-offsets for the operations port.
The latency offsets dictionary should be {'in0': 2, 'out1': 4} if you want to
set the latency offset for the input port with index 0 to 2, and the latency
offset of the output port with index 1 to 4.
Get the execution time of the operation.
The execution time is the time between executing two operations on the
underlying operator. This is also called initiation interval.
"""
raise NotImplementedError
@execution_time.setter
@abstractmethod
def execution_time(self, execution_time: Optional[int]) -> None:
Set the execution time of the operation.
The execution time is the time between executing two operations on the
underlying operator. This is also called initiation interval.
The execution time cannot be a negative integer.
Parameters
----------
execution_time : int or None
Non-negative integer corresponding to the execution time of the operation.
Unset execution time by passing ``None``.
"""
raise NotImplementedError
@abstractmethod
) -> Tuple[Tuple[Tuple[float, float], ...], Tuple[Tuple[float, float], ...]]:
Return coordinates for the latency and execution time polygons.
This returns a tuple containing coordinates for the two polygons outlining
the latency and execution time of the operation.
The polygons are corresponding to a start time of 0 and are of height 1.
"""
raise NotImplementedError
@abstractmethod
def get_input_coordinates(
self,
) -> Tuple[Tuple[float, float], ...]:
"""
Return coordinates for inputs.
These maps to the polygons and are corresponding to a start time of 0
and height 1.
get_output_coordinates
"""
raise NotImplementedError
@abstractmethod
def get_output_coordinates(
self,
) -> Tuple[Tuple[float, float], ...]:
"""
Return coordinates for outputs.
These maps to the polygons and are corresponding to a start time of 0
and height 1.
get_input_coordinates
"""
raise NotImplementedError
@property
@abstractmethod
def source(self) -> OutputPort:
"""
Return the OutputPort if there is only one output port.
If not, raise a TypeError.
"""
raise NotImplementedError
@property
@abstractmethod
def destination(self) -> InputPort:
"""
Return the InputPort if there is only one input port.
If not, raise a TypeError.
"""
raise NotImplementedError
@abstractmethod
def _increase_time_resolution(self, factor: int) -> None:
raise NotImplementedError
@abstractmethod
def _decrease_time_resolution(self, factor: int) -> None:
raise NotImplementedError
@abstractmethod
def _check_all_latencies_set(self) -> None:
raise NotImplementedError
@property
@abstractmethod
def is_linear(self) -> bool:
"""
Return True if the operation is linear.
"""
raise NotImplementedError
@property
@abstractmethod
def is_constant(self) -> bool:
"""
Return True if the output(s) of the operation is(are) constant.
"""
raise NotImplementedError
@property
@abstractmethod
def is_commutative(self) -> bool:
"""
Return True if the operation is commutative.
"""
raise NotImplementedError
@property
@abstractmethod
def is_distributive(self) -> bool:
"""
Return True if the operation is distributive.
"""
raise NotImplementedError
@property
@abstractmethod
def is_swappable(self) -> bool:
"""
Return True if the inputs (and outputs) to the operation can be swapped.
Swapping require that the operation retains the same function, but it is allowed
to modify values to do so.
"""
raise NotImplementedError
@abstractmethod
def swap_io(self) -> None:
"""
Swap inputs (and outputs) of operation.
Errors if :meth:`is_swappable` is False.
"""
raise NotImplementedError
class AbstractOperation(Operation, AbstractGraphComponent):
"""
Generic abstract operation base class.
Angus Lothian
committed
Concrete operations should normally derive from this to get the default
behavior.
__slots__ = ("_input_ports", "_output_ports", "_execution_time")
Angus Lothian
committed
_input_ports: List[InputPort]
_output_ports: List[OutputPort]
def __init__(
self,
input_count: int,
output_count: int,
input_sources: Optional[Sequence[Optional[SignalSourceProvider]]] = None,
latency: Optional[int] = None,
latency_offsets: Optional[Dict[str, int]] = None,
execution_time: Optional[int] = None,
"""
Construct an operation with the given input/output count.
Angus Lothian
committed
A list of input sources may be specified to automatically connect
to the input ports.
If provided, the number of sources must match the number of inputs.
The latency offsets may also be specified to be initialized.
"""
Angus Lothian
committed
self._input_ports = [InputPort(self, i) for i in range(input_count)]
self._output_ports = [OutputPort(self, i) for i in range(output_count)]
# Connect given input sources, if any.
if input_sources is not None:
source_count = len(input_sources)
if source_count != input_count:
raise ValueError(
"Wrong number of input sources supplied to Operation"
f" (expected {input_count}, got {source_count})"
)
Angus Lothian
committed
for i, src in enumerate(input_sources):
if src is not None:
if isinstance(src, Signal):
# Already existing signal
src.set_destination(self._input_ports[i])
else:
self._input_ports[i].connect(src.source)
Angus Lothian
committed
# Set specific latency_offsets
if latency_offsets is not None:
self.set_latency_offsets(latency_offsets)
Angus Lothian
committed
if latency is not None:
if latency < 0:
raise ValueError("Latency cannot be negative")
Angus Lothian
committed
for inp in self.inputs:
if inp.latency_offset is None:
inp.latency_offset = 0
for output in self.outputs:
if output.latency_offset is None:
output.latency_offset = latency
def evaluate(
self, *inputs: Operation
) -> List[Operation]: # pylint: disable=arguments-differ
...
@overload
@abstractmethod
def evaluate(self, *inputs: Num) -> List[Num]: # pylint: disable=arguments-differ
...
@abstractmethod
def evaluate(self, *inputs): # pylint: disable=arguments-differ
Evaluate the operation and generate a list of output values.
Parameters
----------
*inputs
List of input values.
def __ilshift__(self, src: SignalSourceProvider) -> "Operation":
Angus Lothian
committed
if self.input_count != 1:
diff = "more" if self.input_count > 1 else "less"
raise TypeError(
f"{self.__class__.__name__} cannot be used as a destination"
f" because it has {diff} than 1 input"
)
self.input(0).connect(src)
return self
Angus Lothian
committed
def __str__(self) -> str:
"""Get a string representation of this operation."""
inputs_dict: Dict[int, Union[List[GraphID], str]] = {}
for i, current_input in enumerate(self.inputs):
if current_input.signal_count == 0:
Angus Lothian
committed
break
dict_ele = []
Angus Lothian
committed
if signal.source:
if signal.source_operation.graph_id:
dict_ele.append(signal.source_operation.graph_id)
Angus Lothian
committed
else:
Angus Lothian
committed
else:
if signal.graph_id:
dict_ele.append(signal.graph_id)
else:
Angus Lothian
committed
inputs_dict[i] = dict_ele
outputs_dict: Dict[int, Union[List[GraphID], str]] = {}
for i, outport in enumerate(self.outputs):
if outport.signal_count == 0:
Angus Lothian
committed
break
dict_ele = []
Angus Lothian
committed
if signal.destination:
if signal.destination_operation.graph_id:
dict_ele.append(signal.destination_operation.graph_id)
Angus Lothian
committed
else:
Angus Lothian
committed
else:
if signal.graph_id:
dict_ele.append(signal.graph_id)
else:
Angus Lothian
committed
outputs_dict[i] = dict_ele
return (
super().__str__()
+ f", \tinputs: {str(inputs_dict)}, \toutputs: {str(outputs_dict)}"
)
Angus Lothian
committed
@property
def input_count(self) -> int:
return len(self._input_ports)
Angus Lothian
committed
@property
def output_count(self) -> int:
return len(self._output_ports)
Angus Lothian
committed
def input(self, index: int) -> InputPort:
return self._input_ports[index]
def output(self, index: int) -> OutputPort:
return self._output_ports[index]
Angus Lothian
committed
def inputs(self) -> Sequence[InputPort]:
return self._input_ports
Angus Lothian
committed
@property
def outputs(self) -> Sequence[OutputPort]:
return self._output_ports
@property
Angus Lothian
committed
result = []
for p in self.inputs:
for s in p.signals:
result.append(s)
return result
Angus Lothian
committed
@property
Angus Lothian
committed
result = []
for p in self.outputs:
for s in p.signals:
result.append(s)
return result
def key(self, index: int, prefix: str = "") -> ResultKey:
key = prefix
if self.output_count != 1:
if key:
key += "."
key += str(index)
elif not key:
key = str(index)
Angus Lothian
committed
def current_output(
self, index: int, delays: Optional[DelayMap] = None, prefix: str = ""
Angus Lothian
committed
return None
results: Optional[MutableResultMap] = None,
delays: Optional[MutableDelayMap] = None,
prefix: str = "",
bits_override: Optional[int] = None,
Angus Lothian
committed
if index < 0 or index >= self.output_count:
raise IndexError(
"Output index out of range (expected"
f" 0-{self.output_count - 1}, got {index})"
)
Angus Lothian
committed
if len(input_values) != self.input_count:
raise ValueError(
"Wrong number of input values supplied to operation (expected"
f" {self.input_count}, got {len(input_values)})"
)
Angus Lothian
committed
values = self.evaluate(
self.quantize_inputs(input_values, bits_override)
if quantize
Angus Lothian
committed
if isinstance(values, collections.abc.Sequence):
if len(values) != self.output_count:
raise RuntimeError(
"Operation evaluated to incorrect number of outputs"
f" (expected {self.output_count}, got {len(values)})"
)
Angus Lothian
committed
elif isinstance(values, Number):
if self.output_count != 1:
raise RuntimeError(
"Operation evaluated to incorrect number of outputs"
f" (expected {self.output_count}, got 1)"
)
Angus Lothian
committed
values = (values,)
Angus Lothian
committed
raise RuntimeError(
"Operation evaluated to invalid type (expected"
f" Sequence/Number, got {values.__class__.__name__})"
)
Angus Lothian
committed
if results is not None:
for i in range(self.output_count):
results[self.key(i, prefix)] = values[i]
return values[index]
def current_outputs(
self, delays: Optional[DelayMap] = None, prefix: str = ""
self.current_output(i, delays, prefix) for i in range(self.output_count)
results: Optional[MutableResultMap] = None,
delays: Optional[MutableDelayMap] = None,
prefix: str = "",
bits_override: Optional[int] = None,
return [
self.evaluate_output(
i,
input_values,
results,
delays,
prefix,
bits_override,
Angus Lothian
committed
def split(self) -> Iterable[Operation]:
# Import here to avoid circular imports.
from b_asic.special_operations import Input
result = self.evaluate(*([Input()] * self.input_count))
if isinstance(result, collections.abc.Sequence) and all(
isinstance(e, Operation) for e in result
):
return cast(List[Operation], result)
Angus Lothian
committed
return [self]
Angus Lothian
committed
from b_asic.signal_flow_graph import SFG
Angus Lothian
committed
inputs = [Input() for _ in range(self.input_count)]
Angus Lothian
committed
try:
last_operations = self.evaluate(*inputs)
if isinstance(last_operations, Operation):
last_operations = [last_operations]
outputs = [Output(o) for o in last_operations]
except TypeError:
operation_copy: Operation = cast(Operation, self.copy())
Angus Lothian
committed
inputs = []
for i in range(self.input_count):
input_ = Input()
operation_copy.input(i).connect(input_)
inputs.append(input_)
Angus Lothian
committed
outputs = [Output(operation_copy)]
return SFG(inputs=inputs, outputs=outputs)
def copy(self, *args, **kwargs) -> GraphComponent:
new_component: Operation = cast(Operation, super().copy(*args, **kwargs))
for i, _input in enumerate(self.inputs):
new_component.input(i).latency_offset = _input.latency_offset
for i, output in enumerate(self.outputs):
new_component.output(i).latency_offset = output.latency_offset
new_component.execution_time = self._execution_time
Angus Lothian
committed
return new_component
def inputs_required_for_output(self, output_index: int) -> Iterable[int]:
if output_index < 0 or output_index >= self.output_count:
raise IndexError(
"Output index out of range (expected"
f" 0-{self.output_count - 1}, got {output_index})"
)
Angus Lothian
committed
# By default, assume each output depends on all inputs.
Angus Lothian
committed
@property
def neighbors(self) -> Iterable[GraphComponent]:
return list(self.input_signals) + list(self.output_signals)
Angus Lothian
committed
@property
def preceding_operations(self) -> Iterable[Operation]:
"""
Return an Iterable of all Operations that are connected to this
Operations input ports.
signal.source_operation for signal in self.input_signals if signal.source
Angus Lothian
committed
@property
def subsequent_operations(self) -> Iterable[Operation]:
"""
Return an Iterable of all Operations that are connected to this
Operations output ports.
return [
signal.destination.operation
for signal in self.output_signals
if signal.destination
]
Angus Lothian
committed
@property
def source(self) -> OutputPort:
if self.output_count != 1:
diff = "more" if self.output_count > 1 else "less"
raise TypeError(
f"{self.__class__.__name__} cannot be used as an input source"
f" because it has {diff} than one output"
Angus Lothian
committed
return self.output(0)
def destination(self) -> InputPort:
if self.input_count != 1:
diff = "more" if self.input_count > 1 else "less"
raise TypeError(
f"{self.__class__.__name__} cannot be used as an output"
f" destination because it has {diff} than one input"
)
return self.input(0)
def quantize_input(self, index: int, value: Num, bits: int) -> Num:
b = 2**bits
return round((value + 1) * b % (2 * b) - b) / b
Angus Lothian
committed
Quantize the values to be used as inputs.
The bit lengths are specified
by the respective signals connected to each input.
Angus Lothian
committed
args = []
for i, input_port in enumerate(self.inputs):
value = input_values[i]
if bits_override is None and input_port.signal_count >= 1:
Angus Lothian
committed
if bits_override is not None:
if isinstance(value, complex):
raise TypeError(
"Complex value cannot be quantized to {bits} bits as"
" requested by the signal connected to input #{i}"
)
value = self.quantize_input(i, value, bits_override)
Angus Lothian
committed
args.append(value)
return args
@property
def latency(self) -> int:
if None in [inp.latency_offset for inp in self.inputs] or None in [
output.latency_offset for output in self.outputs
Angus Lothian
committed
raise ValueError(
"All native offsets have to set to a non-negative value to"
" calculate the latency."
)
Angus Lothian
committed
(cast(int, output.latency_offset) - cast(int, input_.latency_offset))
for output, input_ in it.product(self.outputs, self.inputs)
Angus Lothian
committed
@property
def latency_offsets(self) -> Dict[str, Optional[int]]:
Angus Lothian
committed
for i, input_ in enumerate(self.inputs):
latency_offsets[f"in{i}"] = input_.latency_offset
Angus Lothian
committed
for i, output in enumerate(self.outputs):
latency_offsets[f"out{i}"] = output.latency_offset
Angus Lothian
committed
return latency_offsets
"""
Raises an exception if an input or output does not have a latency offset.
"""
self.input_latency_offsets()
self.output_latency_offsets()
def input_latency_offsets(self) -> List[int]:
latency_offsets = [i.latency_offset for i in self.inputs]
if any(val is None for val in latency_offsets):
missing = [
i for (i, latency) in enumerate(latency_offsets) if latency is None
]
raise ValueError(f"Missing latencies for input(s) {missing}")
return cast(List[int], latency_offsets)
def output_latency_offsets(self) -> List[int]:
latency_offsets = [i.latency_offset for i in self.outputs]
if any(val is None for val in latency_offsets):
missing = [
i for (i, latency) in enumerate(latency_offsets) if latency is None
]
raise ValueError(f"Missing latencies for output(s) {missing}")
Angus Lothian
committed
def set_latency(self, latency: int) -> None:
if latency < 0:
raise ValueError("Latency cannot be negative")
for current_input in self.inputs:
current_input.latency_offset = 0
Angus Lothian
committed
for outport in self.outputs:
outport.latency_offset = latency
def set_latency_offsets(self, latency_offsets: Dict[str, int]) -> None:
for port_str, latency_offset in latency_offsets.items():
port_str = port_str.lower()
if port_str.startswith("in"):
index_str = port_str[2:]
if not index_str.isdigit():
raise ValueError(
"Incorrectly formatted index in string, expected 'in'"
f" + index, got: {port_str!r}"
)
Angus Lothian
committed
self.input(int(index_str)).latency_offset = latency_offset
elif port_str.startswith("out"):
index_str = port_str[3:]
if not index_str.isdigit():
raise ValueError(
"Incorrectly formatted index in string, expected"
f" 'out' + index, got: {port_str!r}"
)
Angus Lothian
committed
self.output(int(index_str)).latency_offset = latency_offset
else:
raise ValueError(
"Incorrectly formatted string, expected 'in' + index or"
f" 'out' + index, got: {port_str!r}"
)
"""Execution time of operation."""
return self._execution_time
@execution_time.setter
def execution_time(self, execution_time: int) -> None:
if execution_time is not None and execution_time < 0:
raise ValueError("Execution time cannot be negative")
self._execution_time = execution_time
def _increase_time_resolution(self, factor: int) -> None:
if self._execution_time is not None:
self._execution_time *= factor
for port in [*self.inputs, *self.outputs]:
if port.latency_offset is not None:
port.latency_offset *= factor
def _decrease_time_resolution(self, factor: int) -> None:
if self._execution_time is not None:
self._execution_time = self._execution_time // factor
for port in [*self.inputs, *self.outputs]:
if port.latency_offset is not None:
port.latency_offset = port.latency_offset // factor
) -> Tuple[Tuple[Tuple[float, float], ...], Tuple[Tuple[float, float], ...]]: