Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • da/B-ASIC
  • lukja239/B-ASIC
  • robal695/B-ASIC
3 results
Show changes
Commits on Source (2)
Showing
with 418 additions and 397 deletions
......@@ -382,7 +382,7 @@ class SFGMainWindow(QMainWindow):
self.update()
def _create_recent_file_actions_and_menus(self):
for i in range(self._max_recent_files):
for _ in range(self._max_recent_files):
recent_file_action = QAction(self._ui.recent_sfg)
recent_file_action.setVisible(False)
recent_file_action.triggered.connect(
......@@ -510,24 +510,24 @@ class SFGMainWindow(QMainWindow):
and hasattr(source_operation2, "value")
and hasattr(dest_operation, "value")
and hasattr(dest_operation2, "value")
):
if not (
and not (
source_operation.value == source_operation2.value
and dest_operation.value == dest_operation2.value
):
return False
)
):
return False
if (
hasattr(source_operation, "name")
and hasattr(source_operation2, "name")
and hasattr(dest_operation, "name")
and hasattr(dest_operation2, "name")
):
if not (
and not (
source_operation.name == source_operation2.name
and dest_operation.name == dest_operation2.name
):
return False
)
):
return False
try:
signal_source_index = [
......@@ -744,9 +744,8 @@ class SFGMainWindow(QMainWindow):
operation_label.moveBy(10, -20)
attr_button.add_label(operation_label)
if isinstance(is_flipped, bool):
if is_flipped:
attr_button._flip()
if isinstance(is_flipped, bool) and is_flipped:
attr_button._flip()
self._drag_buttons[op] = attr_button
self._drag_operation_scenes[attr_button] = attr_button_scene
......
......@@ -558,6 +558,8 @@ class Memory(Resource):
if self._memory_type == "RAM":
plural_s = 's' if len(self._assignment) >= 2 else ''
return f": (RAM, {len(self._assignment)} cell{plural_s})"
else:
pass
return ""
def assign(self, heuristic: str = "left_edge") -> None:
......@@ -1036,7 +1038,7 @@ of :class:`~b_asic.architecture.ProcessingElement`
fontname='Times New Roman',
)
else:
for i, mem in enumerate(self._memories):
for mem in self._memories:
dg.node(
mem.entity_name,
mem._struct_def(),
......@@ -1044,7 +1046,7 @@ of :class:`~b_asic.architecture.ProcessingElement`
fillcolor=memory_color,
fontname='Times New Roman',
)
for i, pe in enumerate(self._processing_elements):
for pe in self._processing_elements:
dg.node(
pe.entity_name, pe._struct_def(), style='filled', fillcolor=pe_color
)
......
......@@ -57,7 +57,7 @@ def memory_based_storage(
read_ports: set[Port] = {
read_port for mv in collection for read_port in mv.read_ports
} # type: ignore
for idx, read_port in enumerate(read_ports):
for read_port in read_ports:
port_name = read_port if isinstance(read_port, int) else read_port.name
port_name = 'p_' + str(port_name) + '_in'
f.write(f'{2*VHDL_TAB}{port_name} : in std_logic_vector(WL-1 downto 0);\n')
......
......@@ -52,8 +52,8 @@ class AboutWindow(QDialog):
" Toolbox__\n\n*Construct, simulate and analyze signal processing"
" algorithms aimed at implementation on an ASIC or"
" FPGA.*\n\nB-ASIC is developed by the <a"
" href=\"https://liu.se/en/organisation/liu/isy/da\">Division of"
" Computer Engineering</a> at <a"
" href=\"https://liu.se/en/organisation/liu/isy/elda\">Division of"
" Electronics and Computer Engineering</a> at <a"
" href=\"https://liu.se/?l=en\">Linköping University</a>,"
" Sweden.\n\nB-ASIC is released under the <a"
" href=\"https://gitlab.liu.se/da/B-ASIC/-/blob/master/LICENSE\">"
......
......@@ -1046,9 +1046,7 @@ class AbstractOperation(Operation, AbstractGraphComponent):
@property
def is_linear(self) -> bool:
# doc-string inherited
if self.is_constant:
return True
return False
return self.is_constant
@property
def is_constant(self) -> bool:
......
......@@ -359,7 +359,7 @@ class InputPort(AbstractPort):
tmp_signal = self.signals[0]
tmp_signal.remove_destination()
current = self
for i in range(number):
for _ in range(number):
d = Delay()
current.connect(d)
current = d.input(0)
......
......@@ -319,7 +319,7 @@ class MemoryVariable(MemoryProcess):
return self._write_port
def __repr__(self) -> str:
reads = {k: v for k, v in zip(self._read_ports, self._life_times)}
reads = {k: v for k, v in zip(self._read_ports, self._life_times, strict=True)}
return (
f"MemoryVariable({self.start_time}, {self.write_port},"
f" {reads!r}, {self.name!r})"
......@@ -413,7 +413,7 @@ class PlainMemoryVariable(MemoryProcess):
return self._write_port
def __repr__(self) -> str:
reads = {k: v for k, v in zip(self._read_ports, self._life_times)}
reads = {k: v for k, v in zip(self._read_ports, self._life_times, strict=True)}
return (
f"PlainMemoryVariable({self.start_time}, {self.write_port},"
f" {reads!r}, {self.name!r})"
......
......@@ -124,10 +124,7 @@ def quantize(
elif quantization is Quantization.ROUNDING:
v = math.floor(v + 0.5)
elif quantization is Quantization.MAGNITUDE_TRUNCATION:
if v >= 0:
v = math.floor(v)
else:
v = math.ceil(v)
v = math.floor(v) if v >= 0 else math.ceil(v)
elif quantization is Quantization.JAMMING:
v = math.floor(v) | 1
elif quantization is Quantization.UNBIASED_ROUNDING:
......
......@@ -20,10 +20,10 @@ def _insert_delays(
maxdiff = min(outputorder[i][0] - inputorder[i][0] for i in range(size))
outputorder = [(o[0] - maxdiff + min_lifetime, o[1]) for o in outputorder]
maxdelay = max(outputorder[i][0] - inputorder[i][0] for i in range(size))
if cyclic:
if maxdelay >= time:
inputorder = inputorder + [(i[0] + time, i[1]) for i in inputorder]
outputorder = outputorder + [(o[0] + time, o[1]) for o in outputorder]
if cyclic and maxdelay >= time:
inputorder = inputorder + [(i[0] + time, i[1]) for i in inputorder]
outputorder = outputorder + [(o[0] + time, o[1]) for o in outputorder]
return inputorder, outputorder
......
......@@ -1340,29 +1340,18 @@ class ProcessCollection:
raise ValueError(f'{mv!r} is not part of {self!r}.')
# Make sure that concurrent reads/writes do not surpass the port setting
for mv in self:
def filter_write(p):
return p.start_time == mv.start_time
def filter_read(p):
return (
(p.start_time + p.execution_time) % self._schedule_time
== mv.start_time + mv.execution_time % self._schedule_time
)
needed_write_ports = len(list(filter(filter_write, self)))
needed_read_ports = len(list(filter(filter_read, self)))
if needed_write_ports > write_ports + 1:
raise ValueError(
f'More than {write_ports} write ports needed ({needed_write_ports})'
' to generate HDL for this ProcessCollection'
)
if needed_read_ports > read_ports + 1:
raise ValueError(
f'More than {read_ports} read ports needed ({needed_read_ports}) to'
' generate HDL for this ProcessCollection'
)
needed_write_ports = self.read_ports_bound()
needed_read_ports = self.write_ports_bound()
if needed_write_ports > write_ports + 1:
raise ValueError(
f'More than {write_ports} write ports needed ({needed_write_ports})'
' to generate HDL for this ProcessCollection'
)
if needed_read_ports > read_ports + 1:
raise ValueError(
f'More than {read_ports} read ports needed ({needed_read_ports}) to'
' generate HDL for this ProcessCollection'
)
# Sanitize the address logic pipeline settings
if adr_mux_size is not None and adr_pipe_depth is not None:
......@@ -1648,11 +1637,11 @@ class ProcessCollection:
axes : list of three :class:`matplotlib.axes.Axes`
Three Axes to plot in.
"""
axes[0].bar(*zip(*self.read_port_accesses().items()))
axes[0].bar(*zip(*self.read_port_accesses().items(), strict=True))
axes[0].set_title("Read port accesses")
axes[1].bar(*zip(*self.write_port_accesses().items()))
axes[1].bar(*zip(*self.write_port_accesses().items(), strict=True))
axes[1].set_title("Write port accesses")
axes[2].bar(*zip(*self.total_port_accesses().items()))
axes[2].bar(*zip(*self.total_port_accesses().items(), strict=True))
axes[2].set_title("Total port accesses")
for ax in axes:
ax.xaxis.set_major_locator(MaxNLocator(integer=True, min_n_ticks=1))
......
......@@ -462,7 +462,7 @@ class Schedule:
# if updating the scheduling time -> update laps due to operations
# reading and writing in different iterations (across the edge)
if self._schedule_time is not None:
for signal_id in self._laps.keys():
for signal_id in self._laps:
port = self._sfg.find_by_id(signal_id).destination
source_port = port.signals[0].source
......@@ -701,7 +701,7 @@ class Schedule:
else:
offset += 1
for gid, y_location in self._y_locations.items():
for gid in self._y_locations:
self._y_locations[gid] = remapping[self._y_locations[gid]]
def get_y_location(self, graph_id: GraphID) -> int:
......@@ -992,7 +992,7 @@ class Schedule:
destination_laps.append((port.operation.graph_id, port.index, lap))
for op, port, lap in destination_laps:
for delays in range(lap):
for _ in range(lap):
new_sfg = new_sfg.insert_operation_before(op, Delay(), port)
return new_sfg()
......@@ -1234,7 +1234,7 @@ class Schedule:
latency_coordinates,
execution_time_coordinates,
) = operation.get_plot_coordinates()
_x, _y = zip(*latency_coordinates)
_x, _y = zip(*latency_coordinates, strict=True)
x = np.array(_x)
y = np.array(_y)
xvalues = x + op_start_time
......@@ -1258,7 +1258,7 @@ class Schedule:
size=10 - (0.05 * len(self._start_times)),
)
if execution_time_coordinates:
_x, _y = zip(*execution_time_coordinates)
_x, _y = zip(*execution_time_coordinates, strict=True)
x = np.array(_x)
y = np.array(_y)
xvalues = x + op_start_time
......
......@@ -164,7 +164,7 @@ class ALAPScheduler(Scheduler):
# adjust the scheduling time if empty time slots have appeared in the start
slack = min(schedule.start_times.values())
for op_id in schedule.start_times.keys():
for op_id in schedule.start_times:
schedule.move_operation(op_id, -slack)
schedule.set_schedule_time(schedule._schedule_time - slack)
......@@ -349,7 +349,7 @@ class ListScheduler(Scheduler):
def _calculate_fan_outs(self) -> dict["GraphID", int]:
return {
op_id: len(self._sfg.find_by_id(op_id).output_signals)
for op_id in self._alap_start_times.keys()
for op_id in self._alap_start_times
}
def _calculate_memory_reads(
......@@ -379,13 +379,13 @@ class ListScheduler(Scheduler):
if other_op_id != op._graph_id:
if self._schedule._schedule_time is not None:
start_time = start_time % self._schedule._schedule_time
if time >= start_time:
if time < start_time + max(
self._cached_execution_times[other_op_id], 1
):
if isinstance(self._sfg.find_by_id(other_op_id), type(op)):
count += 1
if (
time >= start_time
and time
< start_time + max(self._cached_execution_times[other_op_id], 1)
and isinstance(self._sfg.find_by_id(other_op_id), type(op))
):
count += 1
return count
def _op_satisfies_resource_constraints(self, op: "Operation") -> bool:
......@@ -446,7 +446,7 @@ class ListScheduler(Scheduler):
tmp_used_reads = {}
for i, op_input in enumerate(op.inputs):
source_op = op_input.signals[0].source.operation
if isinstance(source_op, Delay) or isinstance(source_op, DontCare):
if isinstance(source_op, (Delay, DontCare)):
continue
if (
self._schedule.start_times[source_op.graph_id]
......@@ -477,7 +477,7 @@ class ListScheduler(Scheduler):
source_port = op_input.signals[0].source
source_op = source_port.operation
if isinstance(source_op, Delay) or isinstance(source_op, DontCare):
if isinstance(source_op, (Delay, DontCare)):
continue
if source_op.graph_id in self._remaining_ops:
......@@ -519,7 +519,7 @@ class ListScheduler(Scheduler):
self._schedule = schedule
self._sfg = schedule._sfg
for resource_type in self._max_resources.keys():
for resource_type in self._max_resources:
if not self._sfg.find_by_type_name(resource_type):
raise ValueError(
f"Provided max resource of type {resource_type} cannot be found in the provided SFG."
......@@ -528,7 +528,7 @@ class ListScheduler(Scheduler):
differing_elems = [
resource
for resource in self._sfg.get_used_type_names()
if resource not in self._max_resources.keys()
if resource not in self._max_resources
and resource != Delay.type_name()
and resource != DontCare.type_name()
and resource != Sink.type_name()
......@@ -536,13 +536,13 @@ class ListScheduler(Scheduler):
for type_name in differing_elems:
self._max_resources[type_name] = 1
for key in self._input_times.keys():
for key in self._input_times:
if self._sfg.find_by_id(key) is None:
raise ValueError(
f"Provided input time with GraphID {key} cannot be found in the provided SFG."
)
for key in self._output_delta_times.keys():
for key in self._output_delta_times:
if self._sfg.find_by_id(key) is None:
raise ValueError(
f"Provided output delta time with GraphID {key} cannot be found in the provided SFG."
......@@ -574,16 +574,19 @@ class ListScheduler(Scheduler):
self._alap_op_laps = alap_scheduler.op_laps
self._alap_schedule_time = alap_schedule._schedule_time
self._schedule.start_times = {}
for key in self._schedule._laps.keys():
for key in self._schedule._laps:
self._schedule._laps[key] = 0
if not self._schedule._cyclic and self._schedule._schedule_time:
if alap_schedule._schedule_time > self._schedule._schedule_time:
raise ValueError(
f"Provided scheduling time {schedule._schedule_time} cannot be reached, "
"try to enable the cyclic property or increase the time to at least "
f"{alap_schedule._schedule_time}."
)
if (
not self._schedule._cyclic
and self._schedule.schedule_time
and alap_schedule.schedule_time > self._schedule.schedule_time
):
raise ValueError(
f"Provided scheduling time {schedule.schedule_time} cannot be reached, "
"try to enable the cyclic property or increase the time to at least "
f"{alap_schedule.schedule_time}."
)
self._remaining_resources = self._max_resources.copy()
......@@ -753,13 +756,13 @@ class ListScheduler(Scheduler):
if (
not self._schedule._cyclic
and self._schedule._schedule_time is not None
and new_time > self._schedule._schedule_time
):
if new_time > self._schedule._schedule_time:
raise ValueError(
f"Cannot place output {output.graph_id} at time {new_time} "
f"for scheduling time {self._schedule._schedule_time}. "
"Try to relax the scheduling time, change the output delta times or enable cyclic."
)
raise ValueError(
f"Cannot place output {output.graph_id} at time {new_time} "
f"for scheduling time {self._schedule._schedule_time}. "
"Try to relax the scheduling time, change the output delta times or enable cyclic."
)
self._logger.debug(
f" {output.graph_id} moved {min_slack} time steps backwards to new time {new_time}"
)
......@@ -818,20 +821,19 @@ class RecursiveListScheduler(ListScheduler):
def _get_recursive_ops(self, loops: list[list["GraphID"]]) -> list["GraphID"]:
recursive_ops = []
seen = []
for loop in loops:
for op_id in loop:
if op_id not in seen:
if not isinstance(self._sfg.find_by_id(op_id), Delay):
recursive_ops.append(op_id)
seen.append(op_id)
if op_id not in recursive_ops and not isinstance(
self._sfg.find_by_id(op_id), Delay
):
recursive_ops.append(op_id)
return recursive_ops
def _recursive_op_satisfies_data_dependencies(self, op: "Operation") -> bool:
for input_port_index, op_input in enumerate(op.inputs):
for op_input in op.inputs:
source_port = source_op = op_input.signals[0].source
source_op = source_port.operation
if isinstance(source_op, Delay) or isinstance(source_op, DontCare):
if isinstance(source_op, (Delay, DontCare)):
continue
if (
source_op.graph_id in self._recursive_ops
......@@ -937,7 +939,7 @@ class RecursiveListScheduler(ListScheduler):
for op_input in op.inputs:
source_port = op_input.signals[0].source
source_op = source_port.operation
if isinstance(source_op, Delay) or isinstance(source_op, DontCare):
if isinstance(source_op, (Delay, DontCare)):
continue
if source_op.graph_id in self._remaining_ops:
return False
......
......@@ -36,23 +36,23 @@ class ColorDataType:
self.name = name
Latency_Color = ColorDataType(
LATENCY_COLOR_TYPE = ColorDataType(
current_color=OPERATION_LATENCY_INACTIVE,
DEFAULT=OPERATION_LATENCY_INACTIVE,
name='Latency Color',
)
Execution_Time_Color = ColorDataType(
EXECUTION_TIME_COLOR_TYPE = ColorDataType(
current_color=OPERATION_EXECUTION_TIME_ACTIVE,
DEFAULT=OPERATION_EXECUTION_TIME_ACTIVE,
name='Execution Time Color',
)
Signal_Warning_Color = ColorDataType(
SIGNAL_WARNING_COLOR_TYPE = ColorDataType(
current_color=SIGNAL_WARNING, DEFAULT=SIGNAL_WARNING, name='Warning Color'
)
Signal_Color = ColorDataType(
SIGNAL_COLOR_TYPE = ColorDataType(
current_color=SIGNAL_INACTIVE, DEFAULT=SIGNAL_INACTIVE, name='Signal Color'
)
Active_Color = ColorDataType(
ACTIVE_COLOR_TYPE = ColorDataType(
current_color=SIGNAL_ACTIVE, DEFAULT=SIGNAL_ACTIVE, name='Active Color'
)
......@@ -79,6 +79,6 @@ class FontDataType:
self.changed = changed
Font = FontDataType(
FONT = FontDataType(
current_font=DEFAULT_FONT, DEFAULT=DEFAULT_FONT, DEFAULT_COLOR=DEFAULT_FONT_COLOR
)
......@@ -177,11 +177,7 @@ def compile_ui(*filenames: str) -> None:
)
os_ = sys.platform
if os_.startswith("linux"): # Linux
cmd = f"{uic_} {arguments}"
subprocess.call(cmd.split())
elif os_.startswith("win32"): # Windows
if os_.startswith("linux") or os_.startswith("win32"):
cmd = f"{uic_} {arguments}"
subprocess.call(cmd.split())
......@@ -190,7 +186,7 @@ def compile_ui(*filenames: str) -> None:
log.error("macOS UI compiler not implemented")
raise NotImplementedError
else: # other OS
else:
log.error(f"{os_} UI compiler not supported")
raise NotImplementedError
......
This diff is collapsed.
......@@ -26,12 +26,12 @@ from b_asic.graph_component import GraphID
from b_asic.gui_utils.icons import get_icon
from b_asic.operation import Operation
from b_asic.scheduler_gui._preferences import (
ACTIVE_COLOR_TYPE,
EXECUTION_TIME_COLOR_TYPE,
LATENCY_COLOR_TYPE,
OPERATION_HEIGHT,
Active_Color,
Execution_Time_Color,
Latency_Color,
Signal_Color,
Signal_Warning_Color,
SIGNAL_COLOR_TYPE,
SIGNAL_WARNING_COLOR_TYPE,
)
if TYPE_CHECKING:
......@@ -63,7 +63,7 @@ class OperationItem(QGraphicsItemGroup):
_label_item: QGraphicsSimpleTextItem
_port_items: list[QGraphicsEllipseItem]
_port_number_items: list[QGraphicsSimpleTextItem]
_inactive_color: QColor = Latency_Color.DEFAULT
_inactive_color: QColor = LATENCY_COLOR_TYPE.DEFAULT
def __init__(
self,
......@@ -97,30 +97,32 @@ class OperationItem(QGraphicsItemGroup):
QCursor(Qt.CursorShape.OpenHandCursor)
) # default cursor when hovering over object
if Signal_Color.changed:
self._port_filling_brush = QBrush(Signal_Color.current_color)
self._port_outline_pen = QPen(Signal_Color.current_color)
if SIGNAL_COLOR_TYPE.changed:
self._port_filling_brush = QBrush(SIGNAL_COLOR_TYPE.current_color)
self._port_outline_pen = QPen(SIGNAL_COLOR_TYPE.current_color)
else:
self._port_filling_brush = QBrush(Signal_Color.DEFAULT)
self._port_outline_pen = QPen(Signal_Color.DEFAULT)
self._port_filling_brush = QBrush(SIGNAL_COLOR_TYPE.DEFAULT)
self._port_outline_pen = QPen(SIGNAL_COLOR_TYPE.DEFAULT)
self._port_outline_pen.setWidthF(0)
if Active_Color.changed:
self._port_filling_brush_active = QBrush(Active_Color.current_color)
self._port_outline_pen_active = QPen(Active_Color.current_color)
if ACTIVE_COLOR_TYPE.changed:
self._port_filling_brush_active = QBrush(ACTIVE_COLOR_TYPE.current_color)
self._port_outline_pen_active = QPen(ACTIVE_COLOR_TYPE.current_color)
else:
self._port_filling_brush_active = QBrush(Active_Color.DEFAULT)
self._port_outline_pen_active = QPen(Active_Color.DEFAULT)
self._port_filling_brush_active = QBrush(ACTIVE_COLOR_TYPE.DEFAULT)
self._port_outline_pen_active = QPen(ACTIVE_COLOR_TYPE.DEFAULT)
self._port_outline_pen_active.setWidthF(0)
if Signal_Warning_Color.changed:
if SIGNAL_WARNING_COLOR_TYPE.changed:
self._port_filling_brush_warning = QBrush(
Signal_Warning_Color.current_color
SIGNAL_WARNING_COLOR_TYPE.current_color
)
self._port_outline_pen_warning = QPen(
SIGNAL_WARNING_COLOR_TYPE.current_color
)
self._port_outline_pen_warning = QPen(Signal_Warning_Color.current_color)
else:
self._port_filling_brush_warning = QBrush(Signal_Warning_Color.DEFAULT)
self._port_outline_pen_warning = QPen(Signal_Warning_Color.DEFAULT)
self._port_filling_brush_warning = QBrush(SIGNAL_WARNING_COLOR_TYPE.DEFAULT)
self._port_outline_pen_warning = QPen(SIGNAL_WARNING_COLOR_TYPE.DEFAULT)
self._port_outline_pen_warning.setWidthF(0)
self._make_component()
......@@ -198,18 +200,18 @@ class OperationItem(QGraphicsItemGroup):
def set_active(self) -> None:
"""Set the item as active, i.e., draw it in special colors."""
if Active_Color.changed:
self._set_background(Active_Color.current_color)
if ACTIVE_COLOR_TYPE.changed:
self._set_background(ACTIVE_COLOR_TYPE.current_color)
else:
self._set_background(Active_Color.DEFAULT)
self._set_background(ACTIVE_COLOR_TYPE.DEFAULT)
self.setCursor(QCursor(Qt.CursorShape.ClosedHandCursor))
def set_inactive(self) -> None:
"""Set the item as inactive, i.e., draw it in standard colors."""
if Latency_Color.changed:
if LATENCY_COLOR_TYPE.changed:
self._set_background(self._inactive_color)
else:
self._set_background(Latency_Color.DEFAULT)
self._set_background(LATENCY_COLOR_TYPE.DEFAULT)
self.setCursor(QCursor(Qt.CursorShape.OpenHandCursor))
def Set_font(self, font: QFont) -> None:
......@@ -231,12 +233,12 @@ class OperationItem(QGraphicsItemGroup):
def set_port_active(self, key: str):
item = self._ports[key]["item"]
if Active_Color.changed:
self._port_filling_brush_active = QBrush(Active_Color.current_color)
self._port_outline_pen_active = QPen(Active_Color.current_color)
if ACTIVE_COLOR_TYPE.changed:
self._port_filling_brush_active = QBrush(ACTIVE_COLOR_TYPE.current_color)
self._port_outline_pen_active = QPen(ACTIVE_COLOR_TYPE.current_color)
else:
self._port_filling_brush_active = QBrush(Active_Color.DEFAULT)
self._port_outline_pen_active = QPen(Active_Color.DEFAULT)
self._port_filling_brush_active = QBrush(ACTIVE_COLOR_TYPE.DEFAULT)
self._port_outline_pen_active = QPen(ACTIVE_COLOR_TYPE.DEFAULT)
self._port_outline_pen_active.setWidthF(0)
item.setBrush(self._port_filling_brush_active)
......@@ -267,10 +269,10 @@ class OperationItem(QGraphicsItemGroup):
port_size = 7 / self._scale # the diameter of a port
if Execution_Time_Color.changed:
execution_time_color = QColor(Execution_Time_Color.current_color)
if EXECUTION_TIME_COLOR_TYPE.changed:
execution_time_color = QColor(EXECUTION_TIME_COLOR_TYPE.current_color)
else:
execution_time_color = QColor(Execution_Time_Color.DEFAULT)
execution_time_color = QColor(EXECUTION_TIME_COLOR_TYPE.DEFAULT)
execution_time_color.setAlpha(200) # 0-255
execution_time_pen = QPen() # used by execution time outline
execution_time_pen.setColor(execution_time_color)
......@@ -298,7 +300,7 @@ class OperationItem(QGraphicsItemGroup):
self._execution_time_item.setPen(execution_time_pen)
# component item
self._set_background(Latency_Color.DEFAULT) # used by component filling
self._set_background(LATENCY_COLOR_TYPE.DEFAULT) # used by component filling
def create_ports(io_coordinates, prefix):
for i, (x, y) in enumerate(io_coordinates):
......
......@@ -210,14 +210,16 @@ class SchedulerEvent:
if pos_x < 0:
pos_x += self._schedule.schedule_time
redraw = True
if pos_x > self._schedule.schedule_time:
if (
pos_x > self._schedule.schedule_time
# If zero execution time, keep operation at the edge
if (
and (
pos_x > self._schedule.schedule_time + 1
or item.operation.execution_time
):
pos_x = pos_x % self._schedule.schedule_time
redraw = True
)
):
pos_x = pos_x % self._schedule.schedule_time
redraw = True
pos_y = self._schedule.get_y_location(item.operation.graph_id)
# Check move in y-direction
if pos_y != self._old_op_position:
......
......@@ -149,10 +149,7 @@ class SchedulerItem(SchedulerEvent, QGraphicsItemGroup):
def _color_change(self, color: QColor, name: str) -> None:
"""Change inactive color of operation item *."""
for op in self.components:
if name == "all operations":
op._set_background(color)
op._inactive_color = color
elif name == op.operation.type_name():
if name in ("all operations", op.operation.type_name()):
op._set_background(color)
op._inactive_color = color
......@@ -327,7 +324,7 @@ class SchedulerItem(SchedulerEvent, QGraphicsItemGroup):
def _redraw_from_start(self) -> None:
self.schedule.reset_y_locations()
self.schedule.sort_y_locations_on_start_times()
for graph_id in self.schedule.start_times.keys():
for graph_id in self.schedule.start_times:
self._set_position(graph_id)
self._redraw_all_lines()
self._update_axes()
......@@ -355,7 +352,7 @@ class SchedulerItem(SchedulerEvent, QGraphicsItemGroup):
def _make_graph(self) -> None:
"""Make a new graph out of the stored attributes."""
# build components
for graph_id in self.schedule.start_times.keys():
for graph_id in self.schedule.start_times:
operation = cast(Operation, self.schedule._sfg.find_by_id(graph_id))
component = OperationItem(operation, height=OPERATION_HEIGHT, parent=self)
self._operation_items[graph_id] = component
......
......@@ -13,13 +13,13 @@ from qtpy.QtWidgets import QGraphicsPathItem
# B-ASIC
from b_asic.scheduler_gui._preferences import (
ACTIVE_COLOR_TYPE,
SCHEDULE_INDENT,
SIGNAL_COLOR_TYPE,
SIGNAL_WARNING_COLOR_TYPE,
SIGNAL_WIDTH,
SIGNAL_WIDTH_ACTIVE,
SIGNAL_WIDTH_WARNING,
Active_Color,
Signal_Color,
Signal_Warning_Color,
)
from b_asic.scheduler_gui.operation_item import OperationItem
from b_asic.signal import Signal
......@@ -100,24 +100,24 @@ class SignalItem(QGraphicsPathItem):
def _refresh_pens(self) -> None:
"""Create pens."""
if Active_Color.changed:
pen = QPen(Active_Color.current_color)
if ACTIVE_COLOR_TYPE.changed:
pen = QPen(ACTIVE_COLOR_TYPE.current_color)
else:
pen = QPen(Active_Color.DEFAULT)
pen = QPen(ACTIVE_COLOR_TYPE.DEFAULT)
pen.setWidthF(SIGNAL_WIDTH_ACTIVE)
self._active_pen = pen
if Signal_Color.changed:
pen = QPen(Signal_Color.current_color)
if SIGNAL_COLOR_TYPE.changed:
pen = QPen(SIGNAL_COLOR_TYPE.current_color)
else:
pen = QPen(Signal_Color.DEFAULT)
pen = QPen(SIGNAL_COLOR_TYPE.DEFAULT)
pen.setWidthF(SIGNAL_WIDTH)
self._inactive_pen = pen
if Signal_Warning_Color.changed:
pen = QPen(Signal_Warning_Color.current_color)
if SIGNAL_WARNING_COLOR_TYPE.changed:
pen = QPen(SIGNAL_WARNING_COLOR_TYPE.current_color)
else:
pen = QPen(Signal_Warning_Color.DEFAULT)
pen = QPen(SIGNAL_WARNING_COLOR_TYPE.DEFAULT)
pen.setWidthF(SIGNAL_WIDTH_WARNING)
self._warning_pen = pen
......
......@@ -415,7 +415,7 @@ def radix_2_dif_fft(points: int) -> SFG:
raise ValueError("Points must be a power of two.")
inputs = []
for i in range(points):
for _ in range(points):
inputs.append(Input())
ports = inputs
......@@ -430,7 +430,7 @@ def radix_2_dif_fft(points: int) -> SFG:
ports = _get_bit_reversed_ports(ports)
outputs = []
for i, port in enumerate(ports):
for port in ports:
outputs.append(Output(port))
return SFG(inputs=inputs, outputs=outputs)
......