Skip to content
Snippets Groups Projects

Refactored ListScheduler, made it non-abstract and added better checks to...

Merged Simon Bjurek requested to merge refactor-list-scheduler into master
2 unresolved threads
import sys
import numpy as np
import pytest
from b_asic.core_operations import (
@@ -21,6 +22,9 @@ from b_asic.sfg_generators import (
ldlt_matrix_inverse,
radix_2_dif_fft,
)
from b_asic.signal_flow_graph import SFG
from b_asic.signal_generator import Constant, Impulse
from b_asic.simulation import Simulation
from b_asic.special_operations import Input, Output
@@ -63,6 +67,7 @@ class TestEarliestDeadlineScheduler:
"out0": 13,
}
assert schedule.schedule_time == 13
_validate_recreated_sfg_filter(sfg, schedule)
def test_direct_form_2_iir_1_add_1_mul(self, sfg_direct_form_iir_lp_filter):
sfg_direct_form_iir_lp_filter.set_latency_of_type(
@@ -102,6 +107,7 @@ class TestEarliestDeadlineScheduler:
}
assert schedule.schedule_time == 13
_validate_recreated_sfg_filter(sfg_direct_form_iir_lp_filter, schedule)
def test_direct_form_2_iir_2_add_3_mul(self, sfg_direct_form_iir_lp_filter):
sfg_direct_form_iir_lp_filter.set_latency_of_type(
@@ -141,6 +147,7 @@ class TestEarliestDeadlineScheduler:
}
assert schedule.schedule_time == 12
_validate_recreated_sfg_filter(sfg_direct_form_iir_lp_filter, schedule)
def test_radix_2_fft_8_points(self):
sfg = radix_2_dif_fft(points=8)
@@ -196,6 +203,7 @@ class TestEarliestDeadlineScheduler:
"out3": 7,
}
assert schedule.schedule_time == 7
_validate_recreated_sfg_fft(schedule, 8)
class TestLeastSlackTimeScheduler:
@@ -237,6 +245,7 @@ class TestLeastSlackTimeScheduler:
"out0": 13,
}
assert schedule.schedule_time == 13
_validate_recreated_sfg_filter(sfg, schedule)
def test_direct_form_2_iir_1_add_1_mul(self, sfg_direct_form_iir_lp_filter):
sfg_direct_form_iir_lp_filter.set_latency_of_type(
@@ -276,6 +285,7 @@ class TestLeastSlackTimeScheduler:
}
assert schedule.schedule_time == 13
_validate_recreated_sfg_filter(sfg_direct_form_iir_lp_filter, schedule)
def test_direct_form_2_iir_2_add_3_mul(self, sfg_direct_form_iir_lp_filter):
sfg_direct_form_iir_lp_filter.set_latency_of_type(
@@ -315,6 +325,7 @@ class TestLeastSlackTimeScheduler:
}
assert schedule.schedule_time == 12
_validate_recreated_sfg_filter(sfg_direct_form_iir_lp_filter, schedule)
def test_radix_2_fft_8_points(self):
sfg = radix_2_dif_fft(points=8)
@@ -370,6 +381,7 @@ class TestLeastSlackTimeScheduler:
"out3": 7,
}
assert schedule.schedule_time == 7
_validate_recreated_sfg_fft(schedule, 8)
class TestMaxFanOutScheduler:
@@ -404,6 +416,7 @@ class TestMaxFanOutScheduler:
"out0": 15,
}
assert schedule.schedule_time == 15
_validate_recreated_sfg_filter(sfg, schedule)
def test_ldlt_inverse_3x3(self):
sfg = ldlt_matrix_inverse(N=3)
@@ -489,6 +502,7 @@ class TestHybridScheduler:
"out0": 13,
}
assert schedule.schedule_time == 13
_validate_recreated_sfg_filter(sfg, schedule)
def test_radix_2_fft_8_points(self):
sfg = radix_2_dif_fft(points=8)
@@ -542,6 +556,7 @@ class TestHybridScheduler:
"out3": 7,
}
assert schedule.schedule_time == 7
_validate_recreated_sfg_fft(schedule, 8)
def test_radix_2_fft_8_points_one_output(self):
sfg = radix_2_dif_fft(points=8)
@@ -595,7 +610,10 @@ class TestHybridScheduler:
"out5": 12,
}
assert schedule.schedule_time == 12
_validate_recreated_sfg_fft(schedule, 8)
# This schedule that this test is checking against is faulty and will yield a non-working
# fft implementation, however, it is kept commented out for reference
def test_radix_2_fft_8_points_specified_IO_times_cyclic(self):
sfg = radix_2_dif_fft(points=8)
@@ -676,6 +694,30 @@ class TestHybridScheduler:
}
assert schedule.schedule_time == 20
# impulse input -> constant output
sim = Simulation(schedule.sfg, [Impulse()] + [0 for i in range(7)])
sim.run_for(2)
assert np.allclose(sim.results["0"], [1, 0])
assert np.allclose(sim.results["1"], [1, 0])
assert np.allclose(sim.results["2"], [1, 0])
assert np.allclose(sim.results["3"], [1, 0])
assert np.allclose(sim.results["4"], [0, 1])
assert np.allclose(sim.results["5"], [0, 1])
assert np.allclose(sim.results["6"], [0, 1])
assert np.allclose(sim.results["7"], [0, 1])
# constant input -> impulse (with weight=points) output
sim = Simulation(schedule.sfg, [Impulse() for i in range(8)])
sim.run_for(2)
assert np.allclose(sim.results["0"], [8, 0])
assert np.allclose(sim.results["1"], [0, 0])
assert np.allclose(sim.results["2"], [0, 0])
assert np.allclose(sim.results["3"], [0, 0])
assert np.allclose(sim.results["4"], [0, 0])
assert np.allclose(sim.results["5"], [0, 0])
assert np.allclose(sim.results["6"], [0, 0])
assert np.allclose(sim.results["7"], [0, 0])
def test_radix_2_fft_8_points_specified_IO_times_non_cyclic(self):
sfg = radix_2_dif_fft(points=8)
@@ -749,6 +791,7 @@ class TestHybridScheduler:
"out7": 24,
}
assert schedule.schedule_time == 24
_validate_recreated_sfg_fft(schedule, 8)
def test_ldlt_inverse_2x2(self):
sfg = ldlt_matrix_inverse(N=2)
@@ -1140,7 +1183,7 @@ class TestHybridScheduler:
assert schedule.start_times[f"in{i}"] == i
assert schedule.start_times[f"out{i}"] == 95 + i
# Too slow for pipeline right now
# too slow for pipeline timeout
# def test_64_point_fft_custom_io_times(self):
# POINTS = 64
# sfg = radix_2_dif_fft(POINTS)
@@ -1166,7 +1209,7 @@ class TestHybridScheduler:
# assert schedule.start_times[f"in{i}"] == i
# assert (
# schedule.start_times[f"out{i}"]
# == schedule.get_max_non_io_end_time() + i
# == schedule.get_max_non_io_end_time() - 1 + i
# )
def test_32_point_fft_custom_io_times_cyclic(self):
@@ -1353,27 +1396,6 @@ class TestHybridScheduler:
}
assert schedule_4.schedule_time == 4
def test_cyclic_scheduling_time_not_provided(self):
sfg = ldlt_matrix_inverse(N=2)
sfg.set_latency_of_type(MADS.type_name(), 3)
sfg.set_latency_of_type(Reciprocal.type_name(), 2)
sfg.set_execution_time_of_type(MADS.type_name(), 1)
sfg.set_execution_time_of_type(Reciprocal.type_name(), 1)
resources = {MADS.type_name(): 1, Reciprocal.type_name(): 1}
with pytest.raises(
ValueError,
match="Scheduling time must be provided when cyclic = True.",
):
Schedule(
sfg,
scheduler=HybridScheduler(
max_resources=resources,
),
cyclic=True,
)
def test_resources_not_enough(self):
sfg = ldlt_matrix_inverse(N=3)
@@ -1873,10 +1895,61 @@ class TestHybridScheduler:
"s53": 0,
}
#
# schedule = Schedule(
# sfg,
# scheduler=HybridScheduler(max_concurrent_writes=2, max_concurrent_reads=2),
# schedule_time=30,
# cyclic=True,
# )
def _validate_recreated_sfg_filter(sfg: SFG, schedule: Schedule) -> None:
# compare the impulse response of the original sfg and recreated one
sim1 = Simulation(sfg, [Impulse()])
sim1.run_for(1000)
sim2 = Simulation(schedule.sfg, [Impulse()])
sim2.run_for(1000)
spectrum_1 = abs(np.fft.fft(sim1.results['0']))
spectrum_2 = abs(np.fft.fft(sim2.results['0']))
assert np.allclose(spectrum_1, spectrum_2)
def _validate_recreated_sfg_fft(schedule: Schedule, points: int) -> None:
# impulse input -> constant output
sim = Simulation(schedule.sfg, [Impulse()] + [0 for i in range(points - 1)])
sim.run_for(1)
for i in range(points):
assert np.allclose(sim.results[str(i)], 1)
# constant input -> impulse (with weight=points) output
sim = Simulation(schedule.sfg, [Impulse() for i in range(points)])
sim.run_for(1)
assert np.allclose(sim.results["0"], points)
for i in range(1, points):
assert np.allclose(sim.results[str(i)], 0)
# sine input -> compare with numpy fft
n = np.linspace(0, 2 * np.pi, points)
waveform = np.sin(n)
input_samples = [Constant(waveform[i]) for i in range(points)]
sim = Simulation(schedule.sfg, input_samples)
sim.run_for(1)
exp_res = abs(np.fft.fft(waveform))
res = sim.results
for i in range(points):
a = abs(res[str(i)])
b = exp_res[i]
assert np.isclose(a, b)
# multi-tone input -> compare with numpy fft
n = np.linspace(0, 2 * np.pi, points)
waveform = (
2 * np.sin(n)
+ 1.3 * np.sin(0.9 * n)
+ 0.9 * np.sin(0.6 * n)
+ 0.35 * np.sin(0.3 * n)
+ 2.4 * np.sin(0.1 * n)
)
input_samples = [Constant(waveform[i]) for i in range(points)]
sim = Simulation(schedule.sfg, input_samples)
sim.run_for(1)
exp_res = np.fft.fft(waveform)
res = sim.results
for i in range(points):
a = res[str(i)]
b = exp_res[i]
assert np.isclose(a, b)
Loading