diff --git a/b_asic/signal_flow_graph.py b/b_asic/signal_flow_graph.py index cbd491d78ccee41bd79f50d444c1721ff983da60..d3600a68d418db28de3b28aafe634a6f4a99fef4 100644 --- a/b_asic/signal_flow_graph.py +++ b/b_asic/signal_flow_graph.py @@ -1824,40 +1824,59 @@ class SFG(AbstractOperation): inputs_used = [] output_index_used = [] outputs_used = [] - for used_input in self._used_ids: - if 'in' in str(used_input): - inputs_used.append(used_input) - input_index_used.append(int(used_input.replace("in", ""))) - for used_output in self._used_ids: - if 'out' in str(used_output): - outputs_used.append(used_output) - output_index_used.append(int(used_output.replace("out", ""))) + for used_inout in self._used_ids: + if 'in' in str(used_inout): + inputs_used.append(used_inout) + input_index_used.append(int(used_inout.replace("in", ""))) + elif 'out' in str(used_inout): + outputs_used.append(used_inout) + output_index_used.append(int(used_inout.replace("out", ""))) if input_index_used == []: raise ValueError("No input(s) to sfg") if output_index_used == []: raise ValueError("No output(s) to sfg") + dict_of_sfg = {} + for output in output_index_used: + output_op = self._output_operations[output] + queue: Deque[Operation] = deque([output_op]) + visited: Set[Operation] = {output_op} + while queue: + op = queue.popleft() + for input_port in op.inputs: + for signal in input_port.signals: + if signal.source is not None: + new_op = signal.source.operation + dict_of_sfg[new_op.graph_id] = [op.graph_id] + if new_op not in visited: + queue.append(new_op) + visited.add(new_op) + else: + raise ValueError("Source does not exist") for input in input_index_used: input_op = self._input_operations[input] - queue: Deque[Operation] = deque([input_op]) - visited: Set[Operation] = {input_op} - dict_of_sfg = {} - while queue: - op = queue.popleft() - if isinstance(op, Output): - dict_of_sfg[op.graph_id] = [] - for output_port in op.outputs: - dict_of_sfg[op.graph_id] = [] - for signal in output_port.signals: - if signal.destination is not None: - new_op = signal.destination.operation - dict_of_sfg[op.graph_id] += [new_op.graph_id] - if new_op not in visited: - queue.append(new_op) - visited.add(new_op) - else: - raise ValueError("Destination does not exist") + queue: Deque[Operation] = deque([input_op]) + visited: Set[Operation] = {input_op} + while queue: + op = queue.popleft() + if isinstance(op, Output): + dict_of_sfg[op.graph_id] = [] + for output_port in op.outputs: + dict_of_sfg[op.graph_id] = [] + for signal in output_port.signals: + if signal.destination is not None: + new_op = signal.destination.operation + dict_of_sfg[op.graph_id] += [new_op.graph_id] + if new_op not in visited: + queue.append(new_op) + visited.add(new_op) + else: + raise ValueError("Destination does not exist") if not dict_of_sfg: raise ValueError("Empty SFG") + addition_with_constant = {} + for key, item in dict_of_sfg.items(): + if ''.join([i for i in key if not i.isdigit()]) == 'c': + addition_with_constant[item[0]] = self.find_by_id(key).value cycles = [ [node] + path for node in dict_of_sfg @@ -1870,7 +1889,7 @@ class SFG(AbstractOperation): temp_list = [] for element in lista: temp_list.append(element) - if element[0] == 't' and len(temp_list) > 2: + if element[0] == 't' and len(temp_list) >= 2: delay_loop_list.append(temp_list) temp_list = [element] state_space_lista = [] @@ -1879,7 +1898,6 @@ class SFG(AbstractOperation): for x in delay_loop_list if x not in state_space_lista ] - mat_row = len(delay_element_used) + len(output_index_used) mat_col = len(delay_element_used) + len(input_index_used) mat_content = np.zeros((mat_row, mat_col)) @@ -1933,6 +1951,9 @@ class SFG(AbstractOperation): for element in lista: if "cmul" in element: temp_value *= self.find_by_id(element).value + for key, value in addition_with_constant.items(): + if key == element: + temp_value += int(value) mat_content[row, column] += temp_value elif "in" in lista[0] and "out" in lista[-1]: row = len(delay_element_used) + int(lista[-1].replace("out", "")) @@ -1941,6 +1962,9 @@ class SFG(AbstractOperation): for element in lista: if "cmul" in element: temp_value *= self.find_by_id(element).value + for key, value in addition_with_constant.items(): + if key == element: + temp_value += int(value) mat_content[row, column] += temp_value elif lista[0][0] == 't' and lista[-1][0] == 't': row = int(lista[-1].replace("t", "")) @@ -1949,6 +1973,9 @@ class SFG(AbstractOperation): for element in lista: if "cmul" in element: temp_value *= self.find_by_id(element).value + for key, value in addition_with_constant.items(): + if key == element: + temp_value += int(value) mat_content[row, column] += temp_value elif lista[0][0] == 't' and "out" in lista[-1]: row = len(delay_element_used) + int(lista[-1].replace("out", "")) @@ -1957,6 +1984,9 @@ class SFG(AbstractOperation): for element in lista: if "cmul" in element: temp_value *= self.find_by_id(element).value + for key, value in addition_with_constant.items(): + if key == element: + temp_value += int(value) mat_content[row, column] += temp_value return matrix_answer, mat_content, matrix_in diff --git a/test/test_sfg.py b/test/test_sfg.py index 6ece6b74ab738baace869143a3f2dd0a800d8f6d..bc4e93a967c9f2dd5ad4a45a53cc7bda15108fa7 100644 --- a/test/test_sfg.py +++ b/test/test_sfg.py @@ -1843,14 +1843,35 @@ class TestStateSpace: ss = precedence_sfg_delays.state_space_representation() assert ss[0] == ['v0', 'v1', 'y0'] - mat = np.array([[5.0, 2.0, 5.0], [1.0, 0.0, 0.0], [4.0, 6.0, 35.0]]) + mat = np.array([[3.0, 2.0, 5.0], [1.0, 0.0, 0.0], [4.0, 6.0, 35.0]]) assert (ss[1] == mat).all() assert ss[2] == ['v0', 'v1', 'x0'] - @pytest.mark.xfail() - def test_no_delays(self, sfg_two_inputs_two_outputs): + # @pytest.mark.xfail() + def test_sfg_two_inputs_two_outputs(self, sfg_two_inputs_two_outputs): ss = sfg_two_inputs_two_outputs.state_space_representation() assert ss[0] == ['y0', 'y1'] assert (ss[1] == np.array([[1.0, 1.0], [1.0, 2.0]])).all() assert ss[2] == ['x0', 'x1'] + + def test_sfg_two_inputs_two_outputs_independent( + self, sfg_two_inputs_two_outputs_independent + ): + # assert sfg_two_inputs_two_outputs_independent.state_space_representation() == 1 + ss = sfg_two_inputs_two_outputs_independent.state_space_representation() + + assert ss[0] == ['y0', 'y1'] + assert (ss[1] == np.array([[1.0, 0.0], [0.0, 4.0]])).all() + assert ss[2] == ['x0', 'x1'] + + def test_sfg_two_inputs_two_outputs_independent_with_cmul( + self, sfg_two_inputs_two_outputs_independent_with_cmul + ): + ss = ( + sfg_two_inputs_two_outputs_independent_with_cmul.state_space_representation() + ) + + assert ss[0] == ['y0', 'y1'] + assert (ss[1] == np.array([[20.0, 0.0], [0.0, 8.0]])).all() + assert ss[2] == ['x0', 'x1']