From f4f732c32014202073b434756ecd12ad35efe6e7 Mon Sep 17 00:00:00 2001 From: Yaming Kuang Date: Thu, 28 Sep 2023 13:22:27 -0700 Subject: [PATCH] add new concurrent test case --- src/tests/test_blk_read_tracker.cpp | 102 +++++++++++++++++++++++++++- 1 file changed, 100 insertions(+), 2 deletions(-) diff --git a/src/tests/test_blk_read_tracker.cpp b/src/tests/test_blk_read_tracker.cpp index c407551af..f7a030ac5 100644 --- a/src/tests/test_blk_read_tracker.cpp +++ b/src/tests/test_blk_read_tracker.cpp @@ -13,7 +13,12 @@ * specific language governing permissions and limitations under the License. * *********************************************************************************/ - +#include +#include +#include +#include +#include +#include #include #include "blkdata_svc/blk_read_tracker.hpp" @@ -23,6 +28,7 @@ using namespace homestore; SISL_LOGGING_INIT(test_blk_read_tracker, iomgr, flip, io_wd) SISL_OPTIONS_ENABLE(logging, test_blk_read_tracker) +VENUM(op_type_t, uint8_t, insert = 0, remove = 1, wait_on = 2, max_op = 3); class BlkReadTrackerTest : public testing::Test { public: virtual void SetUp() override { @@ -33,6 +39,24 @@ class BlkReadTrackerTest : public testing::Test { void init() { m_blk_read_tracker = std::make_unique< BlkReadTracker >(); } std::shared_ptr< BlkReadTracker > get_inst() { return m_blk_read_tracker; } + op_type_t get_rand_op_type() { + return static_cast< op_type_t >(rand() % static_cast< uint8_t >(op_type_t::max_op)); + } + + void gen_random_blkids(std::vector< BlkId >& out_bids, blk_count_t nblks) { + for (auto i = 0ul; i < nblks; ++i) { + out_bids.emplace_back(gen_random_blkid()); + } + } + + BlkId gen_random_blkid() { + static thread_local std::random_device rd; + static thread_local std::default_random_engine re{rd()}; + std::uniform_int_distribution< blk_num_t > blk_num{0, 1000}; + std::uniform_int_distribution< blk_count_t > nblks{0, 64}; + return BlkId{blk_num(re), nblks(re), static_cast< chunk_num_t >(0ul) /* chunk_num */}; + } + private: std::shared_ptr< BlkReadTracker > m_blk_read_tracker; }; @@ -342,7 +366,6 @@ TEST_F(BlkReadTrackerTest, TestInsRmWithWaiterOverlapMultiReads2) { * 1. do insert with a few threads -- (can also be done in massive threads, but not necessary) * 2. do remove in massive threads (must be same amount of inserts); * */ -VENUM(op_type_t, uint8_t, insert = 1, remove = 2, wait_on = 3, no_op = 4); TEST_F(BlkReadTrackerTest, TestThreadedInsertAndRemove) { auto align = 8ul; LOGINFO("Step 1: set entries per record to {}.", align); @@ -452,6 +475,81 @@ TEST_F(BlkReadTrackerTest, TestThreadedInsertWaitonThenRemove) { LOGINFO("Step 5: all bids wait_on cb called."); } +/* + * Purpose of this test: + * 1. Concurrent random insert/remove/wait_on operations in different threads. + * + * */ +TEST_F(BlkReadTrackerTest, TestThreadedInsertRemoveAndWait2) { + auto align = 8ul; + LOGINFO("Step 1: set entries per record to {}.", align); + get_inst()->set_entries_per_record(align); + + std::mutex mtx; + std::list< BlkId > inserted_bids; + + std::atomic< uint32_t > outstanding_wait_bids_cnt = 0ul; + + LOGINFO("Step 2: randome threaded insert/remove/wait_on operation:"); + std::list< std::thread > op_threads; + auto nitr = 0ul; + while (nitr++ < 200ul || inserted_bids.empty() == false) { + std::thread t([this, &outstanding_wait_bids_cnt, &nitr, &inserted_bids, &mtx]() { + auto op = get_rand_op_type(); + if (nitr >= 200) { + // reached maximum iterations, let's do remove only so that we can exit the while loop; + op = op_type_t::remove; + } + + if (op == op_type_t::insert) { + BlkId b = gen_random_blkid(); + get_inst()->insert(b); + { + std::unique_lock lg(mtx); + inserted_bids.push_front(b); + } + } else if (op == op_type_t::remove) { + BlkId rm_b; + { + std::unique_lock lg(mtx); + if (inserted_bids.size() == 0) { + // remove come ahead of insert, nothing to do; + return; + } + rm_b = inserted_bids.back(); + inserted_bids.pop_back(); + } + + get_inst()->remove(rm_b); + } else if (op == op_type_t::wait_on) { + BlkId wait_b; + { + std::unique_lock lg(mtx); + if (inserted_bids.size() == 0) { + // remove come ahead of insert, nothing to do; + return; + } + wait_b = inserted_bids.back(); + } + + outstanding_wait_bids_cnt.fetch_add(1); + get_inst()->wait_on(wait_b, [&outstanding_wait_bids_cnt]() { outstanding_wait_bids_cnt.fetch_sub(1); }); + } + }); + op_threads.push_back(std::move(t)); + } + + for (auto& t : op_threads) { + t.join(); + } + + LOGINFO("Step 3: all threads joined."); + + LOGMSG_ASSERT(outstanding_wait_bids_cnt.load() == 0, "expecting callback to be called for all waited bids!"); + + LOGINFO("Step 4: Test Passed."); +} + SISL_OPTION_GROUP(test_blk_read_tracker, (num_threads, "", "num_threads", "number of threads", ::cxxopts::value< uint32_t >()->default_value("2"), "number"));