Newer
Older
import sys
import numpy as np
from b_asic.core_operations import (
MADS,
Addition,
Butterfly,
ConstantMultiplication,
Reciprocal,
)
Simon Bjurek
committed
from b_asic.list_schedulers import (
HybridScheduler,
LeastSlackTimeScheduler,
MaxFanOutScheduler,
from b_asic.sfg_generators import (
direct_form_1_iir,
ldlt_matrix_inverse,
radix_2_dif_fft,
)
from b_asic.signal_flow_graph import SFG
from b_asic.signal_generator import Constant, Impulse
from b_asic.simulation import Simulation
from b_asic.special_operations import Input, Output
class TestEarliestDeadlineScheduler:
def test_empty_sfg(self, sfg_empty):
with pytest.raises(
ValueError, match="Empty signal flow graph cannot be scheduled."
):
Schedule(sfg_empty, scheduler=EarliestDeadlineScheduler())
sfg = direct_form_1_iir([1, 2, 3], [1, 2, 3])
sfg.set_latency_of_type(ConstantMultiplication.type_name(), 2)
sfg.set_execution_time_of_type(ConstantMultiplication.type_name(), 1)
sfg.set_latency_of_type(Addition.type_name(), 3)
sfg.set_execution_time_of_type(Addition.type_name(), 1)
resources = {
Addition.type_name(): 1,
ConstantMultiplication.type_name(): 1,
Input.type_name(): sys.maxsize,
Output.type_name(): sys.maxsize,
}
schedule = Schedule(
sfg, scheduler=EarliestDeadlineScheduler(max_resources=resources)
)
assert schedule.start_times == {
"in0": 0,
Simon Bjurek
committed
"cmul3": 0,
"cmul4": 1,
"cmul0": 2,
"add1": 3,
"cmul1": 3,
"cmul2": 4,
Simon Bjurek
committed
"add0": 6,
"add3": 7,
"add2": 10,
"out0": 13,
}
assert schedule.schedule_time == 13
_validate_recreated_sfg_filter(sfg, schedule)
def test_direct_form_2_iir_1_add_1_mul(self, sfg_direct_form_iir_lp_filter):
sfg_direct_form_iir_lp_filter.set_latency_of_type(
ConstantMultiplication.type_name(), 3
)
sfg_direct_form_iir_lp_filter.set_latency_of_type(Addition.type_name(), 2)
sfg_direct_form_iir_lp_filter.set_execution_time_of_type(
ConstantMultiplication.type_name(), 1
)
sfg_direct_form_iir_lp_filter.set_execution_time_of_type(
Addition.type_name(), 1
)
resources = {
Addition.type_name(): 1,
ConstantMultiplication.type_name(): 1,
Input.type_name(): sys.maxsize,
Output.type_name(): sys.maxsize,
}
schedule = Schedule(
sfg_direct_form_iir_lp_filter,
scheduler=EarliestDeadlineScheduler(resources),
"in0": 0,
Simon Bjurek
committed
"cmul3": 0,
"cmul4": 1,
"cmul1": 2,
"cmul2": 3,
"add1": 4,
"add0": 6,
"add3": 7,
"cmul0": 8,
"add2": 11,
"out0": 13,
}
assert schedule.schedule_time == 13
_validate_recreated_sfg_filter(sfg_direct_form_iir_lp_filter, schedule)
def test_direct_form_2_iir_2_add_3_mul(self, sfg_direct_form_iir_lp_filter):
sfg_direct_form_iir_lp_filter.set_latency_of_type(
ConstantMultiplication.type_name(), 3
)
sfg_direct_form_iir_lp_filter.set_latency_of_type(Addition.type_name(), 2)
sfg_direct_form_iir_lp_filter.set_execution_time_of_type(
ConstantMultiplication.type_name(), 1
)
sfg_direct_form_iir_lp_filter.set_execution_time_of_type(
Addition.type_name(), 1
)
resources = {
Addition.type_name(): 2,
ConstantMultiplication.type_name(): 3,
Input.type_name(): sys.maxsize,
Output.type_name(): sys.maxsize,
}
schedule = Schedule(
sfg_direct_form_iir_lp_filter,
scheduler=EarliestDeadlineScheduler(resources),
"in0": 0,
"cmul1": 0,
"cmul4": 0,
"cmul3": 0,
"cmul2": 1,
"add1": 3,
"add3": 4,
"add0": 5,
"cmul0": 7,
"add2": 10,
"out0": 12,
}
assert schedule.schedule_time == 12
_validate_recreated_sfg_filter(sfg_direct_form_iir_lp_filter, schedule)
def test_radix_2_fft_8_points(self):
sfg = radix_2_dif_fft(points=8)
sfg.set_latency_of_type(ConstantMultiplication.type_name(), 2)
sfg.set_execution_time_of_type(ConstantMultiplication.type_name(), 1)
sfg.set_latency_of_type(Butterfly.type_name(), 1)
sfg.set_execution_time_of_type(Butterfly.type_name(), 1)
resources = {
Butterfly.type_name(): 2,
ConstantMultiplication.type_name(): 2,
Input.type_name(): sys.maxsize,
Output.type_name(): sys.maxsize,
}
schedule = Schedule(
sfg, scheduler=EarliestDeadlineScheduler(max_resources=resources)
)
assert schedule.start_times == {
"in0": 0,
"in2": 0,
"in4": 0,
"in6": 0,
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
"in7": 0,
"bfly6": 0,
"bfly8": 0,
"cmul2": 1,
"cmul3": 1,
"bfly11": 1,
"bfly7": 1,
"cmul0": 2,
"bfly0": 2,
"cmul4": 2,
"bfly5": 3,
"bfly1": 3,
"cmul1": 4,
"bfly2": 4,
"bfly9": 4,
"bfly10": 5,
"bfly3": 5,
"out0": 5,
"out4": 5,
"bfly4": 6,
"out1": 6,
"out2": 6,
"out5": 6,
"out6": 6,
"out7": 7,
"out3": 7,
}
assert schedule.schedule_time == 7
_validate_recreated_sfg_fft(schedule, 8)
class TestLeastSlackTimeScheduler:
def test_empty_sfg(self, sfg_empty):
with pytest.raises(
ValueError, match="Empty signal flow graph cannot be scheduled."
):
Schedule(sfg_empty, scheduler=LeastSlackTimeScheduler())
def test_direct_form_1_iir(self):
sfg = direct_form_1_iir([1, 2, 3], [1, 2, 3])
sfg.set_latency_of_type(ConstantMultiplication.type_name(), 2)
sfg.set_execution_time_of_type(ConstantMultiplication.type_name(), 1)
sfg.set_latency_of_type(Addition.type_name(), 3)
sfg.set_execution_time_of_type(Addition.type_name(), 1)
resources = {
Addition.type_name(): 1,
ConstantMultiplication.type_name(): 1,
Input.type_name(): sys.maxsize,
Output.type_name(): sys.maxsize,
}
schedule = Schedule(
sfg, scheduler=LeastSlackTimeScheduler(max_resources=resources)
)
assert schedule.start_times == {
"in0": 0,
Simon Bjurek
committed
"cmul3": 0,
"cmul4": 1,
"cmul0": 2,
"add1": 3,
"cmul1": 3,
"cmul2": 4,
Simon Bjurek
committed
"add0": 6,
"add3": 7,
"add2": 10,
"out0": 13,
}
assert schedule.schedule_time == 13
_validate_recreated_sfg_filter(sfg, schedule)
def test_direct_form_2_iir_1_add_1_mul(self, sfg_direct_form_iir_lp_filter):
sfg_direct_form_iir_lp_filter.set_latency_of_type(
ConstantMultiplication.type_name(), 3
)
sfg_direct_form_iir_lp_filter.set_latency_of_type(Addition.type_name(), 2)
sfg_direct_form_iir_lp_filter.set_execution_time_of_type(
ConstantMultiplication.type_name(), 1
)
sfg_direct_form_iir_lp_filter.set_execution_time_of_type(
Addition.type_name(), 1
)
resources = {
Addition.type_name(): 1,
ConstantMultiplication.type_name(): 1,
Input.type_name(): sys.maxsize,
Output.type_name(): sys.maxsize,
}
schedule = Schedule(
sfg_direct_form_iir_lp_filter,
scheduler=LeastSlackTimeScheduler(resources),
"in0": 0,
Simon Bjurek
committed
"cmul3": 0,
"cmul4": 1,
"cmul1": 2,
"cmul2": 3,
"add1": 4,
"add0": 6,
"add3": 7,
"cmul0": 8,
"add2": 11,
"out0": 13,
}
assert schedule.schedule_time == 13
_validate_recreated_sfg_filter(sfg_direct_form_iir_lp_filter, schedule)
def test_direct_form_2_iir_2_add_3_mul(self, sfg_direct_form_iir_lp_filter):
sfg_direct_form_iir_lp_filter.set_latency_of_type(
ConstantMultiplication.type_name(), 3
)
sfg_direct_form_iir_lp_filter.set_latency_of_type(Addition.type_name(), 2)
sfg_direct_form_iir_lp_filter.set_execution_time_of_type(
ConstantMultiplication.type_name(), 1
)
sfg_direct_form_iir_lp_filter.set_execution_time_of_type(
Addition.type_name(), 1
)
resources = {
Addition.type_name(): 2,
ConstantMultiplication.type_name(): 3,
Input.type_name(): sys.maxsize,
Output.type_name(): sys.maxsize,
}
schedule = Schedule(
sfg_direct_form_iir_lp_filter,
scheduler=LeastSlackTimeScheduler(resources),
"in0": 0,
"cmul1": 0,
"cmul4": 0,
"cmul3": 0,
"cmul2": 1,
"add1": 3,
"add3": 4,
"add0": 5,
"cmul0": 7,
"add2": 10,
"out0": 12,
}
assert schedule.schedule_time == 12
_validate_recreated_sfg_filter(sfg_direct_form_iir_lp_filter, schedule)
def test_radix_2_fft_8_points(self):
sfg = radix_2_dif_fft(points=8)
sfg.set_latency_of_type(ConstantMultiplication.type_name(), 2)
sfg.set_execution_time_of_type(ConstantMultiplication.type_name(), 1)
sfg.set_latency_of_type(Butterfly.type_name(), 1)
sfg.set_execution_time_of_type(Butterfly.type_name(), 1)
resources = {
Butterfly.type_name(): 2,
ConstantMultiplication.type_name(): 2,
Input.type_name(): sys.maxsize,
Output.type_name(): sys.maxsize,
}
schedule = Schedule(
sfg, scheduler=LeastSlackTimeScheduler(max_resources=resources)
)
assert schedule.start_times == {
"in0": 0,
"in2": 0,
"in4": 0,
"in6": 0,
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
"in7": 0,
"bfly6": 0,
"bfly8": 0,
"cmul2": 1,
"cmul3": 1,
"bfly11": 1,
"bfly7": 1,
"cmul0": 2,
"bfly0": 2,
"cmul4": 2,
"bfly5": 3,
"bfly1": 3,
"cmul1": 4,
"bfly2": 4,
"bfly9": 4,
"bfly10": 5,
"bfly3": 5,
"out0": 5,
"out4": 5,
"bfly4": 6,
"out1": 6,
"out2": 6,
"out5": 6,
"out6": 6,
"out7": 7,
"out3": 7,
}
assert schedule.schedule_time == 7
_validate_recreated_sfg_fft(schedule, 8)
class TestMaxFanOutScheduler:
def test_empty_sfg(self, sfg_empty):
with pytest.raises(
ValueError, match="Empty signal flow graph cannot be scheduled."
):
Schedule(sfg_empty, scheduler=MaxFanOutScheduler())
def test_direct_form_1_iir(self):
sfg = direct_form_1_iir([1, 2, 3], [1, 2, 3])
sfg.set_latency_of_type(ConstantMultiplication.type_name(), 2)
sfg.set_execution_time_of_type(ConstantMultiplication.type_name(), 1)
sfg.set_latency_of_type(Addition.type_name(), 3)
sfg.set_execution_time_of_type(Addition.type_name(), 1)
resources = {Addition.type_name(): 1, ConstantMultiplication.type_name(): 1}
schedule = Schedule(sfg, scheduler=MaxFanOutScheduler(max_resources=resources))
assert schedule.start_times == {
"in0": 0,
"cmul0": 0,
"cmul1": 1,
"cmul2": 2,
Simon Bjurek
committed
"cmul3": 3,
"cmul4": 4,
"add3": 4,
"add1": 6,
"add0": 9,
"add2": 12,
"out0": 15,
}
assert schedule.schedule_time == 15
_validate_recreated_sfg_filter(sfg, schedule)
Simon Bjurek
committed
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
def test_ldlt_inverse_3x3(self):
sfg = ldlt_matrix_inverse(N=3)
sfg.set_latency_of_type(MADS.type_name(), 3)
sfg.set_latency_of_type(Reciprocal.type_name(), 2)
sfg.set_execution_time_of_type(MADS.type_name(), 1)
sfg.set_execution_time_of_type(Reciprocal.type_name(), 1)
resources = {MADS.type_name(): 1, Reciprocal.type_name(): 1}
schedule = Schedule(sfg, scheduler=MaxFanOutScheduler(resources))
assert schedule.start_times == {
"in1": 0,
"in2": 1,
"in0": 2,
"rec0": 2,
"in4": 3,
"in5": 4,
"mads1": 4,
"dontcare4": 4,
"dontcare5": 5,
"in3": 5,
"mads0": 5,
"mads13": 7,
"mads12": 8,
"mads14": 9,
"rec1": 12,
"mads8": 14,
"dontcare2": 14,
"mads10": 17,
"rec2": 20,
"mads9": 22,
"out5": 22,
"dontcare0": 22,
"dontcare1": 23,
"mads11": 23,
"mads7": 25,
"out4": 25,
"mads3": 26,
"mads6": 27,
"dontcare3": 27,
"out3": 28,
"mads2": 29,
"out2": 29,
"mads5": 30,
"mads4": 33,
"out1": 33,
"out0": 36,
}
assert schedule.schedule_time == 36
Simon Bjurek
committed
_validate_recreated_sfg_ldlt_matrix_inverse(schedule, 3)
Simon Bjurek
committed
class TestHybridScheduler:
def test_empty_sfg(self, sfg_empty):
with pytest.raises(
ValueError, match="Empty signal flow graph cannot be scheduled."
):
Schedule(sfg_empty, scheduler=HybridScheduler())
def test_direct_form_1_iir(self):
sfg = direct_form_1_iir([1, 2, 3], [1, 2, 3])
sfg.set_latency_of_type(ConstantMultiplication.type_name(), 2)
sfg.set_execution_time_of_type(ConstantMultiplication.type_name(), 1)
sfg.set_latency_of_type(Addition.type_name(), 3)
sfg.set_execution_time_of_type(Addition.type_name(), 1)
resources = {Addition.type_name(): 1, ConstantMultiplication.type_name(): 1}
schedule = Schedule(sfg, scheduler=HybridScheduler(max_resources=resources))
assert schedule.start_times == {
"in0": 0,
Simon Bjurek
committed
"cmul3": 0,
"cmul4": 1,
"cmul0": 2,
"add1": 3,
"cmul1": 3,
"cmul2": 4,
Simon Bjurek
committed
"add0": 6,
"add3": 7,
"add2": 10,
"out0": 13,
}
assert schedule.schedule_time == 13
_validate_recreated_sfg_filter(sfg, schedule)
def test_radix_2_fft_8_points(self):
sfg = radix_2_dif_fft(points=8)
sfg.set_latency_of_type(ConstantMultiplication.type_name(), 2)
sfg.set_execution_time_of_type(ConstantMultiplication.type_name(), 1)
sfg.set_latency_of_type(Butterfly.type_name(), 1)
sfg.set_execution_time_of_type(Butterfly.type_name(), 1)
resources = {
Butterfly.type_name(): 2,
ConstantMultiplication.type_name(): 2,
Input.type_name(): sys.maxsize,
Output.type_name(): sys.maxsize,
}
schedule = Schedule(sfg, scheduler=HybridScheduler(max_resources=resources))
assert schedule.start_times == {
"in0": 0,
"in2": 0,
"in4": 0,
"in6": 0,
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
"in7": 0,
"bfly6": 0,
"bfly8": 0,
"cmul2": 1,
"cmul3": 1,
"bfly11": 1,
"bfly7": 1,
"cmul0": 2,
"bfly0": 2,
"cmul4": 2,
"bfly5": 3,
"bfly1": 3,
"cmul1": 4,
"bfly2": 4,
"bfly9": 4,
"bfly10": 5,
"bfly3": 5,
"out0": 5,
"out4": 5,
"bfly4": 6,
"out1": 6,
"out2": 6,
"out5": 6,
"out6": 6,
"out7": 7,
"out3": 7,
}
assert schedule.schedule_time == 7
_validate_recreated_sfg_fft(schedule, 8)
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
def test_radix_2_fft_8_points_one_output(self):
sfg = radix_2_dif_fft(points=8)
sfg.set_latency_of_type(ConstantMultiplication.type_name(), 2)
sfg.set_execution_time_of_type(ConstantMultiplication.type_name(), 1)
sfg.set_latency_of_type(Butterfly.type_name(), 1)
sfg.set_execution_time_of_type(Butterfly.type_name(), 1)
resources = {
Butterfly.type_name(): 2,
ConstantMultiplication.type_name(): 2,
Input.type_name(): sys.maxsize,
Output.type_name(): 1,
}
schedule = Schedule(sfg, scheduler=HybridScheduler(max_resources=resources))
assert schedule.start_times == {
"in0": 0,
"in1": 0,
"in2": 0,
"in3": 0,
"in4": 0,
"in5": 0,
"in6": 0,
"in7": 0,
"bfly6": 0,
"bfly8": 0,
"cmul2": 1,
"cmul3": 1,
"bfly11": 1,
"bfly7": 1,
"cmul0": 2,
"bfly0": 2,
"cmul4": 2,
"bfly5": 3,
"bfly1": 3,
"cmul1": 4,
"bfly2": 4,
"bfly9": 4,
"bfly10": 5,
"bfly3": 5,
"out0": 5,
"bfly4": 6,
Simon Bjurek
committed
"out2": 6,
"out3": 7,
"out7": 8,
"out6": 9,
"out4": 10,
Simon Bjurek
committed
"out1": 11,
"out5": 12,
}
assert schedule.schedule_time == 12
_validate_recreated_sfg_fft(schedule, 8)
# This schedule that this test is checking against is faulty and will yield a non-working
# fft implementation, however, it is kept commented out for reference
def test_radix_2_fft_8_points_specified_IO_times_cyclic(self):
sfg = radix_2_dif_fft(points=8)
sfg.set_latency_of_type(Butterfly.type_name(), 3)
sfg.set_latency_of_type(ConstantMultiplication.type_name(), 2)
sfg.set_execution_time_of_type(Butterfly.type_name(), 1)
sfg.set_execution_time_of_type(ConstantMultiplication.type_name(), 1)
resources = {
Butterfly.type_name(): 1,
ConstantMultiplication.type_name(): 1,
Input.type_name(): sys.maxsize,
Output.type_name(): sys.maxsize,
}
input_times = {
"in0": 0,
"in1": 1,
"in2": 2,
"in3": 3,
"in4": 4,
"in5": 5,
"in6": 6,
"in7": 7,
}
output_times = {
Simon Bjurek
committed
"out0": 0,
"out1": 1,
"out2": 2,
"out3": 3,
"out4": 4,
"out5": 5,
"out6": 6,
"out7": 7,
scheduler=HybridScheduler(
resources, input_times=input_times, output_delta_times=output_times
),
cyclic=True,
)
assert schedule.start_times == {
"in0": 0,
"in1": 1,
"in2": 2,
"in3": 3,
"in4": 4,
"in5": 5,
"in6": 6,
"in7": 7,
"bfly0": 4,
"bfly8": 5,
"bfly11": 6,
"bfly6": 7,
"cmul2": 8,
"cmul0": 9,
"bfly1": 9,
"cmul3": 10,
"bfly7": 10,
"bfly2": 11,
"bfly5": 12,
"cmul4": 13,
"bfly9": 13,
Simon Bjurek
committed
"bfly3": 15,
Simon Bjurek
committed
"bfly10": 16,
Simon Bjurek
committed
"out0": 17,
"out1": 18,
"out2": 19,
"out3": 20,
"out4": 1,
"out5": 2,
"out6": 3,
"out7": 4,
}
assert schedule.schedule_time == 20
# impulse input -> constant output
sim = Simulation(schedule.sfg, [Impulse()] + [0 for i in range(7)])
sim.run_for(2)
assert np.allclose(sim.results["0"], [1, 0])
assert np.allclose(sim.results["1"], [1, 0])
assert np.allclose(sim.results["2"], [1, 0])
assert np.allclose(sim.results["3"], [1, 0])
assert np.allclose(sim.results["4"], [0, 1])
assert np.allclose(sim.results["5"], [0, 1])
assert np.allclose(sim.results["6"], [0, 1])
assert np.allclose(sim.results["7"], [0, 1])
# constant input -> impulse (with weight=points) output
sim = Simulation(schedule.sfg, [Impulse() for i in range(8)])
sim.run_for(2)
assert np.allclose(sim.results["0"], [8, 0])
assert np.allclose(sim.results["1"], [0, 0])
assert np.allclose(sim.results["2"], [0, 0])
assert np.allclose(sim.results["3"], [0, 0])
assert np.allclose(sim.results["4"], [0, 0])
assert np.allclose(sim.results["5"], [0, 0])
assert np.allclose(sim.results["6"], [0, 0])
assert np.allclose(sim.results["7"], [0, 0])
def test_radix_2_fft_8_points_specified_IO_times_non_cyclic(self):
sfg = radix_2_dif_fft(points=8)
sfg.set_latency_of_type(Butterfly.type_name(), 3)
sfg.set_latency_of_type(ConstantMultiplication.type_name(), 2)
sfg.set_execution_time_of_type(Butterfly.type_name(), 1)
sfg.set_execution_time_of_type(ConstantMultiplication.type_name(), 1)
resources = {Butterfly.type_name(): 1, ConstantMultiplication.type_name(): 1}
input_times = {
"in0": 0,
"in1": 1,
"in2": 2,
"in3": 3,
"in4": 4,
"in5": 5,
"in6": 6,
"in7": 7,
}
output_times = {
Simon Bjurek
committed
"out0": 0,
"out1": 1,
"out2": 2,
"out3": 3,
"out4": 4,
"out5": 5,
"out6": 6,
"out7": 7,
scheduler=HybridScheduler(
resources, input_times=input_times, output_delta_times=output_times
),
cyclic=False,
)
assert schedule.start_times == {
"in0": 0,
"in1": 1,
"in2": 2,
"in3": 3,
"in4": 4,
"in5": 5,
"in6": 6,
"in7": 7,
"bfly0": 4,
"bfly8": 5,
"bfly11": 6,
"bfly6": 7,
"cmul2": 8,
"cmul0": 9,
"bfly1": 9,
"cmul3": 10,
"bfly7": 10,
"bfly2": 11,
"bfly5": 12,
"cmul4": 13,
"bfly9": 13,
Simon Bjurek
committed
"bfly3": 15,
Simon Bjurek
committed
"bfly10": 16,
Simon Bjurek
committed
"out0": 17,
"out1": 18,
"out2": 19,
"out3": 20,
"out4": 21,
"out5": 22,
"out6": 23,
"out7": 24,
Simon Bjurek
committed
assert schedule.schedule_time == 24
_validate_recreated_sfg_fft(schedule, 8)
def test_ldlt_inverse_2x2(self):
sfg = ldlt_matrix_inverse(N=2)
sfg.set_latency_of_type(MADS.type_name(), 3)
sfg.set_latency_of_type(Reciprocal.type_name(), 2)
sfg.set_execution_time_of_type(MADS.type_name(), 1)
sfg.set_execution_time_of_type(Reciprocal.type_name(), 1)
resources = {
MADS.type_name(): 1,
Reciprocal.type_name(): 1,
Input.type_name(): sys.maxsize,
Output.type_name(): sys.maxsize,
}
schedule = Schedule(
sfg,
scheduler=HybridScheduler(resources),
)
assert schedule.start_times == {
"in0": 0,
"in1": 0,
"in2": 0,
"rec0": 0,
"dontcare1": 2,
"mads0": 2,
"mads3": 5,
"rec1": 8,
"dontcare0": 10,
"mads2": 10,
"mads1": 13,
"out2": 10,
"out1": 13,
"out0": 16,
}
assert schedule.schedule_time == 16
Simon Bjurek
committed
_validate_recreated_sfg_ldlt_matrix_inverse(schedule, 2)
def test_ldlt_inverse_2x2_specified_IO_times_cyclic(self):
sfg = ldlt_matrix_inverse(N=2)
sfg.set_latency_of_type(MADS.type_name(), 3)
sfg.set_latency_of_type(Reciprocal.type_name(), 2)
sfg.set_execution_time_of_type(MADS.type_name(), 1)
sfg.set_execution_time_of_type(Reciprocal.type_name(), 1)
resources = {MADS.type_name(): 1, Reciprocal.type_name(): 1}
input_times = {
"in0": 0,
"in1": 1,
"in2": 2,
}
output_times = {
"out0": 0,
"out1": 1,
"out2": 2,
}
schedule = Schedule(
sfg,
scheduler=HybridScheduler(
resources, input_times=input_times, output_delta_times=output_times
),
cyclic=True,
)
assert schedule.start_times == {
"in0": 0,
"in1": 1,
"in2": 2,
"rec0": 0,
"dontcare1": 2,
"mads0": 2,
"mads3": 5,
"rec1": 8,
"dontcare0": 10,
"mads2": 10,
"mads1": 13,
"out0": 16,
"out1": 1,
"out2": 2,
}
assert schedule.schedule_time == 16
Simon Bjurek
committed
# validate regenerated sfg with random 2x2 real s.p.d. matrix
A = np.random.rand(2, 2)
A = np.dot(A, A.T)
A_inv = np.linalg.inv(A)
input_signals = []
for i in range(2):
for j in range(i, 2):
input_signals.append(Constant(A[i, j]))
sim = Simulation(schedule.sfg, input_signals)
sim.run_for(2)
assert np.allclose(sim.results["0"], [A_inv[0, 0], A_inv[0, 0]])
assert np.allclose(sim.results["1"], [0, A_inv[0, 1]])
assert np.allclose(sim.results["2"], [0, A_inv[1, 1]])
Simon Bjurek
committed
def test_invalid_max_resources(self):
sfg = ldlt_matrix_inverse(N=2)
sfg.set_latency_of_type(MADS.type_name(), 3)
sfg.set_latency_of_type(Reciprocal.type_name(), 2)
sfg.set_execution_time_of_type(MADS.type_name(), 1)
sfg.set_execution_time_of_type(Reciprocal.type_name(), 1)
resources = 2
Simon Bjurek
committed
with pytest.raises(
ValueError, match="Provided max_resources must be a dictionary."
):
Schedule(sfg, scheduler=HybridScheduler(resources))
resources = "test"
Simon Bjurek
committed
with pytest.raises(
ValueError, match="Provided max_resources must be a dictionary."
):
Schedule(sfg, scheduler=HybridScheduler(resources))
resources = []
Simon Bjurek
committed
with pytest.raises(
ValueError, match="Provided max_resources must be a dictionary."
):
Schedule(sfg, scheduler=HybridScheduler(resources))
resources = {1: 1}
with pytest.raises(
Simon Bjurek
committed
ValueError, match="Provided max_resources keys must be strings."
):
Schedule(sfg, scheduler=HybridScheduler(resources))
resources = {MADS.type_name(): "test"}
Simon Bjurek
committed
with pytest.raises(
ValueError, match="Provided max_resources values must be integers."
):
Schedule(sfg, scheduler=HybridScheduler(resources))
Simon Bjurek
committed
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
def test_invalid_max_concurrent_writes(self):
sfg = ldlt_matrix_inverse(N=2)
sfg.set_latency_of_type(MADS.type_name(), 3)
sfg.set_latency_of_type(Reciprocal.type_name(), 2)
sfg.set_execution_time_of_type(MADS.type_name(), 1)
sfg.set_execution_time_of_type(Reciprocal.type_name(), 1)
max_concurrent_writes = "5"
with pytest.raises(
ValueError, match="Provided max_concurrent_writes must be an integer."
):
Schedule(
sfg,
scheduler=HybridScheduler(max_concurrent_writes=max_concurrent_writes),
)
max_concurrent_writes = 0
with pytest.raises(
ValueError, match="Provided max_concurrent_writes must be larger than 0."
):
Schedule(
sfg,
scheduler=HybridScheduler(max_concurrent_writes=max_concurrent_writes),
)
max_concurrent_writes = -1
with pytest.raises(
ValueError, match="Provided max_concurrent_writes must be larger than 0."
):
Schedule(
sfg,
scheduler=HybridScheduler(max_concurrent_writes=max_concurrent_writes),
)
def test_invalid_max_concurrent_reads(self):
sfg = ldlt_matrix_inverse(N=2)
sfg.set_latency_of_type(MADS.type_name(), 3)
sfg.set_latency_of_type(Reciprocal.type_name(), 2)
sfg.set_execution_time_of_type(MADS.type_name(), 1)
sfg.set_execution_time_of_type(Reciprocal.type_name(), 1)
max_concurrent_reads = "5"
with pytest.raises(
ValueError, match="Provided max_concurrent_reads must be an integer."
):
Schedule(
sfg,
scheduler=HybridScheduler(max_concurrent_reads=max_concurrent_reads),
)
max_concurrent_reads = 0
with pytest.raises(
ValueError, match="Provided max_concurrent_reads must be larger than 0."
):
Schedule(
sfg,
scheduler=HybridScheduler(max_concurrent_reads=max_concurrent_reads),
)
max_concurrent_reads = -1
with pytest.raises(
ValueError, match="Provided max_concurrent_reads must be larger than 0."
):
Schedule(
sfg,