Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • da/B-ASIC
  • lukja239/B-ASIC
  • robal695/B-ASIC
3 results
Show changes
Showing
with 2079 additions and 356 deletions
#include "simulation.h"
#include "simulation/simulation.h"
namespace py = pybind11;
namespace asic {
void define_simulation_class(pybind11::module& module) {
// clang-format off
py::class_<simulation>(module, "FastSimulation")
.def(py::init<py::handle>(),
py::arg("sfg"),
"SFG Constructor.")
.def(py::init<py::handle, std::optional<std::vector<std::optional<input_provider_t>>>>(),
py::arg("sfg"), py::arg("input_providers"),
"SFG Constructor.")
.def("set_input", &simulation::set_input,
py::arg("index"), py::arg("input_provider"),
"Set the input function used to get values for the specific input at the given index to the internal SFG.")
.def("set_inputs", &simulation::set_inputs,
py::arg("input_providers"),
"Set the input functions used to get values for the inputs to the internal SFG.")
.def("step", &simulation::step,
py::arg("save_results") = true, py::arg("bits_override") = py::none{}, py::arg("truncate") = true,
"Run one iteration of the simulation and return the resulting output values.")
.def("run_until", &simulation::run_until,
py::arg("iteration"), py::arg("save_results") = true, py::arg("bits_override") = py::none{}, py::arg("truncate") = true,
"Run the simulation until its iteration is greater than or equal to the given iteration\n"
"and return the output values of the last iteration.")
.def("run_for", &simulation::run_for,
py::arg("iterations"), py::arg("save_results") = true, py::arg("bits_override") = py::none{}, py::arg("truncate") = true,
"Run a given number of iterations of the simulation and return the output values of the last iteration.")
.def("run", &simulation::run,
py::arg("save_results") = true, py::arg("bits_override") = py::none{}, py::arg("truncate") = true,
"Run the simulation until the end of its input arrays and return the output values of the last iteration.")
.def_property_readonly("iteration", &simulation::iteration,
"Get the current iteration number of the simulation.")
.def_property_readonly("results", &simulation::results,
"Get a mapping from result keys to numpy arrays containing all results, including intermediate values,\n"
"calculated for each iteration up until now that was run with save_results enabled.\n"
"The mapping is indexed using the key() method of Operation with the appropriate output index.\n"
"Example result after 3 iterations: {\"c1\": [3, 6, 7], \"c2\": [4, 5, 5], \"bfly1.0\": [7, 0, 0], \"bfly1.1\": [-1, 0, 2], \"0\": [7, -2, -1]}")
.def("clear_results", &simulation::clear_results,
"Clear all results that were saved until now.")
.def("clear_state", &simulation::clear_state,
"Clear all current state of the simulation, except for the results and iteration.");
// clang-format on
}
} // namespace asic
\ No newline at end of file
#ifndef ASIC_SIMULATION_H
#define ASIC_SIMULATION_H
#include <pybind11/pybind11.h>
namespace asic {
void define_simulation_class(pybind11::module& module);
} // namespace asic
#endif // ASIC_SIMULATION_H
\ No newline at end of file
#define NOMINMAX
#include "compile.h"
#include "../algorithm.h"
#include "../debug.h"
#include "../span.h"
#include "format_code.h"
#include <Python.h>
#include <fmt/format.h>
#include <limits>
#include <optional>
#include <string_view>
#include <tuple>
#include <unordered_map>
#include <utility>
namespace py = pybind11;
namespace asic {
[[nodiscard]] static result_key key_base(py::handle op, std::string_view prefix) {
auto const graph_id = op.attr("graph_id").cast<std::string_view>();
return (prefix.empty()) ? result_key{graph_id} : fmt::format("{}.{}", prefix, graph_id);
}
[[nodiscard]] static result_key key_of_output(py::handle op, std::size_t output_index, std::string_view prefix) {
auto const base = key_base(op, prefix);
if (base.empty()) {
return fmt::to_string(output_index);
}
if (op.attr("output_count").cast<std::size_t>() == 1) {
return base;
}
return fmt::format("{}.{}", base, output_index);
}
class compiler final {
public:
simulation_code compile(py::handle sfg) {
ASIC_DEBUG_MSG("Compiling code...");
this->initialize_code(sfg.attr("input_count").cast<std::size_t>(), sfg.attr("output_count").cast<std::size_t>());
auto deferred_delays = delay_queue{};
this->add_outputs(sfg, deferred_delays);
this->add_deferred_delays(std::move(deferred_delays));
this->resolve_invalid_result_indices();
ASIC_DEBUG_MSG("Compiled code:\n{}\n", format_compiled_simulation_code(m_code));
return std::move(m_code);
}
private:
struct sfg_info final {
py::handle sfg;
std::size_t prefix_length;
sfg_info(py::handle sfg, std::size_t prefix_length)
: sfg(sfg)
, prefix_length(prefix_length) {}
[[nodiscard]] std::size_t find_input_operation_index(py::handle op) const {
for (auto const& [i, in] : enumerate(sfg.attr("input_operations"))) {
if (in.is(op)) {
return i;
}
}
throw py::value_error{"Stray Input operation in simulation SFG"};
}
};
using sfg_info_stack = std::vector<sfg_info>;
using delay_queue = std::vector<std::tuple<std::size_t, py::handle, std::string, sfg_info_stack>>;
using added_output_cache = std::unordered_set<PyObject const*>;
using added_result_cache = std::unordered_map<PyObject const*, result_index_t>;
using added_custom_operation_cache = std::unordered_map<PyObject const*, std::size_t>;
static constexpr auto no_result_index = std::numeric_limits<result_index_t>::max();
void initialize_code(std::size_t input_count, std::size_t output_count) {
m_code.required_stack_size = 0;
m_code.input_count = input_count;
m_code.output_count = output_count;
}
void add_outputs(py::handle sfg, delay_queue& deferred_delays) {
for (auto const i : range(m_code.output_count)) {
this->add_operation_output(sfg, i, std::string_view{}, sfg_info_stack{}, deferred_delays);
}
}
void add_deferred_delays(delay_queue&& deferred_delays) {
while (!deferred_delays.empty()) {
auto new_deferred_delays = delay_queue{};
for (auto const& [delay_index, op, prefix, sfg_stack] : deferred_delays) {
this->add_source(op, 0, prefix, sfg_stack, deferred_delays);
this->add_instruction(instruction_type::update_delay, no_result_index, -1).index = delay_index;
}
deferred_delays = new_deferred_delays;
}
}
void resolve_invalid_result_indices() {
for (auto& instruction : m_code.instructions) {
if (instruction.result_index == no_result_index) {
instruction.result_index = static_cast<result_index_t>(m_code.result_keys.size());
}
}
}
[[nodiscard]] static sfg_info_stack push_sfg(sfg_info_stack const& sfg_stack, py::handle sfg, std::size_t prefix_length) {
auto const new_size = static_cast<std::size_t>(sfg_stack.size() + 1);
auto new_sfg_stack = sfg_info_stack{};
new_sfg_stack.reserve(new_size);
for (auto const& info : sfg_stack) {
new_sfg_stack.push_back(info);
}
new_sfg_stack.emplace_back(sfg, prefix_length);
return new_sfg_stack;
}
[[nodiscard]] static sfg_info_stack pop_sfg(sfg_info_stack const& sfg_stack) {
ASIC_ASSERT(!sfg_stack.empty());
auto const new_size = static_cast<std::size_t>(sfg_stack.size() - 1);
auto new_sfg_stack = sfg_info_stack{};
new_sfg_stack.reserve(new_size);
for (auto const& info : span{sfg_stack}.first(new_size)) {
new_sfg_stack.push_back(info);
}
return new_sfg_stack;
}
instruction& add_instruction(instruction_type type, result_index_t result_index, std::ptrdiff_t stack_diff) {
m_stack_depth += stack_diff;
if (m_stack_depth < 0) {
throw py::value_error{"Detected input/output count mismatch in simulation SFG"};
}
if (auto const stack_size = static_cast<std::size_t>(m_stack_depth); stack_size > m_code.required_stack_size) {
m_code.required_stack_size = stack_size;
}
auto& instruction = m_code.instructions.emplace_back();
instruction.type = type;
instruction.result_index = result_index;
return instruction;
}
[[nodiscard]] std::optional<result_index_t> begin_operation_output(py::handle op, std::size_t output_index, std::string_view prefix) {
auto const pointer = op.attr("outputs")[py::int_{output_index}].ptr();
if (m_incomplete_outputs.count(pointer) != 0) {
// Make sure the output doesn't depend on its own value, unless it's a delay operation.
if (op.attr("type_name")().cast<std::string_view>() != "t") {
throw py::value_error{"Direct feedback loop detected in simulation SFG"};
}
}
// Try to add a new result.
auto const [it, inserted] = m_added_results.try_emplace(pointer, static_cast<result_index_t>(m_code.result_keys.size()));
if (inserted) {
if (m_code.result_keys.size() >= static_cast<std::size_t>(std::numeric_limits<result_index_t>::max())) {
throw py::value_error{fmt::format("Simulation SFG requires too many outputs to be stored (limit: {})",
std::numeric_limits<result_index_t>::max())};
}
m_code.result_keys.push_back(key_of_output(op, output_index, prefix));
m_incomplete_outputs.insert(pointer);
return it->second;
}
// If the result has already been added, we re-use the old result and
// return std::nullopt to indicate that we don't need to add all the required instructions again.
this->add_instruction(instruction_type::push_result, it->second, 1).index = static_cast<std::size_t>(it->second);
return std::nullopt;
}
void end_operation_output(py::handle op, std::size_t output_index) {
auto const pointer = op.attr("outputs")[py::int_{output_index}].ptr();
[[maybe_unused]] auto const erased = m_incomplete_outputs.erase(pointer);
ASIC_ASSERT(erased == 1);
}
[[nodiscard]] std::size_t try_add_custom_operation(py::handle op) {
auto const [it, inserted] = m_added_custom_operations.try_emplace(op.ptr(), m_added_custom_operations.size());
if (inserted) {
auto& custom_operation = m_code.custom_operations.emplace_back();
custom_operation.evaluate_output = op.attr("evaluate_output");
custom_operation.input_count = op.attr("input_count").cast<std::size_t>();
custom_operation.output_count = op.attr("output_count").cast<std::size_t>();
}
return it->second;
}
[[nodiscard]] std::size_t add_delay_info(number initial_value, result_index_t result_index) {
auto const delay_index = m_code.delays.size();
auto& delay = m_code.delays.emplace_back();
delay.initial_value = initial_value;
delay.result_index = result_index;
return delay_index;
}
void add_source(py::handle op, std::size_t input_index, std::string_view prefix, sfg_info_stack const& sfg_stack,
delay_queue& deferred_delays) {
auto const signal = py::object{op.attr("inputs")[py::int_{input_index}].attr("signals")[py::int_{0}]};
auto const src = py::handle{signal.attr("source")};
auto const operation = py::handle{src.attr("operation")};
auto const index = src.attr("index").cast<std::size_t>();
this->add_operation_output(operation, index, prefix, sfg_stack, deferred_delays);
if (!signal.attr("bits").is_none()) {
auto const bits = signal.attr("bits").cast<std::size_t>();
if (bits > 64) {
throw py::value_error{"Cannot truncate to more than 64 bits"};
}
this->add_instruction(instruction_type::truncate, no_result_index, 0).bit_mask = static_cast<std::int64_t>(std::int64_t{1}
<< bits);
}
}
void add_unary_operation_output(py::handle op, result_index_t result_index, std::string_view prefix, sfg_info_stack const& sfg_stack,
delay_queue& deferred_delays, instruction_type type) {
this->add_source(op, 0, prefix, sfg_stack, deferred_delays);
this->add_instruction(type, result_index, 0);
}
void add_binary_operation_output(py::handle op, result_index_t result_index, std::string_view prefix, sfg_info_stack const& sfg_stack,
delay_queue& deferred_delays, instruction_type type) {
this->add_source(op, 0, prefix, sfg_stack, deferred_delays);
this->add_source(op, 1, prefix, sfg_stack, deferred_delays);
this->add_instruction(type, result_index, -1);
}
void add_operation_output(py::handle op, std::size_t output_index, std::string_view prefix, sfg_info_stack const& sfg_stack,
delay_queue& deferred_delays) {
auto const type_name = op.attr("type_name")().cast<std::string_view>();
if (type_name == "out") {
this->add_source(op, 0, prefix, sfg_stack, deferred_delays);
} else if (auto const result_index = this->begin_operation_output(op, output_index, prefix)) {
if (type_name == "c") {
this->add_instruction(instruction_type::push_constant, *result_index, 1).value = op.attr("value").cast<number>();
} else if (type_name == "add") {
this->add_binary_operation_output(op, *result_index, prefix, sfg_stack, deferred_delays, instruction_type::addition);
} else if (type_name == "sub") {
this->add_binary_operation_output(op, *result_index, prefix, sfg_stack, deferred_delays, instruction_type::subtraction);
} else if (type_name == "mul") {
this->add_binary_operation_output(op, *result_index, prefix, sfg_stack, deferred_delays, instruction_type::multiplication);
} else if (type_name == "div") {
this->add_binary_operation_output(op, *result_index, prefix, sfg_stack, deferred_delays, instruction_type::division);
} else if (type_name == "min") {
this->add_binary_operation_output(op, *result_index, prefix, sfg_stack, deferred_delays, instruction_type::min);
} else if (type_name == "max") {
this->add_binary_operation_output(op, *result_index, prefix, sfg_stack, deferred_delays, instruction_type::max);
} else if (type_name == "sqrt") {
this->add_unary_operation_output(op, *result_index, prefix, sfg_stack, deferred_delays, instruction_type::square_root);
} else if (type_name == "conj") {
this->add_unary_operation_output(
op, *result_index, prefix, sfg_stack, deferred_delays, instruction_type::complex_conjugate);
} else if (type_name == "abs") {
this->add_unary_operation_output(op, *result_index, prefix, sfg_stack, deferred_delays, instruction_type::absolute);
} else if (type_name == "cmul") {
this->add_source(op, 0, prefix, sfg_stack, deferred_delays);
this->add_instruction(instruction_type::constant_multiplication, *result_index, 0).value = op.attr("value").cast<number>();
} else if (type_name == "bfly") {
if (output_index == 0) {
this->add_source(op, 0, prefix, sfg_stack, deferred_delays);
this->add_source(op, 1, prefix, sfg_stack, deferred_delays);
this->add_instruction(instruction_type::addition, *result_index, -1);
} else {
this->add_source(op, 0, prefix, sfg_stack, deferred_delays);
this->add_source(op, 1, prefix, sfg_stack, deferred_delays);
this->add_instruction(instruction_type::subtraction, *result_index, -1);
}
} else if (type_name == "in") {
if (sfg_stack.empty()) {
throw py::value_error{"Encountered Input operation outside SFG in simulation"};
}
auto const& info = sfg_stack.back();
auto const input_index = info.find_input_operation_index(op);
if (sfg_stack.size() == 1) {
this->add_instruction(instruction_type::push_input, *result_index, 1).index = input_index;
} else {
this->add_source(info.sfg, input_index, prefix.substr(0, info.prefix_length), pop_sfg(sfg_stack), deferred_delays);
this->add_instruction(instruction_type::forward_value, *result_index, 0);
}
} else if (type_name == "t") {
auto const delay_index = this->add_delay_info(op.attr("initial_value").cast<number>(), *result_index);
deferred_delays.emplace_back(delay_index, op, std::string{prefix}, sfg_stack);
this->add_instruction(instruction_type::push_delay, *result_index, 1).index = delay_index;
} else if (type_name == "sfg") {
auto const output_op = py::handle{op.attr("output_operations")[py::int_{output_index}]};
this->add_source(output_op, 0, key_base(op, prefix), push_sfg(sfg_stack, op, prefix.size()), deferred_delays);
this->add_instruction(instruction_type::forward_value, *result_index, 0);
} else {
auto const custom_operation_index = this->try_add_custom_operation(op);
auto const& custom_operation = m_code.custom_operations[custom_operation_index];
for (auto const i : range(custom_operation.input_count)) {
this->add_source(op, i, prefix, sfg_stack, deferred_delays);
}
auto const custom_source_index = m_code.custom_sources.size();
auto& custom_source = m_code.custom_sources.emplace_back();
custom_source.custom_operation_index = custom_operation_index;
custom_source.output_index = output_index;
auto const stack_diff = std::ptrdiff_t{1} - static_cast<std::ptrdiff_t>(custom_operation.input_count);
this->add_instruction(instruction_type::custom, *result_index, stack_diff).index = custom_source_index;
}
this->end_operation_output(op, output_index);
}
}
simulation_code m_code;
added_output_cache m_incomplete_outputs;
added_result_cache m_added_results;
added_custom_operation_cache m_added_custom_operations;
std::ptrdiff_t m_stack_depth = 0;
};
simulation_code compile_simulation(pybind11::handle sfg) {
return compiler{}.compile(sfg);
}
} // namespace asic
\ No newline at end of file
#ifndef ASIC_SIMULATION_COMPILE_H
#define ASIC_SIMULATION_COMPILE_H
#include "instruction.h"
#include <cstddef>
#include <pybind11/pybind11.h>
#include <string>
#include <vector>
namespace asic {
using result_key = std::string;
struct simulation_code final {
struct custom_operation final {
// Python function used to evaluate the custom operation.
pybind11::object evaluate_output;
// Number of inputs that the custom operation takes.
std::size_t input_count;
// Number of outputs that the custom operation gives.
std::size_t output_count;
};
struct custom_source final {
// Index into custom_operations where the custom_operation corresponding to this custom_source is located.
std::size_t custom_operation_index;
// Output index of the custom_operation that this source gets it value from.
std::size_t output_index;
};
struct delay_info final {
// Initial value to set at the start of the simulation.
number initial_value;
// The result index where the current value should be stored at the start of each iteration.
result_index_t result_index;
};
// Instructions to execute for one full iteration of the simulation.
std::vector<instruction> instructions;
// Custom operations used by the simulation.
std::vector<custom_operation> custom_operations;
// Signal sources that use custom operations.
std::vector<custom_source> custom_sources;
// Info about the delay operations used in the simulation.
std::vector<delay_info> delays;
// Keys for each result produced by the simulation. The index of the key matches the index of the result in the simulation state.
std::vector<result_key> result_keys;
// Number of values expected as input to the simulation.
std::size_t input_count;
// Number of values given as output from the simulation. This will be the number of values left on the stack after a full iteration of the simulation has been run.
std::size_t output_count;
// Maximum number of values that need to be able to fit on the stack in order to run a full iteration of the simulation.
std::size_t required_stack_size;
};
[[nodiscard]] simulation_code compile_simulation(pybind11::handle sfg);
} // namespace asic
#endif // ASIC_SIMULATION_COMPILE_H
\ No newline at end of file
#ifndef ASIC_SIMULATION_FORMAT_CODE_H
#define ASIC_SIMULATION_FORMAT_CODE_H
#include "../algorithm.h"
#include "../debug.h"
#include "../number.h"
#include "compile.h"
#include "instruction.h"
#include <fmt/format.h>
#include <string>
namespace asic {
[[nodiscard]] inline std::string format_number(number const& value) {
if (value.imag() == 0) {
return fmt::to_string(value.real());
}
if (value.real() == 0) {
return fmt::format("{}j", value.imag());
}
if (value.imag() < 0) {
return fmt::format("{}-{}j", value.real(), -value.imag());
}
return fmt::format("{}+{}j", value.real(), value.imag());
}
[[nodiscard]] inline std::string format_compiled_simulation_code_result_keys(simulation_code const& code) {
auto result = std::string{};
for (auto const& [i, result_key] : enumerate(code.result_keys)) {
result += fmt::format("{:>2}: \"{}\"\n", i, result_key);
}
return result;
}
[[nodiscard]] inline std::string format_compiled_simulation_code_delays(simulation_code const& code) {
auto result = std::string{};
for (auto const& [i, delay] : enumerate(code.delays)) {
ASIC_ASSERT(delay.result_index < code.result_keys.size());
result += fmt::format("{:>2}: Initial value: {}, Result: {}: \"{}\"\n",
i,
format_number(delay.initial_value),
delay.result_index,
code.result_keys[delay.result_index]);
}
return result;
}
[[nodiscard]] inline std::string format_compiled_simulation_code_instruction(instruction const& instruction) {
switch (instruction.type) {
// clang-format off
case instruction_type::push_input: return fmt::format("push_input inputs[{}]", instruction.index);
case instruction_type::push_result: return fmt::format("push_result results[{}]", instruction.index);
case instruction_type::push_delay: return fmt::format("push_delay delays[{}]", instruction.index);
case instruction_type::push_constant: return fmt::format("push_constant {}", format_number(instruction.value));
case instruction_type::truncate: return fmt::format("truncate {:#018x}", instruction.bit_mask);
case instruction_type::addition: return "addition";
case instruction_type::subtraction: return "subtraction";
case instruction_type::multiplication: return "multiplication";
case instruction_type::division: return "division";
case instruction_type::min: return "min";
case instruction_type::max: return "max";
case instruction_type::square_root: return "square_root";
case instruction_type::complex_conjugate: return "complex_conjugate";
case instruction_type::absolute: return "absolute";
case instruction_type::constant_multiplication: return fmt::format("constant_multiplication {}", format_number(instruction.value));
case instruction_type::update_delay: return fmt::format("update_delay delays[{}]", instruction.index);
case instruction_type::custom: return fmt::format("custom custom_sources[{}]", instruction.index);
case instruction_type::forward_value: return "forward_value";
// clang-format on
}
return std::string{};
}
[[nodiscard]] inline std::string format_compiled_simulation_code_instructions(simulation_code const& code) {
auto result = std::string{};
for (auto const& [i, instruction] : enumerate(code.instructions)) {
auto instruction_string = format_compiled_simulation_code_instruction(instruction);
if (instruction.result_index < code.result_keys.size()) {
instruction_string = fmt::format(
"{:<26} -> {}: \"{}\"", instruction_string, instruction.result_index, code.result_keys[instruction.result_index]);
}
result += fmt::format("{:>2}: {}\n", i, instruction_string);
}
return result;
}
[[nodiscard]] inline std::string format_compiled_simulation_code(simulation_code const& code) {
return fmt::format(
"==============================================\n"
"> Code stats\n"
"==============================================\n"
"Input count: {}\n"
"Output count: {}\n"
"Instruction count: {}\n"
"Required stack size: {}\n"
"Delay count: {}\n"
"Result count: {}\n"
"Custom operation count: {}\n"
"Custom source count: {}\n"
"==============================================\n"
"> Delays\n"
"==============================================\n"
"{}"
"==============================================\n"
"> Result keys\n"
"==============================================\n"
"{}"
"==============================================\n"
"> Instructions\n"
"==============================================\n"
"{}"
"==============================================",
code.input_count,
code.output_count,
code.instructions.size(),
code.required_stack_size,
code.delays.size(),
code.result_keys.size(),
code.custom_operations.size(),
code.custom_sources.size(),
format_compiled_simulation_code_delays(code),
format_compiled_simulation_code_result_keys(code),
format_compiled_simulation_code_instructions(code));
}
} // namespace asic
#endif // ASIC_SIMULATION_FORMAT_CODE
\ No newline at end of file
#ifndef ASIC_SIMULATION_INSTRUCTION_H
#define ASIC_SIMULATION_INSTRUCTION_H
#include "../number.h"
#include <cstddef>
#include <cstdint>
#include <optional>
namespace asic {
enum class instruction_type : std::uint8_t {
push_input, // push(inputs[index])
push_result, // push(results[index])
push_delay, // push(delays[index])
push_constant, // push(value)
truncate, // push(trunc(pop(), bit_mask))
addition, // push(pop() + pop())
subtraction, // push(pop() - pop())
multiplication, // push(pop() * pop())
division, // push(pop() / pop())
min, // push(min(pop(), pop()))
max, // push(max(pop(), pop()))
square_root, // push(sqrt(pop()))
complex_conjugate, // push(conj(pop()))
absolute, // push(abs(pop()))
constant_multiplication, // push(pop() * value)
update_delay, // delays[index] = pop()
custom, // Custom operation. Uses custom_source[index].
forward_value // Forward the current value on the stack (push(pop()), i.e. do nothing).
};
using result_index_t = std::uint16_t;
struct instruction final {
constexpr instruction() noexcept
: index(0)
, result_index(0)
, type(instruction_type::forward_value) {}
union {
// Index used by push_input, push_result, delay and custom.
std::size_t index;
// Bit mask used by truncate.
std::int64_t bit_mask;
// Constant value used by push_constant and constant_multiplication.
number value;
};
// Index into where the result of the instruction will be stored. If the result should be ignored, this index will be one past the last valid result index.
result_index_t result_index;
// Specifies what kind of operation the instruction should execute.
instruction_type type;
};
} // namespace asic
#endif // ASIC_SIMULATION_INSTRUCTION_H
\ No newline at end of file
#define NOMINMAX
#include "run.h"
#include "../algorithm.h"
#include "../debug.h"
#include "format_code.h"
#include <algorithm>
#include <complex>
#include <cstddef>
#include <fmt/format.h>
#include <iterator>
#include <pybind11/pybind11.h>
#include <pybind11/stl.h>
#include <stdexcept>
namespace py = pybind11;
namespace asic {
[[nodiscard]] static number truncate_value(number value, std::int64_t bit_mask) {
if (value.imag() != 0) {
throw py::type_error{"Complex value cannot be truncated"};
}
return number{static_cast<number::value_type>(static_cast<std::int64_t>(value.real()) & bit_mask)};
}
[[nodiscard]] static std::int64_t setup_truncation_parameters(bool& truncate, std::optional<std::uint8_t>& bits_override) {
if (truncate && bits_override) {
truncate = false; // Ignore truncate instructions, they will be truncated using bits_override instead.
if (*bits_override > 64) {
throw py::value_error{"Cannot truncate to more than 64 bits"};
}
return static_cast<std::int64_t>(std::int64_t{1} << *bits_override); // Return the bit mask override to use.
}
bits_override.reset(); // Don't use bits_override if truncate is false.
return std::int64_t{};
}
simulation_state run_simulation(simulation_code const& code, span<number const> inputs, span<number> delays,
std::optional<std::uint8_t> bits_override, bool truncate) {
ASIC_ASSERT(inputs.size() == code.input_count);
ASIC_ASSERT(delays.size() == code.delays.size());
ASIC_ASSERT(code.output_count <= code.required_stack_size);
auto state = simulation_state{};
// Setup results.
state.results.resize(code.result_keys.size() + 1); // Add one space to store ignored results.
// Initialize delay results to their current values.
for (auto const& [i, delay] : enumerate(code.delays)) {
state.results[delay.result_index] = delays[i];
}
// Setup stack.
state.stack.resize(code.required_stack_size);
auto stack_pointer = state.stack.data();
// Utility functions to make the stack manipulation code below more readable.
// Should hopefully be inlined by the compiler.
auto const push = [&](number value) -> void {
ASIC_ASSERT(std::distance(state.stack.data(), stack_pointer) < static_cast<std::ptrdiff_t>(state.stack.size()));
*stack_pointer++ = value;
};
auto const pop = [&]() -> number {
ASIC_ASSERT(std::distance(state.stack.data(), stack_pointer) > std::ptrdiff_t{0});
return *--stack_pointer;
};
auto const peek = [&]() -> number {
ASIC_ASSERT(std::distance(state.stack.data(), stack_pointer) > std::ptrdiff_t{0});
ASIC_ASSERT(std::distance(state.stack.data(), stack_pointer) <= static_cast<std::ptrdiff_t>(state.stack.size()));
return *(stack_pointer - 1);
};
// Check if results should be truncated.
auto const bit_mask_override = setup_truncation_parameters(truncate, bits_override);
// Hot instruction evaluation loop.
for (auto const& instruction : code.instructions) {
ASIC_DEBUG_MSG("Evaluating {}.", format_compiled_simulation_code_instruction(instruction));
// Execute the instruction.
switch (instruction.type) {
case instruction_type::push_input:
push(inputs[instruction.index]);
break;
case instruction_type::push_result:
push(state.results[instruction.index]);
break;
case instruction_type::push_delay:
push(delays[instruction.index]);
break;
case instruction_type::push_constant:
push(instruction.value);
break;
case instruction_type::truncate:
if (truncate) {
push(truncate_value(pop(), instruction.bit_mask));
}
break;
case instruction_type::addition:
push(pop() + pop());
break;
case instruction_type::subtraction:
push(pop() - pop());
break;
case instruction_type::multiplication:
push(pop() * pop());
break;
case instruction_type::division:
push(pop() / pop());
break;
case instruction_type::min: {
auto const lhs = pop();
auto const rhs = pop();
if (lhs.imag() != 0 || rhs.imag() != 0) {
throw std::runtime_error{"Min does not support complex numbers."};
}
push(std::min(lhs.real(), rhs.real()));
break;
}
case instruction_type::max: {
auto const lhs = pop();
auto const rhs = pop();
if (lhs.imag() != 0 || rhs.imag() != 0) {
throw std::runtime_error{"Max does not support complex numbers."};
}
push(std::max(lhs.real(), rhs.real()));
break;
}
case instruction_type::square_root:
push(std::sqrt(pop()));
break;
case instruction_type::complex_conjugate:
push(std::conj(pop()));
break;
case instruction_type::absolute:
push(number{std::abs(pop())});
break;
case instruction_type::constant_multiplication:
push(pop() * instruction.value);
break;
case instruction_type::update_delay:
delays[instruction.index] = pop();
break;
case instruction_type::custom: {
using namespace pybind11::literals;
auto const& src = code.custom_sources[instruction.index];
auto const& op = code.custom_operations[src.custom_operation_index];
auto input_values = std::vector<number>{};
input_values.reserve(op.input_count);
for (auto i = std::size_t{0}; i < op.input_count; ++i) {
input_values.push_back(pop());
}
push(op.evaluate_output(src.output_index, std::move(input_values), "truncate"_a = truncate).cast<number>());
break;
}
case instruction_type::forward_value:
// Do nothing, since doing push(pop()) would be pointless.
break;
}
// If we've been given a global override for how many bits to use, always truncate the result.
if (bits_override) {
push(truncate_value(pop(), bit_mask_override));
}
// Store the result.
state.results[instruction.result_index] = peek();
}
// Remove the space that we used for ignored results.
state.results.pop_back();
// Erase the portion of the stack that does not contain the output values.
state.stack.erase(state.stack.begin() + static_cast<std::ptrdiff_t>(code.output_count), state.stack.end());
return state;
}
} // namespace asic
\ No newline at end of file
#ifndef ASIC_SIMULATION_RUN_H
#define ASIC_SIMULATION_RUN_H
#include "../number.h"
#include "../span.h"
#include "compile.h"
#include <cstdint>
#include <vector>
namespace asic {
struct simulation_state final {
std::vector<number> stack;
std::vector<number> results;
};
simulation_state run_simulation(simulation_code const& code, span<number const> inputs, span<number> delays,
std::optional<std::uint8_t> bits_override, bool truncate);
} // namespace asic
#endif // ASIC_SIMULATION_RUN_H
\ No newline at end of file
#define NOMINMAX
#include "simulation.h"
#include "../algorithm.h"
#include "../debug.h"
#include "compile.h"
#include "run.h"
#include <fmt/format.h>
#include <limits>
#include <pybind11/numpy.h>
#include <utility>
namespace py = pybind11;
namespace asic {
simulation::simulation(pybind11::handle sfg, std::optional<std::vector<std::optional<input_provider_t>>> input_providers)
: m_code(compile_simulation(sfg))
, m_input_functions(sfg.attr("input_count").cast<std::size_t>(), [](iteration_t) -> number { return number{}; }) {
m_delays.reserve(m_code.delays.size());
for (auto const& delay : m_code.delays) {
m_delays.push_back(delay.initial_value);
}
if (input_providers) {
this->set_inputs(std::move(*input_providers));
}
}
void simulation::set_input(std::size_t index, input_provider_t input_provider) {
if (index >= m_input_functions.size()) {
throw py::index_error{fmt::format("Input index out of range (expected 0-{}, got {})", m_input_functions.size() - 1, index)};
}
if (auto* const callable = std::get_if<input_function_t>(&input_provider)) {
m_input_functions[index] = std::move(*callable);
} else if (auto* const numeric = std::get_if<number>(&input_provider)) {
m_input_functions[index] = [value = *numeric](iteration_t) -> number {
return value;
};
} else if (auto* const list = std::get_if<std::vector<number>>(&input_provider)) {
if (!m_input_length) {
m_input_length = static_cast<iteration_t>(list->size());
} else if (*m_input_length != static_cast<iteration_t>(list->size())) {
throw py::value_error{fmt::format("Inconsistent input length for simulation (was {}, got {})", *m_input_length, list->size())};
}
m_input_functions[index] = [values = std::move(*list)](iteration_t n) -> number {
return values.at(n);
};
}
}
void simulation::set_inputs(std::vector<std::optional<input_provider_t>> input_providers) {
if (input_providers.size() != m_input_functions.size()) {
throw py::value_error{fmt::format(
"Wrong number of inputs supplied to simulation (expected {}, got {})", m_input_functions.size(), input_providers.size())};
}
for (auto&& [i, input_provider] : enumerate(input_providers)) {
if (input_provider) {
this->set_input(i, std::move(*input_provider));
}
}
}
std::vector<number> simulation::step(bool save_results, std::optional<std::uint8_t> bits_override, bool truncate) {
return this->run_for(1, save_results, bits_override, truncate);
}
std::vector<number> simulation::run_until(iteration_t iteration, bool save_results, std::optional<std::uint8_t> bits_override,
bool truncate) {
auto result = std::vector<number>{};
while (m_iteration < iteration) {
ASIC_DEBUG_MSG("Running simulation iteration.");
auto inputs = std::vector<number>(m_code.input_count);
for (auto&& [input, function] : zip(inputs, m_input_functions)) {
input = function(m_iteration);
}
auto state = run_simulation(m_code, inputs, m_delays, bits_override, truncate);
result = std::move(state.stack);
if (save_results) {
m_results.push_back(std::move(state.results));
}
++m_iteration;
}
return result;
}
std::vector<number> simulation::run_for(iteration_t iterations, bool save_results, std::optional<std::uint8_t> bits_override,
bool truncate) {
if (iterations > std::numeric_limits<iteration_t>::max() - m_iteration) {
throw py::value_error("Simulation iteration type overflow!");
}
return this->run_until(m_iteration + iterations, save_results, bits_override, truncate);
}
std::vector<number> simulation::run(bool save_results, std::optional<std::uint8_t> bits_override, bool truncate) {
if (m_input_length) {
return this->run_until(*m_input_length, save_results, bits_override, truncate);
}
throw py::index_error{"Tried to run unlimited simulation"};
}
iteration_t simulation::iteration() const noexcept {
return m_iteration;
}
pybind11::dict simulation::results() const noexcept {
auto results = py::dict{};
if (!m_results.empty()) {
for (auto const& [i, key] : enumerate(m_code.result_keys)) {
auto values = std::vector<number>{};
values.reserve(m_results.size());
for (auto const& result : m_results) {
values.push_back(result[i]);
}
results[py::str{key}] = py::array{static_cast<py::ssize_t>(values.size()), values.data()};
}
}
return results;
}
void simulation::clear_results() noexcept {
m_results.clear();
}
void simulation::clear_state() noexcept {
m_delays.clear();
}
} // namespace asic
#ifndef ASIC_SIMULATION_DOD_H
#define ASIC_SIMULATION_DOD_H
#include "../number.h"
#include "compile.h"
#include <cstddef>
#include <cstdint>
#include <functional>
#include <optional>
#include <pybind11/functional.h>
#include <pybind11/pybind11.h>
#include <pybind11/stl.h>
#include <variant>
#include <vector>
namespace asic {
using iteration_t = std::uint32_t;
using input_function_t = std::function<number(iteration_t)>;
using input_provider_t = std::variant<number, std::vector<number>, input_function_t>;
class simulation final {
public:
simulation(pybind11::handle sfg, std::optional<std::vector<std::optional<input_provider_t>>> input_providers = std::nullopt);
void set_input(std::size_t index, input_provider_t input_provider);
void set_inputs(std::vector<std::optional<input_provider_t>> input_providers);
[[nodiscard]] std::vector<number> step(bool save_results, std::optional<std::uint8_t> bits_override, bool truncate);
[[nodiscard]] std::vector<number> run_until(iteration_t iteration, bool save_results, std::optional<std::uint8_t> bits_override,
bool truncate);
[[nodiscard]] std::vector<number> run_for(iteration_t iterations, bool save_results, std::optional<std::uint8_t> bits_override,
bool truncate);
[[nodiscard]] std::vector<number> run(bool save_results, std::optional<std::uint8_t> bits_override, bool truncate);
[[nodiscard]] iteration_t iteration() const noexcept;
[[nodiscard]] pybind11::dict results() const noexcept;
void clear_results() noexcept;
void clear_state() noexcept;
private:
simulation_code m_code;
std::vector<number> m_delays;
std::vector<input_function_t> m_input_functions;
std::optional<iteration_t> m_input_length;
iteration_t m_iteration = 0;
std::vector<std::vector<number>> m_results;
};
} // namespace asic
#endif // ASIC_SIMULATION_DOD_H
\ No newline at end of file
#ifndef ASIC_SPAN_H
#define ASIC_SPAN_H
#include <cstddef>
#include <type_traits>
#include <utility>
#include <iterator>
#include <limits>
#include <array>
#include <algorithm>
#include <cassert>
namespace asic {
constexpr auto dynamic_size = static_cast<std::size_t>(-1);
// C++17-compatible std::span substitute.
template <typename T, std::size_t Size = dynamic_size>
class span;
namespace detail {
template <typename T>
struct is_span_impl : std::false_type {};
template <typename T, std::size_t Size>
struct is_span_impl<span<T, Size>> : std::true_type {};
template <typename T>
struct is_span : is_span_impl<std::remove_cv_t<T>> {};
template <typename T>
constexpr auto is_span_v = is_span<T>::value;
template <typename T>
struct is_std_array_impl : std::false_type {};
template <typename T, std::size_t Size>
struct is_std_array_impl<std::array<T, Size>> : std::true_type {};
template <typename T>
struct is_std_array : is_std_array_impl<std::remove_cv_t<T>> {};
template <typename T>
constexpr auto is_std_array_v = is_std_array<T>::value;
template <std::size_t From, std::size_t To>
struct is_size_convertible : std::bool_constant<From == To || From == dynamic_size || To == dynamic_size> {};
template <std::size_t From, std::size_t To>
constexpr auto is_size_convertible_v = is_size_convertible<From, To>::value;
template <typename From, typename To>
struct is_element_type_convertible : std::bool_constant<std::is_convertible_v<From(*)[], To(*)[]>> {};
template <typename From, typename To>
constexpr auto is_element_type_convertible_v = is_element_type_convertible<From, To>::value;
template <typename T, std::size_t Size>
struct span_base {
using element_type = T;
using pointer = element_type*;
using size_type = std::size_t;
constexpr span_base() noexcept = default;
constexpr span_base(pointer data, [[maybe_unused]] size_type size) : m_data(data) { assert(size == Size); }
template <size_type N>
constexpr span_base(span_base<T, N> other) : m_data(other.data()) {
static_assert(N == Size || N == dynamic_size);
assert(other.size() == Size);
}
[[nodiscard]] constexpr pointer data() const noexcept { return m_data; }
[[nodiscard]] constexpr size_type size() const noexcept { return Size; }
private:
pointer m_data = nullptr;
};
template <typename T>
struct span_base<T, dynamic_size> {
using element_type = T;
using pointer = element_type*;
using size_type = std::size_t;
constexpr span_base() noexcept = default;
constexpr span_base(pointer data, size_type size) : m_data(data), m_size(size) {}
template <size_type N>
explicit constexpr span_base(span_base<T, N> other) : m_data(other.data()), m_size(other.size()) {}
[[nodiscard]] constexpr pointer data() const noexcept { return m_data; }
[[nodiscard]] constexpr size_type size() const noexcept { return m_size; }
private:
pointer m_data = nullptr;
size_type m_size = 0;
};
template <typename T, std::size_t Size, std::size_t Offset, std::size_t N>
struct subspan_type {
using type = span<
T,
(N != dynamic_size) ?
N :
(Size != dynamic_size) ?
Size - Offset :
Size
>;
};
template <typename T, std::size_t Size, std::size_t Offset, std::size_t Count>
using subspan_type_t = typename subspan_type<T, Size, Offset, Count>::type;
} // namespace detail
template <typename T, std::size_t Size>
class span final : public detail::span_base<T, Size> {
public:
using element_type = typename detail::span_base<T, Size>::element_type;
using pointer = typename detail::span_base<T, Size>::pointer;
using size_type = typename detail::span_base<T, Size>::size_type;
using value_type = std::remove_cv_t<element_type>;
using reference = element_type&;
using iterator = element_type*;
using const_iterator = const element_type*;
using reverse_iterator = std::reverse_iterator<iterator>;
using const_reverse_iterator = std::reverse_iterator<const_iterator>;
// Default constructor.
constexpr span() noexcept = default;
// Construct from pointer, size.
constexpr span(pointer data, size_type size) : detail::span_base<T, Size>(data, size) {}
// Copy constructor.
template <
typename U, std::size_t N,
typename = std::enable_if_t<detail::is_size_convertible_v<N, Size>>,
typename = std::enable_if_t<detail::is_element_type_convertible_v<U, T>>
>
constexpr span(span<U, N> const& other) : span(other.data(), other.size()) {}
// Copy assignment.
constexpr span& operator=(span const&) noexcept = default;
// Destructor.
~span() = default;
// Construct from begin, end.
constexpr span(pointer begin, pointer end) : span(begin, end - begin) {}
// Construct from C array.
template <std::size_t N>
constexpr span(element_type(&arr)[N]) noexcept : span(std::data(arr), N) {}
// Construct from std::array.
template <
std::size_t N,
typename = std::enable_if_t<N != 0>
>
constexpr span(std::array<value_type, N>& arr) noexcept : span(std::data(arr), N) {}
// Construct from empty std::array.
constexpr span(std::array<value_type, 0>&) noexcept : span() {}
// Construct from const std::array.
template <
std::size_t N,
typename = std::enable_if_t<N != 0>
>
constexpr span(std::array<value_type, N> const& arr) noexcept : span(std::data(arr), N) {}
// Construct from empty const std::array.
constexpr span(std::array<value_type, 0> const&) noexcept : span() {}
// Construct from other container.
template <
typename Container,
typename = std::enable_if_t<!detail::is_span_v<Container>>,
typename = std::enable_if_t<!detail::is_std_array_v<Container>>,
typename = decltype(std::data(std::declval<Container>())),
typename = decltype(std::size(std::declval<Container>())),
typename = std::enable_if_t<std::is_convertible_v<typename Container::pointer, pointer>>,
typename = std::enable_if_t<std::is_convertible_v<typename Container::pointer, decltype(std::data(std::declval<Container>()))>>
>
constexpr span(Container& container) : span(std::data(container), std::size(container)) {}
// Construct from other const container.
template <
typename Container,
typename Element = element_type,
typename = std::enable_if_t<std::is_const_v<Element>>,
typename = std::enable_if_t<!detail::is_span_v<Container>>,
typename = std::enable_if_t<!detail::is_std_array_v<Container>>,
typename = decltype(std::data(std::declval<Container>())),
typename = decltype(std::size(std::declval<Container>())),
typename = std::enable_if_t<std::is_convertible_v<typename Container::pointer, pointer>>,
typename = std::enable_if_t<std::is_convertible_v<typename Container::pointer, decltype(std::data(std::declval<Container>()))>>
>
constexpr span(Container const& container) : span(std::data(container), std::size(container)) {}
[[nodiscard]] constexpr iterator begin() const noexcept { return this->data(); }
[[nodiscard]] constexpr const_iterator cbegin() const noexcept { return this->data(); }
[[nodiscard]] constexpr iterator end() const noexcept { return this->data() + this->size(); }
[[nodiscard]] constexpr const_iterator cend() const noexcept { return this->data() + this->size(); }
[[nodiscard]] constexpr reverse_iterator rbegin() const noexcept { return std::make_reverse_iterator(this->end()); }
[[nodiscard]] constexpr const_reverse_iterator crbegin() const noexcept { return std::make_reverse_iterator(this->cend()); }
[[nodiscard]] constexpr reverse_iterator rend() const noexcept { return std::make_reverse_iterator(this->begin()); }
[[nodiscard]] constexpr const_reverse_iterator crend() const noexcept { return std::make_reverse_iterator(this->cbegin()); }
[[nodiscard]] constexpr reference operator[](size_type i) const noexcept { assert(i < this->size()); return this->data()[i]; }
[[nodiscard]] constexpr reference operator()(size_type i) const noexcept { assert(i < this->size()); return this->data()[i]; }
[[nodiscard]] constexpr size_type size_bytes() const noexcept { return this->size() * sizeof(element_type); }
[[nodiscard]] constexpr bool empty() const noexcept { return this->size() == 0; }
[[nodiscard]] constexpr reference front() const noexcept { assert(!this->empty()); return this->data()[0]; }
[[nodiscard]] constexpr reference back() const noexcept { assert(!this->empty()); return this->data()[this->size() - 1]; }
template <std::size_t N>
[[nodiscard]] constexpr span<T, N> first() const {
static_assert(N != dynamic_size && N <= Size);
return {this->data(), N};
}
template <std::size_t N>
[[nodiscard]] constexpr span<T, N> last() const {
static_assert(N != dynamic_size && N <= Size);
return {this->data() + (Size - N), N};
}
template <std::size_t Offset, std::size_t N = dynamic_size>
[[nodiscard]] constexpr auto subspan() const -> detail::subspan_type_t<T, Size, Offset, N> {
static_assert(Offset <= Size);
return {this->data() + Offset, (N == dynamic_size) ? this->size() - Offset : N};
}
[[nodiscard]] constexpr span<T, dynamic_size> first(size_type n) const {
assert(n <= this->size());
return { this->data(), n };
}
[[nodiscard]] constexpr span<T, dynamic_size> last(size_type n) const {
return this->subspan(this->size() - n);
}
[[nodiscard]] constexpr span<T, dynamic_size> subspan(size_type offset, size_type n = dynamic_size) const {
if constexpr (Size == dynamic_size) {
assert(offset <= this->size());
if (n == dynamic_size) {
return { this->data() + offset, this->size() - offset };
}
assert(n <= this->size());
assert(offset + n <= this->size());
return {this->data() + offset, n};
} else {
return span<T, dynamic_size>{*this}.subspan(offset, n);
}
}
};
template <typename T, std::size_t LhsSize, std::size_t RhsSize>
[[nodiscard]] constexpr bool operator==(span<T, LhsSize> lhs, span<T, RhsSize> rhs) {
return std::equal(lhs.begin(), lhs.end(), rhs.begin(), rhs.end());
}
template <typename T, std::size_t LhsSize, std::size_t RhsSize>
[[nodiscard]] constexpr bool operator!=(span<T, LhsSize> lhs, span<T, RhsSize> rhs) {
return !(lhs == rhs);
}
template <typename T, std::size_t LhsSize, std::size_t RhsSize>
[[nodiscard]] constexpr bool operator<(span<T, LhsSize> lhs, span<T, RhsSize> rhs) {
return std::lexicographical_compare(lhs.begin(), lhs.end(), rhs.begin(), rhs.end());
}
template <typename T, std::size_t LhsSize, std::size_t RhsSize>
[[nodiscard]] constexpr bool operator<=(span<T, LhsSize> lhs, span<T, RhsSize> rhs) {
return !(lhs > rhs);
}
template <typename T, std::size_t LhsSize, std::size_t RhsSize>
[[nodiscard]] constexpr bool operator>(span<T, LhsSize> lhs, span<T, RhsSize> rhs) {
return rhs < lhs;
}
template <typename T, std::size_t LhsSize, std::size_t RhsSize>
[[nodiscard]] constexpr bool operator>=(span<T, LhsSize> lhs, span<T, RhsSize> rhs) {
return !(lhs < rhs);
}
template <typename Container>
span(Container&) -> span<typename Container::value_type>;
template <typename Container>
span(Container const&) -> span<typename Container::value_type const>;
template <typename T, std::size_t N>
span(T(&)[N]) -> span<T, N>;
template <typename T, std::size_t N>
span(std::array<T, N>&) -> span<T, N>;
template <typename T, std::size_t N>
span(std::array<T, N> const&) -> span<T const, N>;
template <typename T, typename Dummy>
span(T, Dummy&&) -> span<std::remove_reference_t<decltype(std::declval<T>()[0])>>;
} // namespace asic
#endif // ASIC_SPAN_H
\ No newline at end of file
from test.fixtures.signal import signal, signals
from test.fixtures.operation_tree import *
from test.fixtures.port import *
import pytest
from test.fixtures.signal_flow_graph import *
import pytest
\ No newline at end of file
from b_asic.core_operations import Addition, Constant
from b_asic.signal import Signal
import pytest
from b_asic import Addition, Constant, Signal, Butterfly
@pytest.fixture
def operation():
return Constant(2)
def create_operation(_type, dest_oper, index, **kwargs):
oper = _type(**kwargs)
oper_signal = Signal()
oper._output_ports[0].add_signal(oper_signal)
dest_oper._input_ports[index].add_signal(oper_signal)
return oper
@pytest.fixture
def operation_tree():
"""Return a addition operation connected with 2 constants.
---C---+
---A
---C---+
"""Valid addition operation connected with 2 constants.
2---+
|
v
add = 2 + 3 = 5
^
|
3---+
"""
add_oper = Addition()
create_operation(Constant, add_oper, 0, value=2)
create_operation(Constant, add_oper, 1, value=3)
return add_oper
return Addition(Constant(2), Constant(3))
@pytest.fixture
def large_operation_tree():
"""Return a constant operation connected with a large operation tree with 3 other constants and 3 additions.
---C---+
---A---+
---C---+ |
+---A
---C---+ |
---A---+
---C---+
"""Valid addition operation connected with a large operation tree with 2 other additions and 4 constants.
2---+
|
v
add---+
^ |
| |
3---+ v
add = (2 + 3) + (4 + 5) = 14
4---+ ^
| |
v |
add---+
^
|
5---+
"""
add_oper = Addition()
add_oper_2 = Addition()
const_oper = create_operation(Constant, add_oper, 0, value=2)
create_operation(Constant, add_oper, 1, value=3)
return Addition(Addition(Constant(2), Constant(3)), Addition(Constant(4), Constant(5)))
create_operation(Constant, add_oper_2, 0, value=4)
create_operation(Constant, add_oper_2, 1, value=5)
@pytest.fixture
def large_operation_tree_names():
"""Valid addition operation connected with a large operation tree with 2 other additions and 4 constants.
With names.
2---+
|
v
add---+
^ |
| |
3---+ v
add = (2 + 3) + (4 + 5) = 14
4---+ ^
| |
v |
add---+
^
|
5---+
"""
return Addition(Addition(Constant(2, name="constant2"), Constant(3, name="constant3")), Addition(Constant(4, name="constant4"), Constant(5, name="constant5")))
add_oper_3 = Addition()
add_oper_signal = Signal(add_oper.output(0), add_oper_3.output(0))
add_oper._output_ports[0].add_signal(add_oper_signal)
add_oper_3._input_ports[0].add_signal(add_oper_signal)
@pytest.fixture
def butterfly_operation_tree():
"""Valid butterfly operations connected to eachother with 3 butterfly operations and 2 constants as inputs and 2 outputs.
2 ---+ +--- (2 + 4) ---+ +--- (6 + (-2)) ---+ +--- (4 + 8) ---> out1 = 12
| | | | | |
v ^ v ^ v ^
butterfly butterfly butterfly
^ v ^ v ^ v
| | | | | |
4 ---+ +--- (2 - 4) ---+ +--- (6 - (-2)) ---+ +--- (4 - 8) ---> out2 = -4
"""
return Butterfly(*(Butterfly(*(Butterfly(Constant(2), Constant(4), name="bfly3").outputs), name="bfly2").outputs), name="bfly1")
add_oper_2_signal = Signal(add_oper_2.output(0), add_oper_3.output(0))
add_oper_2._output_ports[0].add_signal(add_oper_2_signal)
add_oper_3._input_ports[1].add_signal(add_oper_2_signal)
return const_oper
@pytest.fixture
def operation_graph_with_cycle():
"""Invalid addition operation connected with an operation graph containing a cycle.
+-+
| |
v |
add+---+
^ |
| v
7 add = (? + 7) + 6 = ?
^
|
6
"""
add1 = Addition(None, Constant(7))
add1.input(0).connect(add1)
return Addition(add1, Constant(6))
import pytest
from b_asic.port import InputPort, OutputPort
from b_asic import InputPort, OutputPort
@pytest.fixture
def input_port():
return InputPort(0, None)
return InputPort(None, 0)
@pytest.fixture
def output_port():
return OutputPort(0, None)
return OutputPort(None, 0)
@pytest.fixture
def list_of_input_ports():
return [InputPort(None, i) for i in range(0, 3)]
@pytest.fixture
def list_of_output_ports():
return [OutputPort(None, i) for i in range(0, 3)]
import pytest
from b_asic import Signal
@pytest.fixture
def signal():
"""Return a signal with no connections."""
......@@ -9,4 +11,4 @@ def signal():
@pytest.fixture
def signals():
"""Return 3 signals with no connections."""
return [Signal() for _ in range(0,3)]
return [Signal() for _ in range(0, 3)]
import pytest
from b_asic import SFG, Input, Output, Constant, Delay, Addition, ConstantMultiplication, Butterfly, AbstractOperation, Name, TypeName, SignalSourceProvider
from typing import Optional
@pytest.fixture
def sfg_two_inputs_two_outputs():
"""Valid SFG with two inputs and two outputs.
. .
in1-------+ +--------->out1
. | | .
. v | .
. add1+--+ .
. ^ | .
. | v .
in2+------+ add2---->out2
| . ^ .
| . | .
+------------+ .
. .
out1 = in1 + in2
out2 = in1 + 2 * in2
"""
in1 = Input("IN1")
in2 = Input("IN2")
add1 = Addition(in1, in2, "ADD1")
add2 = Addition(add1, in2, "ADD2")
out1 = Output(add1, "OUT1")
out2 = Output(add2, "OUT2")
return SFG(inputs=[in1, in2], outputs=[out1, out2])
@pytest.fixture
def sfg_two_inputs_two_outputs_independent():
"""Valid SFG with two inputs and two outputs, where the first output only depends
on the first input and the second output only depends on the second input.
. .
in1-------------------->out1
. .
. .
. c1--+ .
. | .
. v .
in2------+ add1---->out2
. | ^ .
. | | .
. +------+ .
. .
out1 = in1
out2 = in2 + 3
"""
in1 = Input("IN1")
in2 = Input("IN2")
c1 = Constant(3, "C1")
add1 = Addition(in2, c1, "ADD1")
out1 = Output(in1, "OUT1")
out2 = Output(add1, "OUT2")
return SFG(inputs=[in1, in2], outputs=[out1, out2])
@pytest.fixture
def sfg_two_inputs_two_outputs_independent_with_cmul():
"""Valid SFG with two inputs and two outputs, where the first output only depends
on the first input and the second output only depends on the second input.
. .
in1--->cmul1--->cmul2--->out1
. .
. .
. c1 .
. | .
. v .
in2--->add1---->cmul3--->out2
. .
"""
in1 = Input("IN1")
in2 = Input("IN2")
c1 = Constant(3, "C1")
add1 = Addition(in2, c1, "ADD1", 7)
cmul3 = ConstantMultiplication(2, add1, "CMUL3", 3)
cmul1 = ConstantMultiplication(5, in1, "CMUL1", 5)
cmul2 = ConstantMultiplication(4, cmul1, "CMUL2", 4)
out1 = Output(cmul2, "OUT1")
out2 = Output(cmul3, "OUT2")
return SFG(inputs=[in1, in2], outputs=[out1, out2])
@pytest.fixture
def sfg_nested():
"""Valid SFG with two inputs and one output.
out1 = in1 + (in1 + in1 * in2) * (in1 + in2 * (in1 + in1 * in2))
"""
mac_in1 = Input()
mac_in2 = Input()
mac_in3 = Input()
mac_out1 = Output(mac_in1 + mac_in2 * mac_in3)
MAC = SFG(inputs=[mac_in1, mac_in2, mac_in3], outputs=[mac_out1])
in1 = Input()
in2 = Input()
mac1 = MAC(in1, in1, in2)
mac2 = MAC(in1, in2, mac1)
mac3 = MAC(in1, mac1, mac2)
out1 = Output(mac3)
return SFG(inputs=[in1, in2], outputs=[out1])
@pytest.fixture
def sfg_delay():
"""Valid SFG with one input and one output.
out1 = in1'
"""
in1 = Input()
t1 = Delay(in1)
out1 = Output(t1)
return SFG(inputs = [in1], outputs = [out1])
@pytest.fixture
def sfg_accumulator():
"""Valid SFG with two inputs and one output.
data_out = (data_in' + data_in) * (1 - reset)
"""
data_in = Input()
reset = Input()
t = Delay()
t << (t + data_in) * (1 - reset)
data_out = Output(t)
return SFG(inputs = [data_in, reset], outputs = [data_out])
@pytest.fixture
def sfg_simple_accumulator():
"""Valid SFG with two inputs and one output.
. .
in1----->add1-----+----->out1
. ^ | .
. | | .
. +--t1<--+ .
. .
"""
in1 = Input()
t1 = Delay()
add1 = in1 + t1
t1 << add1
out1 = Output(add1)
return SFG(inputs = [in1], outputs = [out1])
@pytest.fixture
def sfg_simple_filter():
"""A valid SFG that is used as a filter in the first lab for TSTE87.
. .
. +--cmul1<--+ .
. | | .
. v | .
in1---->add1----->t1+---->out1
. .
"""
in1 = Input("IN1")
cmul1 = ConstantMultiplication(0.5, name="CMUL1")
add1 = Addition(in1, cmul1, "ADD1")
add1.input(1).signals[0].name = "S2"
t1 = Delay(add1, name="T1")
cmul1.input(0).connect(t1, "S1")
out1 = Output(t1, "OUT1")
return SFG(inputs=[in1], outputs=[out1], name="simple_filter")
@pytest.fixture
def sfg_custom_operation():
"""A valid SFG containing a custom operation."""
class CustomOperation(AbstractOperation):
def __init__(self, src0: Optional[SignalSourceProvider] = None, name: Name = ""):
super().__init__(input_count = 1, output_count = 2, name = name, input_sources = [src0])
@classmethod
def type_name(self) -> TypeName:
return "custom"
def evaluate(self, a):
return a * 2, 2 ** a
in1 = Input()
custom1 = CustomOperation(in1)
out1 = Output(custom1.output(0))
out2 = Output(custom1.output(1))
return SFG(inputs=[in1], outputs=[out1, out2])
@pytest.fixture
def precedence_sfg_delays():
"""A sfg with delays and interesting layout for precednce list generation.
. .
IN1>--->C0>--->ADD1>--->Q1>---+--->A0>--->ADD4>--->OUT1
. ^ | ^ .
. | T1 | .
. | | | .
. ADD2<---<B1<---+--->A1>--->ADD3 .
. ^ | ^ .
. | T2 | .
. | | | .
. +-----<B2<---+--->A2>-----+ .
"""
in1 = Input("IN1")
c0 = ConstantMultiplication(5, in1, "C0")
add1 = Addition(c0, None, "ADD1")
# Not sure what operation "Q" is supposed to be in the example
Q1 = ConstantMultiplication(1, add1, "Q1")
T1 = Delay(Q1, 0, "T1")
T2 = Delay(T1, 0, "T2")
b2 = ConstantMultiplication(2, T2, "B2")
b1 = ConstantMultiplication(3, T1, "B1")
add2 = Addition(b1, b2, "ADD2")
add1.input(1).connect(add2)
a1 = ConstantMultiplication(4, T1, "A1")
a2 = ConstantMultiplication(6, T2, "A2")
add3 = Addition(a1, a2, "ADD3")
a0 = ConstantMultiplication(7, Q1, "A0")
add4 = Addition(a0, add3, "ADD4")
out1 = Output(add4, "OUT1")
return SFG(inputs=[in1], outputs=[out1], name="SFG")
@pytest.fixture
def precedence_sfg_delays_and_constants():
in1 = Input("IN1")
c0 = ConstantMultiplication(5, in1, "C0")
add1 = Addition(c0, None, "ADD1")
# Not sure what operation "Q" is supposed to be in the example
Q1 = ConstantMultiplication(1, add1, "Q1")
T1 = Delay(Q1, 0, "T1")
const1 = Constant(10, "CONST1") # Replace T2 delay with a constant
b2 = ConstantMultiplication(2, const1, "B2")
b1 = ConstantMultiplication(3, T1, "B1")
add2 = Addition(b1, b2, "ADD2")
add1.input(1).connect(add2)
a1 = ConstantMultiplication(4, T1, "A1")
a2 = ConstantMultiplication(10, const1, "A2")
add3 = Addition(a1, a2, "ADD3")
a0 = ConstantMultiplication(7, Q1, "A0")
# Replace ADD4 with a butterfly to test multiple output ports
bfly1 = Butterfly(a0, add3, "BFLY1")
out1 = Output(bfly1.output(0), "OUT1")
Output(bfly1.output(1), "OUT2")
return SFG(inputs=[in1], outputs=[out1], name="SFG")
"""
B-ASIC test suite for the AbstractOperation class.
"""
from b_asic.core_operations import Addition, ConstantAddition, Subtraction, ConstantSubtraction, \
Multiplication, ConstantMultiplication, Division, ConstantDivision
import pytest
def test_addition_overload():
"""Tests addition overloading for both operation and number argument."""
add1 = Addition(None, None, "add1")
add2 = Addition(None, None, "add2")
add3 = add1 + add2
assert isinstance(add3, Addition)
assert add3.input(0).signals == add1.output(0).signals
assert add3.input(1).signals == add2.output(0).signals
add4 = add3 + 5
assert isinstance(add4, ConstantAddition)
assert add4.input(0).signals == add3.output(0).signals
def test_subtraction_overload():
"""Tests subtraction overloading for both operation and number argument."""
add1 = Addition(None, None, "add1")
add2 = Addition(None, None, "add2")
sub1 = add1 - add2
assert isinstance(sub1, Subtraction)
assert sub1.input(0).signals == add1.output(0).signals
assert sub1.input(1).signals == add2.output(0).signals
sub2 = sub1 - 5
assert isinstance(sub2, ConstantSubtraction)
assert sub2.input(0).signals == sub1.output(0).signals
def test_multiplication_overload():
"""Tests multiplication overloading for both operation and number argument."""
add1 = Addition(None, None, "add1")
add2 = Addition(None, None, "add2")
mul1 = add1 * add2
assert isinstance(mul1, Multiplication)
assert mul1.input(0).signals == add1.output(0).signals
assert mul1.input(1).signals == add2.output(0).signals
mul2 = mul1 * 5
assert isinstance(mul2, ConstantMultiplication)
assert mul2.input(0).signals == mul1.output(0).signals
def test_division_overload():
"""Tests division overloading for both operation and number argument."""
add1 = Addition(None, None, "add1")
add2 = Addition(None, None, "add2")
div1 = add1 / add2
assert isinstance(div1, Division)
assert div1.input(0).signals == add1.output(0).signals
assert div1.input(1).signals == add2.output(0).signals
div2 = div1 / 5
assert isinstance(div2, ConstantDivision)
assert div2.input(0).signals == div1.output(0).signals
......@@ -2,226 +2,175 @@
B-ASIC test suite for the core operations.
"""
from b_asic.core_operations import Constant, Addition, Subtraction, Multiplication, Division, SquareRoot, ComplexConjugate, Max, Min, Absolute, ConstantMultiplication, ConstantAddition, ConstantSubtraction, ConstantDivision
# Constant tests.
def test_constant():
constant_operation = Constant(3)
assert constant_operation.evaluate() == 3
def test_constant_negative():
constant_operation = Constant(-3)
assert constant_operation.evaluate() == -3
def test_constant_complex():
constant_operation = Constant(3+4j)
assert constant_operation.evaluate() == 3+4j
# Addition tests.
def test_addition():
test_operation = Addition()
constant_operation = Constant(3)
constant_operation_2 = Constant(5)
assert test_operation.evaluate(constant_operation.evaluate(), constant_operation_2.evaluate()) == 8
def test_addition_negative():
test_operation = Addition()
constant_operation = Constant(-3)
constant_operation_2 = Constant(-5)
assert test_operation.evaluate(constant_operation.evaluate(), constant_operation_2.evaluate()) == -8
def test_addition_complex():
test_operation = Addition()
constant_operation = Constant((3+5j))
constant_operation_2 = Constant((4+6j))
assert test_operation.evaluate(constant_operation.evaluate(), constant_operation_2.evaluate()) == (7+11j)
# Subtraction tests.
def test_subtraction():
test_operation = Subtraction()
constant_operation = Constant(5)
constant_operation_2 = Constant(3)
assert test_operation.evaluate(constant_operation.evaluate(), constant_operation_2.evaluate()) == 2
def test_subtraction_negative():
test_operation = Subtraction()
constant_operation = Constant(-5)
constant_operation_2 = Constant(-3)
assert test_operation.evaluate(constant_operation.evaluate(), constant_operation_2.evaluate()) == -2
def test_subtraction_complex():
test_operation = Subtraction()
constant_operation = Constant((3+5j))
constant_operation_2 = Constant((4+6j))
assert test_operation.evaluate(constant_operation.evaluate(), constant_operation_2.evaluate()) == (-1-1j)
# Multiplication tests.
def test_multiplication():
test_operation = Multiplication()
constant_operation = Constant(5)
constant_operation_2 = Constant(3)
assert test_operation.evaluate(constant_operation.evaluate(), constant_operation_2.evaluate()) == 15
def test_multiplication_negative():
test_operation = Multiplication()
constant_operation = Constant(-5)
constant_operation_2 = Constant(-3)
assert test_operation.evaluate(constant_operation.evaluate(), constant_operation_2.evaluate()) == 15
def test_multiplication_complex():
test_operation = Multiplication()
constant_operation = Constant((3+5j))
constant_operation_2 = Constant((4+6j))
assert test_operation.evaluate(constant_operation.evaluate(), constant_operation_2.evaluate()) == (-18+38j)
# Division tests.
def test_division():
test_operation = Division()
constant_operation = Constant(30)
constant_operation_2 = Constant(5)
assert test_operation.evaluate(constant_operation.evaluate(), constant_operation_2.evaluate()) == 6
def test_division_negative():
test_operation = Division()
constant_operation = Constant(-30)
constant_operation_2 = Constant(-5)
assert test_operation.evaluate(constant_operation.evaluate(), constant_operation_2.evaluate()) == 6
def test_division_complex():
test_operation = Division()
constant_operation = Constant((60+40j))
constant_operation_2 = Constant((10+20j))
assert test_operation.evaluate(constant_operation.evaluate(), constant_operation_2.evaluate()) == (2.8-1.6j)
# SquareRoot tests.
def test_squareroot():
test_operation = SquareRoot()
constant_operation = Constant(36)
assert test_operation.evaluate(constant_operation.evaluate()) == 6
def test_squareroot_negative():
test_operation = SquareRoot()
constant_operation = Constant(-36)
assert test_operation.evaluate(constant_operation.evaluate()) == 6j
def test_squareroot_complex():
test_operation = SquareRoot()
constant_operation = Constant((48+64j))
assert test_operation.evaluate(constant_operation.evaluate()) == (8+4j)
# ComplexConjugate tests.
def test_complexconjugate():
test_operation = ComplexConjugate()
constant_operation = Constant(3+4j)
assert test_operation.evaluate(constant_operation.evaluate()) == (3-4j)
def test_test_complexconjugate_negative():
test_operation = ComplexConjugate()
constant_operation = Constant(-3-4j)
assert test_operation.evaluate(constant_operation.evaluate()) == (-3+4j)
# Max tests.
def test_max():
test_operation = Max()
constant_operation = Constant(30)
constant_operation_2 = Constant(5)
assert test_operation.evaluate(constant_operation.evaluate(), constant_operation_2.evaluate()) == 30
def test_max_negative():
test_operation = Max()
constant_operation = Constant(-30)
constant_operation_2 = Constant(-5)
assert test_operation.evaluate(constant_operation.evaluate(), constant_operation_2.evaluate()) == -5
# Min tests.
def test_min():
test_operation = Min()
constant_operation = Constant(30)
constant_operation_2 = Constant(5)
assert test_operation.evaluate(constant_operation.evaluate(), constant_operation_2.evaluate()) == 5
def test_min_negative():
test_operation = Min()
constant_operation = Constant(-30)
constant_operation_2 = Constant(-5)
assert test_operation.evaluate(constant_operation.evaluate(), constant_operation_2.evaluate()) == -30
# Absolute tests.
def test_absolute():
test_operation = Absolute()
constant_operation = Constant(30)
assert test_operation.evaluate(constant_operation.evaluate()) == 30
def test_absolute_negative():
test_operation = Absolute()
constant_operation = Constant(-5)
assert test_operation.evaluate(constant_operation.evaluate()) == 5
def test_absolute_complex():
test_operation = Absolute()
constant_operation = Constant((3+4j))
assert test_operation.evaluate(constant_operation.evaluate()) == 5.0
# ConstantMultiplication tests.
def test_constantmultiplication():
test_operation = ConstantMultiplication(5)
constant_operation = Constant(20)
assert test_operation.evaluate(constant_operation.evaluate()) == 100
def test_constantmultiplication_negative():
test_operation = ConstantMultiplication(5)
constant_operation = Constant(-5)
assert test_operation.evaluate(constant_operation.evaluate()) == -25
def test_constantmultiplication_complex():
test_operation = ConstantMultiplication(3+2j)
constant_operation = Constant((3+4j))
assert test_operation.evaluate(constant_operation.evaluate()) == (1+18j)
# ConstantAddition tests.
def test_constantaddition():
test_operation = ConstantAddition(5)
constant_operation = Constant(20)
assert test_operation.evaluate(constant_operation.evaluate()) == 25
def test_constantaddition_negative():
test_operation = ConstantAddition(4)
constant_operation = Constant(-5)
assert test_operation.evaluate(constant_operation.evaluate()) == -1
def test_constantaddition_complex():
test_operation = ConstantAddition(3+2j)
constant_operation = Constant((3+4j))
assert test_operation.evaluate(constant_operation.evaluate()) == (6+6j)
# ConstantSubtraction tests.
def test_constantsubtraction():
test_operation = ConstantSubtraction(5)
constant_operation = Constant(20)
assert test_operation.evaluate(constant_operation.evaluate()) == 15
def test_constantsubtraction_negative():
test_operation = ConstantSubtraction(4)
constant_operation = Constant(-5)
assert test_operation.evaluate(constant_operation.evaluate()) == -9
def test_constantsubtraction_complex():
test_operation = ConstantSubtraction(4+6j)
constant_operation = Constant((3+4j))
assert test_operation.evaluate(constant_operation.evaluate()) == (-1-2j)
# ConstantDivision tests.
def test_constantdivision():
test_operation = ConstantDivision(5)
constant_operation = Constant(20)
assert test_operation.evaluate(constant_operation.evaluate()) == 4
def test_constantdivision_negative():
test_operation = ConstantDivision(4)
constant_operation = Constant(-20)
assert test_operation.evaluate(constant_operation.evaluate()) == -5
def test_constantdivision_complex():
test_operation = ConstantDivision(2+2j)
constant_operation = Constant((10+10j))
assert test_operation.evaluate(constant_operation.evaluate()) == (5+0j)
from b_asic import \
Constant, Addition, Subtraction, Multiplication, ConstantMultiplication, Division, \
SquareRoot, ComplexConjugate, Max, Min, Absolute, Butterfly
class TestConstant:
def test_constant_positive(self):
test_operation = Constant(3)
assert test_operation.evaluate_output(0, []) == 3
def test_constant_negative(self):
test_operation = Constant(-3)
assert test_operation.evaluate_output(0, []) == -3
def test_constant_complex(self):
test_operation = Constant(3+4j)
assert test_operation.evaluate_output(0, []) == 3+4j
class TestAddition:
def test_addition_positive(self):
test_operation = Addition()
assert test_operation.evaluate_output(0, [3, 5]) == 8
def test_addition_negative(self):
test_operation = Addition()
assert test_operation.evaluate_output(0, [-3, -5]) == -8
def test_addition_complex(self):
test_operation = Addition()
assert test_operation.evaluate_output(0, [3+5j, 4+6j]) == 7+11j
class TestSubtraction:
def test_subtraction_positive(self):
test_operation = Subtraction()
assert test_operation.evaluate_output(0, [5, 3]) == 2
def test_subtraction_negative(self):
test_operation = Subtraction()
assert test_operation.evaluate_output(0, [-5, -3]) == -2
def test_subtraction_complex(self):
test_operation = Subtraction()
assert test_operation.evaluate_output(0, [3+5j, 4+6j]) == -1-1j
class TestMultiplication:
def test_multiplication_positive(self):
test_operation = Multiplication()
assert test_operation.evaluate_output(0, [5, 3]) == 15
def test_multiplication_negative(self):
test_operation = Multiplication()
assert test_operation.evaluate_output(0, [-5, -3]) == 15
def test_multiplication_complex(self):
test_operation = Multiplication()
assert test_operation.evaluate_output(0, [3+5j, 4+6j]) == -18+38j
class TestDivision:
def test_division_positive(self):
test_operation = Division()
assert test_operation.evaluate_output(0, [30, 5]) == 6
def test_division_negative(self):
test_operation = Division()
assert test_operation.evaluate_output(0, [-30, -5]) == 6
def test_division_complex(self):
test_operation = Division()
assert test_operation.evaluate_output(0, [60+40j, 10+20j]) == 2.8-1.6j
class TestSquareRoot:
def test_squareroot_positive(self):
test_operation = SquareRoot()
assert test_operation.evaluate_output(0, [36]) == 6
def test_squareroot_negative(self):
test_operation = SquareRoot()
assert test_operation.evaluate_output(0, [-36]) == 6j
def test_squareroot_complex(self):
test_operation = SquareRoot()
assert test_operation.evaluate_output(0, [48+64j]) == 8+4j
class TestComplexConjugate:
def test_complexconjugate_positive(self):
test_operation = ComplexConjugate()
assert test_operation.evaluate_output(0, [3+4j]) == 3-4j
def test_test_complexconjugate_negative(self):
test_operation = ComplexConjugate()
assert test_operation.evaluate_output(0, [-3-4j]) == -3+4j
class TestMax:
def test_max_positive(self):
test_operation = Max()
assert test_operation.evaluate_output(0, [30, 5]) == 30
def test_max_negative(self):
test_operation = Max()
assert test_operation.evaluate_output(0, [-30, -5]) == -5
class TestMin:
def test_min_positive(self):
test_operation = Min()
assert test_operation.evaluate_output(0, [30, 5]) == 5
def test_min_negative(self):
test_operation = Min()
assert test_operation.evaluate_output(0, [-30, -5]) == -30
class TestAbsolute:
def test_absolute_positive(self):
test_operation = Absolute()
assert test_operation.evaluate_output(0, [30]) == 30
def test_absolute_negative(self):
test_operation = Absolute()
assert test_operation.evaluate_output(0, [-5]) == 5
def test_absolute_complex(self):
test_operation = Absolute()
assert test_operation.evaluate_output(0, [3+4j]) == 5.0
class TestConstantMultiplication:
def test_constantmultiplication_positive(self):
test_operation = ConstantMultiplication(5)
assert test_operation.evaluate_output(0, [20]) == 100
def test_constantmultiplication_negative(self):
test_operation = ConstantMultiplication(5)
assert test_operation.evaluate_output(0, [-5]) == -25
def test_constantmultiplication_complex(self):
test_operation = ConstantMultiplication(3+2j)
assert test_operation.evaluate_output(0, [3+4j]) == 1+18j
class TestButterfly:
def test_butterfly_positive(self):
test_operation = Butterfly()
assert test_operation.evaluate_output(0, [2, 3]) == 5
assert test_operation.evaluate_output(1, [2, 3]) == -1
def test_butterfly_negative(self):
test_operation = Butterfly()
assert test_operation.evaluate_output(0, [-2, -3]) == -5
assert test_operation.evaluate_output(1, [-2, -3]) == 1
def test_buttefly_complex(self):
test_operation = Butterfly()
assert test_operation.evaluate_output(0, [2+1j, 3-2j]) == 5-1j
assert test_operation.evaluate_output(1, [2+1j, 3-2j]) == -1+3j
class TestDepends:
def test_depends_addition(self):
add1 = Addition()
assert set(add1.inputs_required_for_output(0)) == {0, 1}
def test_depends_butterfly(self):
bfly1 = Butterfly()
assert set(bfly1.inputs_required_for_output(0)) == {0, 1}
assert set(bfly1.inputs_required_for_output(1)) == {0, 1}
import pytest
import numpy as np
from b_asic import SFG, Output, FastSimulation, Addition, Subtraction, Constant, Butterfly
class TestRunFor:
def test_with_lambdas_as_input(self, sfg_two_inputs_two_outputs):
simulation = FastSimulation(sfg_two_inputs_two_outputs, [lambda n: n + 3, lambda n: 1 + n * 2])
output = simulation.run_for(101, save_results = True)
assert output[0] == 304
assert output[1] == 505
assert simulation.results["0"][100] == 304
assert simulation.results["1"][100] == 505
assert simulation.results["in1"][0] == 3
assert simulation.results["in2"][0] == 1
assert simulation.results["add1"][0] == 4
assert simulation.results["add2"][0] == 5
assert simulation.results["0"][0] == 4
assert simulation.results["1"][0] == 5
assert simulation.results["in1"][1] == 4
assert simulation.results["in2"][1] == 3
assert simulation.results["add1"][1] == 7
assert simulation.results["add2"][1] == 10
assert simulation.results["0"][1] == 7
assert simulation.results["1"][1] == 10
assert simulation.results["in1"][2] == 5
assert simulation.results["in2"][2] == 5
assert simulation.results["add1"][2] == 10
assert simulation.results["add2"][2] == 15
assert simulation.results["0"][2] == 10
assert simulation.results["1"][2] == 15
assert simulation.results["in1"][3] == 6
assert simulation.results["in2"][3] == 7
assert simulation.results["add1"][3] == 13
assert simulation.results["add2"][3] == 20
assert simulation.results["0"][3] == 13
assert simulation.results["1"][3] == 20
def test_with_numpy_arrays_as_input(self, sfg_two_inputs_two_outputs):
input0 = np.array([5, 9, 25, -5, 7])
input1 = np.array([7, 3, 3, 54, 2])
simulation = FastSimulation(sfg_two_inputs_two_outputs, [input0, input1])
output = simulation.run_for(5, save_results = True)
assert output[0] == 9
assert output[1] == 11
assert isinstance(simulation.results["in1"], np.ndarray)
assert isinstance(simulation.results["in2"], np.ndarray)
assert isinstance(simulation.results["add1"], np.ndarray)
assert isinstance(simulation.results["add2"], np.ndarray)
assert isinstance(simulation.results["0"], np.ndarray)
assert isinstance(simulation.results["1"], np.ndarray)
assert simulation.results["in1"][0] == 5
assert simulation.results["in2"][0] == 7
assert simulation.results["add1"][0] == 12
assert simulation.results["add2"][0] == 19
assert simulation.results["0"][0] == 12
assert simulation.results["1"][0] == 19
assert simulation.results["in1"][1] == 9
assert simulation.results["in2"][1] == 3
assert simulation.results["add1"][1] == 12
assert simulation.results["add2"][1] == 15
assert simulation.results["0"][1] == 12
assert simulation.results["1"][1] == 15
assert simulation.results["in1"][2] == 25
assert simulation.results["in2"][2] == 3
assert simulation.results["add1"][2] == 28
assert simulation.results["add2"][2] == 31
assert simulation.results["0"][2] == 28
assert simulation.results["1"][2] == 31
assert simulation.results["in1"][3] == -5
assert simulation.results["in2"][3] == 54
assert simulation.results["add1"][3] == 49
assert simulation.results["add2"][3] == 103
assert simulation.results["0"][3] == 49
assert simulation.results["1"][3] == 103
assert simulation.results["0"][4] == 9
assert simulation.results["1"][4] == 11
def test_with_numpy_array_overflow(self, sfg_two_inputs_two_outputs):
input0 = np.array([5, 9, 25, -5, 7])
input1 = np.array([7, 3, 3, 54, 2])
simulation = FastSimulation(sfg_two_inputs_two_outputs, [input0, input1])
simulation.run_for(5)
with pytest.raises(IndexError):
simulation.step()
def test_run_whole_numpy_array(self, sfg_two_inputs_two_outputs):
input0 = np.array([5, 9, 25, -5, 7])
input1 = np.array([7, 3, 3, 54, 2])
simulation = FastSimulation(sfg_two_inputs_two_outputs, [input0, input1])
simulation.run()
assert len(simulation.results["0"]) == 5
assert len(simulation.results["1"]) == 5
with pytest.raises(IndexError):
simulation.step()
def test_delay(self, sfg_delay):
simulation = FastSimulation(sfg_delay)
simulation.set_input(0, [5, -2, 25, -6, 7, 0])
simulation.run_for(6, save_results = True)
assert simulation.results["0"][0] == 0
assert simulation.results["0"][1] == 5
assert simulation.results["0"][2] == -2
assert simulation.results["0"][3] == 25
assert simulation.results["0"][4] == -6
assert simulation.results["0"][5] == 7
class TestRun:
def test_save_results(self, sfg_two_inputs_two_outputs):
simulation = FastSimulation(sfg_two_inputs_two_outputs, [2, 3])
assert not simulation.results
simulation.run_for(10, save_results = False)
assert not simulation.results
simulation.run_for(10)
assert len(simulation.results["0"]) == 10
assert len(simulation.results["1"]) == 10
simulation.run_for(10, save_results = True)
assert len(simulation.results["0"]) == 20
assert len(simulation.results["1"]) == 20
simulation.run_for(10, save_results = False)
assert len(simulation.results["0"]) == 20
assert len(simulation.results["1"]) == 20
simulation.run_for(13, save_results = True)
assert len(simulation.results["0"]) == 33
assert len(simulation.results["1"]) == 33
simulation.step(save_results = False)
assert len(simulation.results["0"]) == 33
assert len(simulation.results["1"]) == 33
simulation.step()
assert len(simulation.results["0"]) == 34
assert len(simulation.results["1"]) == 34
simulation.clear_results()
assert not simulation.results
def test_nested(self, sfg_nested):
input0 = np.array([5, 9])
input1 = np.array([7, 3])
simulation = FastSimulation(sfg_nested, [input0, input1])
output0 = simulation.step()
output1 = simulation.step()
assert output0[0] == 11405
assert output1[0] == 4221
def test_accumulator(self, sfg_accumulator):
data_in = np.array([5, -2, 25, -6, 7, 0])
reset = np.array([0, 0, 0, 1, 0, 0])
simulation = FastSimulation(sfg_accumulator, [data_in, reset])
output0 = simulation.step()
output1 = simulation.step()
output2 = simulation.step()
output3 = simulation.step()
output4 = simulation.step()
output5 = simulation.step()
assert output0[0] == 0
assert output1[0] == 5
assert output2[0] == 3
assert output3[0] == 28
assert output4[0] == 0
assert output5[0] == 7
def test_simple_accumulator(self, sfg_simple_accumulator):
data_in = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
simulation = FastSimulation(sfg_simple_accumulator, [data_in])
simulation.run()
assert list(simulation.results["0"]) == [0, 1, 3, 6, 10, 15, 21, 28, 36, 45]
def test_simple_filter(self, sfg_simple_filter):
input0 = np.array([1, 2, 3, 4, 5])
simulation = FastSimulation(sfg_simple_filter, [input0])
simulation.run_for(len(input0), save_results = True)
assert all(simulation.results["0"] == np.array([0, 1.0, 2.5, 4.25, 6.125]))
def test_custom_operation(self, sfg_custom_operation):
simulation = FastSimulation(sfg_custom_operation, [lambda n: n + 1])
simulation.run_for(5)
assert all(simulation.results["0"] == np.array([2, 4, 6, 8, 10]))
assert all(simulation.results["1"] == np.array([2, 4, 8, 16, 32]))
class TestLarge:
def test_1k_additions(self):
prev_op = Addition(Constant(1), Constant(1))
for _ in range(999):
prev_op = Addition(prev_op, Constant(2))
sfg = SFG(outputs=[Output(prev_op)])
simulation = FastSimulation(sfg, [])
assert simulation.step()[0] == 2000
def test_1k_subtractions(self):
prev_op = Subtraction(Constant(0), Constant(2))
for _ in range(999):
prev_op = Subtraction(prev_op, Constant(2))
sfg = SFG(outputs=[Output(prev_op)])
simulation = FastSimulation(sfg, [])
assert simulation.step()[0] == -2000
def test_1k_butterfly(self):
prev_op_add = Addition(Constant(1), Constant(1))
prev_op_sub = Subtraction(Constant(-1), Constant(1))
for _ in range(499):
prev_op_add = Addition(prev_op_add, Constant(2))
for _ in range(499):
prev_op_sub = Subtraction(prev_op_sub, Constant(2))
butterfly = Butterfly(prev_op_add, prev_op_sub)
sfg = SFG(outputs=[Output(butterfly.output(0)), Output(butterfly.output(1))])
simulation = FastSimulation(sfg, [])
assert list(simulation.step()) == [0, 2000]
\ No newline at end of file
......@@ -2,9 +2,10 @@
B-ASIC test suite for graph id generator.
"""
from b_asic.graph_id import GraphIDGenerator, GraphID
import pytest
from b_asic import GraphIDGenerator, GraphID
@pytest.fixture
def graph_id_generator():
return GraphIDGenerator()
......@@ -12,17 +13,17 @@ def graph_id_generator():
class TestGetNextId:
def test_empty_string_generator(self, graph_id_generator):
"""Test the graph id generator for an empty string type."""
assert graph_id_generator.get_next_id("") == "1"
assert graph_id_generator.get_next_id("") == "2"
assert graph_id_generator.next_id("") == "1"
assert graph_id_generator.next_id("") == "2"
def test_normal_string_generator(self, graph_id_generator):
""""Test the graph id generator for a normal string type."""
assert graph_id_generator.get_next_id("add") == "add1"
assert graph_id_generator.get_next_id("add") == "add2"
assert graph_id_generator.next_id("add") == "add1"
assert graph_id_generator.next_id("add") == "add2"
def test_different_strings_generator(self, graph_id_generator):
"""Test the graph id generator for different strings."""
assert graph_id_generator.get_next_id("sub") == "sub1"
assert graph_id_generator.get_next_id("mul") == "mul1"
assert graph_id_generator.get_next_id("sub") == "sub2"
assert graph_id_generator.get_next_id("mul") == "mul2"
assert graph_id_generator.next_id("sub") == "sub1"
assert graph_id_generator.next_id("mul") == "mul1"
assert graph_id_generator.next_id("sub") == "sub2"
assert graph_id_generator.next_id("mul") == "mul2"