Skip to content

Commit

Permalink
add new concurrent test case
Browse files Browse the repository at this point in the history
  • Loading branch information
yamingk committed Sep 28, 2023
1 parent debcf24 commit f4f732c
Showing 1 changed file with 100 additions and 2 deletions.
102 changes: 100 additions & 2 deletions src/tests/test_blk_read_tracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@
* specific language governing permissions and limitations under the License.
*
*********************************************************************************/

#include <memory>
#include <mutex>
#include <random>
#include <string>
#include <vector>
#include <list>
#include <gtest/gtest.h>

#include "blkdata_svc/blk_read_tracker.hpp"
Expand All @@ -23,6 +28,7 @@ using namespace homestore;
SISL_LOGGING_INIT(test_blk_read_tracker, iomgr, flip, io_wd)
SISL_OPTIONS_ENABLE(logging, test_blk_read_tracker)

VENUM(op_type_t, uint8_t, insert = 0, remove = 1, wait_on = 2, max_op = 3);
class BlkReadTrackerTest : public testing::Test {
public:
virtual void SetUp() override {
Expand All @@ -33,6 +39,24 @@ class BlkReadTrackerTest : public testing::Test {
void init() { m_blk_read_tracker = std::make_unique< BlkReadTracker >(); }
std::shared_ptr< BlkReadTracker > get_inst() { return m_blk_read_tracker; }

op_type_t get_rand_op_type() {
return static_cast< op_type_t >(rand() % static_cast< uint8_t >(op_type_t::max_op));
}

void gen_random_blkids(std::vector< BlkId >& out_bids, blk_count_t nblks) {
for (auto i = 0ul; i < nblks; ++i) {
out_bids.emplace_back(gen_random_blkid());
}
}

BlkId gen_random_blkid() {
static thread_local std::random_device rd;
static thread_local std::default_random_engine re{rd()};
std::uniform_int_distribution< blk_num_t > blk_num{0, 1000};
std::uniform_int_distribution< blk_count_t > nblks{0, 64};
return BlkId{blk_num(re), nblks(re), static_cast< chunk_num_t >(0ul) /* chunk_num */};
}

private:
std::shared_ptr< BlkReadTracker > m_blk_read_tracker;
};
Expand Down Expand Up @@ -342,7 +366,6 @@ TEST_F(BlkReadTrackerTest, TestInsRmWithWaiterOverlapMultiReads2) {
* 1. do insert with a few threads -- (can also be done in massive threads, but not necessary)
* 2. do remove in massive threads (must be same amount of inserts);
* */
VENUM(op_type_t, uint8_t, insert = 1, remove = 2, wait_on = 3, no_op = 4);
TEST_F(BlkReadTrackerTest, TestThreadedInsertAndRemove) {
auto align = 8ul;
LOGINFO("Step 1: set entries per record to {}.", align);
Expand Down Expand Up @@ -452,6 +475,81 @@ TEST_F(BlkReadTrackerTest, TestThreadedInsertWaitonThenRemove) {
LOGINFO("Step 5: all bids wait_on cb called.");
}

/*
* Purpose of this test:
* 1. Concurrent random insert/remove/wait_on operations in different threads.
*
* */
TEST_F(BlkReadTrackerTest, TestThreadedInsertRemoveAndWait2) {
auto align = 8ul;
LOGINFO("Step 1: set entries per record to {}.", align);
get_inst()->set_entries_per_record(align);

std::mutex mtx;
std::list< BlkId > inserted_bids;

std::atomic< uint32_t > outstanding_wait_bids_cnt = 0ul;

LOGINFO("Step 2: randome threaded insert/remove/wait_on operation:");
std::list< std::thread > op_threads;
auto nitr = 0ul;
while (nitr++ < 200ul || inserted_bids.empty() == false) {
std::thread t([this, &outstanding_wait_bids_cnt, &nitr, &inserted_bids, &mtx]() {
auto op = get_rand_op_type();
if (nitr >= 200) {
// reached maximum iterations, let's do remove only so that we can exit the while loop;
op = op_type_t::remove;
}

if (op == op_type_t::insert) {
BlkId b = gen_random_blkid();
get_inst()->insert(b);
{
std::unique_lock lg(mtx);
inserted_bids.push_front(b);
}
} else if (op == op_type_t::remove) {
BlkId rm_b;
{
std::unique_lock lg(mtx);
if (inserted_bids.size() == 0) {
// remove come ahead of insert, nothing to do;
return;
}
rm_b = inserted_bids.back();
inserted_bids.pop_back();
}

get_inst()->remove(rm_b);
} else if (op == op_type_t::wait_on) {
BlkId wait_b;
{
std::unique_lock lg(mtx);
if (inserted_bids.size() == 0) {
// remove come ahead of insert, nothing to do;
return;
}
wait_b = inserted_bids.back();
}

outstanding_wait_bids_cnt.fetch_add(1);
get_inst()->wait_on(wait_b, [&outstanding_wait_bids_cnt]() { outstanding_wait_bids_cnt.fetch_sub(1); });
}
});
op_threads.push_back(std::move(t));
}

for (auto& t : op_threads) {
t.join();
}

LOGINFO("Step 3: all threads joined.");

LOGMSG_ASSERT(outstanding_wait_bids_cnt.load() == 0, "expecting callback to be called for all waited bids!");

LOGINFO("Step 4: Test Passed.");
}

SISL_OPTION_GROUP(test_blk_read_tracker,
(num_threads, "", "num_threads", "number of threads",
::cxxopts::value< uint32_t >()->default_value("2"), "number"));
Expand Down

0 comments on commit f4f732c

Please sign in to comment.