Skip to content

Commit

Permalink
FIX btree tests
Browse files Browse the repository at this point in the history
  • Loading branch information
shosseinimotlagh committed Jan 19, 2024
1 parent 77171ce commit 6ce479f
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 88 deletions.
4 changes: 2 additions & 2 deletions src/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ if (${build_nonio_tests})
set(TEST_MEMBTREE_SOURCE_FILES test_mem_btree.cpp)
add_executable(test_mem_btree ${TEST_MEMBTREE_SOURCE_FILES})
target_link_libraries(test_mem_btree ${COMMON_TEST_DEPS} GTest::gtest)
# add_test(NAME MemBtree COMMAND test_mem_btree)
# set_tests_properties(MemBtree PROPERTIES TIMEOUT 180)
add_test(NAME MemBtree COMMAND test_mem_btree)
set_tests_properties(MemBtree PROPERTIES TIMEOUT 600)

add_executable(test_blk_read_tracker)
target_sources(test_blk_read_tracker PRIVATE test_blk_read_tracker.cpp ../lib/blkdata_svc/blk_read_tracker.cpp ../lib/blkalloc/blk.cpp)
Expand Down
122 changes: 79 additions & 43 deletions src/tests/btree_helpers/btree_test_helper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ struct BtreeTestHelper {
if (m_is_multi_threaded) {
std::mutex mtx;
m_run_time = SISL_OPTIONS["run_time"].as< uint32_t >();
iomanager.run_on_wait(iomgr::reactor_regex::all_io, [this, &mtx]() {
iomanager.run_on_wait(iomgr::reactor_regex::all_worker, [this, &mtx]() {
auto fv = iomanager.sync_io_capable_fibers();
std::unique_lock lg(mtx);
m_fibers.insert(m_fibers.end(), fv.begin(), fv.end());
Expand Down Expand Up @@ -84,27 +84,30 @@ struct BtreeTestHelper {

public:
void preload(uint32_t preload_size) {
const auto chunk_size = preload_size / m_fibers.size();
const auto last_chunk_size = preload_size % chunk_size ?: chunk_size;
auto test_count = m_fibers.size();

for (std::size_t i = 0; i < m_fibers.size(); ++i) {
const auto start_range = i * chunk_size;
const auto end_range = start_range + ((i == m_fibers.size() - 1) ? last_chunk_size : chunk_size);
iomanager.run_on_forget(m_fibers[i], [this, start_range, end_range, &test_count]() {
for (uint32_t i = start_range; i < end_range; i++) {
put(i, btree_put_type::INSERT);
}
{
std::unique_lock lg(m_test_done_mtx);
if (--test_count == 0) { m_test_done_cv.notify_one(); }
}
});
}
if (preload_size) {
const auto n_fibers = std::min(preload_size, (uint32_t)m_fibers.size());
const auto chunk_size = preload_size / n_fibers;
const auto last_chunk_size = preload_size % chunk_size ?: chunk_size;
auto test_count = n_fibers;

for (std::size_t i = 0; i < n_fibers; ++i) {
const auto start_range = i * chunk_size;
const auto end_range = start_range + ((i == n_fibers - 1) ? last_chunk_size : chunk_size);
iomanager.run_on_forget(m_fibers[i], [this, start_range, end_range, &test_count]() {
for (uint32_t i = start_range; i < end_range; i++) {
put(i, btree_put_type::INSERT);
}
{
std::unique_lock lg(m_test_done_mtx);
if (--test_count == 0) { m_test_done_cv.notify_one(); }
}
});
}

{
std::unique_lock< std::mutex > lk(m_test_done_mtx);
m_test_done_cv.wait(lk, [&]() { return test_count == 0; });
{
std::unique_lock< std::mutex > lk(m_test_done_mtx);
m_test_done_cv.wait(lk, [&]() { return test_count == 0; });
}
}
LOGINFO("Preload Done");
}
Expand Down Expand Up @@ -157,15 +160,13 @@ struct BtreeTestHelper {
auto pk = std::make_unique< K >(k);

auto rreq = BtreeSingleRemoveRequest{pk.get(), existing_v.get()};
rreq.enable_route_tracing();
bool removed = (m_bt->remove(rreq) == btree_status_t::success);

ASSERT_EQ(removed, m_shadow_map.exists(*pk))
<< "Removal of key " << pk->key() << " status doesn't match with shadow";

if (removed) {
m_shadow_map.validate_data(rreq.key(), (const V&)rreq.value());
m_shadow_map.erase(rreq.key());
}
if (removed) { m_shadow_map.remove_and_check(*pk, *existing_v); }
}

void remove_random() {
Expand Down Expand Up @@ -213,14 +214,17 @@ struct BtreeTestHelper {
auto const expected_count = std::min(remaining, batch_size);

ASSERT_EQ(out_vector.size(), expected_count) << "Received incorrect value on query pagination";
remaining -= expected_count;

if (remaining == 0) {
if (remaining < batch_size) {
ASSERT_EQ(ret, btree_status_t::success) << "Expected success on query";
} else {
} else if (remaining > batch_size) {
ASSERT_EQ(ret, btree_status_t::has_more) << "Expected query to return has_more";
} else if (remaining == batch_size) {
// we don't know, go to the next round
}

remaining -= expected_count;

for (size_t idx{0}; idx < out_vector.size(); ++idx) {
ASSERT_EQ(out_vector[idx].second, it->second)
<< "Range get doesn't return correct data for key=" << it->first << " idx=" << idx;
Expand Down Expand Up @@ -253,7 +257,7 @@ struct BtreeTestHelper {
*copy_key = key;
auto out_v = std::make_unique< V >();
auto req = BtreeSingleGetRequest{copy_key.get(), out_v.get()};

req.enable_route_tracing();
const auto ret = m_bt->get(req);
ASSERT_EQ(ret, btree_status_t::success) << "Missing key " << key << " in btree but present in shadow map";
ASSERT_EQ((const V&)req.value(), value)
Expand All @@ -265,7 +269,7 @@ struct BtreeTestHelper {
auto pk = std::make_unique< K >(k);
auto out_v = std::make_unique< V >();
auto req = BtreeSingleGetRequest{pk.get(), out_v.get()};

req.enable_route_tracing();
const auto status = m_bt->get(req);
if (status == btree_status_t::success) {
m_shadow_map.validate_data(req.key(), (const V&)req.value());
Expand All @@ -279,6 +283,7 @@ struct BtreeTestHelper {
auto out_v = std::make_unique< V >();
auto req =
BtreeGetAnyRequest< K >{BtreeKeyRange< K >{K{start_k}, true, K{end_k}, true}, out_k.get(), out_v.get()};
req.enable_route_tracing();
const auto status = m_bt->get(req);

if (status == btree_status_t::success) {
Expand Down Expand Up @@ -335,6 +340,7 @@ struct BtreeTestHelper {
auto existing_v = std::make_unique< V >();
K key = K{k};
auto sreq = BtreeSinglePutRequest{&key, &value, put_type, existing_v.get()};
sreq.enable_route_tracing();
bool done = (m_bt->put(sreq) == btree_status_t::success);

if (put_type == btree_put_type::INSERT) {
Expand All @@ -351,43 +357,73 @@ struct BtreeTestHelper {
K end_key = K{end_k};

auto rreq = BtreeRangeRemoveRequest< K >{BtreeKeyRange< K >{start_key, true, end_key, true}};
rreq.enable_route_tracing();
auto const ret = m_bt->remove(rreq);

m_shadow_map.range_erase(start_key, end_key);

if (all_existing) {
m_shadow_map.range_erase(start_key, end_key);
ASSERT_EQ((ret == btree_status_t::success), true)
<< "not a successful remove op for range " << start_k << "-" << end_k;
}

if (start_k < m_max_range_input) {
m_shadow_map.remove_keys(start_k, std::min(end_k, uint64_cast(m_max_range_input - 1)));
} else if (start_k < m_max_range_input) {
K end_range{std::min(end_k, uint64_cast(m_max_range_input - 1))};
m_shadow_map.range_erase(start_key, end_range);
}
}

protected:
void run_in_parallel(const std::vector< std::pair< std::string, int > >& op_list) {
auto test_count = m_fibers.size();
for (auto it = m_fibers.begin(); it < m_fibers.end(); ++it) {
iomanager.run_on_forget(*it, [this, &test_count, op_list]() {
const auto total_iters = SISL_OPTIONS["num_iters"].as< uint32_t >();
const auto num_iters_per_thread = total_iters / m_fibers.size();
const auto extra_iters = total_iters % num_iters_per_thread;
LOGINFO("number of fibers {} num_iters_per_thread {} extra_iters {} ", m_fibers.size(), num_iters_per_thread,
extra_iters);

for (uint32_t fiber_id = 0; fiber_id < m_fibers.size(); ++fiber_id) {
auto num_iters_this_fiber = num_iters_per_thread + (fiber_id < extra_iters ? 1 : 0);
iomanager.run_on_forget(m_fibers[fiber_id], [this, fiber_id, &test_count, op_list, num_iters_this_fiber]() {
std::random_device g_rd{};
std::default_random_engine re{g_rd()};
const auto num_iters_per_thread =
sisl::round_up(SISL_OPTIONS["num_iters"].as< uint32_t >() / m_fibers.size(), m_fibers.size());
std::vector< uint32_t > weights;
std::transform(op_list.begin(), op_list.end(), std::back_inserter(weights),
[](const auto& pair) { return pair.second; });

double progress_interval = (double)num_iters_this_fiber / 20; // 5% of the total number of iterations
double progress_thresh = progress_interval; // threshold for progress interval
double elapsed_time, progress_percent, last_progress_time = 0;

// Construct a weighted distribution based on the input frequencies
std::discrete_distribution< uint32_t > s_rand_op_generator(weights.begin(), weights.end());
auto m_start_time = Clock::now();
auto time_to_stop = [this, m_start_time]() {
return (get_elapsed_time_sec(m_start_time) > m_run_time);
};

auto time_to_stop = [this, m_start_time]() {return (get_elapsed_time_sec(m_start_time) > m_run_time);};

for (uint32_t i = 0; i < num_iters_per_thread && !time_to_stop(); i++) {
for (uint32_t i = 0; i < num_iters_this_fiber && !time_to_stop(); i++) {
uint32_t op_idx = s_rand_op_generator(re);
(this->m_operations[op_list[op_idx].first])();
m_num_ops.fetch_add(1);

if (fiber_id == 0) {
elapsed_time = get_elapsed_time_sec(m_start_time);
progress_percent = (double)i / num_iters_this_fiber * 100;

// check progress every 5% of the total number of iterations or every 30 seconds
bool print_time = false;
if (i >= progress_thresh) {
progress_thresh += progress_interval;
print_time = true;
}
if (elapsed_time - last_progress_time > 30) {
last_progress_time = elapsed_time;
print_time = true;
}
if (print_time) {
LOGINFO("Progress: iterations completed ({:.2f}%)- Elapsed time: {:.0f} seconds of total "
"{} - total entries: {}",
progress_percent, elapsed_time, m_run_time, m_shadow_map.size());
}
}
}
{
std::unique_lock lg(m_test_done_mtx);
Expand Down
44 changes: 31 additions & 13 deletions src/tests/btree_helpers/shadow_map.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class ShadowMap {
}

std::pair< K, K > pick_existing_range(const K& start_key, uint32_t max_count) const {
std::shared_lock lock{m_mutex};
std::lock_guard lock{m_mutex};
auto const start_it = m_map.lower_bound(start_key);
auto it = start_it;
uint32_t count = 0;
Expand All @@ -59,12 +59,12 @@ class ShadowMap {
}

bool exists(const K& key) const {
std::shared_lock lock{m_mutex};
std::lock_guard lock{m_mutex};
return m_map.find(key) != m_map.end();
}

bool exists_in_range(const K& key, uint64_t start_k, uint64_t end_k) const {
std::shared_lock lock{m_mutex};
std::lock_guard lock{m_mutex};
const auto itlower = m_map.lower_bound(K{start_k});
const auto itupper = m_map.upper_bound(K{end_k});
auto it = itlower;
Expand All @@ -76,7 +76,7 @@ class ShadowMap {
}

uint64_t size() const {
std::shared_lock lock{m_mutex};
std::lock_guard lock{m_mutex};
return m_map.size();
}

Expand All @@ -87,12 +87,21 @@ class ShadowMap {
}

void validate_data(const K& key, const V& btree_val) const {
std::shared_lock lock{m_mutex};
std::lock_guard lock{m_mutex};
const auto r = m_map.find(key);
ASSERT_NE(r, m_map.end()) << "Key " << key.to_string() << " is not present in shadow map";
ASSERT_EQ(btree_val, r->second) << "Found value in btree doesn't return correct data for key=" << r->first;
}

void remove_and_check(const K& key, const V& btree_val) {
std::lock_guard lock{m_mutex};
const auto r = m_map.find(key);
ASSERT_NE(r, m_map.end()) << "Key " << key.to_string() << " is not present in shadow map";
ASSERT_EQ(btree_val, r->second) << "Found value in btree doesn't return correct data for key=" << r->first;
m_map.erase(key);
m_range_scheduler.remove_key(key.key());
}

void erase(const K& key) {
std::lock_guard lock{m_mutex};
m_map.erase(key);
Expand All @@ -101,7 +110,7 @@ class ShadowMap {

void range_erase(const K& start_key, uint32_t count) {
std::lock_guard lock{m_mutex};
auto const it = m_map.lower_bound(start_key);
auto it = m_map.lower_bound(start_key);
uint32_t i{0};
while ((it != m_map.cend()) && (i++ < count)) {
it = m_map.erase(it);
Expand All @@ -124,25 +133,34 @@ class ShadowMap {
const std::map< K, V >& map_const() const { return m_map; }

void foreach (std::function< void(K, V) > func) const {
std::shared_lock lock{m_mutex};
std::lock_guard lock{m_mutex};
for (const auto& [key, value] : m_map) {
func(key, value);
}
}

std::pair< uint32_t, uint32_t > pick_random_non_existing_keys(uint32_t max_keys) {
std::shared_lock lock{m_mutex};
return m_range_scheduler.pick_random_non_existing_keys(max_keys);
do {
std::lock_guard lock{m_mutex};
auto ret = m_range_scheduler.pick_random_non_existing_keys(max_keys);
if (ret.first != UINT32_MAX) { return ret; }
} while (true);
}

std::pair< uint32_t, uint32_t > pick_random_existing_keys(uint32_t max_keys) {
std::shared_lock lock{m_mutex};
return m_range_scheduler.pick_random_existing_keys(max_keys);
do {
std::lock_guard lock{m_mutex};
auto ret = m_range_scheduler.pick_random_existing_keys(max_keys);
if (ret.first != UINT32_MAX) { return ret; }
} while (true);
}

std::pair< uint32_t, uint32_t > pick_random_non_working_keys(uint32_t max_keys) {
std::shared_lock lock{m_mutex};
return m_range_scheduler.pick_random_non_working_keys(max_keys);
do {
std::lock_guard lock{m_mutex};
auto ret = m_range_scheduler.pick_random_non_working_keys(max_keys);
if (ret.first != UINT32_MAX) { return ret; }
} while (true);
}

void remove_keys_from_working(uint32_t s, uint32_t e) {
Expand Down
2 changes: 1 addition & 1 deletion src/tests/test_common/homestore_test_common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ class HSTestHelper {

LOGINFO("Starting iomgr with {} threads, spdk: {}", num_threads, is_spdk);
ioenvironment.with_iomgr(
iomgr::iomgr_params{.num_threads = num_threads, .is_spdk = is_spdk, .num_fibers = num_fibers});
iomgr::iomgr_params{.num_threads = num_threads, .is_spdk = is_spdk, .num_fibers = 1 + num_fibers});

auto const http_port = SISL_OPTIONS["http_port"].as< int >();
if (http_port != 0) {
Expand Down
Loading

0 comments on commit 6ce479f

Please sign in to comment.