diff --git a/b_asic/signal_flow_graph.py b/b_asic/signal_flow_graph.py index befbd67014d22f2790fbdc726250b6d7a11c0bf8..b9716c3f66364b3cf0e09c85d9a59db594e14bb0 100644 --- a/b_asic/signal_flow_graph.py +++ b/b_asic/signal_flow_graph.py @@ -436,13 +436,14 @@ class SFG(AbstractOperation): if other_destination is None: raise ValueError("Missing destination in signal.") other_destination.clear() - other_destination.add_signal(Signal(destination.signals[0])) + other_destination.add_signal(Signal(destination.signals[0].source)) + input_operation.output(0).clear() # For each output_signal, connect it to the corresponding operation for output_port, output_operation in zip(self.outputs, self.output_operations): src = output_operation.input(0).signals[0].source if src is None: raise ValueError("Missing source in signal.") - src.clear() + src.remove_signal(output_operation.input(0).signals[0]) output_port.signals[0].set_source(src) return True @@ -1695,141 +1696,93 @@ class SFG(AbstractOperation): if factor == 0: raise ValueError("Unfolding 0 times removes the SFG") - # Make `factor` copies of the sfg - new_ops = [ - [cast(Operation, op.copy()) for op in self.operations] - for _ in range(factor) - ] - - id_idx_map = {op.graph_id: idx for (idx, op) in enumerate(self.operations)} - - # The rest of the process is easier if we clear the connections of the inputs - # and outputs of all operations - for layer, op_list in enumerate(new_ops): - for op_idx, op in enumerate(op_list): - for input_ in op.inputs: - input_.clear() - for output in op.outputs: - output.clear() - - suffix = layer - - new_ops[layer][op_idx].name = f"{new_ops[layer][op_idx].name}_{suffix}" - # NOTE: Since these IDs are what show up when printing the graph, it - # is helpful to set them. However, this can cause name collisions when - # names in a graph are already suffixed with _n - new_ops[layer][op_idx].graph_id = GraphID( - f"{new_ops[layer][op_idx].graph_id}_{suffix}" - ) + sfg = self() # copy the sfg - # Walk through the operations, replacing delay nodes with connections - for layer in range(factor): - for op_idx, op in enumerate(self.operations): - if isinstance(op, Delay): - # Port of the operation feeding into this delay - source_port = op.inputs[0].connected_source - if source_port is None: - raise ValueError("Dangling delay input port in sfg") - - source_op_idx = id_idx_map[source_port.operation.graph_id] - source_op_output_index = source_port.index - new_source_op = new_ops[layer][source_op_idx] - source_op_output = new_source_op.outputs[source_op_output_index] - - # If this is the last layer, we need to create a new delay element - # and connect it instead of the copied port - if layer == factor - 1: - delay = Delay(name=op.name) - delay.graph_id = op.graph_id - - # Since we're adding a new operation instead of bypassing as in - # the common case, we also need to hook up the inputs to the - # delay. - delay.inputs[0].connect(source_op_output) - - new_source_op = delay - new_source_port = new_source_op.outputs[0] - else: - # The new output port we should connect to - new_source_port = source_op_output + inputs = sfg.input_operations + outputs = sfg.output_operations - for out_signal in op.outputs[0].signals: - sink_port = out_signal.destination - if sink_port is None: - # It would be weird if we found a signal that wasn't - # connected anywhere - raise ValueError("Dangling output port in sfg") - - sink_op_idx = id_idx_map[sink_port.operation.graph_id] - sink_op_output_index = sink_port.index + # Remove all delay elements in the SFG and replace each one + # with one input operation and one output operation + for delay in sfg.find_by_type_name(Delay.type_name()): + i = Input(name="input_" + delay.graph_id) + o = Output( + src0=delay.input(0).signals[0].source, + name="output_" + delay.graph_id + ) - target_layer = 0 if layer == factor - 1 else layer + 1 + inputs.append(i) + outputs.append(o) - new_dest_op = new_ops[target_layer][sink_op_idx] - new_destination = new_dest_op.inputs[sink_op_output_index] - new_destination.connect(new_source_port) + # move all outgoing signals from the delay to the new input operation + while len(delay.output(0).signals) > 0: + signal = delay.output(0).signals[0] + destination = signal.destination + destination.remove_signal(signal) + signal.remove_source() + destination.connect(i.output(0)) + + delay.input(0).signals[0].remove_source() + delay.input(0).clear() + + new_sfg = SFG(inputs, outputs) # The new sfg without the delays + + sfgs = [new_sfg() for _ in range(factor)] # Copy the SFG factor times + + # Add suffixes to all graphIDs in the SFGs in order to keep them separated + for i in range(factor): + for operation in sfgs[i].operations: + suffix = f'_{i}' + operation.graph_id = operation.graph_id + suffix + + input_name_to_idx = {} # save the input port indices for future reference + new_inputs = [] + # For each copy of the SFG, create new input operations for every "original" + # input operation and connect them to begin creating the unfolded SFG + for i in range(factor): + for port,operation in zip(sfgs[i].inputs, sfgs[i].input_operations): + if not operation.name.startswith("input_t"): + i = Input() + new_inputs.append(i) + port.connect(i) else: - # Other operations need to be re-targeted to the corresponding - # output in the current layer, as long as that output is not a - # delay, as that has been solved above. - # To avoid double connections, we'll only re-connect inputs - for input_num, original_input in enumerate(op.inputs): - original_source = original_input.connected_source - # We may not always have something connected to the input, if we - # don't we can abort - if original_source is None: - continue - - # delay connections are handled elsewhere - if not isinstance(original_source.operation, Delay): - source_op_idx = id_idx_map[ - original_source.operation.graph_id - ] - source_op_output_idx = original_source.index - - target_output = new_ops[layer][source_op_idx].outputs[ - source_op_output_idx - ] - - new_ops[layer][op_idx].inputs[input_num].connect( - target_output - ) + # If the input was created earlier when removing the delays + # then just save the index + input_name_to_idx[operation.name] = port.index + + # Connect the original outputs in the same way as the inputs + # Also connect the copies of the SFG together according to a formula + # from the TSTE87 course material, and save the number of delays for + # each interconnection + new_outputs = [] + delay_placements = {} + for i in range(factor): + for port,operation in zip(sfgs[i].outputs, sfgs[i].output_operations): + if not operation.name.startswith("output_t"): + new_outputs.append(Output(port)) + else: + index = operation.name[8:] # Remove the "output_t" prefix + j = (i + 1) % factor + number_of_delays_between = (i + 1)//factor + input_port = sfgs[j].input(input_name_to_idx["input_t" + index]) + input_port.connect(port) + delay_placements[port] = [i,number_of_delays_between] + sfgs[i].graph_id = f'sfg{i}' # deterministically set the graphID of the sfgs - all_ops = [op for op_list in new_ops for op in op_list] - - # To get the input order correct, we need to know the input order in the - # original sfg and which operations they correspond to - input_ids = [op.graph_id for op in self.input_operations] - output_ids = [op.graph_id for op in self.output_operations] - - # Re-order the inputs to the correct order. Internal order of the inputs should - # be preserved, i.e. for a graph with 2 inputs (in1, in2), in1 must occur before - # in2, but the "time" order should be reversed. I.e. the input from layer - # `factor-1` is the first input - all_inputs = list( - itertools.chain.from_iterable( - [ - [ops[id_idx_map[input_id]] for input_id in input_ids] - for ops in new_ops - ] - ) - ) + sfg = SFG(new_inputs, new_outputs) # create a new SFG to remove floating nodes - # Outputs are not reversed, but need the same treatment - all_outputs = list( - itertools.chain.from_iterable( - [ - [ops[id_idx_map[output_id]] for output_id in output_ids] - for ops in new_ops - ] - ) - ) + # Insert the interconnect delays according to what is saved in delay_placements + for port,val in delay_placements.items(): + i, no_of_delays = val + for _ in range(no_of_delays): + sfg = sfg.insert_operation_after(f'sfg{i}.{port.index}', Delay()) + + # Flatten all the copies of the original SFG + for i in range(factor): + sfg.find_by_id(f'sfg{i}').connect_external_signals_to_components() + sfg = sfg() - # Sanity check to ensure that no duplicate graph IDs have been created - ids = [op.graph_id for op in all_ops] - assert len(ids) == len(set(ids)) + return sfg - return SFG(inputs=all_inputs, outputs=all_outputs) @property def is_linear(self) -> bool: diff --git a/test/test_sfg.py b/test/test_sfg.py index 1bd6d9fddab6f66a721182d89fad642546616342..6643813956144c38864ee8c9f8949717909dc7c6 100644 --- a/test/test_sfg.py +++ b/test/test_sfg.py @@ -1480,6 +1480,11 @@ class TestUnfold: ): self.do_tests(sfg_two_inputs_two_outputs_independent) + def test_threetapiir( + self, sfg_direct_form_iir_lp_filter: SFG + ): + self.do_tests(sfg_direct_form_iir_lp_filter) + def do_tests(self, sfg: SFG): for factor in range(2, 4): # Ensure that the correct number of operations get created