Skip to content
Snippets Groups Projects
test_list_schedulers.py 60.3 KiB
Newer Older
import pytest

from b_asic.core_operations import (
    MADS,
    Addition,
    Butterfly,
    ConstantMultiplication,
    Reciprocal,
)
    EarliestDeadlineScheduler,
    HybridScheduler,
    LeastSlackTimeScheduler,
    MaxFanOutScheduler,
from b_asic.schedule import Schedule
from b_asic.sfg_generators import (
    direct_form_1_iir,
    ldlt_matrix_inverse,
    radix_2_dif_fft,
)
from b_asic.signal_flow_graph import SFG
from b_asic.signal_generator import Constant, Impulse
from b_asic.simulation import Simulation
from b_asic.special_operations import Input, Output


class TestEarliestDeadlineScheduler:
    def test_empty_sfg(self, sfg_empty):
        with pytest.raises(
            ValueError, match="Empty signal flow graph cannot be scheduled."
        ):
            Schedule(sfg_empty, scheduler=EarliestDeadlineScheduler())

    def test_direct_form_1_iir(self):
        sfg = direct_form_1_iir([1, 2, 3], [1, 2, 3])

        sfg.set_latency_of_type(ConstantMultiplication.type_name(), 2)
        sfg.set_execution_time_of_type(ConstantMultiplication.type_name(), 1)
        sfg.set_latency_of_type(Addition.type_name(), 3)
        sfg.set_execution_time_of_type(Addition.type_name(), 1)

        resources = {
            Addition.type_name(): 1,
            ConstantMultiplication.type_name(): 1,
            Input.type_name(): sys.maxsize,
            Output.type_name(): sys.maxsize,
        }
        schedule = Schedule(
            sfg, scheduler=EarliestDeadlineScheduler(max_resources=resources)
        )

        assert schedule.start_times == {
            "cmul0": 2,
            "add1": 3,
            "cmul1": 3,
            "cmul2": 4,
            "add2": 10,
            "out0": 13,
        }
        assert schedule.schedule_time == 13
        _validate_recreated_sfg_filter(sfg, schedule)
    def test_direct_form_2_iir_1_add_1_mul(self, sfg_direct_form_iir_lp_filter):
        sfg_direct_form_iir_lp_filter.set_latency_of_type(
            ConstantMultiplication.type_name(), 3
        )
        sfg_direct_form_iir_lp_filter.set_latency_of_type(Addition.type_name(), 2)
        sfg_direct_form_iir_lp_filter.set_execution_time_of_type(
            ConstantMultiplication.type_name(), 1
        )
        sfg_direct_form_iir_lp_filter.set_execution_time_of_type(
            Addition.type_name(), 1
        )

        resources = {
            Addition.type_name(): 1,
            ConstantMultiplication.type_name(): 1,
            Input.type_name(): sys.maxsize,
            Output.type_name(): sys.maxsize,
        }

        schedule = Schedule(
            sfg_direct_form_iir_lp_filter,
            scheduler=EarliestDeadlineScheduler(resources),
        assert schedule.start_times == {
            "cmul1": 2,
            "cmul2": 3,
            "add1": 4,
            "add0": 6,
            "add3": 7,
            "cmul0": 8,
            "add2": 11,
            "out0": 13,
        }

        assert schedule.schedule_time == 13
        _validate_recreated_sfg_filter(sfg_direct_form_iir_lp_filter, schedule)
    def test_direct_form_2_iir_2_add_3_mul(self, sfg_direct_form_iir_lp_filter):
        sfg_direct_form_iir_lp_filter.set_latency_of_type(
            ConstantMultiplication.type_name(), 3
        )
        sfg_direct_form_iir_lp_filter.set_latency_of_type(Addition.type_name(), 2)
        sfg_direct_form_iir_lp_filter.set_execution_time_of_type(
            ConstantMultiplication.type_name(), 1
        )
        sfg_direct_form_iir_lp_filter.set_execution_time_of_type(
            Addition.type_name(), 1
        )

        resources = {
            Addition.type_name(): 2,
            ConstantMultiplication.type_name(): 3,
            Input.type_name(): sys.maxsize,
            Output.type_name(): sys.maxsize,
        }

        schedule = Schedule(
            sfg_direct_form_iir_lp_filter,
            scheduler=EarliestDeadlineScheduler(resources),
        assert schedule.start_times == {
            "cmul1": 0,
            "cmul4": 0,
            "cmul3": 0,
            "cmul2": 1,
            "add1": 3,
            "add3": 4,
            "add0": 5,
            "cmul0": 7,
            "add2": 10,
            "out0": 12,
        }

        assert schedule.schedule_time == 12
        _validate_recreated_sfg_filter(sfg_direct_form_iir_lp_filter, schedule)

    def test_radix_2_fft_8_points(self):
        sfg = radix_2_dif_fft(points=8)

        sfg.set_latency_of_type(ConstantMultiplication.type_name(), 2)
        sfg.set_execution_time_of_type(ConstantMultiplication.type_name(), 1)
        sfg.set_latency_of_type(Butterfly.type_name(), 1)
        sfg.set_execution_time_of_type(Butterfly.type_name(), 1)

        resources = {
            Butterfly.type_name(): 2,
            ConstantMultiplication.type_name(): 2,
            Input.type_name(): sys.maxsize,
            Output.type_name(): sys.maxsize,
        }
        schedule = Schedule(
            sfg, scheduler=EarliestDeadlineScheduler(max_resources=resources)
        )

        assert schedule.start_times == {
            "in7": 0,
            "bfly6": 0,
            "bfly8": 0,
            "cmul2": 1,
            "cmul3": 1,
            "bfly11": 1,
            "bfly7": 1,
            "cmul0": 2,
            "bfly0": 2,
            "cmul4": 2,
            "bfly5": 3,
            "bfly1": 3,
            "cmul1": 4,
            "bfly2": 4,
            "bfly9": 4,
            "bfly10": 5,
            "bfly3": 5,
            "out0": 5,
            "out4": 5,
            "bfly4": 6,
            "out1": 6,
            "out2": 6,
            "out5": 6,
Loading
Loading full blame...