Newer
Older
import sys
import numpy as np
from scipy import signal
from b_asic.core_operations import (
MADS,
Addition,
Butterfly,
ConstantMultiplication,
Reciprocal,
)
Simon Bjurek
committed
from b_asic.list_schedulers import (
HybridScheduler,
LeastSlackTimeScheduler,
MaxFanOutScheduler,
from b_asic.scheduler import ListScheduler, RecursiveListScheduler
from b_asic.sfg_generators import (
direct_form_1_iir,
direct_form_2_iir,
ldlt_matrix_inverse,
radix_2_dif_fft,
)
from b_asic.signal_flow_graph import SFG
from b_asic.signal_generator import Constant, Impulse
from b_asic.simulation import Simulation
from b_asic.special_operations import Delay, Input, Output
class TestEarliestDeadlineScheduler:
def test_empty_sfg(self, sfg_empty):
with pytest.raises(
ValueError, match="Empty signal flow graph cannot be scheduled."
):
Schedule(sfg_empty, scheduler=EarliestDeadlineScheduler())
sfg = direct_form_1_iir([1, 2, 3], [1, 2, 3])
sfg.set_latency_of_type_name(ConstantMultiplication.type_name(), 2)
sfg.set_execution_time_of_type_name(ConstantMultiplication.type_name(), 1)
sfg.set_latency_of_type_name(Addition.type_name(), 3)
sfg.set_execution_time_of_type_name(Addition.type_name(), 1)
resources = {
Addition.type_name(): 1,
ConstantMultiplication.type_name(): 1,
Input.type_name(): sys.maxsize,
Output.type_name(): sys.maxsize,
}
schedule = Schedule(
sfg, scheduler=EarliestDeadlineScheduler(max_resources=resources)
)
assert schedule.start_times == {
"in0": 0,
Simon Bjurek
committed
"cmul3": 0,
"cmul4": 1,
"cmul0": 2,
"add1": 3,
"cmul1": 3,
"cmul2": 4,
Simon Bjurek
committed
"add0": 6,
"add3": 7,
"add2": 10,
"out0": 13,
}
assert schedule.schedule_time == 13
_validate_recreated_sfg_filter(sfg, schedule)
def test_direct_form_2_iir_1_add_1_mul(self, sfg_direct_form_iir_lp_filter):
sfg_direct_form_iir_lp_filter.set_latency_of_type_name(
ConstantMultiplication.type_name(), 3
)
sfg_direct_form_iir_lp_filter.set_latency_of_type_name(Addition.type_name(), 2)
sfg_direct_form_iir_lp_filter.set_execution_time_of_type_name(
ConstantMultiplication.type_name(), 1
)
sfg_direct_form_iir_lp_filter.set_execution_time_of_type_name(
resources = {
Addition.type_name(): 1,
ConstantMultiplication.type_name(): 1,
Input.type_name(): sys.maxsize,
Output.type_name(): sys.maxsize,
}
schedule = Schedule(
sfg_direct_form_iir_lp_filter,
scheduler=EarliestDeadlineScheduler(resources),
"in0": 0,
Simon Bjurek
committed
"cmul3": 0,
"cmul4": 1,
"cmul1": 2,
"cmul2": 3,
"add1": 4,
"add0": 6,
"add3": 7,
"cmul0": 8,
"add2": 11,
"out0": 13,
}
assert schedule.schedule_time == 13
_validate_recreated_sfg_filter(sfg_direct_form_iir_lp_filter, schedule)
def test_direct_form_2_iir_2_add_3_mul(self, sfg_direct_form_iir_lp_filter):
sfg_direct_form_iir_lp_filter.set_latency_of_type_name(
ConstantMultiplication.type_name(), 3
)
sfg_direct_form_iir_lp_filter.set_latency_of_type_name(Addition.type_name(), 2)
sfg_direct_form_iir_lp_filter.set_execution_time_of_type_name(
ConstantMultiplication.type_name(), 1
)
sfg_direct_form_iir_lp_filter.set_execution_time_of_type_name(
resources = {
Addition.type_name(): 2,
ConstantMultiplication.type_name(): 3,
Input.type_name(): sys.maxsize,
Output.type_name(): sys.maxsize,
}
schedule = Schedule(
sfg_direct_form_iir_lp_filter,
scheduler=EarliestDeadlineScheduler(resources),
"in0": 0,
"cmul1": 0,
"cmul4": 0,
"cmul3": 0,
"cmul2": 1,
"add1": 3,
"add3": 4,
"add0": 5,
"cmul0": 7,
"add2": 10,
"out0": 12,
}
assert schedule.schedule_time == 12
_validate_recreated_sfg_filter(sfg_direct_form_iir_lp_filter, schedule)
def test_radix_2_fft_8_points(self):
sfg = radix_2_dif_fft(points=8)
sfg.set_latency_of_type_name(ConstantMultiplication.type_name(), 2)
sfg.set_execution_time_of_type_name(ConstantMultiplication.type_name(), 1)
sfg.set_latency_of_type_name(Butterfly.type_name(), 1)
sfg.set_execution_time_of_type_name(Butterfly.type_name(), 1)
resources = {
Butterfly.type_name(): 2,
ConstantMultiplication.type_name(): 2,
Input.type_name(): sys.maxsize,
Output.type_name(): sys.maxsize,
}
schedule = Schedule(
sfg, scheduler=EarliestDeadlineScheduler(max_resources=resources)
)
assert schedule.start_times == {
"in0": 0,
"in2": 0,
"in4": 0,
"in6": 0,
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
"in7": 0,
"bfly6": 0,
"bfly8": 0,
"cmul2": 1,
"cmul3": 1,
"bfly11": 1,
"bfly7": 1,
"cmul0": 2,
"bfly0": 2,
"cmul4": 2,
"bfly5": 3,
"bfly1": 3,
"cmul1": 4,
"bfly2": 4,
"bfly9": 4,
"bfly10": 5,
"bfly3": 5,
"out0": 5,
"out4": 5,
"bfly4": 6,
"out1": 6,
"out2": 6,
"out5": 6,
"out6": 6,
"out7": 7,
"out3": 7,
}
assert schedule.schedule_time == 7
_validate_recreated_sfg_fft(schedule, 8)
class TestLeastSlackTimeScheduler:
def test_empty_sfg(self, sfg_empty):
with pytest.raises(
ValueError, match="Empty signal flow graph cannot be scheduled."
):
Schedule(sfg_empty, scheduler=LeastSlackTimeScheduler())
def test_direct_form_1_iir(self):
sfg = direct_form_1_iir([1, 2, 3], [1, 2, 3])
sfg.set_latency_of_type_name(ConstantMultiplication.type_name(), 2)
sfg.set_execution_time_of_type_name(ConstantMultiplication.type_name(), 1)
sfg.set_latency_of_type_name(Addition.type_name(), 3)
sfg.set_execution_time_of_type_name(Addition.type_name(), 1)
resources = {
Addition.type_name(): 1,
ConstantMultiplication.type_name(): 1,
Input.type_name(): sys.maxsize,
Output.type_name(): sys.maxsize,
}
schedule = Schedule(
sfg, scheduler=LeastSlackTimeScheduler(max_resources=resources)
)
assert schedule.start_times == {
"in0": 0,
Simon Bjurek
committed
"cmul3": 0,
"cmul4": 1,
"cmul0": 2,
"add1": 3,
"cmul1": 3,
"cmul2": 4,
Simon Bjurek
committed
"add0": 6,
"add3": 7,
"add2": 10,
"out0": 13,
}
assert schedule.schedule_time == 13
_validate_recreated_sfg_filter(sfg, schedule)
def test_direct_form_2_iir_1_add_1_mul(self, sfg_direct_form_iir_lp_filter):
sfg_direct_form_iir_lp_filter.set_latency_of_type_name(
ConstantMultiplication.type_name(), 3
)
sfg_direct_form_iir_lp_filter.set_latency_of_type_name(Addition.type_name(), 2)
sfg_direct_form_iir_lp_filter.set_execution_time_of_type_name(
ConstantMultiplication.type_name(), 1
)
sfg_direct_form_iir_lp_filter.set_execution_time_of_type_name(
resources = {
Addition.type_name(): 1,
ConstantMultiplication.type_name(): 1,
Input.type_name(): sys.maxsize,
Output.type_name(): sys.maxsize,
}
schedule = Schedule(
sfg_direct_form_iir_lp_filter,
scheduler=LeastSlackTimeScheduler(resources),
"in0": 0,
Simon Bjurek
committed
"cmul3": 0,
"cmul4": 1,
"cmul1": 2,
"cmul2": 3,
"add1": 4,
"add0": 6,
"add3": 7,
"cmul0": 8,
"add2": 11,
"out0": 13,
}
assert schedule.schedule_time == 13
_validate_recreated_sfg_filter(sfg_direct_form_iir_lp_filter, schedule)
def test_direct_form_2_iir_2_add_3_mul(self, sfg_direct_form_iir_lp_filter):
sfg_direct_form_iir_lp_filter.set_latency_of_type_name(
ConstantMultiplication.type_name(), 3
)
sfg_direct_form_iir_lp_filter.set_latency_of_type_name(Addition.type_name(), 2)
sfg_direct_form_iir_lp_filter.set_execution_time_of_type_name(
ConstantMultiplication.type_name(), 1
)
sfg_direct_form_iir_lp_filter.set_execution_time_of_type_name(
resources = {
Addition.type_name(): 2,
ConstantMultiplication.type_name(): 3,
Input.type_name(): sys.maxsize,
Output.type_name(): sys.maxsize,
}
schedule = Schedule(
sfg_direct_form_iir_lp_filter,
scheduler=LeastSlackTimeScheduler(resources),
"in0": 0,
"cmul1": 0,
"cmul4": 0,
"cmul3": 0,
"cmul2": 1,
"add1": 3,
"add3": 4,
"add0": 5,
"cmul0": 7,
"add2": 10,
"out0": 12,
}
assert schedule.schedule_time == 12
_validate_recreated_sfg_filter(sfg_direct_form_iir_lp_filter, schedule)
def test_radix_2_fft_8_points(self):
sfg = radix_2_dif_fft(points=8)
sfg.set_latency_of_type_name(ConstantMultiplication.type_name(), 2)
sfg.set_execution_time_of_type_name(ConstantMultiplication.type_name(), 1)
sfg.set_latency_of_type_name(Butterfly.type_name(), 1)
sfg.set_execution_time_of_type_name(Butterfly.type_name(), 1)
resources = {
Butterfly.type_name(): 2,
ConstantMultiplication.type_name(): 2,
Input.type_name(): sys.maxsize,
Output.type_name(): sys.maxsize,
}
schedule = Schedule(
sfg, scheduler=LeastSlackTimeScheduler(max_resources=resources)
)
assert schedule.start_times == {
"in0": 0,
"in2": 0,
"in4": 0,
"in6": 0,
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
"in7": 0,
"bfly6": 0,
"bfly8": 0,
"cmul2": 1,
"cmul3": 1,
"bfly11": 1,
"bfly7": 1,
"cmul0": 2,
"bfly0": 2,
"cmul4": 2,
"bfly5": 3,
"bfly1": 3,
"cmul1": 4,
"bfly2": 4,
"bfly9": 4,
"bfly10": 5,
"bfly3": 5,
"out0": 5,
"out4": 5,
"bfly4": 6,
"out1": 6,
"out2": 6,
"out5": 6,
"out6": 6,
"out7": 7,
"out3": 7,
}
assert schedule.schedule_time == 7
_validate_recreated_sfg_fft(schedule, 8)
class TestMaxFanOutScheduler:
def test_empty_sfg(self, sfg_empty):
with pytest.raises(
ValueError, match="Empty signal flow graph cannot be scheduled."
):
Schedule(sfg_empty, scheduler=MaxFanOutScheduler())
def test_direct_form_1_iir(self):
sfg = direct_form_1_iir([1, 2, 3], [1, 2, 3])
sfg.set_latency_of_type_name(ConstantMultiplication.type_name(), 2)
sfg.set_execution_time_of_type_name(ConstantMultiplication.type_name(), 1)
sfg.set_latency_of_type_name(Addition.type_name(), 3)
sfg.set_execution_time_of_type_name(Addition.type_name(), 1)
resources = {Addition.type_name(): 1, ConstantMultiplication.type_name(): 1}
schedule = Schedule(sfg, scheduler=MaxFanOutScheduler(max_resources=resources))
assert schedule.start_times == {
"in0": 0,
"cmul0": 0,
"cmul1": 1,
"cmul2": 2,
Simon Bjurek
committed
"cmul3": 3,
"cmul4": 4,
"add3": 4,
"add1": 6,
"add0": 9,
"add2": 12,
"out0": 15,
}
assert schedule.schedule_time == 15
_validate_recreated_sfg_filter(sfg, schedule)
Simon Bjurek
committed
def test_ldlt_inverse_3x3(self):
sfg = ldlt_matrix_inverse(N=3)
sfg.set_latency_of_type_name(MADS.type_name(), 3)
sfg.set_latency_of_type_name(Reciprocal.type_name(), 2)
sfg.set_execution_time_of_type_name(MADS.type_name(), 1)
sfg.set_execution_time_of_type_name(Reciprocal.type_name(), 1)
Simon Bjurek
committed
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
resources = {MADS.type_name(): 1, Reciprocal.type_name(): 1}
schedule = Schedule(sfg, scheduler=MaxFanOutScheduler(resources))
assert schedule.start_times == {
"in1": 0,
"in2": 1,
"in0": 2,
"rec0": 2,
"in4": 3,
"in5": 4,
"mads1": 4,
"dontcare4": 4,
"dontcare5": 5,
"in3": 5,
"mads0": 5,
"mads13": 7,
"mads12": 8,
"mads14": 9,
"rec1": 12,
"mads8": 14,
"dontcare2": 14,
"mads10": 17,
"rec2": 20,
"mads9": 22,
"out5": 22,
"dontcare0": 22,
"dontcare1": 23,
"mads11": 23,
"mads7": 25,
"out4": 25,
"mads3": 26,
"mads6": 27,
"dontcare3": 27,
"out3": 28,
"mads2": 29,
"out2": 29,
"mads5": 30,
"mads4": 33,
"out1": 33,
"out0": 36,
}
assert schedule.schedule_time == 36
Simon Bjurek
committed
_validate_recreated_sfg_ldlt_matrix_inverse(schedule, 3)
Simon Bjurek
committed
class TestHybridScheduler:
def test_empty_sfg(self, sfg_empty):
with pytest.raises(
ValueError, match="Empty signal flow graph cannot be scheduled."
):
Schedule(sfg_empty, scheduler=HybridScheduler())
def test_direct_form_1_iir(self):
sfg = direct_form_1_iir([1, 2, 3], [1, 2, 3])
sfg.set_latency_of_type_name(ConstantMultiplication.type_name(), 2)
sfg.set_execution_time_of_type_name(ConstantMultiplication.type_name(), 1)
sfg.set_latency_of_type_name(Addition.type_name(), 3)
sfg.set_execution_time_of_type_name(Addition.type_name(), 1)
resources = {Addition.type_name(): 1, ConstantMultiplication.type_name(): 1}
schedule = Schedule(sfg, scheduler=HybridScheduler(max_resources=resources))
assert schedule.start_times == {
"in0": 0,
Simon Bjurek
committed
"cmul3": 0,
"cmul4": 1,
"cmul0": 2,
"add1": 3,
"cmul1": 3,
"cmul2": 4,
Simon Bjurek
committed
"add0": 6,
"add3": 7,
"add2": 10,
"out0": 13,
}
assert schedule.schedule_time == 13
_validate_recreated_sfg_filter(sfg, schedule)
def test_radix_2_fft_8_points(self):
sfg = radix_2_dif_fft(points=8)
sfg.set_latency_of_type_name(ConstantMultiplication.type_name(), 2)
sfg.set_execution_time_of_type_name(ConstantMultiplication.type_name(), 1)
sfg.set_latency_of_type_name(Butterfly.type_name(), 1)
sfg.set_execution_time_of_type_name(Butterfly.type_name(), 1)
resources = {
Butterfly.type_name(): 2,
ConstantMultiplication.type_name(): 2,
Input.type_name(): sys.maxsize,
Output.type_name(): sys.maxsize,
}
schedule = Schedule(sfg, scheduler=HybridScheduler(max_resources=resources))
assert schedule.start_times == {
"in0": 0,
"in2": 0,
"in4": 0,
"in6": 0,
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
"in7": 0,
"bfly6": 0,
"bfly8": 0,
"cmul2": 1,
"cmul3": 1,
"bfly11": 1,
"bfly7": 1,
"cmul0": 2,
"bfly0": 2,
"cmul4": 2,
"bfly5": 3,
"bfly1": 3,
"cmul1": 4,
"bfly2": 4,
"bfly9": 4,
"bfly10": 5,
"bfly3": 5,
"out0": 5,
"out4": 5,
"bfly4": 6,
"out1": 6,
"out2": 6,
"out5": 6,
"out6": 6,
"out7": 7,
"out3": 7,
}
assert schedule.schedule_time == 7
_validate_recreated_sfg_fft(schedule, 8)
def test_radix_2_fft_8_points_one_output(self):
sfg = radix_2_dif_fft(points=8)
sfg.set_latency_of_type_name(ConstantMultiplication.type_name(), 2)
sfg.set_execution_time_of_type_name(ConstantMultiplication.type_name(), 1)
sfg.set_latency_of_type_name(Butterfly.type_name(), 1)
sfg.set_execution_time_of_type_name(Butterfly.type_name(), 1)
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
resources = {
Butterfly.type_name(): 2,
ConstantMultiplication.type_name(): 2,
Input.type_name(): sys.maxsize,
Output.type_name(): 1,
}
schedule = Schedule(sfg, scheduler=HybridScheduler(max_resources=resources))
assert schedule.start_times == {
"in0": 0,
"in1": 0,
"in2": 0,
"in3": 0,
"in4": 0,
"in5": 0,
"in6": 0,
"in7": 0,
"bfly6": 0,
"bfly8": 0,
"cmul2": 1,
"cmul3": 1,
"bfly11": 1,
"bfly7": 1,
"cmul0": 2,
"bfly0": 2,
"cmul4": 2,
"bfly5": 3,
"bfly1": 3,
"cmul1": 4,
"bfly2": 4,
"bfly9": 4,
"bfly10": 5,
"bfly3": 5,
"out0": 5,
"bfly4": 6,
Simon Bjurek
committed
"out2": 6,
"out3": 7,
"out7": 8,
"out6": 9,
"out4": 10,
Simon Bjurek
committed
"out1": 11,
"out5": 12,
}
assert schedule.schedule_time == 12
_validate_recreated_sfg_fft(schedule, 8)
# This schedule that this test is checking against is faulty and will yield a non-working
# fft implementation, however, it is kept commented out for reference
def test_radix_2_fft_8_points_specified_IO_times_cyclic(self):
sfg = radix_2_dif_fft(points=8)
sfg.set_latency_of_type_name(Butterfly.type_name(), 3)
sfg.set_latency_of_type_name(ConstantMultiplication.type_name(), 2)
sfg.set_execution_time_of_type_name(Butterfly.type_name(), 1)
sfg.set_execution_time_of_type_name(ConstantMultiplication.type_name(), 1)
resources = {
Butterfly.type_name(): 1,
ConstantMultiplication.type_name(): 1,
Input.type_name(): sys.maxsize,
Output.type_name(): sys.maxsize,
}
input_times = {
"in0": 0,
"in1": 1,
"in2": 2,
"in3": 3,
"in4": 4,
"in5": 5,
"in6": 6,
"in7": 7,
}
output_times = {
Simon Bjurek
committed
"out0": 0,
"out1": 1,
"out2": 2,
"out3": 3,
"out4": 4,
"out5": 5,
"out6": 6,
"out7": 7,
scheduler=HybridScheduler(
resources, input_times=input_times, output_delta_times=output_times
),
cyclic=True,
)
assert schedule.start_times == {
"in0": 0,
"in1": 1,
"in2": 2,
"in3": 3,
"in4": 4,
"in5": 5,
"in6": 6,
"in7": 7,
"bfly0": 4,
"bfly8": 5,
"bfly11": 6,
"bfly6": 7,
"cmul2": 8,
"cmul0": 9,
"bfly1": 9,
"cmul3": 10,
"bfly7": 10,
"bfly2": 11,
"bfly5": 12,
"cmul4": 13,
"bfly9": 13,
Simon Bjurek
committed
"bfly3": 15,
Simon Bjurek
committed
"bfly10": 16,
Simon Bjurek
committed
"out0": 17,
"out1": 18,
"out2": 19,
"out3": 20,
"out4": 1,
"out5": 2,
"out6": 3,
"out7": 4,
}
assert schedule.schedule_time == 20
# impulse input -> constant output
sim = Simulation(schedule.sfg, [Impulse()] + [0 for i in range(7)])
sim.run_for(2)
assert np.allclose(sim.results["0"], [1, 0])
assert np.allclose(sim.results["1"], [1, 0])
assert np.allclose(sim.results["2"], [1, 0])
assert np.allclose(sim.results["3"], [1, 0])
assert np.allclose(sim.results["4"], [0, 1])
assert np.allclose(sim.results["5"], [0, 1])
assert np.allclose(sim.results["6"], [0, 1])
assert np.allclose(sim.results["7"], [0, 1])
# constant input -> impulse (with weight=points) output
sim = Simulation(schedule.sfg, [Impulse() for i in range(8)])
sim.run_for(2)
assert np.allclose(sim.results["0"], [8, 0])
assert np.allclose(sim.results["1"], [0, 0])
assert np.allclose(sim.results["2"], [0, 0])
assert np.allclose(sim.results["3"], [0, 0])
assert np.allclose(sim.results["4"], [0, 0])
assert np.allclose(sim.results["5"], [0, 0])
assert np.allclose(sim.results["6"], [0, 0])
assert np.allclose(sim.results["7"], [0, 0])
def test_radix_2_fft_8_points_specified_IO_times_non_cyclic(self):
sfg = radix_2_dif_fft(points=8)
sfg.set_latency_of_type_name(Butterfly.type_name(), 3)
sfg.set_latency_of_type_name(ConstantMultiplication.type_name(), 2)
sfg.set_execution_time_of_type_name(Butterfly.type_name(), 1)
sfg.set_execution_time_of_type_name(ConstantMultiplication.type_name(), 1)
resources = {Butterfly.type_name(): 1, ConstantMultiplication.type_name(): 1}
input_times = {
"in0": 0,
"in1": 1,
"in2": 2,
"in3": 3,
"in4": 4,
"in5": 5,
"in6": 6,
"in7": 7,
}
output_times = {
Simon Bjurek
committed
"out0": 0,
"out1": 1,
"out2": 2,
"out3": 3,
"out4": 4,
"out5": 5,
"out6": 6,
"out7": 7,
scheduler=HybridScheduler(
resources, input_times=input_times, output_delta_times=output_times
),
cyclic=False,
)
assert schedule.start_times == {
"in0": 0,
"in1": 1,
"in2": 2,
"in3": 3,
"in4": 4,
"in5": 5,
"in6": 6,
"in7": 7,
"bfly0": 4,
"bfly8": 5,
"bfly11": 6,
"bfly6": 7,
"cmul2": 8,
"cmul0": 9,
"bfly1": 9,
"cmul3": 10,
"bfly7": 10,
"bfly2": 11,
"bfly5": 12,
"cmul4": 13,
"bfly9": 13,
Simon Bjurek
committed
"bfly3": 15,
Simon Bjurek
committed
"bfly10": 16,
Simon Bjurek
committed
"out0": 17,
"out1": 18,
"out2": 19,
"out3": 20,
"out4": 21,
"out5": 22,
"out6": 23,
"out7": 24,
Simon Bjurek
committed
assert schedule.schedule_time == 24
_validate_recreated_sfg_fft(schedule, 8)
def test_ldlt_inverse_2x2(self):
sfg = ldlt_matrix_inverse(N=2)
sfg.set_latency_of_type_name(MADS.type_name(), 3)
sfg.set_latency_of_type_name(Reciprocal.type_name(), 2)
sfg.set_execution_time_of_type_name(MADS.type_name(), 1)
sfg.set_execution_time_of_type_name(Reciprocal.type_name(), 1)
resources = {
MADS.type_name(): 1,
Reciprocal.type_name(): 1,
Input.type_name(): sys.maxsize,
Output.type_name(): sys.maxsize,
}
schedule = Schedule(
sfg,
scheduler=HybridScheduler(resources),
)
assert schedule.start_times == {
"in0": 0,
"in1": 0,
"in2": 0,
"rec0": 0,
"dontcare1": 2,
"mads0": 2,
"mads3": 5,
"rec1": 8,
"dontcare0": 10,
"mads2": 10,
"mads1": 13,
"out2": 10,
"out1": 13,
"out0": 16,
}
assert schedule.schedule_time == 16
Simon Bjurek
committed
_validate_recreated_sfg_ldlt_matrix_inverse(schedule, 2)
def test_ldlt_inverse_2x2_specified_IO_times_cyclic(self):
sfg = ldlt_matrix_inverse(N=2)
sfg.set_latency_of_type_name(MADS.type_name(), 3)
sfg.set_latency_of_type_name(Reciprocal.type_name(), 2)
sfg.set_execution_time_of_type_name(MADS.type_name(), 1)
sfg.set_execution_time_of_type_name(Reciprocal.type_name(), 1)
resources = {MADS.type_name(): 1, Reciprocal.type_name(): 1}
input_times = {
"in0": 0,
"in1": 1,
"in2": 2,
}
output_times = {
"out0": 0,
"out1": 1,
"out2": 2,
}
schedule = Schedule(
sfg,
scheduler=HybridScheduler(
resources, input_times=input_times, output_delta_times=output_times
),
cyclic=True,
)
assert schedule.start_times == {
"in0": 0,
"in1": 1,
"in2": 2,
"rec0": 0,
"dontcare1": 2,
"mads0": 2,
"mads3": 5,
"rec1": 8,
"dontcare0": 10,
"mads2": 10,
"mads1": 13,
"out0": 16,
"out1": 1,
"out2": 2,
}
assert schedule.schedule_time == 16
Simon Bjurek
committed
# validate regenerated sfg with random 2x2 real s.p.d. matrix
A = np.random.default_rng().random((2, 2))
Simon Bjurek
committed
A = np.dot(A, A.T)
A_inv = np.linalg.inv(A)
input_signals = []
for i in range(2):
for j in range(i, 2):
input_signals.append(Constant(A[i, j]))
sim = Simulation(schedule.sfg, input_signals)
sim.run_for(2)
assert np.allclose(sim.results["0"], [A_inv[0, 0], A_inv[0, 0]])
assert np.allclose(sim.results["1"], [0, A_inv[0, 1]])
assert np.allclose(sim.results["2"], [0, A_inv[1, 1]])
Simon Bjurek
committed
def test_invalid_max_resources(self):
sfg.set_latency_of_type_name(MADS.type_name(), 3)
sfg.set_latency_of_type_name(Reciprocal.type_name(), 2)
sfg.set_execution_time_of_type_name(MADS.type_name(), 1)
sfg.set_execution_time_of_type_name(Reciprocal.type_name(), 1)
Simon Bjurek
committed
with pytest.raises(
ValueError, match="Provided max_resources must be a dictionary."
):
Schedule(sfg, scheduler=HybridScheduler(resources))
resources = "test"
Simon Bjurek
committed
with pytest.raises(
ValueError, match="Provided max_resources must be a dictionary."
):
Schedule(sfg, scheduler=HybridScheduler(resources))
resources = []
Simon Bjurek
committed
with pytest.raises(
ValueError, match="Provided max_resources must be a dictionary."
):
Schedule(sfg, scheduler=HybridScheduler(resources))
resources = {1: 1}
with pytest.raises(
Simon Bjurek
committed
ValueError, match="Provided max_resources keys must be strings."
):
Schedule(sfg, scheduler=HybridScheduler(resources))
resources = {MADS.type_name(): "test"}
Simon Bjurek
committed
with pytest.raises(
ValueError, match="Provided max_resources values must be integers."
):
Schedule(sfg, scheduler=HybridScheduler(resources))
Simon Bjurek
committed
def test_invalid_max_concurrent_writes(self):
sfg = ldlt_matrix_inverse(N=2)
sfg.set_latency_of_type_name(MADS.type_name(), 3)
sfg.set_latency_of_type_name(Reciprocal.type_name(), 2)
sfg.set_execution_time_of_type_name(MADS.type_name(), 1)
sfg.set_execution_time_of_type_name(Reciprocal.type_name(), 1)
Simon Bjurek
committed
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
max_concurrent_writes = "5"
with pytest.raises(
ValueError, match="Provided max_concurrent_writes must be an integer."
):
Schedule(
sfg,
scheduler=HybridScheduler(max_concurrent_writes=max_concurrent_writes),
)
max_concurrent_writes = 0
with pytest.raises(
ValueError, match="Provided max_concurrent_writes must be larger than 0."
):
Schedule(
sfg,
scheduler=HybridScheduler(max_concurrent_writes=max_concurrent_writes),
)
max_concurrent_writes = -1
with pytest.raises(
ValueError, match="Provided max_concurrent_writes must be larger than 0."
):
Schedule(
sfg,
scheduler=HybridScheduler(max_concurrent_writes=max_concurrent_writes),
)
def test_invalid_max_concurrent_reads(self):
sfg = ldlt_matrix_inverse(N=2)
sfg.set_latency_of_type_name(MADS.type_name(), 3)
sfg.set_latency_of_type_name(Reciprocal.type_name(), 2)
sfg.set_execution_time_of_type_name(MADS.type_name(), 1)
sfg.set_execution_time_of_type_name(Reciprocal.type_name(), 1)
Simon Bjurek
committed
max_concurrent_reads = "5"
with pytest.raises(
ValueError, match="Provided max_concurrent_reads must be an integer."
):
Schedule(
sfg,
scheduler=HybridScheduler(max_concurrent_reads=max_concurrent_reads),
)
max_concurrent_reads = 0
with pytest.raises(
ValueError, match="Provided max_concurrent_reads must be larger than 0."
):
Schedule(
sfg,
scheduler=HybridScheduler(max_concurrent_reads=max_concurrent_reads),
)
max_concurrent_reads = -1
with pytest.raises(
ValueError, match="Provided max_concurrent_reads must be larger than 0."