From 1120d83b2e7e8bc59ed6afcd65740c4cd07ba05f Mon Sep 17 00:00:00 2001 From: TheZoq2 <frans.skarman@protonmail.com> Date: Wed, 15 Feb 2023 17:40:37 +0100 Subject: [PATCH 1/5] "Working" but untested unfolding --- b_asic/signal_flow_graph.py | 118 ++++++++++++++++++++++++++++++++++++ examples/twotapfirsfg.py | 10 +-- test/test_sfg.py | 6 ++ 3 files changed, 129 insertions(+), 5 deletions(-) diff --git a/b_asic/signal_flow_graph.py b/b_asic/signal_flow_graph.py index 417baa24..c5883722 100644 --- a/b_asic/signal_flow_graph.py +++ b/b_asic/signal_flow_graph.py @@ -1115,6 +1115,11 @@ class SFG(AbstractOperation): return new_component def _add_operation_connected_tree_copy(self, start_op: Operation) -> None: + print( + "Running _add_operation_connected_tree_copy with" + f" {self._operations_dfs_order}" + ) + print(f"Start op: {start_op}") op_stack = deque([start_op]) while op_stack: original_op = op_stack.pop() @@ -1486,3 +1491,116 @@ class SFG(AbstractOperation): from b_asic.schedule import Schedule return Schedule(self, scheduling_algorithm="ASAP").schedule_time + + def unfold(self, factor: int) -> "SFG": + if factor == 0: + raise ValueError("Unrollnig 0 times removes the SFG") + + # Make `factor` copies of the sfg + new_ops = [ + [cast(Operation, op.copy_component()) for op in self.operations] + for _ in range(factor) + ] + + id_idx_map = { + op.graph_id: idx for (idx, op) in enumerate(self.operations) + } + + # The rest of the process is easier if we clear the connections of the inputs + # and outputs of all operations + for list in new_ops: + for op in list: + for input in op.inputs: + input.clear() + for output in op.outputs: + output.clear() + + # Walk through the operations, replacing delay nodes with connections + for layer in range(factor): + for op_idx, op in enumerate(self.operations): + new_ops[layer][ + op_idx + ].name = f"{new_ops[layer][op_idx].name}_{factor-layer}" + # NOTE: These are overwritten later, but it's useful to debug with them + new_ops[layer][op_idx].graph_id = GraphID( + f"{new_ops[layer][op_idx].graph_id}_{factor-layer}" + ) + if isinstance(op, Delay): + # Port of the operation feeding into this delay + source_port = op.inputs[0].connected_source + if source_port is None: + raise ValueError("Dangling delay input port in sfg") + + source_op_idx = id_idx_map[source_port.operation.graph_id] + source_op_output_index = source_port.index + new_source_op = new_ops[layer][source_op_idx] + source_op_output = new_source_op.outputs[ + source_op_output_index + ] + + # If this is the last layer, we need to create a new delay element and connect it instead + # of the copied port + if layer == factor - 1: + delay = Delay(name=op.name) + delay.graph_id = op.graph_id + + # Since we're adding a new operation instead of bypassing as in the + # common case, we also need to hook up the inputs to the delay. + delay.inputs[0].connect(source_op_output) + + new_source_op = delay + new_source_port = new_source_op.outputs[0] + else: + # The new output port we should connect to + new_source_port = source_op_output + new_source_port.clear() + + for out_signal in op.outputs[0].signals: + sink_port = out_signal.destination + if sink_port is None: + # It would be weird if we found a signal but it wasn't connected anywere + raise ValueError("Dangling output port in sfg") + + sink_op_idx = id_idx_map[sink_port.operation.graph_id] + sink_op_output_index = sink_port.index + + target_layer = 0 if layer == factor - 1 else layer + 1 + + new_dest_op = new_ops[target_layer][sink_op_idx] + new_destination = new_dest_op.inputs[ + sink_op_output_index + ] + new_destination.clear() + new_destination.connect(new_source_port) + else: + # Other opreations need to be re-targeted to the corresponding output in the + # current layer, as long as that output is not a delay, as that has been solved + # above. + # To avoid double connections, we'll only re-connect inputs + for input_num, original_input in enumerate(op.inputs): + original_source = original_input.connected_source + # We may not always have something connected to the input, if we don't + # we can abort + if original_source is None: + continue + + # delay connections are handled elsewhere + if not isinstance(original_source.operation, Delay): + source_op_idx = id_idx_map[ + original_source.operation.graph_id + ] + source_op_output_idx = original_source.index + + target_output = new_ops[layer][ + source_op_idx + ].outputs[source_op_output_idx] + + new_ops[layer][op_idx].inputs[input_num].connect( + target_output + ) + + all_ops = [op for op_list in new_ops for op in op_list] + all_inputs = [op for op in all_ops if isinstance(op, Input)] + all_outputs = [op for op in all_ops if isinstance(op, Output)] + + return SFG(inputs=all_inputs, outputs=all_outputs) diff --git a/examples/twotapfirsfg.py b/examples/twotapfirsfg.py index e111e2a3..14377648 100644 --- a/examples/twotapfirsfg.py +++ b/examples/twotapfirsfg.py @@ -17,18 +17,18 @@ from b_asic import ( in1 = Input(name="in1") # Outputs: -out1 = Output(name="") +out1 = Output(name="out1") # Operations: -t1 = Delay(initial_value=0, name="") +t1 = Delay(initial_value=0, name="t1") cmul1 = ConstantMultiplication( - value=0.5, name="cmul2", latency_offsets={'in0': None, 'out0': None} + value=0.5, name="cmul1", latency_offsets={'in0': None, 'out0': None} ) add1 = Addition( - name="", latency_offsets={'in0': None, 'in1': None, 'out0': None} + name="add1", latency_offsets={'in0': None, 'in1': None, 'out0': None} ) cmul2 = ConstantMultiplication( - value=0.5, name="cmul", latency_offsets={'in0': None, 'out0': None} + value=0.5, name="cmul2", latency_offsets={'in0': None, 'out0': None} ) # Signals: diff --git a/test/test_sfg.py b/test/test_sfg.py index a4276fa6..02f4c492 100644 --- a/test/test_sfg.py +++ b/test/test_sfg.py @@ -1595,3 +1595,9 @@ class TestCriticalPath: sfg_simple_accumulator.set_latency_of_type(Addition.type_name(), 6) assert sfg_simple_accumulator.critical_path() == 6 + + +class TestUnroll: + def unrolling_by_factor_0_raises(self, sfg_simple_filter: SFG): + with pytest.raises(ValueError): + sfg_simple_filter.unfold(0) -- GitLab From 2dfa7a04ab3a68c787c21806e579a57917c8aa0a Mon Sep 17 00:00:00 2001 From: TheZoq2 <frans.skarman@protonmail.com> Date: Thu, 16 Feb 2023 14:50:03 +0100 Subject: [PATCH 2/5] Add rudiemntary test sfg unfolding --- test/test_sfg.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/test/test_sfg.py b/test/test_sfg.py index 02f4c492..d5ff3e2e 100644 --- a/test/test_sfg.py +++ b/test/test_sfg.py @@ -1597,7 +1597,10 @@ class TestCriticalPath: assert sfg_simple_accumulator.critical_path() == 6 -class TestUnroll: - def unrolling_by_factor_0_raises(self, sfg_simple_filter: SFG): +class TestUnfold: + # QUESTION: Is it possible to run a test on *all* fixtures? + def test_unfolding_by_factor_0_raises(self, sfg_simple_filter: SFG): with pytest.raises(ValueError): sfg_simple_filter.unfold(0) + + # TODO: Add more tests -- GitLab From df17cad40c855b2bd8ba34c0285642073891b1fa Mon Sep 17 00:00:00 2001 From: TheZoq2 <frans.skarman@protonmail.com> Date: Fri, 17 Feb 2023 16:45:50 +0100 Subject: [PATCH 3/5] Add more unfolding tests --- b_asic/signal_flow_graph.py | 104 +++++++++++++++++++---- examples/twotapfirsfg.py | 2 +- test/fixtures/signal_flow_graph.py | 32 ++++++++ test/test_sfg.py | 128 +++++++++++++++++++++++++++-- 4 files changed, 243 insertions(+), 23 deletions(-) diff --git a/b_asic/signal_flow_graph.py b/b_asic/signal_flow_graph.py index c5883722..c6156a3a 100644 --- a/b_asic/signal_flow_graph.py +++ b/b_asic/signal_flow_graph.py @@ -26,6 +26,7 @@ from typing import ( ) from graphviz import Digraph +from matplotlib.axes import itertools from b_asic.graph_component import GraphComponent from b_asic.operation import ( @@ -1115,11 +1116,6 @@ class SFG(AbstractOperation): return new_component def _add_operation_connected_tree_copy(self, start_op: Operation) -> None: - print( - "Running _add_operation_connected_tree_copy with" - f" {self._operations_dfs_order}" - ) - print(f"Start op: {start_op}") op_stack = deque([start_op]) while op_stack: original_op = op_stack.pop() @@ -1169,8 +1165,14 @@ class SFG(AbstractOperation): original_signal not in self._original_components_to_new ): if original_signal.source is None: + dest = ( + original_signal.destination.operation.name + if original_signal.destination is not None + else "None" + ) raise ValueError( "Dangling signal without source in SFG" + f" (destination: {dest})" ) new_signal = cast( @@ -1493,6 +1495,19 @@ class SFG(AbstractOperation): return Schedule(self, scheduling_algorithm="ASAP").schedule_time def unfold(self, factor: int) -> "SFG": + """ + Unfolds the SFG `factor` times. Returns a new SFG without modifying the original + + Inputs and outputs are ordered with early inputs first. I.e. for an sfg + with n inputs, the first n inputs are the inputs at time t, the next n + inputs are the inputs at time t+1, the next n at t+2 and so on. + + Parameters + ---------- + factor : string, optional + Number of times to unfold + """ + if factor == 0: raise ValueError("Unrollnig 0 times removes the SFG") @@ -1508,23 +1523,36 @@ class SFG(AbstractOperation): # The rest of the process is easier if we clear the connections of the inputs # and outputs of all operations - for list in new_ops: - for op in list: + for layer, op_list in enumerate(new_ops): + for op_idx, op in enumerate(op_list): for input in op.inputs: input.clear() for output in op.outputs: output.clear() - # Walk through the operations, replacing delay nodes with connections - for layer in range(factor): - for op_idx, op in enumerate(self.operations): + suffix = layer + new_ops[layer][ op_idx - ].name = f"{new_ops[layer][op_idx].name}_{factor-layer}" - # NOTE: These are overwritten later, but it's useful to debug with them + ].name = f"{new_ops[layer][op_idx].name}_{suffix}" + # NOTE: Since these IDs are what show up when printing the graph, it + # is helpful to set them. However, this can cause name collisions when + # names in a graph are already suffixed with _n new_ops[layer][op_idx].graph_id = GraphID( - f"{new_ops[layer][op_idx].graph_id}_{factor-layer}" + f"{new_ops[layer][op_idx].graph_id}_{suffix}" ) + + def sanity_check(): + all_ops = [op for op_list in new_ops for op in op_list] + cmul201 = [ + op for op in all_ops if op.graph_id == GraphID("cmul2_0_1") + ] + if cmul201: + print(f"NOW: {cmul201[0]}") + + # Walk through the operations, replacing delay nodes with connections + for layer in range(factor): + for op_idx, op in enumerate(self.operations): if isinstance(op, Delay): # Port of the operation feeding into this delay source_port = op.inputs[0].connected_source @@ -1553,7 +1581,6 @@ class SFG(AbstractOperation): else: # The new output port we should connect to new_source_port = source_op_output - new_source_port.clear() for out_signal in op.outputs[0].signals: sink_port = out_signal.destination @@ -1570,7 +1597,6 @@ class SFG(AbstractOperation): new_destination = new_dest_op.inputs[ sink_op_output_index ] - new_destination.clear() new_destination.connect(new_source_port) else: # Other opreations need to be re-targeted to the corresponding output in the @@ -1599,8 +1625,52 @@ class SFG(AbstractOperation): target_output ) + print( + f"Connecting {new_ops[layer][op_idx].name} <-" + f" {target_output.operation.name} ({target_output.operation.graph_id})" + ) + print(f" |>{new_ops[layer][op_idx]}") + print(f" |<{target_output.operation}") + sanity_check() + all_ops = [op for op_list in new_ops for op in op_list] - all_inputs = [op for op in all_ops if isinstance(op, Input)] - all_outputs = [op for op in all_ops if isinstance(op, Output)] + + # To get the input order correct, we need to know the input order in the original + # sfg and which operations they correspond to + input_ids = [op.graph_id for op in self.input_operations] + output_ids = [op.graph_id for op in self.output_operations] + + # Re-order the inputs to the correct order. Internal order of the inputs should + # be preserved, i.e. for a graph with 2 inputs (in1, in2), in1 must occur before in2, + # but the "time" order should be reversed. I.e. the input from layer `factor-1` is the + # first input + all_inputs = list( + itertools.chain.from_iterable( + [ + [ops[id_idx_map[input_id]] for input_id in input_ids] + for ops in new_ops + ] + ) + ) + + # Outputs are not reversed, but need the same treatment + all_outputs = list( + itertools.chain.from_iterable( + [ + [ops[id_idx_map[output_id]] for output_id in output_ids] + for ops in new_ops + ] + ) + ) + + print("All ops: ") + print(*all_ops, sep="\n") + + print("All outputs: ") + print(*all_outputs, sep="\n") + + # Sanity check to ensure that no duplicate graph IDs have been created + ids = [op.graph_id for op in all_ops] + assert len(ids) == len(set(ids)) return SFG(inputs=all_inputs, outputs=all_outputs) diff --git a/examples/twotapfirsfg.py b/examples/twotapfirsfg.py index 14377648..87a90e1c 100644 --- a/examples/twotapfirsfg.py +++ b/examples/twotapfirsfg.py @@ -14,7 +14,7 @@ from b_asic import ( ) # Inputs: -in1 = Input(name="in1") +in1 = Input(name="in_1") # Outputs: out1 = Output(name="out1") diff --git a/test/fixtures/signal_flow_graph.py b/test/fixtures/signal_flow_graph.py index eb876cec..321e7523 100644 --- a/test/fixtures/signal_flow_graph.py +++ b/test/fixtures/signal_flow_graph.py @@ -13,6 +13,7 @@ from b_asic import ( Input, Name, Output, + Signal, SignalSourceProvider, TypeName, ) @@ -274,3 +275,34 @@ def precedence_sfg_delays_and_constants(): Output(bfly1.output(1), "OUT2") return SFG(inputs=[in1], outputs=[out1], name="SFG") + + +@pytest.fixture +def sfg_two_tap_fir(): + # Inputs: + in1 = Input(name="in1") + + # Outputs: + out1 = Output(name="out1") + + # Operations: + t1 = Delay(initial_value=0, name="t1") + cmul1 = ConstantMultiplication( + value=0.5, name="cmul1", latency_offsets={'in0': None, 'out0': None} + ) + add1 = Addition( + name="add1", latency_offsets={'in0': None, 'in1': None, 'out0': None} + ) + cmul2 = ConstantMultiplication( + value=0.5, name="cmul2", latency_offsets={'in0': None, 'out0': None} + ) + + # Signals: + + Signal(source=t1.output(0), destination=cmul1.input(0)) + Signal(source=in1.output(0), destination=t1.input(0)) + Signal(source=in1.output(0), destination=cmul2.input(0)) + Signal(source=cmul1.output(0), destination=add1.input(0)) + Signal(source=add1.output(0), destination=out1.input(0)) + Signal(source=cmul2.output(0), destination=add1.input(1)) + return SFG(inputs=[in1], outputs=[out1], name='twotapfir') diff --git a/test/test_sfg.py b/test/test_sfg.py index d5ff3e2e..bdbbd7db 100644 --- a/test/test_sfg.py +++ b/test/test_sfg.py @@ -1,9 +1,11 @@ import io +import itertools import random import re import string import sys from os import path, remove +from typing import Counter, Dict, Type import pytest @@ -23,7 +25,9 @@ from b_asic.core_operations import ( Subtraction, SymmetricTwoportAdaptor, ) +from b_asic.operation import ResultKey from b_asic.save_load_structure import python_to_sfg, sfg_to_python +from b_asic.simulation import Simulation from b_asic.special_operations import Delay @@ -1598,9 +1602,123 @@ class TestCriticalPath: class TestUnfold: - # QUESTION: Is it possible to run a test on *all* fixtures? - def test_unfolding_by_factor_0_raises(self, sfg_simple_filter: SFG): - with pytest.raises(ValueError): - sfg_simple_filter.unfold(0) + def count_kinds(self, sfg: SFG) -> Dict[Type, int]: + return Counter([type(op) for op in sfg.operations]) + + # Checks that the number of each kind of operation in sfg2 is multiple*count + # of the same operation in sfg1. + # Filters out delay delays + def assert_counts_is_correct(self, sfg1: SFG, sfg2: SFG, multiple: int): + count1 = self.count_kinds(sfg1) + count2 = self.count_kinds(sfg2) + + # Delays should not be duplicated. Check that and then clear them + # Using get to avoid issues if there are no delays in the sfg + assert count1.get(Delay) == count2.get(Delay) + count1[Delay] = 0 + count2[Delay] = 0 + + # Ensure that we aren't missing any keys, or have any extras + assert count1.keys() == count2.keys() + + for k in count1.keys(): + assert count1[k] * multiple == count2[k] + + # This is horrifying, but I can't figure out a way to run the test on multiple fixtures, + # so this is an ugly hack until someone that knows pytest comes along + def test_two_inputs_two_outputs(self, sfg_two_inputs_two_outputs: SFG): + self.do_tests(sfg_two_inputs_two_outputs) + + def test_twotapfir(self, sfg_two_tap_fir: SFG): + self.do_tests(sfg_two_tap_fir) + + def test_delay(self, sfg_delay: SFG): + self.do_tests(sfg_delay) + + def test_sfg_two_inputs_two_outputs_independent( + self, sfg_two_inputs_two_outputs_independent: SFG + ): + self.do_tests(sfg_two_inputs_two_outputs_independent) + + def do_tests(self, sfg: SFG): + for factor in range(2, 4): + # Ensure that the correct number of operations get created + unfolded = sfg.unfold(factor) + + self.assert_counts_is_correct(sfg, unfolded, factor) - # TODO: Add more tests + double_unfolded = sfg.unfold(factor).unfold(factor) + + self.assert_counts_is_correct( + sfg, double_unfolded, factor * factor + ) + + NUM_TESTS = 5 + # Evaluate with some random values + # To avoid problems with missing inputs at the end of the sequence, + # we generate i*(some large enough) number + input_list = [ + [random.random() for _ in range(0, NUM_TESTS * factor)] + for _ in sfg.inputs + ] + + print("In: ") + print(input_list) + + sim = Simulation(sfg, input_list) + sim.run() + ref = sim.results + + print("out: ") + print(list(ref[ResultKey("0")])) + + # We have i copies of the inputs, each sourcing their input from the orig + unfolded_input_lists = [ + [] for _ in range(len(sfg.inputs) * factor) + ] + for t in range(0, NUM_TESTS): + for n in range(0, factor): + for k in range(0, len(sfg.inputs)): + unfolded_input_lists[k + n * len(sfg.inputs)].append( + input_list[k][t * factor + n] + ) + + sim = Simulation(unfolded, unfolded_input_lists) + sim.run() + unfolded_results = sim.results + + print("ref out: ") + print("0: ", unfolded_results[ResultKey("0")]) + print("1: ", unfolded_results[ResultKey("1")]) + + for n, _ in enumerate(sfg.outputs): + # Outputs for an original output + ref_values = list(ref[ResultKey(f"{n}")]) + + # Output n will be split into `factor` output ports, compute the + # indicies where we find the outputs + out_indices = [n + k * len(sfg.outputs) for k in range(factor)] + print("out indices: ", out_indices) + u_values = [ + [ + unfolded_results[ResultKey(f"{idx}")][k] + for idx in out_indices + ] + for k in range(int(NUM_TESTS)) + ] + + print("u_values: ", u_values) + + flat_u_values = list(itertools.chain.from_iterable(u_values)) + + print("ref_values: ", ref_values) + print("flat u_values: ", flat_u_values) + + assert flat_u_values == ref_values + + def test_value_error(self, sfg_two_inputs_two_outputs: SFG): + sfg = sfg_two_inputs_two_outputs + with pytest.raises( + ValueError, match="Unrollnig 0 times removes the SFG" + ): + sfg.unfold(0) -- GitLab From b7feba387ddadbc5a6a86b0bf6abb56a8accc6e9 Mon Sep 17 00:00:00 2001 From: TheZoq2 <frans.skarman@protonmail.com> Date: Fri, 17 Feb 2023 16:47:02 +0100 Subject: [PATCH 4/5] Remove debug prints --- b_asic/signal_flow_graph.py | 22 ---------------------- test/test_sfg.py | 16 ---------------- 2 files changed, 38 deletions(-) diff --git a/b_asic/signal_flow_graph.py b/b_asic/signal_flow_graph.py index c6156a3a..8f119022 100644 --- a/b_asic/signal_flow_graph.py +++ b/b_asic/signal_flow_graph.py @@ -1542,14 +1542,6 @@ class SFG(AbstractOperation): f"{new_ops[layer][op_idx].graph_id}_{suffix}" ) - def sanity_check(): - all_ops = [op for op_list in new_ops for op in op_list] - cmul201 = [ - op for op in all_ops if op.graph_id == GraphID("cmul2_0_1") - ] - if cmul201: - print(f"NOW: {cmul201[0]}") - # Walk through the operations, replacing delay nodes with connections for layer in range(factor): for op_idx, op in enumerate(self.operations): @@ -1625,14 +1617,6 @@ class SFG(AbstractOperation): target_output ) - print( - f"Connecting {new_ops[layer][op_idx].name} <-" - f" {target_output.operation.name} ({target_output.operation.graph_id})" - ) - print(f" |>{new_ops[layer][op_idx]}") - print(f" |<{target_output.operation}") - sanity_check() - all_ops = [op for op_list in new_ops for op in op_list] # To get the input order correct, we need to know the input order in the original @@ -1663,12 +1647,6 @@ class SFG(AbstractOperation): ) ) - print("All ops: ") - print(*all_ops, sep="\n") - - print("All outputs: ") - print(*all_outputs, sep="\n") - # Sanity check to ensure that no duplicate graph IDs have been created ids = [op.graph_id for op in all_ops] assert len(ids) == len(set(ids)) diff --git a/test/test_sfg.py b/test/test_sfg.py index bdbbd7db..59cda52a 100644 --- a/test/test_sfg.py +++ b/test/test_sfg.py @@ -1662,16 +1662,10 @@ class TestUnfold: for _ in sfg.inputs ] - print("In: ") - print(input_list) - sim = Simulation(sfg, input_list) sim.run() ref = sim.results - print("out: ") - print(list(ref[ResultKey("0")])) - # We have i copies of the inputs, each sourcing their input from the orig unfolded_input_lists = [ [] for _ in range(len(sfg.inputs) * factor) @@ -1687,10 +1681,6 @@ class TestUnfold: sim.run() unfolded_results = sim.results - print("ref out: ") - print("0: ", unfolded_results[ResultKey("0")]) - print("1: ", unfolded_results[ResultKey("1")]) - for n, _ in enumerate(sfg.outputs): # Outputs for an original output ref_values = list(ref[ResultKey(f"{n}")]) @@ -1698,7 +1688,6 @@ class TestUnfold: # Output n will be split into `factor` output ports, compute the # indicies where we find the outputs out_indices = [n + k * len(sfg.outputs) for k in range(factor)] - print("out indices: ", out_indices) u_values = [ [ unfolded_results[ResultKey(f"{idx}")][k] @@ -1707,13 +1696,8 @@ class TestUnfold: for k in range(int(NUM_TESTS)) ] - print("u_values: ", u_values) - flat_u_values = list(itertools.chain.from_iterable(u_values)) - print("ref_values: ", ref_values) - print("flat u_values: ", flat_u_values) - assert flat_u_values == ref_values def test_value_error(self, sfg_two_inputs_two_outputs: SFG): -- GitLab From 0c22caa50f8ed85cd15cf0dd324b7c9e38a6359f Mon Sep 17 00:00:00 2001 From: Oscar Gustafsson <oscar.gustafsson@liu.se> Date: Fri, 17 Feb 2023 21:30:14 +0000 Subject: [PATCH 5/5] Minor wording changes. --- b_asic/signal_flow_graph.py | 6 +++--- test/test_sfg.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/b_asic/signal_flow_graph.py b/b_asic/signal_flow_graph.py index 8f119022..43216d62 100644 --- a/b_asic/signal_flow_graph.py +++ b/b_asic/signal_flow_graph.py @@ -1496,9 +1496,9 @@ class SFG(AbstractOperation): def unfold(self, factor: int) -> "SFG": """ - Unfolds the SFG `factor` times. Returns a new SFG without modifying the original + Unfold the SFG *factor* times. Return a new SFG without modifying the original. - Inputs and outputs are ordered with early inputs first. I.e. for an sfg + Inputs and outputs are ordered with early inputs first. That is for an SFG with n inputs, the first n inputs are the inputs at time t, the next n inputs are the inputs at time t+1, the next n at t+2 and so on. @@ -1509,7 +1509,7 @@ class SFG(AbstractOperation): """ if factor == 0: - raise ValueError("Unrollnig 0 times removes the SFG") + raise ValueError("Unfolding 0 times removes the SFG") # Make `factor` copies of the sfg new_ops = [ diff --git a/test/test_sfg.py b/test/test_sfg.py index 59cda52a..601df109 100644 --- a/test/test_sfg.py +++ b/test/test_sfg.py @@ -1703,6 +1703,6 @@ class TestUnfold: def test_value_error(self, sfg_two_inputs_two_outputs: SFG): sfg = sfg_two_inputs_two_outputs with pytest.raises( - ValueError, match="Unrollnig 0 times removes the SFG" + ValueError, match="Unfolding 0 times removes the SFG" ): sfg.unfold(0) -- GitLab