diff --git a/src/lib/homestore_backend/tests/homeobj_fixture.hpp b/src/lib/homestore_backend/tests/homeobj_fixture.hpp index bca4821c..c0f3bb66 100644 --- a/src/lib/homestore_backend/tests/homeobj_fixture.hpp +++ b/src/lib/homestore_backend/tests/homeobj_fixture.hpp @@ -37,17 +37,17 @@ class HomeObjectFixture : public ::testing::Test { void SetUp() override { _obj_inst = std::dynamic_pointer_cast< HSHomeObject >(g_helper->build_new_homeobject()); - g_helper->sync_for_test_start(); + g_helper->sync(); } void TearDown() override { - g_helper->sync_for_cleanup_start(); + g_helper->sync(); _obj_inst.reset(); g_helper->delete_homeobject(); } void restart() { - g_helper->sync_for_cleanup_start(); + g_helper->sync(); trigger_cp(true); _obj_inst.reset(); _obj_inst = std::dynamic_pointer_cast< HSHomeObject >(g_helper->restart()); @@ -82,9 +82,7 @@ class HomeObjectFixture : public ::testing::Test { } ShardInfo create_shard(pg_id_t pg_id, uint64_t size_bytes) { - g_helper->sync_for_uint64_id(); - RELEASE_ASSERT(pg_exist(pg_id), "PG {} does not exist", pg_id); - + g_helper->sync(); // schedule create_shard only on leader run_on_pg_leader(pg_id, [&]() { auto s = _obj_inst->shard_manager()->create_shard(pg_id, size_bytes).get(); @@ -93,13 +91,15 @@ class HomeObjectFixture : public ::testing::Test { g_helper->set_uint64_id(ret.id); }); - // all the members need to wait for shard creation to complete locally + // wait for create_shard finished on leader and shard_id set to the uint64_id in IPC. while (g_helper->get_uint64_id() == INVALID_UINT64_ID) { std::this_thread::sleep_for(std::chrono::milliseconds(1000)); } + // get shard_id from IPC auto shard_id = g_helper->get_uint64_id(); + // all the members need to wait for shard creation to complete locally while (!shard_exist(shard_id)) { // for leader, shard creation is done locally and will nor reach here. but for follower, we need to wait for // shard creation to complete locally @@ -113,7 +113,7 @@ class HomeObjectFixture : public ::testing::Test { ShardInfo seal_shard(shard_id_t shard_id) { // before seal shard, we need to wait all the memebers to complete shard state verification - g_helper->sync_for_verify_start(); + g_helper->sync(); auto r = _obj_inst->shard_manager()->get_shard(shard_id).get(); RELEASE_ASSERT(!!r, "failed to get shard {}", shard_id); auto pg_id = r.value().placement_group; @@ -135,7 +135,7 @@ class HomeObjectFixture : public ::testing::Test { } void put_blob(shard_id_t shard_id, Blob&& blob) { - g_helper->sync_for_uint64_id(); + g_helper->sync(); auto r = _obj_inst->shard_manager()->get_shard(shard_id).get(); RELEASE_ASSERT(!!r, "failed to get shard {}", shard_id); auto pg_id = r.value().placement_group; @@ -157,8 +157,10 @@ class HomeObjectFixture : public ::testing::Test { } } + // TODO:make this run in parallel void put_blobs(std::map< pg_id_t, std::vector< shard_id_t > > const& pg_shard_id_vec, uint64_t const num_blobs_per_shard, std::map< pg_id_t, blob_id_t >& pg_blob_id) { + g_helper->sync(); for (const auto& [pg_id, shard_vec] : pg_shard_id_vec) { // the blob_id of a pg is a continuous number starting from 0 and increasing by 1 blob_id_t current_blob_id{pg_blob_id[pg_id]}; @@ -199,8 +201,10 @@ class HomeObjectFixture : public ::testing::Test { } } + // TODO:make this run in parallel void del_all_blobs(std::map< pg_id_t, std::vector< shard_id_t > > const& pg_shard_id_vec, uint64_t const num_blobs_per_shard, std::map< pg_id_t, blob_id_t >& pg_blob_id) { + g_helper->sync(); for (const auto& [pg_id, shard_vec] : pg_shard_id_vec) { run_on_pg_leader(pg_id, [&]() { blob_id_t current_blob_id{0}; @@ -224,6 +228,7 @@ class HomeObjectFixture : public ::testing::Test { } } + // TODO:make this run in parallel void verify_get_blob(std::map< pg_id_t, std::vector< shard_id_t > > const& pg_shard_id_vec, uint64_t const num_blobs_per_shard, bool const use_random_offset = false) { uint32_t off = 0, len = 0; diff --git a/src/lib/homestore_backend/tests/hs_blob_tests.cpp b/src/lib/homestore_backend/tests/hs_blob_tests.cpp index 0232fd7e..c4685575 100644 --- a/src/lib/homestore_backend/tests/hs_blob_tests.cpp +++ b/src/lib/homestore_backend/tests/hs_blob_tests.cpp @@ -51,10 +51,6 @@ TEST_F(HomeObjectFixture, BasicPutGetDelBlobWRestart) { // Verify the stats after restart verify_obj_count(num_pgs, num_blobs_per_shard, num_shards_per_pg, false /* deleted */); - // we need to sync here. if not then the leader deletion op will bring impact to follower`s verification if the - // follower is very slow. for example, the follower is still verifying blob abd obj_count , but the leader has - // already start deletion. so the follower will fail the verification. - g_helper->sync_for_test_start(); // Put blob after restart to test the persistance of blob sequence number put_blobs(pg_shard_id_vec, num_blobs_per_shard, pg_blob_id); @@ -64,9 +60,6 @@ TEST_F(HomeObjectFixture, BasicPutGetDelBlobWRestart) { // Verify the stats after put blobs after restart verify_obj_count(num_pgs, num_blobs_per_shard * 2, num_shards_per_pg, false /* deleted */); - // we need to sync here, same reason as above.we can not use sync_for_test_start() twice without a new type of sync, - // so use sync_for_verify_start() here - g_helper->sync_for_verify_start(); // Delete all blobs del_all_blobs(pg_shard_id_vec, num_blobs_per_shard, pg_blob_id); diff --git a/src/lib/homestore_backend/tests/hs_repl_test_helper.hpp b/src/lib/homestore_backend/tests/hs_repl_test_helper.hpp index ee25b384..24654e43 100644 --- a/src/lib/homestore_backend/tests/hs_repl_test_helper.hpp +++ b/src/lib/homestore_backend/tests/hs_repl_test_helper.hpp @@ -46,50 +46,21 @@ using namespace homeobject; namespace test_common { -ENUM(repl_test_phase_t, uint32_t, REGISTER, TEST_RUN, VALIDATE, CLEANUP); - class HSReplTestHelper { protected: struct IPCData { - bip::interprocess_mutex mtx_; - bip::interprocess_condition cv_; - bip::interprocess_mutex exec_mtx_; - - repl_test_phase_t phase_{repl_test_phase_t::REGISTER}; - uint32_t test_start_count_{0}; - uint32_t verify_start_count_{0}; - uint32_t cleanup_start_count_{0}; - - // the following variables are used to sync for shard and blob - uint64_t uint64_id_{0}; - uint8_t homeobject_replica_count_{0}; - - // TODO:: now, we can not call the same syc type twice continuously, this will make the second sync ineffective. - // this will bring a little complexity to the UT writter to make sure the sync type is different between - // continuous syncs. will use another PR to remove this limitation and make the sync more flexible. - - void sync_for_test_start(uint32_t num_members = 0) { - sync_for(test_start_count_, repl_test_phase_t::TEST_RUN, num_members); - } - void sync_for_verify_start(uint32_t num_members = 0) { - sync_for(verify_start_count_, repl_test_phase_t::VALIDATE, num_members); - } - void sync_for_cleanup_start(uint32_t num_members = 0) { - sync_for(cleanup_start_count_, repl_test_phase_t::CLEANUP, num_members); - } - - // this is used for sync blob_id or shard_id among different replicas - void sync_for_uint64_id(uint32_t max_count = 0) { - if (max_count == 0) { max_count = SISL_OPTIONS["replicas"].as< uint32_t >(); } + void sync(uint64_t sync_point, uint32_t max_count = 0) { + if (max_count == 0) { max_count = SISL_OPTIONS["replicas"].as< uint8_t >(); } std::unique_lock< bip::interprocess_mutex > lg(mtx_); ++homeobject_replica_count_; if (homeobject_replica_count_ == max_count) { + sync_point_num_ = sync_point; uint64_id_ = INVALID_UINT64_ID; - homeobject_replica_count_ = 0; cv_.notify_all(); } else { - cv_.wait(lg, [this]() { return uint64_id_ == INVALID_UINT64_ID; }); + cv_.wait(lg, [this, sync_point]() { return sync_point_num_ == sync_point; }); } + homeobject_replica_count_ = 0; } void set_uint64_id(uint64_t input_uint64_id) { @@ -103,19 +74,15 @@ class HSReplTestHelper { } private: - void sync_for(uint32_t& count, repl_test_phase_t new_phase, uint32_t max_count = 0) { - if (max_count == 0) { max_count = SISL_OPTIONS["replicas"].as< uint32_t >(); } - std::unique_lock< bip::interprocess_mutex > lg(mtx_); - ++count; - if (count == max_count) { - phase_ = new_phase; - cv_.notify_all(); - } else { - cv_.wait(lg, [this, new_phase]() { return (phase_ == new_phase); }); - } + bip::interprocess_mutex mtx_; + bip::interprocess_condition cv_; + uint8_t homeobject_replica_count_{0}; - count = 0; - } + // the following variables are used to share shard_id and blob_id among different replicas + uint64_t uint64_id_{0}; + + // the nth synchronization point, thas is how many times different replicas have synced + uint64_t sync_point_num_{UINT64_MAX}; }; public: @@ -265,7 +232,7 @@ class HSReplTestHelper { return homeobj_; } - uint16_t replica_num() const { return replica_num_; } + uint8_t replica_num() const { return replica_num_; } peer_id_t my_replica_id() const { return my_replica_id_; } @@ -288,11 +255,7 @@ class HSReplTestHelper { void teardown() { sisl::GrpcAsyncClientWorker::shutdown_all(); } - void sync_for_test_start(uint32_t num_members = 0) { ipc_data_->sync_for_test_start(num_members); } - void sync_for_verify_start(uint32_t num_members = 0) { ipc_data_->sync_for_verify_start(num_members); } - void sync_for_cleanup_start(uint32_t num_members = 0) { ipc_data_->sync_for_cleanup_start(num_members); } - - void sync_for_uint64_id(uint32_t num_members = 0) { ipc_data_->sync_for_uint64_id(num_members); } + void sync(uint32_t num_members = 0) { ipc_data_->sync(sync_point_num++, num_members); } void set_uint64_id(uint64_t uint64_id) { ipc_data_->set_uint64_id(uint64_id); } uint64_t get_uint64_id() { return ipc_data_->get_uint64_id(); } @@ -365,7 +328,8 @@ class HSReplTestHelper { } private: - uint16_t replica_num_; + uint8_t replica_num_; + uint64_t sync_point_num{0}; std::string name_; std::vector< std::string > args_; char** argv_; diff --git a/src/lib/homestore_backend/tests/test_homestore_backend.cpp b/src/lib/homestore_backend/tests/test_homestore_backend.cpp index 0eb47da1..73269875 100644 --- a/src/lib/homestore_backend/tests/test_homestore_backend.cpp +++ b/src/lib/homestore_backend/tests/test_homestore_backend.cpp @@ -26,8 +26,7 @@ SISL_OPTION_GROUP( "true or false"), (init_device, "", "init_device", "init real device", ::cxxopts::value< bool >()->default_value("false"), "true or false"), - (replicas, "", "replicas", "Total number of replicas", ::cxxopts::value< uint32_t >()->default_value("3"), - "number"), + (replicas, "", "replicas", "Total number of replicas", ::cxxopts::value< uint8_t >()->default_value("3"), "number"), (spare_replicas, "", "spare_replicas", "Additional number of spare replicas not part of repldev", ::cxxopts::value< uint32_t >()->default_value("1"), "number"), (base_port, "", "base_port", "Port number of first replica", ::cxxopts::value< uint16_t >()->default_value("4000"), @@ -60,7 +59,7 @@ int main(int argc, char* argv[]) { SISL_OPTIONS_LOAD(parsed_argc, argv, test_options); g_helper = std::make_unique< test_common::HSReplTestHelper >("test_homeobject", args, orig_argv); - g_helper->setup(SISL_OPTIONS["replicas"].as< uint32_t >()); + g_helper->setup(SISL_OPTIONS["replicas"].as< uint8_t >()); auto ret = RUN_ALL_TESTS(); g_helper->teardown(); return ret;