Skip to content
Snippets Groups Projects
simulation.py 7.54 KiB
Newer Older
  • Learn to ignore specific revisions
  • """
    B-ASIC Simulation Module.
    
    
    Contains a class for simulating the result of an SFG given a set of input values.
    
    from numbers import Number
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
    from typing import (
        Callable,
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        Mapping,
        MutableMapping,
        MutableSequence,
        Optional,
        Sequence,
        Union,
    )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
    import numpy as np
    
    from b_asic.operation import MutableDelayMap, ResultKey
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
    from b_asic.signal_flow_graph import SFG
    
    from b_asic.types import Num
    
    ResultArrayMap = Mapping[ResultKey, Sequence[Num]]
    MutableResultArrayMap = MutableMapping[ResultKey, MutableSequence[Num]]
    InputFunction = Callable[[int], Num]
    InputProvider = Union[Num, Sequence[Num], InputFunction]
    
        """
        Simulation of an SFG.
    
        Use FastSimulation (from the C++ extension module) for a more effective
        simulation when running many iterations.
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
    
        Parameters
        ----------
        sfg : SFG
            The signal flow graph to simulate.
        input_providers : list, optional
            Input values, one list item per input. Each list item can be an array of values,
            a callable taking a time index and returning the value, or a
            number (constant input). If a value is not provided for an input, it will be 0.
    
        _sfg: SFG
        _results: MutableResultArrayMap
        _delays: MutableDelayMap
        _iteration: int
    
        _input_functions: List[InputFunction]
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        def __init__(
            self,
            sfg: SFG,
            input_providers: Optional[Sequence[Optional[InputProvider]]] = None,
        ):
    
            if not isinstance(sfg, SFG):
                raise TypeError("An SFG must be provided")
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            # Copy the SFG to make sure it's not modified from the outside.
            self._sfg = sfg()
    
            self._results = defaultdict(list)
            self._delays = {}
            self._iteration = 0
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            self._input_functions = [lambda _: 0 for _ in range(self._sfg.input_count)]
    
            self._input_length = None
            if input_providers is not None:
                self.set_inputs(input_providers)
    
        def set_input(self, index: int, input_provider: InputProvider) -> None:
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            Set the input used to get values for the specific input at the given index of\
     the internal SFG.
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
    
            Parameters
            ----------
            index : int
                The input index.
            input_provider : list, callable, or number
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                Can be an array of values, a callable taking a time index and returning
                the value, or a number (constant input).
    
            if index < 0 or index >= len(self._input_functions):
                raise IndexError(
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                    "Input index out of range (expected"
                    f" 0-{len(self._input_functions) - 1}, got {index})"
                )
    
            if callable(input_provider):
                self._input_functions[index] = input_provider
            elif isinstance(input_provider, Number):
                self._input_functions[index] = lambda _: input_provider
            else:
                if self._input_length is None:
                    self._input_length = len(input_provider)
                elif self._input_length != len(input_provider):
                    raise ValueError(
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        "Inconsistent input length for simulation (was"
                        f" {self._input_length}, got {len(input_provider)})"
                    )
    
                self._input_functions[index] = lambda n: input_provider[n]
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        def set_inputs(self, input_providers: Sequence[Optional[InputProvider]]) -> None:
    
            """
            Set the input functions used to get values for the inputs to the internal SFG.
    
            if len(input_providers) != self._sfg.input_count:
                raise ValueError(
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                    "Wrong number of inputs supplied to simulation (expected"
                    f" {self._sfg.input_count}, got {len(input_providers)})"
                )
    
            for index, input_provider in enumerate(input_providers):
                if input_provider is not None:
                    self.set_input(index, input_provider)
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        def step(
            self,
            save_results: bool = True,
            bits_override: Optional[int] = None,
            truncate: bool = True,
    
        ) -> Sequence[Num]:
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            """Run one iteration of the simulation and return the resulting output values.
    
            return self.run_for(1, save_results, bits_override, truncate)
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        def run_until(
            self,
            iteration: int,
            save_results: bool = True,
            bits_override: Optional[int] = None,
            truncate: bool = True,
    
        ) -> Sequence[Num]:
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            Run the simulation until its iteration is greater than or equal to the given\
     iteration and return the output values of the last iteration.
    
            result: Sequence[Num] = []
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                input_values = [
                    self._input_functions[i](self._iteration)
                    for i in range(self._sfg.input_count)
                ]
    
                results = {}
                result = self._sfg.evaluate_outputs(
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                    input_values,
                    results,
                    self._delays,
                    "",
                    bits_override,
                    truncate,
                )
    
                if save_results:
                    for key, value in results.items():
                        self._results[key].append(value)
                self._iteration += 1
            return result
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
        def run_for(
            self,
            iterations: int,
            save_results: bool = True,
            bits_override: Optional[int] = None,
            truncate: bool = True,
    
        ) -> Sequence[Num]:
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            Run a given number of iterations of the simulation and return the output\
     values of the last iteration.
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            return self.run_until(
                self._iteration + iterations, save_results, bits_override, truncate
            )
    
        def run(
            self,
            save_results: bool = True,
            bits_override: Optional[int] = None,
            truncate: bool = True,
    
        ) -> Sequence[Num]:
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            Run the simulation until the end of its input arrays and return the output\
     values of the last iteration.
    
            if self._input_length is None:
                raise IndexError("Tried to run unlimited simulation")
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            return self.run_until(self._input_length, save_results, bits_override, truncate)
    
    
        @property
        def iteration(self) -> int:
            """Get the current iteration number of the simulation."""
            return self._iteration
    
        @property
        def results(self) -> ResultArrayMap:
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            Get a mapping from result keys to numpy arrays containing all results.
    
            This includes intermediate values, calculated for each iteration up until now
            that was run with *save_results* enabled.
            The mapping is indexed using the ``key()`` method of Operation with the
            appropriate output index.
    
            Example result after 3 iterations::
    
                {"c1": [3, 6, 7], "c2": [4, 5, 5], "bfly1.0": [7, 0, 0], "bfly1.1": [-1, 0, 2], "0": [7, -2, -1]}
    
            """
            return {key: np.array(value) for key, value in self._results.items()}
    
        def clear_results(self) -> None:
            """Clear all results that were saved until now."""
            self._results.clear()
    
            """
            Clear all current state of the simulation, except for the results and iteration.
    
    
        def show(self) -> None:
            """Show the simulation results."""
            # import here to avoid cyclic imports
            from b_asic.gui_utils.plot_window import start_simulation_dialog
    
            start_simulation_dialog(self.results, self._sfg.name)