Skip to content

Commit

Permalink
issue #184: threaded test for readblktracker (#190)
Browse files Browse the repository at this point in the history
* threaded test for readblktracker
* add new concurrent test case
  • Loading branch information
yamingk authored Oct 9, 2023
1 parent 169d97e commit 6ca1be3
Show file tree
Hide file tree
Showing 2 changed files with 231 additions and 30 deletions.
13 changes: 8 additions & 5 deletions src/lib/blkdata_svc/blk_read_tracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,21 @@ void BlkReadTracker::merge(const BlkId& blkid, int64_t new_ref_count,
});
} else if (new_ref_count < 0) {
// This is a remove operation
m_pending_reads_map.upsert_or_delete(base_blkid, [new_ref_count](BlkTrackRecord& rec, bool existing) {
HS_DBG_ASSERT_EQ(existing, true, "Decrement a ref count which does not exist in map");
rec.m_ref_cnt += new_ref_count;
return (rec.m_ref_cnt == 0);
});
m_pending_reads_map.upsert_or_delete(
base_blkid, [new_ref_count, &base_blkid](BlkTrackRecord& rec, bool existing) {
HS_DBG_ASSERT_EQ(existing, true, "Decrement a ref count (blk: {}) which does not exist in map",
base_blkid.to_string());
rec.m_ref_cnt += new_ref_count;
return (rec.m_ref_cnt == 0);
});
} else {
// this is wait_on operation
m_pending_reads_map.update(base_blkid, [&waiter_rescheduled, &waiter](BlkTrackRecord& rec) {
rec.m_waiters.push_back(waiter);
waiter_rescheduled = true;
});
}

cur_base_blk_num += entries_per_record();
}

Expand Down
248 changes: 223 additions & 25 deletions src/tests/test_blk_read_tracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@
* specific language governing permissions and limitations under the License.
*
*********************************************************************************/

#include <memory>
#include <mutex>
#include <random>
#include <string>
#include <vector>
#include <list>
#include <gtest/gtest.h>

#include "blkdata_svc/blk_read_tracker.hpp"
Expand All @@ -23,11 +28,35 @@ using namespace homestore;
SISL_LOGGING_INIT(test_blk_read_tracker, iomgr, flip, io_wd)
SISL_OPTIONS_ENABLE(logging, test_blk_read_tracker)

VENUM(op_type_t, uint8_t, insert = 0, remove = 1, wait_on = 2, max_op = 3);
class BlkReadTrackerTest : public testing::Test {
public:
virtual void SetUp() override {
LOGINFO("Step 0: initialize BlkReadTracker instance. ");
init();
}

void init() { m_blk_read_tracker = std::make_unique< BlkReadTracker >(); }
std::shared_ptr< BlkReadTracker > get_inst() { return m_blk_read_tracker; }

op_type_t get_rand_op_type() {
return static_cast< op_type_t >(rand() % static_cast< uint8_t >(op_type_t::max_op));
}

void gen_random_blkids(std::vector< BlkId >& out_bids, blk_count_t nblks) {
for (auto i = 0ul; i < nblks; ++i) {
out_bids.emplace_back(gen_random_blkid());
}
}

BlkId gen_random_blkid() {
static thread_local std::random_device rd;
static thread_local std::default_random_engine re{rd()};
std::uniform_int_distribution< blk_num_t > blk_num{0, 1000};
std::uniform_int_distribution< blk_count_t > nblks{0, 64};
return BlkId{blk_num(re), nblks(re), static_cast< chunk_num_t >(0ul) /* chunk_num */};
}

private:
std::shared_ptr< BlkReadTracker > m_blk_read_tracker;
};
Expand All @@ -37,9 +66,6 @@ class BlkReadTrackerTest : public testing::Test {
* 2. no overlap insert and remove without any waiter,
* */
TEST_F(BlkReadTrackerTest, TestBaiscInsertRemoveWithNoWaiter) {
LOGINFO("Step 0: initialize BlkReadTracker instance. ");
init();

LOGINFO("Step 1: set entries per record to 16");
get_inst()->set_entries_per_record(16);

Expand Down Expand Up @@ -69,9 +95,6 @@ TEST_F(BlkReadTrackerTest, TestBaiscInsertRemoveWithNoWaiter) {
* alignment: 16
* */
TEST_F(BlkReadTrackerTest, TestOverlapInsertThenRemoveWithNoWaiter) {
LOGINFO("Step 0: initialize BlkReadTracker instance. ");
init();

LOGINFO("Step 1: set entries per record to 16");
get_inst()->set_entries_per_record(16);

Expand Down Expand Up @@ -109,8 +132,6 @@ TEST_F(BlkReadTrackerTest, TestOverlapInsertThenRemoveWithNoWaiter) {
* waiter overlap with read, but there is no read completes, waiter's cb should NOT be triggered;
* */
TEST_F(BlkReadTrackerTest, TestInsertWithWaiter) {
LOGINFO("Step 0: initialize BlkReadTracker instance. ");
init();

BlkId b{16, 20, 0};
get_inst()->insert(b);
Expand All @@ -135,9 +156,6 @@ TEST_F(BlkReadTrackerTest, TestInsertWithWaiter) {
* free bid callback should be called after read completes
* */
TEST_F(BlkReadTrackerTest, TestInsRmWithWaiterOnSameBid) {
LOGINFO("Step 0: initialize BlkReadTracker instance. ");
init();

BlkId b{16, 20, 0};
LOGINFO("Step 1: read blkid: {} into hash map.", b.to_string());
get_inst()->insert(b);
Expand Down Expand Up @@ -169,9 +187,6 @@ TEST_F(BlkReadTrackerTest, TestInsRmWithWaiterOnSameBid) {
* free cb1 should be called only after read-1 completes;
* */
TEST_F(BlkReadTrackerTest, TestInsRmeWithWaiterOverlapOneRead) {
LOGINFO("Step 0: initialize BlkReadTracker instance. ");
init();

auto align = 16ul;
LOGINFO("Step 1: set entries per record to {}.", align);
get_inst()->set_entries_per_record(align);
Expand Down Expand Up @@ -217,9 +232,6 @@ TEST_F(BlkReadTrackerTest, TestInsRmeWithWaiterOverlapOneRead) {
* 5. Read-1 completes // <<< free cb should be triggered
* */
TEST_F(BlkReadTrackerTest, TestInsRmWithWaiterOverlapMultiReads0) {
LOGINFO("Step 0: initialize BlkReadTracker instance. ");
init();

auto align = 16ul;
LOGINFO("Step 1: set entries per record to {}.", align);
get_inst()->set_entries_per_record(align);
Expand Down Expand Up @@ -265,9 +277,6 @@ TEST_F(BlkReadTrackerTest, TestInsRmWithWaiterOverlapMultiReads0) {
* 5. Read-2 completes; // free cb should be triggered;
* */
TEST_F(BlkReadTrackerTest, TestInsRmWithWaiterOverlapMultiReads1) {
LOGINFO("Step 0: initialize BlkReadTracker instance. ");
init();

auto align = 8ul;
LOGINFO("Step 1: set entries per record to {}.", align);
get_inst()->set_entries_per_record(align);
Expand Down Expand Up @@ -313,14 +322,11 @@ TEST_F(BlkReadTrackerTest, TestInsRmWithWaiterOverlapMultiReads1) {
* overlapping;
* 4. read-1 completes // callback of free should be called, even though read-2 is not completed yet;
* 5. read-2 completes
* Note: read should never olverap with unfinished free blkid; read-2 is not vialating this rule;
* Note: read should never olverap with unfinished free blkid; read-2 is not violating this rule;
*
* free cb1 should only wait on read-1 to completes, read-2 should not block free;
* */
TEST_F(BlkReadTrackerTest, TestInsRmWithWaiterOverlapMultiReads2) {
LOGINFO("Step 0: initialize BlkReadTracker instance. ");
init();

auto align = 16ul;
LOGINFO("Step 1: set entries per record to {}.", align);
get_inst()->set_entries_per_record(align);
Expand Down Expand Up @@ -352,6 +358,198 @@ TEST_F(BlkReadTrackerTest, TestInsRmWithWaiterOverlapMultiReads2) {
get_inst()->remove(c);
}

//////////////////////////// Multi-thread test cases //////////////////////////////

/*
* Multi-thread Insert and remove, with no free operation;
*
* 1. do insert with a few threads -- (can also be done in massive threads, but not necessary)
* 2. do remove in massive threads (must be same amount of inserts);
* */
TEST_F(BlkReadTrackerTest, TestThreadedInsertAndRemove) {
auto align = 8ul;
LOGINFO("Step 1: set entries per record to {}.", align);
get_inst()->set_entries_per_record(align);

std::vector< BlkId > bids{{10, 8, 0}, {20, 5, 0}, {25, 6, 0}, {43, 16, 0}, {56, 4, 0}, {72, 18, 0}, {122, 4, 0}};

const auto repeat = 100ul;
std::vector< std::thread > op_threads;

for (const auto& b : bids) {
std::thread t([this, &b]() {
for (auto j = 0ul; j < repeat; ++j) {
get_inst()->insert(b);
}
});
op_threads.push_back(std::move(t));
}

LOGINFO("Step 2: threaded insert issued.");

for (auto& t : op_threads) {
t.join();
}

// remove has to wait for insert to complete because if insert thread runs slower than remove, it might assert
// complaining no elements are found in map;

LOGINFO("Step 3: threaded insert joined.");
op_threads.clear();

for (const auto& b : bids) {
for (auto j = 0ul; j < repeat; ++j) {
std::thread t([this, &b]() { get_inst()->remove(b); });
op_threads.push_back(std::move(t));
}
}

LOGINFO("Step 4: threaded remove issued.");
for (auto& t : op_threads) {
t.join();
}

LOGINFO("Step 4: all threads joined.");
}

TEST_F(BlkReadTrackerTest, TestThreadedInsertWaitonThenRemove) {
auto align = 8ul;
LOGINFO("Step 1: set entries per record to {}.", align);
get_inst()->set_entries_per_record(align);

std::vector< BlkId > bids{{12, 6, 0}, {18, 5, 0}, {25, 8, 0}, {36, 16, 0}, {57, 4, 0}, {66, 18, 0}, {92, 14, 0}};
const auto repeat = 100ul;
std::vector< std::thread > op_threads;
for (const auto& b : bids) {
std::thread t([this, &b]() {
for (auto j = 0ul; j < repeat; ++j) {
get_inst()->insert(b);
}
});
op_threads.push_back(std::move(t));
}

LOGINFO("Step 2: threaded insert issued.");

std::vector< bool > called(bids.size(), false);
std::mutex mtx;
for (auto i = 0ul; i < bids.size(); ++i) {
std::thread t([this, &bids, i, &mtx, &called]() {
get_inst()->wait_on(bids[i], [i, &bids, &mtx, &called]() {
std::unique_lock lk(mtx);
LOGMSG_ASSERT(called[i] == false, "not expecting callback to be called more than once!");
called[i] = true;
LOGINFO("wait_on called on blkid: {};", bids[i].to_string());
});
});
op_threads.push_back(std::move(t));
}

// wait for all insert to complete, otherwise it will race with remove thread;
for (auto& t : op_threads) {
t.join();
}

op_threads.clear();

LOGINFO("Step 3: threaded wait_on issued on all bids.");

for (const auto& b : bids) {
for (auto j = 0ul; j < repeat; ++j) {
std::thread t([this, &b]() { get_inst()->remove(b); });
op_threads.push_back(std::move(t));
}
}

LOGINFO("Step 3: threaded remove issued.");
for (auto& t : op_threads) {
t.join();
}

LOGINFO("Step 4: all threads joined.");

for (const auto x : called) {
LOGMSG_ASSERT(x == true, "expecting all waiters to be called");
}

LOGINFO("Step 5: all bids wait_on cb called.");
}

/*
* Purpose of this test:
* 1. Concurrent random insert/remove/wait_on operations in different threads.
*
* */
TEST_F(BlkReadTrackerTest, TestThreadedInsertRemoveAndWait2) {
auto align = 8ul;
LOGINFO("Step 1: set entries per record to {}.", align);
get_inst()->set_entries_per_record(align);

std::mutex mtx;
std::list< BlkId > inserted_bids;

std::atomic< uint32_t > outstanding_wait_bids_cnt = 0ul;

LOGINFO("Step 2: randome threaded insert/remove/wait_on operation:");
std::list< std::thread > op_threads;
auto nitr = 0ul;
while (nitr++ < 200ul || inserted_bids.empty() == false) {
std::thread t([this, &outstanding_wait_bids_cnt, &nitr, &inserted_bids, &mtx]() {
auto op = get_rand_op_type();
if (nitr >= 200) {
// reached maximum iterations, let's do remove only so that we can exit the while loop;
op = op_type_t::remove;
}

if (op == op_type_t::insert) {
BlkId b = gen_random_blkid();
get_inst()->insert(b);
{
std::unique_lock lg(mtx);
inserted_bids.push_front(b);
}
} else if (op == op_type_t::remove) {
BlkId rm_b;
{
std::unique_lock lg(mtx);
if (inserted_bids.size() == 0) {
// remove come ahead of insert, nothing to do;
return;
}
rm_b = inserted_bids.back();
inserted_bids.pop_back();
}

get_inst()->remove(rm_b);
} else if (op == op_type_t::wait_on) {
BlkId wait_b;
{
std::unique_lock lg(mtx);
if (inserted_bids.size() == 0) {
// remove come ahead of insert, nothing to do;
return;
}
wait_b = inserted_bids.back();
}

outstanding_wait_bids_cnt.fetch_add(1);
get_inst()->wait_on(wait_b, [&outstanding_wait_bids_cnt]() { outstanding_wait_bids_cnt.fetch_sub(1); });
}
});
op_threads.push_back(std::move(t));
}

for (auto& t : op_threads) {
t.join();
}

LOGINFO("Step 3: all threads joined.");

LOGMSG_ASSERT(outstanding_wait_bids_cnt.load() == 0, "expecting callback to be called for all waited bids!");

LOGINFO("Step 4: Test Passed.");
}

SISL_OPTION_GROUP(test_blk_read_tracker,
(num_threads, "", "num_threads", "number of threads",
::cxxopts::value< uint32_t >()->default_value("2"), "number"));
Expand Down

0 comments on commit 6ca1be3

Please sign in to comment.