Skip to content
Snippets Groups Projects

Resolve "Generate Schedule"

Merged Angus Lothian requested to merge 30-generate-schedule into develop
1 file
+ 0
1
Compare changes
  • Side-by-side
  • Inline
+ 93
6
@@ -3,16 +3,17 @@ B-ASIC Operation Module.
TODO: More info.
"""
from b_asic.signal import Signal
from b_asic.port import SignalSourceProvider, InputPort, OutputPort
from b_asic.graph_component import GraphComponent, AbstractGraphComponent, Name
import itertools as it
from math import trunc
import collections
from abc import abstractmethod
from numbers import Number
from typing import NewType, List, Sequence, Iterable, Mapping, MutableMapping, Optional, Any, Set, Union
from math import trunc
from typing import NewType, List, Dict, Sequence, Iterable, Mapping, MutableMapping, Optional, Any, Set, Union
from b_asic.graph_component import GraphComponent, AbstractGraphComponent, Name
from b_asic.port import SignalSourceProvider, InputPort, OutputPort
from b_asic.signal import Signal
OutputKey = NewType("OutputKey", str)
OutputMap = Mapping[OutputKey, Optional[Number]]
@@ -193,6 +194,39 @@ class Operation(GraphComponent, SignalSourceProvider):
"""
raise NotImplementedError
@property
@abstractmethod
def latency(self) -> int:
"""Get the latency of the operation, which is the longest time it takes from one of
the operations inputport to one of the operations outputport.
"""
raise NotImplementedError
@property
@abstractmethod
def latency_offsets(self) -> Sequence[Sequence[int]]:
"""Get a nested list with all the operations ports latency-offsets, the first list contains the
latency-offsets of the operations input ports, the second list contains the latency-offsets of
the operations output ports.
"""
raise NotImplementedError
@abstractmethod
def set_latency(self, latency: int) -> None:
"""Sets the latency of the operation to the specified integer value by setting the
latency-offsets of operations input ports to 0 and the latency-offsets of the operations
output ports to the specified value. The latency cannot be a negative integers.
"""
raise NotImplementedError
@abstractmethod
def set_latency_offsets(self, latency_offsets: Dict[str, int]) -> None:
"""Sets the latency-offsets for the operations ports specified in the latency_offsets dictionary.
The latency offsets dictionary should be {'in0': 2, 'out1': 4} if you want to set the latency offset
for the inport port with index 0 to 2, and the latency offset of the output port with index 1 to 4.
"""
raise NotImplementedError
class AbstractOperation(Operation, AbstractGraphComponent):
"""Generic abstract operation class which most implementations will derive from.
@@ -202,7 +236,7 @@ class AbstractOperation(Operation, AbstractGraphComponent):
_input_ports: List[InputPort]
_output_ports: List[OutputPort]
def __init__(self, input_count: int, output_count: int, name: Name = "", input_sources: Optional[Sequence[Optional[SignalSourceProvider]]] = None):
def __init__(self, input_count: int, output_count: int, name: Name = "", input_sources: Optional[Sequence[Optional[SignalSourceProvider]]] = None, latency: int = None, latency_offsets: Dict[str, int] = None):
super().__init__(name)
self._input_ports = [InputPort(self, i) for i in range(input_count)]
@@ -218,6 +252,22 @@ class AbstractOperation(Operation, AbstractGraphComponent):
if src is not None:
self._input_ports[i].connect(src.source)
ports_without_latency_offset = set(([f"in{i}" for i in range(self.input_count)] +
[f"out{i}" for i in range(self.output_count)]))
if latency_offsets is not None:
self.set_latency_offsets(latency_offsets)
if latency is not None:
# Set the latency of the rest of ports with no latency_offset.
assert latency >= 0, "Negative latency entered"
for inp in self.inputs:
if inp.latency_offset is None:
inp.latency_offset = 0
for outp in self.outputs:
if outp.latency_offset is None:
outp.latency_offset = latency
@abstractmethod
def evaluate(self, *inputs) -> Any: # pylint: disable=arguments-differ
"""Evaluate the operation and generate a list of output values given a list of input values."""
@@ -433,6 +483,14 @@ class AbstractOperation(Operation, AbstractGraphComponent):
return SFG(inputs=inputs, outputs=outputs)
def copy_component(self, *args, **kwargs) -> Operation:
new_component = super().copy_component(*args, **kwargs)
for i, inp in enumerate(self.inputs):
new_component.input(i).latency_offset = inp.latency_offset
for i, outp in enumerate(self.outputs):
new_component.output(i).latency_offset = outp.latency_offset
return new_component
def inputs_required_for_output(self, output_index: int) -> Iterable[int]:
if output_index < 0 or output_index >= self.output_count:
raise IndexError(f"Output index out of range (expected 0-{self.output_count - 1}, got {output_index})")
@@ -483,3 +541,32 @@ class AbstractOperation(Operation, AbstractGraphComponent):
else:
args.append(input_values[i])
return args
@property
def latency(self) -> int:
return max(((outp.latency_offset - inp.latency_offset) for outp, inp in it.product(self.outputs, self.inputs)))
def set_latency(self, latency: int) -> None:
assert latency >= 0, "Negative latency entered."
for inport in self.inputs:
inport.latency_offset = 0
for outport in self.outputs:
outport.latency_offset = latency
@property
def latency_offsets(self) -> Sequence[Sequence[int]]:
return ([inp.latency_offset for inp in self.inputs], [outp.latency_offset for outp in self.outputs])
def set_latency_offsets(self, latency_offsets: Dict[str, int]) -> None:
for port_str, latency_offset in latency_offsets.items():
port_str = port_str.lower()
if port_str.startswith("in"):
index_str = port_str[2:]
assert index_str.isdigit(), "Incorrectly formatted index in string, expected 'in' + index"
self.input(int(index_str)).latency_offset = latency_offset
elif port_str.startswith("out"):
index_str = port_str[3:]
assert index_str.isdigit(), "INcorrectly formatted index in string, expected 'out' + index"
self.output(int(index_str)).latency_offset = latency_offset
else:
raise ValueError("Incorrectly formatted string, expected 'in' + index or 'out' + index")
Loading