Skip to content
Snippets Groups Projects
signal_flow_graph.py 87.7 KiB
Newer Older
  • Learn to ignore specific revisions
  •         Unfold the SFG *factor* times. Return a new SFG without modifying the original.
    
            Inputs and outputs are ordered with early inputs first. That is for an SFG
    
    Frans Skarman's avatar
    Frans Skarman committed
            with n inputs, the first n inputs are the inputs at time t, the next n
            inputs are the inputs at time t+1, the next n at t+2 and so on.
    
            Parameters
            ----------
    
            if factor == 0:
    
                raise ValueError("Unfolding 0 times removes the SFG")
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            sfg = self()  # copy the sfg
    
            inputs = sfg.input_operations
            outputs = sfg.output_operations
    
            # Remove all delay elements in the SFG and replace each one
            # with one input operation and one output operation
            for delay in sfg.find_by_type_name(Delay.type_name()):
                i = Input(name="input_" + delay.graph_id)
                o = Output(
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                    src0=delay.input(0).signals[0].source, name="output_" + delay.graph_id
                )
    
                inputs.append(i)
                outputs.append(o)
    
                # move all outgoing signals from the delay to the new input operation
                while len(delay.output(0).signals) > 0:
                    signal = delay.output(0).signals[0]
                    destination = signal.destination
                    destination.remove_signal(signal)
                    signal.remove_source()
                    destination.connect(i.output(0))
    
                delay.input(0).signals[0].remove_source()
                delay.input(0).clear()
    
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            new_sfg = SFG(inputs, outputs)  # The new sfg without the delays
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            sfgs = [new_sfg() for _ in range(factor)]  # Copy the SFG factor times
    
            # Add suffixes to all graphIDs and names in order to keep them separated
    
            for i in range(factor):
                for operation in sfgs[i].operations:
                    suffix = f'_{i}'
                    operation.graph_id = operation.graph_id + suffix
    
                    if operation.name[:7] not in ['', 'input_t', 'output_']:
                        operation.name = operation.name + suffix
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            input_name_to_idx = {}  # save the input port indices for future reference
    
            new_inputs = []
            # For each copy of the SFG, create new input operations for every "original"
            # input operation and connect them to begin creating the unfolded SFG
            for i in range(factor):
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                for port, operation in zip(sfgs[i].inputs, sfgs[i].input_operations):
    
                    if not operation.name.startswith("input_t"):
                        i = Input()
                        new_inputs.append(i)
                        port.connect(i)
    
                        # If the input was created earlier when removing the delays
                        # then just save the index
                        input_name_to_idx[operation.name] = port.index
    
            # Connect the original outputs in the same way as the inputs
            # Also connect the copies of the SFG together according to a formula
            # from the TSTE87 course material, and save the number of delays for
            # each interconnection
            new_outputs = []
            delay_placements = {}
            for i in range(factor):
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                for port, operation in zip(sfgs[i].outputs, sfgs[i].output_operations):
    
                    if not operation.name.startswith("output_t"):
                        new_outputs.append(Output(port))
                    else:
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        index = operation.name[8:]  # Remove the "output_t" prefix
    
                        j = (i + 1) % factor
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        number_of_delays_between = (i + 1) // factor
    
                        input_port = sfgs[j].input(input_name_to_idx["input_t" + index])
                        input_port.connect(port)
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
                        delay_placements[port] = [i, number_of_delays_between]
                sfgs[i].graph_id = (
                    f'sfg{i}'  # deterministically set the graphID of the sfgs
                )
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            sfg = SFG(new_inputs, new_outputs)  # create a new SFG to remove floating nodes
    
            # Insert the interconnect delays according to what is saved in delay_placements
    
    Oscar Gustafsson's avatar
    Oscar Gustafsson committed
            for port, val in delay_placements.items():
    
                i, no_of_delays = val
                for _ in range(no_of_delays):
                    sfg = sfg.insert_operation_after(f'sfg{i}.{port.index}', Delay())
    
            # Flatten all the copies of the original SFG
            for i in range(factor):
                sfg.find_by_id(f'sfg{i}').connect_external_signals_to_components()
                sfg = sfg()
    
        @property
        def is_linear(self) -> bool:
            return all(op.is_linear for op in self.split())
    
        @property
        def is_constant(self) -> bool:
            return all(output.is_constant for output in self._output_operations)
    
        @property
        def is_commutative(self) -> bool:
            """
            Checks if all operations in the SFG are commutative.
    
            An operation is considered commutative if it is not in B-ASIC Special Operations Module,
            and its `is_commutative` property is `True`.
    
            Returns
            -------
            bool: `True` if all operations are commutative, `False` otherwise.
            """
            return all(
                (
                    op.is_commutative
                    if op.type_name() not in ["in", "out", "t", "c"]
                    else True
                )
                for op in self.split()
            )
    
        @property
        def is_distributive(self) -> bool:
            """
            Checks if the SFG is distributive.
    
            An operation is considered distributive if it can be applied to each element of a set separately.
            For example, multiplication is distributive over addition, meaning that `a * (b + c)` is equivalent to `a * b + a * c`.
    
            Returns
            -------
            bool: True if the SFG is distributive, False otherwise.
    
            Examples
            --------
                >>> Mad_op = MAD(Input(), Input(), Input()) # Creates an instance of the Mad operation, MAD is defined in b_asic.core_operations
                >>> Mad_sfg = Mad_op.to_sfg()  # The operation is turned into a sfg
                >>> Mad_sfg.is_distributive    # True  # if the distributive property holds, False otherwise
    
            """
            structures = []
            operations = self.get_operations_topological_order()
            for op in operations:
                if not (
                    op.type_name() == "in"
                    or op.type_name() == "out"
                    or op.type_name() == "c"
                    or op.type_name() == "t"
                ):
                    structures.append(op)
            return (
                all(self.has_distributive_structure(op) for op in structures)
                if len(structures) > 1
                else False
            )
    
        def has_distributive_structure(self, op: Operation) -> bool:
            """
            Checks if the SFG contains distributive structures.
            NOTE: a*b + c = a*(b + c/a) is considered distributive. Meaning that an algorithm transformation would require an additionat operation.
    
            Parameters:
            ----------
            op : Operation
                The operation that is the start of the structure to check for distributivity.
    
            Returns:
            -------
                bool: True if a distributive structures is found, False otherwise.
            """
            # TODO Butterfly and SymmetricTwoportAdaptor needs to be converted to a SF using to_sfg() in order to be evaluated
            if op.type_name() == 'mac':
                return True
            elif op.type_name() in ['mul', 'div']:
                for subsequent_op in op.subsequent_operations:
                    if subsequent_op.type_name() in [
                        'add',
                        'sub',
                        'addsub',
                        'min',
                        'max',
                        'sqrt',
                        'abs',
                        'rec',
                        'out',
                        't',
                    ]:
                        return True
                    elif subsequent_op.type_name() in ['mul', 'div']:
                        for subsequent_op in subsequent_op.subsequent_operations:
                            return self.has_distributive_structure(subsequent_op)
                    else:
                        return False
            elif op.type_name() in ['cmul', 'shift', 'rshift', 'lshift']:
                for subsequent_op in op.subsequent_operations:
                    if subsequent_op.type_name() in [
                        'add',
                        'sub',
                        'addsub',
                        'min',
                        'max',
                        'out',
                        't',
                    ]:
                        return True
                    elif subsequent_op.type_name() in ['cmul', 'shift', 'rshift', 'lshift']:
                        for subsequent_op in subsequent_op.subsequent_operations:
                            return self.has_distributive_structure(subsequent_op)
                    else:
                        return False
            elif op.type_name() in ['add', 'sub', 'addsub']:
                for subsequent_op in op.subsequent_operations:
                    if subsequent_op.type_name() in [
                        'mul',
                        'div',
                        'min',
                        'max',
                        'out',
                        'cmul',
                        't',
                    ]:
                        return True
                    elif subsequent_op.type_name() in ['add', 'sub', 'addsub']:
                        for subsequent_op in subsequent_op.subsequent_operations:
                            return self.has_distributive_structure(subsequent_op)
                    else:
                        return False
            elif op.type_name() in ['min', 'max']:
                for subsequent_op in op.subsequent_operations:
                    if subsequent_op.type_name() in [
                        'add',
                        'sub',
                        'addsub',
                        'mul',
                        'div',
                        'cmul',
                        'out',
                        't',
                    ]:
                        return True
                    elif subsequent_op.type_name() in ['min', 'max']:
                        for subsequent_op in subsequent_op.subsequent_operations:
                            return self.has_distributive_structure(subsequent_op)
                    else:
                        return False
            return False
    
    
        def get_used_type_names(self) -> List[TypeName]:
            """Get a list of all TypeNames used in the SFG."""
            ret = list({op.type_name() for op in self.operations})
            ret.sort()
            return ret
    
    
        def get_used_graph_ids(self) -> Set[GraphID]:
            """Get a list of all GraphID:s used in the SFG."""
            ret = set({op.graph_id for op in self.operations})
            sorted(ret)
            return ret