Skip to content
Snippets Groups Projects
Commit d16867dc authored by Mikael Henriksson's avatar Mikael Henriksson :runner:
Browse files

resources.py: fix maximum life-time left-edge algorithm bug (closes #250)

parent c17c055f
No related branches found
No related tags found
No related merge requests found
Pipeline #97103 passed
......@@ -1066,40 +1066,64 @@ class ProcessCollection:
Two or more processes can share a single resource if, and only if, they have no
overlaping execution time.
Raises :class:`ValueError` if any process in this collection has an execution
time which is greater than the collection schedule time.
Returns
-------
List[ProcessCollection]
"""
next_empty_cell = 0
cell_assignment: Dict[int, ProcessCollection] = dict()
assignment: List[ProcessCollection] = []
for next_process in sorted(self):
insert_to_new_cell = True
for cell in cell_assignment:
insert_to_this_cell = True
for process in cell_assignment[cell]:
next_process_stop_time = (
next_process.start_time + next_process.execution_time
) % self._schedule_time
if (
next_process.start_time
< process.start_time + process.execution_time
or next_process.start_time
> next_process_stop_time
> process.start_time
):
insert_to_this_cell = False
break
if insert_to_this_cell:
cell_assignment[cell].add_process(next_process)
insert_to_new_cell = False
break
if insert_to_new_cell:
cell_assignment[next_empty_cell] = ProcessCollection(
collection=[], schedule_time=self._schedule_time
if next_process.execution_time > self.schedule_time:
# Can not assign process to any cell
raise ValueError(
f"{next_process} has execution time greater than the schedule time"
)
cell_assignment[next_empty_cell].add_process(next_process)
next_empty_cell += 1
return [pc for pc in cell_assignment.values()]
elif next_process.execution_time == self.schedule_time:
# Always assign maximum lifetime process to new cell
assignment.append(
ProcessCollection(
(next_process,),
schedule_time=self.schedule_time,
cyclic=self._cyclic,
)
)
continue # Continue assigning next process
else:
next_process_stop_time = (
next_process.start_time + next_process.execution_time
) % self._schedule_time
insert_to_new_cell = True
for cell_assignment in assignment:
insert_to_this_cell = True
for process in cell_assignment:
# The next_process start_time is always greater than or equal to
# the start time of all other assigned processes
process_end_time = process.start_time + process.execution_time
if next_process.start_time < process_end_time:
insert_to_this_cell = False
break
if (
next_process.start_time
> next_process_stop_time
> process.start_time
):
insert_to_this_cell = False
break
if insert_to_this_cell:
cell_assignment.add_process(next_process)
insert_to_new_cell = False
break
if insert_to_new_cell:
assignment.append(
ProcessCollection(
(next_process,),
schedule_time=self.schedule_time,
cyclic=self._cyclic,
)
)
return assignment
def generate_memory_based_storage_vhdl(
self,
......
......@@ -210,3 +210,13 @@ class TestProcessCollectionPlainMemoryVariable:
assert exclusion_graph.degree(p1) == 3
assert exclusion_graph.degree(p2) == 1
assert exclusion_graph.degree(p3) == 3
def test_left_edge_maximum_lifetime(self):
a = PlainMemoryVariable(2, 0, {0: 1}, "cmul1.0")
b = PlainMemoryVariable(4, 0, {0: 7}, "cmul4.0")
c = PlainMemoryVariable(5, 0, {0: 4}, "cmul5.0")
collection = ProcessCollection([a, b, c], schedule_time=7, cyclic=True)
assignment = collection.split_on_execution_time(heuristic="left_edge")
assert a in assignment[0]
assert b in assignment[1]
assert c in assignment[0]
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment