Skip to content
Snippets Groups Projects

Change return type for coordinates to tuples and split port methods

Merged Oscar Gustafsson requested to merge operation_port_coordinates into master
Files
5
+ 93
36
@@ -407,7 +407,9 @@ class Operation(GraphComponent, SignalSourceProvider):
@abstractmethod
def get_plot_coordinates(
self,
) -> Tuple[List[List[float]], List[List[float]]]:
) -> Tuple[
Tuple[Tuple[float, float], ...], Tuple[Tuple[float, float], ...]
]:
"""
Return a tuple containing coordinates for the two polygons outlining
the latency and execution time of the operation.
@@ -418,11 +420,51 @@ class Operation(GraphComponent, SignalSourceProvider):
@abstractmethod
def get_io_coordinates(
self,
) -> Tuple[List[List[float]], List[List[float]]]:
) -> Tuple[
Tuple[Tuple[float, float], ...], Tuple[Tuple[float, float], ...]
]:
"""
Return a tuple containing coordinates for inputs and outputs, respectively.
These maps to the polygons and are corresponding to a start time of 0
and height 1.
See also
========
get_input_coordinates
get_output_coordinates
"""
raise NotImplementedError
@abstractmethod
def get_input_coordinates(
self,
) -> Tuple[Tuple[float, float], ...]:
"""
Return coordinates for inputs.
These maps to the polygons and are corresponding to a start time of 0
and height 1.
See also
========
get_io_coordinates
get_output_coordinates
"""
raise NotImplementedError
@abstractmethod
def get_output_coordinates(
self,
) -> Tuple[Tuple[float, float], ...]:
"""
Return coordinates for outputs.
These maps to the polygons and are corresponding to a start time of 0
and height 1.
See also
========
get_input_coordinates
get_io_coordinates
"""
raise NotImplementedError
@@ -915,7 +957,7 @@ class AbstractOperation(Operation, AbstractGraphComponent):
return self.output(0)
@property
def destination(self) -> OutputPort:
def destination(self) -> InputPort:
if self.input_count != 1:
diff = "more" if self.input_count > 1 else "less"
raise TypeError(
@@ -1044,25 +1086,29 @@ class AbstractOperation(Operation, AbstractGraphComponent):
def get_plot_coordinates(
self,
) -> Tuple[List[List[float]], List[List[float]]]:
) -> Tuple[
Tuple[Tuple[float, float], ...], Tuple[Tuple[float, float], ...]
]:
# Doc-string inherited
return (
self._get_plot_coordinates_for_latency(),
self._get_plot_coordinates_for_execution_time(),
)
def _get_plot_coordinates_for_execution_time(self) -> List[List[float]]:
def _get_plot_coordinates_for_execution_time(
self,
) -> Tuple[Tuple[float, float], ...]:
# Always a rectangle, but easier if coordinates are returned
execution_time = self._execution_time # Copy for type checking
if execution_time is None:
return []
return [
[0, 0],
[0, 1],
[execution_time, 1],
[execution_time, 0],
[0, 0],
]
return tuple()
return (
(0, 0),
(0, 1),
(execution_time, 1),
(execution_time, 0),
(0, 0),
)
def _check_all_latencies_set(self):
if any(val is None for val in self.latency_offsets.values()):
@@ -1070,47 +1116,58 @@ class AbstractOperation(Operation, AbstractGraphComponent):
f"All latencies must be set: {self.latency_offsets}"
)
def _get_plot_coordinates_for_latency(self) -> List[List[float]]:
def _get_plot_coordinates_for_latency(
self,
) -> Tuple[Tuple[float, float], ...]:
self._check_all_latencies_set()
# Points for latency polygon
latency = []
# Remember starting point
start_point = [self.inputs[0].latency_offset, 0.0]
start_point = (self.inputs[0].latency_offset, 0.0)
num_in = self.input_count
latency.append(start_point)
for k in range(1, num_in):
latency.append([self.inputs[k - 1].latency_offset, k / num_in])
latency.append([self.inputs[k].latency_offset, k / num_in])
latency.append([self.inputs[num_in - 1].latency_offset, 1])
latency.append((self.inputs[k - 1].latency_offset, k / num_in))
latency.append((self.inputs[k].latency_offset, k / num_in))
latency.append((self.inputs[num_in - 1].latency_offset, 1))
num_out = self.output_count
latency.append([self.outputs[num_out - 1].latency_offset, 1])
latency.append((self.outputs[num_out - 1].latency_offset, 1))
for k in reversed(range(1, num_out)):
latency.append([self.outputs[k].latency_offset, k / num_out])
latency.append([self.outputs[k - 1].latency_offset, k / num_out])
latency.append([self.outputs[0].latency_offset, 0.0])
latency.append((self.outputs[k].latency_offset, k / num_out))
latency.append((self.outputs[k - 1].latency_offset, k / num_out))
latency.append((self.outputs[0].latency_offset, 0.0))
# Close the polygon
latency.append(start_point)
return latency
return tuple(latency)
def get_io_coordinates(
self,
) -> Tuple[List[List[float]], List[List[float]]]:
# Doc-string inherited
def get_input_coordinates(self) -> Tuple[Tuple[float, float], ...]:
# doc-string inherited
self._check_all_latencies_set()
input_coordinates = [
[
return tuple(
(
self.inputs[k].latency_offset,
(1 + 2 * k) / (2 * len(self.inputs)),
]
)
for k in range(len(self.inputs))
]
output_coordinates = [
[
)
def get_output_coordinates(self) -> Tuple[Tuple[float, float], ...]:
# doc-string inherited
self._check_all_latencies_set()
return tuple(
(
self.outputs[k].latency_offset,
(1 + 2 * k) / (2 * len(self.outputs)),
]
)
for k in range(len(self.outputs))
]
return input_coordinates, output_coordinates
)
def get_io_coordinates(
self,
) -> Tuple[
Tuple[Tuple[float, float], ...], Tuple[Tuple[float, float], ...]
]:
# Doc-string inherited
return self.get_input_coordinates(), self.get_output_coordinates()
Loading