Skip to content

Commit

Permalink
refine
Browse files Browse the repository at this point in the history
  • Loading branch information
JacksonYao287 committed Oct 28, 2024
1 parent 6656058 commit 1bd815e
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 72 deletions.
23 changes: 14 additions & 9 deletions src/lib/homestore_backend/tests/homeobj_fixture.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,17 @@ class HomeObjectFixture : public ::testing::Test {

void SetUp() override {
_obj_inst = std::dynamic_pointer_cast< HSHomeObject >(g_helper->build_new_homeobject());
g_helper->sync_for_test_start();
g_helper->sync();
}

void TearDown() override {
g_helper->sync_for_cleanup_start();
g_helper->sync();
_obj_inst.reset();
g_helper->delete_homeobject();
}

void restart() {
g_helper->sync_for_cleanup_start();
g_helper->sync();
trigger_cp(true);
_obj_inst.reset();
_obj_inst = std::dynamic_pointer_cast< HSHomeObject >(g_helper->restart());
Expand Down Expand Up @@ -82,9 +82,7 @@ class HomeObjectFixture : public ::testing::Test {
}

ShardInfo create_shard(pg_id_t pg_id, uint64_t size_bytes) {
g_helper->sync_for_uint64_id();
RELEASE_ASSERT(pg_exist(pg_id), "PG {} does not exist", pg_id);

g_helper->sync();
// schedule create_shard only on leader
run_on_pg_leader(pg_id, [&]() {
auto s = _obj_inst->shard_manager()->create_shard(pg_id, size_bytes).get();
Expand All @@ -93,13 +91,15 @@ class HomeObjectFixture : public ::testing::Test {
g_helper->set_uint64_id(ret.id);
});

// all the members need to wait for shard creation to complete locally
// wait for create_shard finished on leader and shard_id set to the uint64_id in IPC.
while (g_helper->get_uint64_id() == INVALID_UINT64_ID) {
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}

// get shard_id from IPC
auto shard_id = g_helper->get_uint64_id();

// all the members need to wait for shard creation to complete locally
while (!shard_exist(shard_id)) {
// for leader, shard creation is done locally and will nor reach here. but for follower, we need to wait for
// shard creation to complete locally
Expand All @@ -113,7 +113,7 @@ class HomeObjectFixture : public ::testing::Test {

ShardInfo seal_shard(shard_id_t shard_id) {
// before seal shard, we need to wait all the memebers to complete shard state verification
g_helper->sync_for_verify_start();
g_helper->sync();
auto r = _obj_inst->shard_manager()->get_shard(shard_id).get();
RELEASE_ASSERT(!!r, "failed to get shard {}", shard_id);
auto pg_id = r.value().placement_group;
Expand All @@ -135,7 +135,7 @@ class HomeObjectFixture : public ::testing::Test {
}

void put_blob(shard_id_t shard_id, Blob&& blob) {
g_helper->sync_for_uint64_id();
g_helper->sync();
auto r = _obj_inst->shard_manager()->get_shard(shard_id).get();
RELEASE_ASSERT(!!r, "failed to get shard {}", shard_id);
auto pg_id = r.value().placement_group;
Expand All @@ -157,8 +157,10 @@ class HomeObjectFixture : public ::testing::Test {
}
}

// TODO:make this run in parallel
void put_blobs(std::map< pg_id_t, std::vector< shard_id_t > > const& pg_shard_id_vec,
uint64_t const num_blobs_per_shard, std::map< pg_id_t, blob_id_t >& pg_blob_id) {
g_helper->sync();
for (const auto& [pg_id, shard_vec] : pg_shard_id_vec) {
// the blob_id of a pg is a continuous number starting from 0 and increasing by 1
blob_id_t current_blob_id{pg_blob_id[pg_id]};
Expand Down Expand Up @@ -199,8 +201,10 @@ class HomeObjectFixture : public ::testing::Test {
}
}

// TODO:make this run in parallel
void del_all_blobs(std::map< pg_id_t, std::vector< shard_id_t > > const& pg_shard_id_vec,
uint64_t const num_blobs_per_shard, std::map< pg_id_t, blob_id_t >& pg_blob_id) {
g_helper->sync();
for (const auto& [pg_id, shard_vec] : pg_shard_id_vec) {
run_on_pg_leader(pg_id, [&]() {
blob_id_t current_blob_id{0};
Expand All @@ -224,6 +228,7 @@ class HomeObjectFixture : public ::testing::Test {
}
}

// TODO:make this run in parallel
void verify_get_blob(std::map< pg_id_t, std::vector< shard_id_t > > const& pg_shard_id_vec,
uint64_t const num_blobs_per_shard, bool const use_random_offset = false) {
uint32_t off = 0, len = 0;
Expand Down
7 changes: 0 additions & 7 deletions src/lib/homestore_backend/tests/hs_blob_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,6 @@ TEST_F(HomeObjectFixture, BasicPutGetDelBlobWRestart) {
// Verify the stats after restart
verify_obj_count(num_pgs, num_blobs_per_shard, num_shards_per_pg, false /* deleted */);

// we need to sync here. if not then the leader deletion op will bring impact to follower`s verification if the
// follower is very slow. for example, the follower is still verifying blob abd obj_count , but the leader has
// already start deletion. so the follower will fail the verification.
g_helper->sync_for_test_start();
// Put blob after restart to test the persistance of blob sequence number
put_blobs(pg_shard_id_vec, num_blobs_per_shard, pg_blob_id);

Expand All @@ -64,9 +60,6 @@ TEST_F(HomeObjectFixture, BasicPutGetDelBlobWRestart) {
// Verify the stats after put blobs after restart
verify_obj_count(num_pgs, num_blobs_per_shard * 2, num_shards_per_pg, false /* deleted */);

// we need to sync here, same reason as above.we can not use sync_for_test_start() twice without a new type of sync,
// so use sync_for_verify_start() here
g_helper->sync_for_verify_start();
// Delete all blobs
del_all_blobs(pg_shard_id_vec, num_blobs_per_shard, pg_blob_id);

Expand Down
70 changes: 17 additions & 53 deletions src/lib/homestore_backend/tests/hs_repl_test_helper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,50 +46,21 @@ using namespace homeobject;

namespace test_common {

ENUM(repl_test_phase_t, uint32_t, REGISTER, TEST_RUN, VALIDATE, CLEANUP);

class HSReplTestHelper {
protected:
struct IPCData {
bip::interprocess_mutex mtx_;
bip::interprocess_condition cv_;
bip::interprocess_mutex exec_mtx_;

repl_test_phase_t phase_{repl_test_phase_t::REGISTER};
uint32_t test_start_count_{0};
uint32_t verify_start_count_{0};
uint32_t cleanup_start_count_{0};

// the following variables are used to sync for shard and blob
uint64_t uint64_id_{0};
uint8_t homeobject_replica_count_{0};

// TODO:: now, we can not call the same syc type twice continuously, this will make the second sync ineffective.
// this will bring a little complexity to the UT writter to make sure the sync type is different between
// continuous syncs. will use another PR to remove this limitation and make the sync more flexible.

void sync_for_test_start(uint32_t num_members = 0) {
sync_for(test_start_count_, repl_test_phase_t::TEST_RUN, num_members);
}
void sync_for_verify_start(uint32_t num_members = 0) {
sync_for(verify_start_count_, repl_test_phase_t::VALIDATE, num_members);
}
void sync_for_cleanup_start(uint32_t num_members = 0) {
sync_for(cleanup_start_count_, repl_test_phase_t::CLEANUP, num_members);
}

// this is used for sync blob_id or shard_id among different replicas
void sync_for_uint64_id(uint32_t max_count = 0) {
if (max_count == 0) { max_count = SISL_OPTIONS["replicas"].as< uint32_t >(); }
void sync(uint64_t sync_point, uint32_t max_count = 0) {
if (max_count == 0) { max_count = SISL_OPTIONS["replicas"].as< uint8_t >(); }
std::unique_lock< bip::interprocess_mutex > lg(mtx_);
++homeobject_replica_count_;
if (homeobject_replica_count_ == max_count) {
sync_point_num_ = sync_point;
uint64_id_ = INVALID_UINT64_ID;
homeobject_replica_count_ = 0;
cv_.notify_all();
} else {
cv_.wait(lg, [this]() { return uint64_id_ == INVALID_UINT64_ID; });
cv_.wait(lg, [this, sync_point]() { return sync_point_num_ == sync_point; });
}
homeobject_replica_count_ = 0;
}

void set_uint64_id(uint64_t input_uint64_id) {
Expand All @@ -103,19 +74,15 @@ class HSReplTestHelper {
}

private:
void sync_for(uint32_t& count, repl_test_phase_t new_phase, uint32_t max_count = 0) {
if (max_count == 0) { max_count = SISL_OPTIONS["replicas"].as< uint32_t >(); }
std::unique_lock< bip::interprocess_mutex > lg(mtx_);
++count;
if (count == max_count) {
phase_ = new_phase;
cv_.notify_all();
} else {
cv_.wait(lg, [this, new_phase]() { return (phase_ == new_phase); });
}
bip::interprocess_mutex mtx_;
bip::interprocess_condition cv_;
uint8_t homeobject_replica_count_{0};

count = 0;
}
// the following variables are used to share shard_id and blob_id among different replicas
uint64_t uint64_id_{0};

// the nth synchronization point, thas is how many times different replicas have synced
uint64_t sync_point_num_{UINT64_MAX};
};

public:
Expand Down Expand Up @@ -265,7 +232,7 @@ class HSReplTestHelper {
return homeobj_;
}

uint16_t replica_num() const { return replica_num_; }
uint8_t replica_num() const { return replica_num_; }

peer_id_t my_replica_id() const { return my_replica_id_; }

Expand All @@ -288,11 +255,7 @@ class HSReplTestHelper {

void teardown() { sisl::GrpcAsyncClientWorker::shutdown_all(); }

void sync_for_test_start(uint32_t num_members = 0) { ipc_data_->sync_for_test_start(num_members); }
void sync_for_verify_start(uint32_t num_members = 0) { ipc_data_->sync_for_verify_start(num_members); }
void sync_for_cleanup_start(uint32_t num_members = 0) { ipc_data_->sync_for_cleanup_start(num_members); }

void sync_for_uint64_id(uint32_t num_members = 0) { ipc_data_->sync_for_uint64_id(num_members); }
void sync(uint32_t num_members = 0) { ipc_data_->sync(sync_point_num++, num_members); }
void set_uint64_id(uint64_t uint64_id) { ipc_data_->set_uint64_id(uint64_id); }
uint64_t get_uint64_id() { return ipc_data_->get_uint64_id(); }

Expand Down Expand Up @@ -365,7 +328,8 @@ class HSReplTestHelper {
}

private:
uint16_t replica_num_;
uint8_t replica_num_;
uint64_t sync_point_num{0};
std::string name_;
std::vector< std::string > args_;
char** argv_;
Expand Down
5 changes: 2 additions & 3 deletions src/lib/homestore_backend/tests/test_homestore_backend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ SISL_OPTION_GROUP(
"true or false"),
(init_device, "", "init_device", "init real device", ::cxxopts::value< bool >()->default_value("false"),
"true or false"),
(replicas, "", "replicas", "Total number of replicas", ::cxxopts::value< uint32_t >()->default_value("3"),
"number"),
(replicas, "", "replicas", "Total number of replicas", ::cxxopts::value< uint8_t >()->default_value("3"), "number"),
(spare_replicas, "", "spare_replicas", "Additional number of spare replicas not part of repldev",
::cxxopts::value< uint32_t >()->default_value("1"), "number"),
(base_port, "", "base_port", "Port number of first replica", ::cxxopts::value< uint16_t >()->default_value("4000"),
Expand Down Expand Up @@ -60,7 +59,7 @@ int main(int argc, char* argv[]) {
SISL_OPTIONS_LOAD(parsed_argc, argv, test_options);

g_helper = std::make_unique< test_common::HSReplTestHelper >("test_homeobject", args, orig_argv);
g_helper->setup(SISL_OPTIONS["replicas"].as< uint32_t >());
g_helper->setup(SISL_OPTIONS["replicas"].as< uint8_t >());
auto ret = RUN_ALL_TESTS();
g_helper->teardown();
return ret;
Expand Down

0 comments on commit 1bd815e

Please sign in to comment.