diff --git a/src/lib/blkdata_svc/blk_read_tracker.cpp b/src/lib/blkdata_svc/blk_read_tracker.cpp index 6111689c1..7dbe84fde 100644 --- a/src/lib/blkdata_svc/blk_read_tracker.cpp +++ b/src/lib/blkdata_svc/blk_read_tracker.cpp @@ -47,11 +47,13 @@ void BlkReadTracker::merge(const BlkId& blkid, int64_t new_ref_count, }); } else if (new_ref_count < 0) { // This is a remove operation - m_pending_reads_map.upsert_or_delete(base_blkid, [new_ref_count](BlkTrackRecord& rec, bool existing) { - HS_DBG_ASSERT_EQ(existing, true, "Decrement a ref count which does not exist in map"); - rec.m_ref_cnt += new_ref_count; - return (rec.m_ref_cnt == 0); - }); + m_pending_reads_map.upsert_or_delete( + base_blkid, [new_ref_count, &base_blkid](BlkTrackRecord& rec, bool existing) { + HS_DBG_ASSERT_EQ(existing, true, "Decrement a ref count (blk: {}) which does not exist in map", + base_blkid.to_string()); + rec.m_ref_cnt += new_ref_count; + return (rec.m_ref_cnt == 0); + }); } else { // this is wait_on operation m_pending_reads_map.update(base_blkid, [&waiter_rescheduled, &waiter](BlkTrackRecord& rec) { @@ -59,6 +61,7 @@ void BlkReadTracker::merge(const BlkId& blkid, int64_t new_ref_count, waiter_rescheduled = true; }); } + cur_base_blk_num += entries_per_record(); } diff --git a/src/tests/test_blk_read_tracker.cpp b/src/tests/test_blk_read_tracker.cpp index 26ba98007..f7a030ac5 100644 --- a/src/tests/test_blk_read_tracker.cpp +++ b/src/tests/test_blk_read_tracker.cpp @@ -13,7 +13,12 @@ * specific language governing permissions and limitations under the License. * *********************************************************************************/ - +#include +#include +#include +#include +#include +#include #include #include "blkdata_svc/blk_read_tracker.hpp" @@ -23,11 +28,35 @@ using namespace homestore; SISL_LOGGING_INIT(test_blk_read_tracker, iomgr, flip, io_wd) SISL_OPTIONS_ENABLE(logging, test_blk_read_tracker) +VENUM(op_type_t, uint8_t, insert = 0, remove = 1, wait_on = 2, max_op = 3); class BlkReadTrackerTest : public testing::Test { public: + virtual void SetUp() override { + LOGINFO("Step 0: initialize BlkReadTracker instance. "); + init(); + } + void init() { m_blk_read_tracker = std::make_unique< BlkReadTracker >(); } std::shared_ptr< BlkReadTracker > get_inst() { return m_blk_read_tracker; } + op_type_t get_rand_op_type() { + return static_cast< op_type_t >(rand() % static_cast< uint8_t >(op_type_t::max_op)); + } + + void gen_random_blkids(std::vector< BlkId >& out_bids, blk_count_t nblks) { + for (auto i = 0ul; i < nblks; ++i) { + out_bids.emplace_back(gen_random_blkid()); + } + } + + BlkId gen_random_blkid() { + static thread_local std::random_device rd; + static thread_local std::default_random_engine re{rd()}; + std::uniform_int_distribution< blk_num_t > blk_num{0, 1000}; + std::uniform_int_distribution< blk_count_t > nblks{0, 64}; + return BlkId{blk_num(re), nblks(re), static_cast< chunk_num_t >(0ul) /* chunk_num */}; + } + private: std::shared_ptr< BlkReadTracker > m_blk_read_tracker; }; @@ -37,9 +66,6 @@ class BlkReadTrackerTest : public testing::Test { * 2. no overlap insert and remove without any waiter, * */ TEST_F(BlkReadTrackerTest, TestBaiscInsertRemoveWithNoWaiter) { - LOGINFO("Step 0: initialize BlkReadTracker instance. "); - init(); - LOGINFO("Step 1: set entries per record to 16"); get_inst()->set_entries_per_record(16); @@ -69,9 +95,6 @@ TEST_F(BlkReadTrackerTest, TestBaiscInsertRemoveWithNoWaiter) { * alignment: 16 * */ TEST_F(BlkReadTrackerTest, TestOverlapInsertThenRemoveWithNoWaiter) { - LOGINFO("Step 0: initialize BlkReadTracker instance. "); - init(); - LOGINFO("Step 1: set entries per record to 16"); get_inst()->set_entries_per_record(16); @@ -109,8 +132,6 @@ TEST_F(BlkReadTrackerTest, TestOverlapInsertThenRemoveWithNoWaiter) { * waiter overlap with read, but there is no read completes, waiter's cb should NOT be triggered; * */ TEST_F(BlkReadTrackerTest, TestInsertWithWaiter) { - LOGINFO("Step 0: initialize BlkReadTracker instance. "); - init(); BlkId b{16, 20, 0}; get_inst()->insert(b); @@ -135,9 +156,6 @@ TEST_F(BlkReadTrackerTest, TestInsertWithWaiter) { * free bid callback should be called after read completes * */ TEST_F(BlkReadTrackerTest, TestInsRmWithWaiterOnSameBid) { - LOGINFO("Step 0: initialize BlkReadTracker instance. "); - init(); - BlkId b{16, 20, 0}; LOGINFO("Step 1: read blkid: {} into hash map.", b.to_string()); get_inst()->insert(b); @@ -169,9 +187,6 @@ TEST_F(BlkReadTrackerTest, TestInsRmWithWaiterOnSameBid) { * free cb1 should be called only after read-1 completes; * */ TEST_F(BlkReadTrackerTest, TestInsRmeWithWaiterOverlapOneRead) { - LOGINFO("Step 0: initialize BlkReadTracker instance. "); - init(); - auto align = 16ul; LOGINFO("Step 1: set entries per record to {}.", align); get_inst()->set_entries_per_record(align); @@ -217,9 +232,6 @@ TEST_F(BlkReadTrackerTest, TestInsRmeWithWaiterOverlapOneRead) { * 5. Read-1 completes // <<< free cb should be triggered * */ TEST_F(BlkReadTrackerTest, TestInsRmWithWaiterOverlapMultiReads0) { - LOGINFO("Step 0: initialize BlkReadTracker instance. "); - init(); - auto align = 16ul; LOGINFO("Step 1: set entries per record to {}.", align); get_inst()->set_entries_per_record(align); @@ -265,9 +277,6 @@ TEST_F(BlkReadTrackerTest, TestInsRmWithWaiterOverlapMultiReads0) { * 5. Read-2 completes; // free cb should be triggered; * */ TEST_F(BlkReadTrackerTest, TestInsRmWithWaiterOverlapMultiReads1) { - LOGINFO("Step 0: initialize BlkReadTracker instance. "); - init(); - auto align = 8ul; LOGINFO("Step 1: set entries per record to {}.", align); get_inst()->set_entries_per_record(align); @@ -313,14 +322,11 @@ TEST_F(BlkReadTrackerTest, TestInsRmWithWaiterOverlapMultiReads1) { * overlapping; * 4. read-1 completes // callback of free should be called, even though read-2 is not completed yet; * 5. read-2 completes - * Note: read should never olverap with unfinished free blkid; read-2 is not vialating this rule; + * Note: read should never olverap with unfinished free blkid; read-2 is not violating this rule; * * free cb1 should only wait on read-1 to completes, read-2 should not block free; * */ TEST_F(BlkReadTrackerTest, TestInsRmWithWaiterOverlapMultiReads2) { - LOGINFO("Step 0: initialize BlkReadTracker instance. "); - init(); - auto align = 16ul; LOGINFO("Step 1: set entries per record to {}.", align); get_inst()->set_entries_per_record(align); @@ -352,6 +358,198 @@ TEST_F(BlkReadTrackerTest, TestInsRmWithWaiterOverlapMultiReads2) { get_inst()->remove(c); } +//////////////////////////// Multi-thread test cases ////////////////////////////// + +/* + * Multi-thread Insert and remove, with no free operation; + * + * 1. do insert with a few threads -- (can also be done in massive threads, but not necessary) + * 2. do remove in massive threads (must be same amount of inserts); + * */ +TEST_F(BlkReadTrackerTest, TestThreadedInsertAndRemove) { + auto align = 8ul; + LOGINFO("Step 1: set entries per record to {}.", align); + get_inst()->set_entries_per_record(align); + + std::vector< BlkId > bids{{10, 8, 0}, {20, 5, 0}, {25, 6, 0}, {43, 16, 0}, {56, 4, 0}, {72, 18, 0}, {122, 4, 0}}; + + const auto repeat = 100ul; + std::vector< std::thread > op_threads; + + for (const auto& b : bids) { + std::thread t([this, &b]() { + for (auto j = 0ul; j < repeat; ++j) { + get_inst()->insert(b); + } + }); + op_threads.push_back(std::move(t)); + } + + LOGINFO("Step 2: threaded insert issued."); + + for (auto& t : op_threads) { + t.join(); + } + + // remove has to wait for insert to complete because if insert thread runs slower than remove, it might assert + // complaining no elements are found in map; + + LOGINFO("Step 3: threaded insert joined."); + op_threads.clear(); + + for (const auto& b : bids) { + for (auto j = 0ul; j < repeat; ++j) { + std::thread t([this, &b]() { get_inst()->remove(b); }); + op_threads.push_back(std::move(t)); + } + } + + LOGINFO("Step 4: threaded remove issued."); + for (auto& t : op_threads) { + t.join(); + } + + LOGINFO("Step 4: all threads joined."); +} + +TEST_F(BlkReadTrackerTest, TestThreadedInsertWaitonThenRemove) { + auto align = 8ul; + LOGINFO("Step 1: set entries per record to {}.", align); + get_inst()->set_entries_per_record(align); + + std::vector< BlkId > bids{{12, 6, 0}, {18, 5, 0}, {25, 8, 0}, {36, 16, 0}, {57, 4, 0}, {66, 18, 0}, {92, 14, 0}}; + const auto repeat = 100ul; + std::vector< std::thread > op_threads; + for (const auto& b : bids) { + std::thread t([this, &b]() { + for (auto j = 0ul; j < repeat; ++j) { + get_inst()->insert(b); + } + }); + op_threads.push_back(std::move(t)); + } + + LOGINFO("Step 2: threaded insert issued."); + + std::vector< bool > called(bids.size(), false); + std::mutex mtx; + for (auto i = 0ul; i < bids.size(); ++i) { + std::thread t([this, &bids, i, &mtx, &called]() { + get_inst()->wait_on(bids[i], [i, &bids, &mtx, &called]() { + std::unique_lock lk(mtx); + LOGMSG_ASSERT(called[i] == false, "not expecting callback to be called more than once!"); + called[i] = true; + LOGINFO("wait_on called on blkid: {};", bids[i].to_string()); + }); + }); + op_threads.push_back(std::move(t)); + } + + // wait for all insert to complete, otherwise it will race with remove thread; + for (auto& t : op_threads) { + t.join(); + } + + op_threads.clear(); + + LOGINFO("Step 3: threaded wait_on issued on all bids."); + + for (const auto& b : bids) { + for (auto j = 0ul; j < repeat; ++j) { + std::thread t([this, &b]() { get_inst()->remove(b); }); + op_threads.push_back(std::move(t)); + } + } + + LOGINFO("Step 3: threaded remove issued."); + for (auto& t : op_threads) { + t.join(); + } + + LOGINFO("Step 4: all threads joined."); + + for (const auto x : called) { + LOGMSG_ASSERT(x == true, "expecting all waiters to be called"); + } + + LOGINFO("Step 5: all bids wait_on cb called."); +} + +/* + * Purpose of this test: + * 1. Concurrent random insert/remove/wait_on operations in different threads. + * + * */ +TEST_F(BlkReadTrackerTest, TestThreadedInsertRemoveAndWait2) { + auto align = 8ul; + LOGINFO("Step 1: set entries per record to {}.", align); + get_inst()->set_entries_per_record(align); + + std::mutex mtx; + std::list< BlkId > inserted_bids; + + std::atomic< uint32_t > outstanding_wait_bids_cnt = 0ul; + + LOGINFO("Step 2: randome threaded insert/remove/wait_on operation:"); + std::list< std::thread > op_threads; + auto nitr = 0ul; + while (nitr++ < 200ul || inserted_bids.empty() == false) { + std::thread t([this, &outstanding_wait_bids_cnt, &nitr, &inserted_bids, &mtx]() { + auto op = get_rand_op_type(); + if (nitr >= 200) { + // reached maximum iterations, let's do remove only so that we can exit the while loop; + op = op_type_t::remove; + } + + if (op == op_type_t::insert) { + BlkId b = gen_random_blkid(); + get_inst()->insert(b); + { + std::unique_lock lg(mtx); + inserted_bids.push_front(b); + } + } else if (op == op_type_t::remove) { + BlkId rm_b; + { + std::unique_lock lg(mtx); + if (inserted_bids.size() == 0) { + // remove come ahead of insert, nothing to do; + return; + } + rm_b = inserted_bids.back(); + inserted_bids.pop_back(); + } + + get_inst()->remove(rm_b); + } else if (op == op_type_t::wait_on) { + BlkId wait_b; + { + std::unique_lock lg(mtx); + if (inserted_bids.size() == 0) { + // remove come ahead of insert, nothing to do; + return; + } + wait_b = inserted_bids.back(); + } + + outstanding_wait_bids_cnt.fetch_add(1); + get_inst()->wait_on(wait_b, [&outstanding_wait_bids_cnt]() { outstanding_wait_bids_cnt.fetch_sub(1); }); + } + }); + op_threads.push_back(std::move(t)); + } + + for (auto& t : op_threads) { + t.join(); + } + + LOGINFO("Step 3: all threads joined."); + + LOGMSG_ASSERT(outstanding_wait_bids_cnt.load() == 0, "expecting callback to be called for all waited bids!"); + + LOGINFO("Step 4: Test Passed."); +} + SISL_OPTION_GROUP(test_blk_read_tracker, (num_threads, "", "num_threads", "number of threads", ::cxxopts::value< uint32_t >()->default_value("2"), "number"));