Skip to content
Snippets Groups Projects
test_list_schedulers.py 65.2 KiB
Newer Older
  • Learn to ignore specific revisions
  •         ):
                Schedule(
                    sfg,
                    scheduler=HybridScheduler(max_concurrent_reads=max_concurrent_reads),
                )
    
        def test_invalid_input_times(self):
            sfg = ldlt_matrix_inverse(N=2)
    
    
            sfg.set_latency_of_type_name(MADS.type_name(), 3)
            sfg.set_latency_of_type_name(Reciprocal.type_name(), 2)
            sfg.set_execution_time_of_type_name(MADS.type_name(), 1)
            sfg.set_execution_time_of_type_name(Reciprocal.type_name(), 1)
    
    
            input_times = 5
            with pytest.raises(
                ValueError, match="Provided input_times must be a dictionary."
            ):
                Schedule(sfg, scheduler=HybridScheduler(input_times=input_times))
    
            input_times = "test1"
            with pytest.raises(
                ValueError, match="Provided input_times must be a dictionary."
            ):
                Schedule(sfg, scheduler=HybridScheduler(input_times=input_times))
    
            input_times = []
            with pytest.raises(
                ValueError, match="Provided input_times must be a dictionary."
            ):
                Schedule(sfg, scheduler=HybridScheduler(input_times=input_times))
    
            input_times = {3: 3}
            with pytest.raises(
                ValueError, match="Provided input_times keys must be strings."
            ):
                Schedule(sfg, scheduler=HybridScheduler(input_times=input_times))
    
            input_times = {"in0": "foo"}
            with pytest.raises(
                ValueError, match="Provided input_times values must be integers."
            ):
                Schedule(sfg, scheduler=HybridScheduler(input_times=input_times))
    
            input_times = {"in0": -1}
            with pytest.raises(
                ValueError, match="Provided input_times values must be non-negative."
            ):
                Schedule(sfg, scheduler=HybridScheduler(input_times=input_times))
    
        def test_invalid_output_delta_times(self):
            sfg = ldlt_matrix_inverse(N=2)
    
    
            sfg.set_latency_of_type_name(MADS.type_name(), 3)
            sfg.set_latency_of_type_name(Reciprocal.type_name(), 2)
            sfg.set_execution_time_of_type_name(MADS.type_name(), 1)
            sfg.set_execution_time_of_type_name(Reciprocal.type_name(), 1)
    
    
            output_delta_times = 10
            with pytest.raises(
                ValueError, match="Provided output_delta_times must be a dictionary."
            ):
                Schedule(
                    sfg, scheduler=HybridScheduler(output_delta_times=output_delta_times)
                )
    
            output_delta_times = "test2"
            with pytest.raises(
                ValueError, match="Provided output_delta_times must be a dictionary."
            ):
                Schedule(
                    sfg, scheduler=HybridScheduler(output_delta_times=output_delta_times)
                )
    
            output_delta_times = []
            with pytest.raises(
                ValueError, match="Provided output_delta_times must be a dictionary."
            ):
                Schedule(
                    sfg, scheduler=HybridScheduler(output_delta_times=output_delta_times)
                )
    
            output_delta_times = {4: 4}
            with pytest.raises(
                ValueError, match="Provided output_delta_times keys must be strings."
            ):
                Schedule(
                    sfg, scheduler=HybridScheduler(output_delta_times=output_delta_times)
                )
    
            output_delta_times = {"out0": "foo"}
            with pytest.raises(
                ValueError, match="Provided output_delta_times values must be integers."
            ):
                Schedule(
                    sfg, scheduler=HybridScheduler(output_delta_times=output_delta_times)
                )
    
            output_delta_times = {"out0": -1}
            with pytest.raises(
                ValueError, match="Provided output_delta_times values must be non-negative."
            ):
                Schedule(
                    sfg, scheduler=HybridScheduler(output_delta_times=output_delta_times)
                )
    
        def test_resource_not_in_sfg(self):
            sfg = ldlt_matrix_inverse(N=3)
    
    
            sfg.set_latency_of_type_name(MADS.type_name(), 3)
            sfg.set_latency_of_type_name(Reciprocal.type_name(), 2)
            sfg.set_execution_time_of_type_name(MADS.type_name(), 1)
            sfg.set_execution_time_of_type_name(Reciprocal.type_name(), 1)
    
    
            resources = {
                MADS.type_name(): 1,
                Reciprocal.type_name(): 1,
                Addition.type_name(): 2,
            }
            with pytest.raises(
                ValueError,
                match="Provided max resource of type add cannot be found in the provided SFG.",
            ):
                Schedule(sfg, scheduler=HybridScheduler(resources))
    
        def test_input_not_in_sfg(self):
            sfg = ldlt_matrix_inverse(N=2)
    
    
            sfg.set_latency_of_type_name(MADS.type_name(), 3)
            sfg.set_latency_of_type_name(Reciprocal.type_name(), 2)
            sfg.set_execution_time_of_type_name(MADS.type_name(), 1)
            sfg.set_execution_time_of_type_name(Reciprocal.type_name(), 1)
    
    
            input_times = {"in100": 4}
            with pytest.raises(
                ValueError,
                match="Provided input time with GraphID in100 cannot be found in the provided SFG.",
            ):
                Schedule(sfg, scheduler=HybridScheduler(input_times=input_times))
    
        def test_output_not_in_sfg(self):
            sfg = ldlt_matrix_inverse(N=2)
    
    
            sfg.set_latency_of_type_name(MADS.type_name(), 3)
            sfg.set_latency_of_type_name(Reciprocal.type_name(), 2)
            sfg.set_execution_time_of_type_name(MADS.type_name(), 1)
            sfg.set_execution_time_of_type_name(Reciprocal.type_name(), 1)
    
    
            output_delta_times = {"out90": 2}
            with pytest.raises(
                ValueError,
                match="Provided output delta time with GraphID out90 cannot be found in the provided SFG.",
            ):
                Schedule(
                    sfg, scheduler=HybridScheduler(output_delta_times=output_delta_times)
                )
    
    
        def test_ldlt_inverse_3x3_read_and_write_constrained(self):
            sfg = ldlt_matrix_inverse(N=3)
    
    
            sfg.set_latency_of_type_name(MADS.type_name(), 3)
            sfg.set_latency_of_type_name(Reciprocal.type_name(), 2)
            sfg.set_execution_time_of_type_name(MADS.type_name(), 1)
            sfg.set_execution_time_of_type_name(Reciprocal.type_name(), 1)
    
    
            resources = {MADS.type_name(): 1, Reciprocal.type_name(): 1}
    
            schedule = Schedule(
                sfg,
                scheduler=HybridScheduler(
                    max_resources=resources,
                    max_concurrent_reads=3,
                    max_concurrent_writes=1,
                ),
            )
    
            direct, mem_vars = schedule.get_memory_variables().split_on_length()
            assert mem_vars.read_ports_bound() == 3
            assert mem_vars.write_ports_bound() == 1
    
            _validate_recreated_sfg_ldlt_matrix_inverse(schedule, 3)
    
        def test_32_point_fft_custom_io_times(self):
            POINTS = 32
            sfg = radix_2_dif_fft(POINTS)
    
    
            sfg.set_latency_of_type_name(Butterfly.type_name(), 1)
            sfg.set_latency_of_type_name(ConstantMultiplication.type_name(), 3)
            sfg.set_execution_time_of_type_name(Butterfly.type_name(), 1)
            sfg.set_execution_time_of_type_name(ConstantMultiplication.type_name(), 1)
    
    
            resources = {Butterfly.type_name(): 1, ConstantMultiplication.type_name(): 1}
            input_times = {f"in{i}": i for i in range(POINTS)}
            output_delta_times = {f"out{i}": i for i in range(POINTS)}
            schedule = Schedule(
                sfg,
                scheduler=HybridScheduler(
                    resources,
                    input_times=input_times,
                    output_delta_times=output_delta_times,
                ),
            )
    
            for i in range(POINTS):
                assert schedule.start_times[f"in{i}"] == i
    
                assert schedule.start_times[f"out{i}"] == 95 + i
    
        # def test_64_point_fft_custom_io_times(self):
        #     POINTS = 64
        #     sfg = radix_2_dif_fft(POINTS)
    
    
        #     sfg.set_latency_of_type_name(Butterfly.type_name(), 1)
        #     sfg.set_latency_of_type_name(ConstantMultiplication.type_name(), 3)
        #     sfg.set_execution_time_of_type_name(Butterfly.type_name(), 1)
        #     sfg.set_execution_time_of_type_name(ConstantMultiplication.type_name(), 1)
    
    
        #     resources = {Butterfly.type_name(): 1, ConstantMultiplication.type_name(): 1}
        #     input_times = {f"in{i}": i for i in range(POINTS)}
        #     output_delta_times = {f"out{i}": i for i in range(POINTS)}
        #     schedule = Schedule(
        #         sfg,
        #         scheduler=HybridScheduler(
        #             resources,
        #             input_times=input_times,
        #             output_delta_times=output_delta_times,
        #         ),
        #     )
    
        #     for i in range(POINTS):
        #         assert schedule.start_times[f"in{i}"] == i
        #         assert (
        #             schedule.start_times[f"out{i}"]
    
        #             == schedule.get_max_non_io_end_time() - 1 + i
    
        #         )
    
        def test_32_point_fft_custom_io_times_cyclic(self):
            POINTS = 32
            sfg = radix_2_dif_fft(POINTS)
    
    
            sfg.set_latency_of_type_name(Butterfly.type_name(), 1)
            sfg.set_latency_of_type_name(ConstantMultiplication.type_name(), 3)
            sfg.set_execution_time_of_type_name(Butterfly.type_name(), 1)
            sfg.set_execution_time_of_type_name(ConstantMultiplication.type_name(), 1)
    
    
            resources = {Butterfly.type_name(): 1, ConstantMultiplication.type_name(): 1}
            input_times = {f"in{i}": i for i in range(POINTS)}
            output_delta_times = {f"out{i}": i for i in range(POINTS)}
            schedule = Schedule(
                sfg,
                scheduler=HybridScheduler(
                    resources,
                    input_times=input_times,
                    output_delta_times=output_delta_times,
                ),
                schedule_time=96,
                cyclic=True,
            )
    
            for i in range(POINTS):
                assert schedule.start_times[f"in{i}"] == i
    
                if i == 0:
                    expected_value = 95
                elif i == 1:
                    expected_value = 96
                else:
                    expected_value = i - 1
                assert schedule.start_times[f"out{i}"] == expected_value
    
    
        def test_cyclic_scheduling(self):
            sfg = radix_2_dif_fft(points=4)
    
    
            sfg.set_latency_of_type_name(Butterfly.type_name(), 1)
            sfg.set_latency_of_type_name(ConstantMultiplication.type_name(), 3)
            sfg.set_execution_time_of_type_name(Butterfly.type_name(), 1)
            sfg.set_execution_time_of_type_name(ConstantMultiplication.type_name(), 1)
    
    
            resources = {
                Butterfly.type_name(): 1,
                ConstantMultiplication.type_name(): 1,
            }
            schedule_1 = Schedule(sfg, scheduler=HybridScheduler(resources))
            schedule_2 = Schedule(
                sfg, scheduler=HybridScheduler(resources), schedule_time=6, cyclic=True
            )
            schedule_3 = Schedule(
                sfg, scheduler=HybridScheduler(resources), schedule_time=5, cyclic=True
            )
            schedule_4 = Schedule(
                sfg, scheduler=HybridScheduler(resources), schedule_time=4, cyclic=True
            )
    
            assert schedule_1.start_times == {
                "in1": 0,
                "in3": 1,
                "bfly3": 1,
                "cmul0": 2,
                "in0": 2,
                "in2": 3,
                "bfly0": 3,
                "bfly1": 4,
                "bfly2": 5,
                "out0": 5,
                "out1": 6,
                "out3": 7,
                "out2": 8,
            }
            assert schedule_1.laps == {
                "s4": 0,
                "s6": 0,
                "s5": 0,
                "s7": 0,
                "s8": 0,
                "s12": 0,
                "s10": 0,
                "s9": 0,
                "s0": 0,
                "s2": 0,
                "s11": 0,
                "s1": 0,
                "s3": 0,
            }
            assert schedule_1.schedule_time == 8
    
            assert schedule_2.start_times == {
                "in1": 0,
                "in3": 1,
                "bfly3": 1,
                "cmul0": 2,
                "in0": 2,
                "in2": 3,
                "bfly0": 3,
                "bfly1": 4,
                "bfly2": 5,
                "out0": 5,
                "out1": 6,
                "out3": 1,
                "out2": 2,
            }
            assert schedule_2.laps == {
                "s4": 0,
                "s6": 1,
                "s5": 0,
                "s7": 1,
                "s8": 0,
                "s12": 0,
                "s10": 0,
                "s9": 0,
                "s0": 0,
                "s2": 0,
                "s11": 0,
                "s1": 0,
                "s3": 0,
            }
            assert schedule_2.schedule_time == 6
    
            assert schedule_3.start_times == {
                "in1": 0,
                "in3": 1,
                "bfly3": 1,
                "cmul0": 2,
                "in0": 2,
                "in2": 3,
                "bfly0": 3,
                "bfly1": 4,
                "bfly2": 0,
                "out0": 5,
                "out1": 1,
                "out3": 2,
                "out2": 3,
            }
            assert schedule_3.laps == {
                "s4": 0,
                "s6": 1,
                "s5": 0,
                "s7": 0,
                "s8": 0,
                "s12": 0,
                "s10": 1,
                "s9": 1,
                "s0": 0,
                "s2": 0,
                "s11": 0,
                "s1": 0,
                "s3": 0,
            }
            assert schedule_3.schedule_time == 5
    
            assert schedule_4.start_times == {
                "in1": 0,
                "in3": 1,
                "bfly3": 1,
                "cmul0": 2,
                "in0": 2,
                "in2": 3,
                "bfly0": 3,
                "bfly1": 0,
                "out0": 1,
                "bfly2": 2,
                "out2": 2,
                "out1": 3,
                "out3": 4,
            }
            assert schedule_4.laps == {
                "s4": 0,
                "s6": 0,
                "s5": 0,
                "s7": 0,
                "s8": 1,
                "s12": 1,
                "s10": 0,
                "s9": 1,
                "s0": 0,
                "s2": 0,
                "s11": 0,
                "s1": 0,
                "s3": 0,
            }
            assert schedule_4.schedule_time == 4
    
        def test_resources_not_enough(self):
            sfg = ldlt_matrix_inverse(N=3)
    
    
            sfg.set_latency_of_type_name(MADS.type_name(), 3)
            sfg.set_latency_of_type_name(Reciprocal.type_name(), 2)
            sfg.set_execution_time_of_type_name(MADS.type_name(), 1)
            sfg.set_execution_time_of_type_name(Reciprocal.type_name(), 1)
    
    
            resources = {MADS.type_name(): 1, Reciprocal.type_name(): 1}
            with pytest.raises(
                ValueError,
                match="Amount of resource: mads is not enough to realize schedule for scheduling time: 5.",
            ):
                Schedule(
                    sfg,
                    scheduler=HybridScheduler(
                        max_resources=resources,
                    ),
                    schedule_time=5,
                )
    
    
            sfg.set_latency_of_type_name(Butterfly.type_name(), 1)
            sfg.set_latency_of_type_name(ConstantMultiplication.type_name(), 3)
            sfg.set_execution_time_of_type_name(Butterfly.type_name(), 1)
            sfg.set_execution_time_of_type_name(ConstantMultiplication.type_name(), 1)
    
    
            resources = {
                Butterfly.type_name(): 1,
                ConstantMultiplication.type_name(): 1,
            }
            with pytest.raises(
                ValueError,
                match="Amount of resource: bfly is not enough to realize schedule for scheduling time: 6.",
            ):
                Schedule(
                    sfg,
                    scheduler=HybridScheduler(
                        resources, max_concurrent_reads=2, max_concurrent_writes=2
                    ),
                    schedule_time=6,
                    cyclic=True,
                )
    
    
        def test_scheduling_time_not_enough(self):
            sfg = ldlt_matrix_inverse(N=3)
    
    
            sfg.set_latency_of_type_name(MADS.type_name(), 3)
            sfg.set_latency_of_type_name(Reciprocal.type_name(), 2)
            sfg.set_execution_time_of_type_name(MADS.type_name(), 1)
            sfg.set_execution_time_of_type_name(Reciprocal.type_name(), 1)
    
    
            resources = {MADS.type_name(): 10, Reciprocal.type_name(): 10}
            with pytest.raises(
                ValueError,
                match="Provided scheduling time 5 cannot be reached, try to enable the cyclic property or increase the time to at least 30.",
            ):
                Schedule(
                    sfg,
                    scheduler=HybridScheduler(
                        max_resources=resources,
                    ),
                    schedule_time=5,
                )
    
        def test_cyclic_scheduling_write_and_read_constrained(self):
            sfg = radix_2_dif_fft(points=4)
    
    
            sfg.set_latency_of_type_name(Butterfly.type_name(), 1)
            sfg.set_latency_of_type_name(ConstantMultiplication.type_name(), 3)
            sfg.set_execution_time_of_type_name(Butterfly.type_name(), 1)
            sfg.set_execution_time_of_type_name(ConstantMultiplication.type_name(), 1)
    
    
            resources = {
                Butterfly.type_name(): 1,
                ConstantMultiplication.type_name(): 1,
            }
            schedule = Schedule(
                sfg,
                scheduler=HybridScheduler(
    
                    resources, max_concurrent_reads=2, max_concurrent_writes=3
    
                ),
                schedule_time=6,
                cyclic=True,
            )
    
            assert schedule.start_times == {
                "in1": 0,
                "in3": 1,
                "bfly3": 1,
                "cmul0": 2,
    
                "in0": 2,
                "in2": 3,
                "bfly0": 3,
                "bfly1": 4,
                "bfly2": 5,
                "out0": 5,
                "out1": 6,
                "out3": 1,
                "out2": 2,
    
            }
            assert schedule.laps == {
                "s4": 0,
                "s6": 1,
                "s5": 0,
    
                "s0": 0,
                "s2": 0,
                "s11": 0,
                "s1": 0,
                "s3": 0,
            }
            assert schedule.schedule_time == 6
    
    
            _, mem_vars = schedule.get_memory_variables().split_on_length()
    
            assert mem_vars.read_ports_bound() <= 2
            assert mem_vars.write_ports_bound() <= 3
    
    
        def test_cyclic_scheduling_several_inputs_and_outputs(self):
            sfg = radix_2_dif_fft(points=4)
    
    
            sfg.set_latency_of_type_name(Butterfly.type_name(), 1)
            sfg.set_latency_of_type_name(ConstantMultiplication.type_name(), 3)
            sfg.set_execution_time_of_type_name(Butterfly.type_name(), 1)
            sfg.set_execution_time_of_type_name(ConstantMultiplication.type_name(), 1)
    
    
            resources = {
                Butterfly.type_name(): 1,
                ConstantMultiplication.type_name(): 1,
                Input.type_name(): 2,
                Output.type_name(): 2,
            }
            schedule = Schedule(
                sfg, scheduler=HybridScheduler(resources), schedule_time=4, cyclic=True
            )
    
            assert schedule.schedule_time == 4
    
            _validate_recreated_sfg_fft(schedule, points=4, delays=[0, 1, 0, 1])
    
    
        def test_invalid_output_delta_time(self):
            sfg = radix_2_dif_fft(points=4)
    
    
            sfg.set_latency_of_type_name(Butterfly.type_name(), 1)
            sfg.set_latency_of_type_name(ConstantMultiplication.type_name(), 3)
            sfg.set_execution_time_of_type_name(Butterfly.type_name(), 1)
            sfg.set_execution_time_of_type_name(ConstantMultiplication.type_name(), 1)
    
    
            resources = {
                Butterfly.type_name(): 1,
                ConstantMultiplication.type_name(): 1,
                Input.type_name(): 2,
                Output.type_name(): 2,
            }
            output_delta_times = {"out0": 0, "out1": 1, "out2": 2, "out3": 3}
    
            with pytest.raises(
                ValueError,
                match="Cannot place output out2 at time 6 for scheduling time 5. Try to relax the scheduling time, change the output delta times or enable cyclic.",
            ):
                Schedule(
                    sfg,
                    scheduler=HybridScheduler(
                        resources, output_delta_times=output_delta_times
                    ),
                    schedule_time=5,
                )
    
    
        def test_iteration_period_bound(self):
            sfg = direct_form_1_iir([1, 2, 3], [1, 2, 3])
    
    
            sfg.set_latency_of_type_name(ConstantMultiplication.type_name(), 2)
            sfg.set_execution_time_of_type_name(ConstantMultiplication.type_name(), 1)
            sfg.set_latency_of_type_name(Addition.type_name(), 3)
            sfg.set_execution_time_of_type_name(Addition.type_name(), 1)
    
    
            resources = {
                Addition.type_name(): 1,
                ConstantMultiplication.type_name(): 1,
            }
    
            with pytest.raises(
                ValueError,
                match="Provided scheduling time 5 must be larger or equal to the iteration period bound: 8.",
            ):
                Schedule(
                    sfg,
                    scheduler=EarliestDeadlineScheduler(max_resources=resources),
                    schedule_time=5,
                    cyclic=True,
                )
    
    
        def test_latency_offsets(self):
            sfg = ldlt_matrix_inverse(
                N=3,
                mads_properties={
                    "latency_offsets": {"in0": 3, "in1": 0, "in2": 0, "out0": 4},
                    "execution_time": 1,
                },
                reciprocal_properties={"latency": 10, "execution_time": 1},
            )
            schedule = Schedule(sfg, scheduler=HybridScheduler())
    
            assert schedule.start_times == {
                "dontcare0": 49,
                "dontcare1": 50,
                "dontcare2": 31,
                "dontcare3": 55,
                "dontcare4": 14,
                "dontcare5": 13,
                "in0": 0,
                "in1": 1,
                "in2": 3,
                "in3": 2,
                "in4": 4,
                "in5": 5,
                "mads0": 10,
                "mads1": 11,
                "mads10": 32,
                "mads11": 47,
                "mads12": 16,
                "mads13": 15,
                "mads14": 14,
                "mads2": 55,
                "mads3": 51,
                "mads4": 58,
                "mads5": 54,
                "mads6": 52,
                "mads7": 50,
                "mads8": 28,
                "mads9": 46,
                "out0": 62,
                "out1": 58,
                "out2": 55,
                "out3": 54,
                "out4": 50,
                "out5": 46,
                "rec0": 0,
                "rec1": 18,
                "rec2": 36,
            }
    
    
            assert all(val == 0 for val in schedule.laps.values())
    
            _validate_recreated_sfg_ldlt_matrix_inverse(schedule, 3)
    
    
        def test_latency_offsets_cyclic(self):
            sfg = ldlt_matrix_inverse(
                N=3,
                mads_properties={
                    "latency_offsets": {"in0": 3, "in1": 0, "in2": 0, "out0": 4},
                    "execution_time": 1,
                },
                reciprocal_properties={"latency": 10, "execution_time": 1},
            )
            schedule = Schedule(
                sfg,
                scheduler=HybridScheduler(),
                schedule_time=49,
                cyclic=True,
            )
    
    
            assert schedule.schedule_time == 49
            _validate_recreated_sfg_ldlt_matrix_inverse(
                schedule, N=3, delays=[1, 1, 1, 1, 1, 0]
            )
    
    
        def test_latency_offsets_cyclic_min_schedule_time(self):
            sfg = ldlt_matrix_inverse(
                N=3,
                mads_properties={
                    "latency_offsets": {"in0": 3, "in1": 0, "in2": 0, "out0": 4},
                    "execution_time": 1,
                },
                reciprocal_properties={"latency": 10, "execution_time": 1},
            )
            schedule = Schedule(
                sfg,
                scheduler=HybridScheduler(),
                schedule_time=15,
                cyclic=True,
            )
    
    
            assert schedule.schedule_time == 15
            _validate_recreated_sfg_ldlt_matrix_inverse(
                schedule, N=3, delays=[4, 4, 3, 3, 3, 3]
            )
    
    
    
    Simon Bjurek's avatar
    Simon Bjurek committed
    class TestListScheduler:
        def test_latencies_and_execution_times_not_set(self):
            N = 3
            Wc = 0.2
            b, a = signal.butter(N, Wc, btype="lowpass", output="ba")
            sfg = direct_form_1_iir(b, a)
    
    
            sfg.set_latency_of_type_name(ConstantMultiplication.type_name(), 2)
    
    Simon Bjurek's avatar
    Simon Bjurek committed
    
            resources = {
                Addition.type_name(): 1,
                ConstantMultiplication.type_name(): 1,
                Input.type_name(): 1,
                Output.type_name(): 1,
            }
    
            with pytest.raises(
                ValueError,
                match="Input port 0 of operation add4 has no latency-offset.",
            ):
                Schedule(
                    sfg,
                    scheduler=ListScheduler(
                        sort_order=((1, True), (3, False), (4, False)),
                        max_resources=resources,
                    ),
                )
    
    
            sfg.set_latency_offsets_of_type_name(Addition.type_name(), {"in0": 0, "in1": 0})
    
    Simon Bjurek's avatar
    Simon Bjurek committed
            with pytest.raises(
                ValueError,
                match="Output port 0 of operation add4 has no latency-offset.",
            ):
                Schedule(
                    sfg,
                    scheduler=ListScheduler(
                        sort_order=((1, True), (3, False), (4, False)),
                        max_resources=resources,
                    ),
                )
    
    
            sfg.set_latency_of_type_name(Addition.type_name(), 3)
            sfg.set_execution_time_of_type_name(ConstantMultiplication.type_name(), None)
            sfg.set_execution_time_of_type_name(Addition.type_name(), None)
    
    Simon Bjurek's avatar
    Simon Bjurek committed
    
            with pytest.raises(
                ValueError,
                match="All operations in the SFG must have a specified execution time. Missing operation: cmul0.",
            ):
                Schedule(
                    sfg,
                    scheduler=ListScheduler(
                        sort_order=((1, True), (3, False), (4, False)),
                        max_resources=resources,
                    ),
                )
    
    
            sfg.set_execution_time_of_type_name(ConstantMultiplication.type_name(), 1)
    
    Simon Bjurek's avatar
    Simon Bjurek committed
    
            with pytest.raises(
                ValueError,
                match="All operations in the SFG must have a specified execution time. Missing operation: add0.",
            ):
                Schedule(
                    sfg,
                    scheduler=ListScheduler(
                        sort_order=((1, True), (3, False), (4, False)),
                        max_resources=resources,
                    ),
                )
    
    
            sfg.set_execution_time_of_type_name(Addition.type_name(), 1)
    
    Simon Bjurek's avatar
    Simon Bjurek committed
    
            Schedule(
                sfg,
                scheduler=ListScheduler(
                    sort_order=((1, True), (3, False), (4, False)), max_resources=resources
                ),
            )
    
    
        def test_cyclic_and_recursive_loops(self):
            N = 3
            Wc = 0.2
            b, a = signal.butter(N, Wc, btype="lowpass", output="ba")
            sfg = direct_form_1_iir(b, a)
    
            sfg.set_latency_of_type_name(ConstantMultiplication.type_name(), 2)
            sfg.set_execution_time_of_type_name(ConstantMultiplication.type_name(), 1)
            sfg.set_latency_of_type_name(Addition.type_name(), 3)
            sfg.set_execution_time_of_type_name(Addition.type_name(), 1)
    
            resources = {
                Addition.type_name(): 1,
                ConstantMultiplication.type_name(): 1,
                Input.type_name(): 1,
                Output.type_name(): 1,
            }
    
            with pytest.raises(
                ValueError,
                match="ListScheduler does not support cyclic scheduling of recursive algorithms. Use RecursiveListScheduler instead.",
            ):
                Schedule(
                    sfg,
                    scheduler=ListScheduler(
                        sort_order=((1, True), (3, False), (4, False)),
                        max_resources=resources,
                    ),
                    cyclic=True,
                    schedule_time=sfg.iteration_period_bound(),
                )
    
    
    class TestRecursiveListScheduler:
        def test_empty_sfg(self, sfg_empty):
            with pytest.raises(
                ValueError, match="Empty signal flow graph cannot be scheduled."
            ):
                Schedule(
                    sfg_empty,
                    scheduler=RecursiveListScheduler(
                        sort_order=((1, True), (3, False), (4, False))
                    ),
                )
    
        def test_direct_form_1_iir(self):
            N = 3
            Wc = 0.2
            b, a = signal.butter(N, Wc, btype="lowpass", output="ba")
            sfg = direct_form_1_iir(b, a)
    
    
            sfg.set_latency_of_type_name(ConstantMultiplication.type_name(), 2)
            sfg.set_execution_time_of_type_name(ConstantMultiplication.type_name(), 1)
            sfg.set_latency_of_type_name(Addition.type_name(), 3)
            sfg.set_execution_time_of_type_name(Addition.type_name(), 1)
    
    
            resources = {
                Addition.type_name(): 1,
                ConstantMultiplication.type_name(): 1,
                Input.type_name(): 1,
                Output.type_name(): 1,
    
            schedule = Schedule(
                sfg,
                scheduler=RecursiveListScheduler(
                    sort_order=((1, True), (3, False), (4, False)), max_resources=resources
                ),
            )
            _validate_recreated_sfg_filter(sfg, schedule)
    
        def test_direct_form_2_iir(self):
            N = 3
            Wc = 0.2
            b, a = signal.butter(N, Wc, btype="lowpass", output="ba")
            sfg = direct_form_2_iir(b, a)
    
    
            sfg.set_latency_of_type_name(ConstantMultiplication.type_name(), 2)
            sfg.set_execution_time_of_type_name(ConstantMultiplication.type_name(), 1)
            sfg.set_latency_of_type_name(Addition.type_name(), 3)
            sfg.set_execution_time_of_type_name(Addition.type_name(), 1)
    
    
            resources = {
                Addition.type_name(): 1,
                ConstantMultiplication.type_name(): 1,
                Input.type_name(): 1,
                Output.type_name(): 1,
    
            schedule = Schedule(
                sfg,
                scheduler=RecursiveListScheduler(
                    sort_order=((1, True), (3, False), (4, False)), max_resources=resources
                ),
            )
            _validate_recreated_sfg_filter(sfg, schedule)
    
        def test_large_direct_form_2_iir(self):
    
            Wc = 0.2
            b, a = signal.butter(N, Wc, btype="lowpass", output="ba")
            sfg = direct_form_2_iir(b, a)
    
    
            sfg.set_latency_of_type_name(ConstantMultiplication.type_name(), 2)
            sfg.set_execution_time_of_type_name(ConstantMultiplication.type_name(), 1)
            sfg.set_latency_of_type_name(Addition.type_name(), 3)
            sfg.set_execution_time_of_type_name(Addition.type_name(), 1)
    
    
            resources = {
                Addition.type_name(): 1,
                ConstantMultiplication.type_name(): 1,
                Input.type_name(): 1,
                Output.type_name(): 1,
            }
            schedule = Schedule(
                sfg,
                scheduler=RecursiveListScheduler(
                    sort_order=((1, True), (3, False), (4, False)), max_resources=resources
                ),
            )
            _validate_recreated_sfg_filter(sfg, schedule)
    
        def test_custom_recursive_filter(self):
            # Create the SFG for a digital filter (seen in an exam question from TSTE87).
            x = Input()
            t0 = Delay()
            t1 = Delay(t0)
            b = ConstantMultiplication(0.5, x)
            d = ConstantMultiplication(0.5, t1)
            a1 = Addition(x, d)
            a = ConstantMultiplication(0.5, a1)
            t2 = Delay(a1)
            c = ConstantMultiplication(0.5, t2)
            a2 = Addition(b, c)
            a3 = Addition(a2, a)
            t0.input(0).connect(a3)
            y = Output(a2)
            sfg = SFG([x], [y])
    
    
            sfg.set_latency_of_type_name(Addition.type_name(), 1)
            sfg.set_latency_of_type_name(ConstantMultiplication.type_name(), 2)
            sfg.set_execution_time_of_type_name(Addition.type_name(), 1)
            sfg.set_execution_time_of_type_name(ConstantMultiplication.type_name(), 1)
    
    
            resources = {
                Addition.type_name(): 1,
                ConstantMultiplication.type_name(): 1,
                Input.type_name(): 1,
                Output.type_name(): 1,
            }
            schedule = Schedule(
                sfg,
                scheduler=RecursiveListScheduler(
                    sort_order=((1, True), (3, False), (4, False)), max_resources=resources
                ),
            )
            _validate_recreated_sfg_filter(sfg, schedule)
    
    
    def _validate_recreated_sfg_filter(sfg: SFG, schedule: Schedule) -> None:
        # compare the impulse response of the original sfg and recreated one
        sim1 = Simulation(sfg, [Impulse()])
    
        sim2 = Simulation(schedule.sfg, [Impulse()])
    
        spectrum_1 = abs(np.fft.fft(sim1.results["0"]))
        spectrum_2 = abs(np.fft.fft(sim2.results["0"]))
    
    def _validate_recreated_sfg_fft(
        schedule: Schedule, points: int, delays: list[int] | None = None
    ) -> None:
        if delays is None:
            delays = [0 for i in range(points)]
    
        sim = Simulation(schedule.sfg, [Constant()] + [0 for i in range(points - 1)])
        sim.run_for(128)
    
            assert np.all(np.isclose(sim.results[str(i)][delays[i] :], 1))
    
    
        # constant input -> impulse (with weight=points) output
    
        sim = Simulation(schedule.sfg, [Constant() for i in range(points)])
        sim.run_for(128)
    
        assert np.allclose(sim.results["0"], points)
        for i in range(1, points):
    
            assert np.all(np.isclose(sim.results[str(i)][delays[i] :], 0))
    
    
        # sine input -> compare with numpy fft
        n = np.linspace(0, 2 * np.pi, points)
        waveform = np.sin(n)
        input_samples = [Constant(waveform[i]) for i in range(points)]
        sim = Simulation(schedule.sfg, input_samples)
    
        sim.run_for(128)
        exp_res = np.fft.fft(waveform)
    
            assert np.all(np.isclose(a, b))
    
    
        # multi-tone input -> compare with numpy fft
        n = np.linspace(0, 2 * np.pi, points)
        waveform = (
            2 * np.sin(n)
            + 1.3 * np.sin(0.9 * n)
            + 0.9 * np.sin(0.6 * n)
            + 0.35 * np.sin(0.3 * n)
            + 2.4 * np.sin(0.1 * n)
        )