Skip to content

Commit

Permalink
issue 329: RafReplDev Metrics and Restart Leader Test (#337)
Browse files Browse the repository at this point in the history
* issue 329: RafReplDev Metrics and Restart Leader Test
  • Loading branch information
yamingk authored Feb 29, 2024
1 parent e0cd9a8 commit dcaad8f
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 9 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "5.1.9"
version = "5.1.10"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
19 changes: 16 additions & 3 deletions src/lib/replication/repl_dev/raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ RaftReplDev::RaftReplDev(RaftReplService& svc, superblk< raft_repl_dev_superblk
m_group_id{rd_sb->group_id},
m_my_repl_id{svc.get_my_repl_uuid()},
m_raft_server_id{nuraft_mesg::to_server_id(m_my_repl_id)},
m_rd_sb{std::move(rd_sb)} {
m_rd_sb{std::move(rd_sb)},
m_metrics{fmt::format("{}_{}", group_id_str(), m_raft_server_id).c_str()} {
m_state_machine = std::make_shared< RaftStateMachine >(*this);

if (load_existing) {
Expand Down Expand Up @@ -216,7 +217,11 @@ void RaftReplDev::on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_
sgs_vec.push_back(sgs);

async_read(local_blkid, sgs, total_size).thenValue([this, &ctx](auto&& err) {
RD_REL_ASSERT(!err, "Error in reading data"); // TODO: Find a way to return error to the Listener
if (err) {
COUNTER_INCREMENT(m_metrics, read_err_cnt, 1);
RD_REL_ASSERT(false, "Error in reading data"); // TODO: Find a way to return error to the Listener
}

{
std::unique_lock< std::mutex > lk{ctx->mtx};
--(ctx->outstanding_read_cnt);
Expand Down Expand Up @@ -337,6 +342,7 @@ void RaftReplDev::on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_d
.async_write(r_cast< const char* >(data), push_req->data_size(), rreq->local_blkid)
.thenValue([this, rreq](auto&& err) {
if (err) {
COUNTER_INCREMENT(m_metrics, write_err_cnt, 1);
RD_DBG_ASSERT(false, "Error in writing data");
handle_error(rreq, ReplServiceError::DRIVE_WRITE_ERROR);
} else {
Expand Down Expand Up @@ -460,7 +466,7 @@ void RaftReplDev::check_and_fetch_remote_data(std::vector< repl_req_ptr_t >* rre
void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs) {
if (rreqs.size() == 0) { return; }

std::vector< ::flatbuffers::Offset< RequestEntry > > entries;
std::vector<::flatbuffers::Offset< RequestEntry > > entries;
entries.reserve(rreqs.size());

shared< flatbuffers::FlatBufferBuilder > builder = std::make_shared< flatbuffers::FlatBufferBuilder >();
Expand All @@ -487,6 +493,9 @@ void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs) {
builder->FinishSizePrefixed(
CreateFetchData(*builder, CreateFetchDataRequest(*builder, builder->CreateVector(entries))));

COUNTER_INCREMENT(m_metrics, fetch_rreq_cnt, 1);
COUNTER_INCREMENT(m_metrics, fetch_total_entries_cnt, rreqs.size());

// leader can change, on the receiving side, we need to check if the leader is still the one who originated the
// blkid;
group_msg_service()
Expand All @@ -503,6 +512,7 @@ void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs) {
"Not able to fetching data from originator={}, error={}, probably originator is down. Will "
"retry when new leader start appending log entries",
rreqs.front()->remote_blkid.server_id, e.error());
COUNTER_INCREMENT(m_metrics, fetch_err_cnt, 1);
for (auto const& rreq : rreqs) {
handle_error(rreq, RaftReplService::to_repl_error(e.error()));
}
Expand All @@ -512,6 +522,8 @@ void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs) {
auto raw_data = e.value().response_blob().cbytes();
auto total_size = e.value().response_blob().size();

COUNTER_INCREMENT(m_metrics, fetch_total_blk_size, total_size);

RD_DBG_ASSERT_GT(total_size, 0, "Empty response from remote");
RD_DBG_ASSERT(raw_data, "Empty response from remote");

Expand Down Expand Up @@ -591,6 +603,7 @@ void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs) {
for (auto const& err_c : vf) {
if (sisl_unlikely(err_c.value())) {
auto ec = err_c.value();
COUNTER_INCREMENT(m_metrics, write_err_cnt, 1);
RD_LOG(ERROR, "Error in writing data: {}", ec.value());
// TODO: actually will never arrive here as iomgr will assert (should not assert but
// to raise alert and leave the raft group);
Expand Down
24 changes: 24 additions & 0 deletions src/lib/replication/repl_dev/raft_repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,28 @@ struct raft_repl_dev_superblk : public repl_dev_superblk {

using raft_buf_ptr_t = nuraft::ptr< nuraft::buffer >;

class RaftReplDevMetrics : public sisl::MetricsGroup {
public:
explicit RaftReplDevMetrics(const char* inst_name) : sisl::MetricsGroup("RaftReplDev", inst_name) {
REGISTER_COUNTER(read_err_cnt, "total read error count", "read_err_cnt", {"op", "read"});
REGISTER_COUNTER(write_err_cnt, "total write error count", "write_err_cnt", {"op", "write"});
REGISTER_COUNTER(fetch_err_cnt, "total fetch data error count", "fetch_err_cnt", {"op", "fetch"});

REGISTER_COUNTER(fetch_rreq_cnt, "total fetch data count", "fetch_data_req_cnt", {"op", "fetch"});
REGISTER_COUNTER(fetch_total_blk_size, "total fetch data blocks size", "fetch_total_blk_size", {"op", "fetch"});
REGISTER_COUNTER(fetch_total_entries_cnt, "total fetch total entries count", "fetch_total_entries_cnt",
{"op", "fetch"});

register_me_to_farm();
}

RaftReplDevMetrics(const RaftReplDevMetrics&) = delete;
RaftReplDevMetrics(RaftReplDevMetrics&&) noexcept = delete;
RaftReplDevMetrics& operator=(const RaftReplDevMetrics&) = delete;
RaftReplDevMetrics& operator=(RaftReplDevMetrics&&) noexcept = delete;
~RaftReplDevMetrics() { deregister_me_from_farm(); }
};

class RaftReplService;
class CP;
class RaftReplDev : public ReplDev,
Expand Down Expand Up @@ -62,6 +84,8 @@ class RaftReplDev : public ReplDev,
iomgr::null_timer_handle}; // non-recurring timer doesn't need to be cancelled on shutdown;
bool m_resync_mode{false};

RaftReplDevMetrics m_metrics;

static std::atomic< uint64_t > s_next_group_ordinal;

public:
Expand Down
3 changes: 2 additions & 1 deletion src/tests/test_common/hs_repl_test_common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ class HSReplTestHelper {
} else {
cv_.wait(lg, [this, new_phase]() { return (phase_ == new_phase); });
}

count = 0;
}
};
Expand Down Expand Up @@ -182,7 +183,7 @@ class HSReplTestHelper {

void teardown() {
LOGINFO("Stopping Homestore replica={}", replica_num_);
sisl::GrpcAsyncClientWorker::shutdown_all();
// sisl::GrpcAsyncClientWorker::shutdown_all();
test_common::HSTestHelper::shutdown_homestore();
}

Expand Down
72 changes: 68 additions & 4 deletions src/tests/test_raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ class TestReplicatedDB : public homestore::ReplDevListener {

void on_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, MultiBlkId const& blkids,
cintrusive< repl_req_ctx >& ctx) override {

m_num_commits.fetch_add(1, std::memory_order_relaxed);

ASSERT_EQ(header.size(), sizeof(test_req::journal_header));

auto jheader = r_cast< test_req::journal_header const* >(header.cbytes());
Expand Down Expand Up @@ -199,6 +202,8 @@ class TestReplicatedDB : public homestore::ReplDevListener {
g_helper->runner().execute().get();
}

uint64_t db_num_writes() const { return m_num_commits.load(std::memory_order_relaxed); }

uint64_t db_size() const {
std::shared_lock lk(db_mtx_);
return inmem_db_.size();
Expand All @@ -207,6 +212,7 @@ class TestReplicatedDB : public homestore::ReplDevListener {
private:
std::map< Key, Value > inmem_db_;
std::shared_mutex db_mtx_;
std::atomic< uint64_t > m_num_commits;
};

class RaftReplDevTest : public testing::Test {
Expand All @@ -228,7 +234,7 @@ class RaftReplDevTest : public testing::Test {
while (true) {
uint64_t total_writes{0};
for (auto const& db : dbs_) {
total_writes += db->db_size();
total_writes += db->db_num_writes();
}

if (total_writes >= exp_writes) { break; }
Expand Down Expand Up @@ -373,7 +379,7 @@ TEST_F(RaftReplDevTest, All_restart_one_follower_inc_resync) {
if (g_helper->replica_num() == 1) {
LOGINFO("Restart homestore: replica_num = 1");
g_helper->restart(10 /* shutdown_delay_sec */);
g_helper->sync_for_test_start();
// g_helper->sync_for_test_start();
}

exp_entries += SISL_OPTIONS["num_io"].as< uint64_t >();
Expand Down Expand Up @@ -401,7 +407,6 @@ TEST_F(RaftReplDevTest, All_restart_one_follower_inc_resync) {
this->validate_all_data();
g_helper->sync_for_cleanup_start();
}

//
// staging the fetch remote data with flip point;
//
Expand Down Expand Up @@ -432,7 +437,7 @@ TEST_F(RaftReplDevTest, All_restart_one_follower_inc_resync_with_staging) {
if (g_helper->replica_num() == 1) {
LOGINFO("Restart homestore: replica_num = 1");
g_helper->restart(10 /* shutdown_delay_sec */);
g_helper->sync_for_test_start();
// g_helper->sync_for_test_start();
}

exp_entries += SISL_OPTIONS["num_io"].as< uint64_t >();
Expand Down Expand Up @@ -461,6 +466,65 @@ TEST_F(RaftReplDevTest, All_restart_one_follower_inc_resync_with_staging) {
g_helper->sync_for_cleanup_start();
}

// do some io before restart;
TEST_F(RaftReplDevTest, All_restart_leader) {
LOGINFO("Homestore replica={} setup completed", g_helper->replica_num());
g_helper->sync_for_test_start();

// step-0: do some IO before restart one member;
uint64_t exp_entries = 20;
if (g_helper->replica_num() == 0) {
g_helper->runner().set_num_tasks(exp_entries);
auto block_size = SISL_OPTIONS["block_size"].as< uint32_t >();
LOGINFO("Run on worker threads to schedule append on repldev for {} Bytes.", block_size);
g_helper->runner().set_task([this, block_size]() {
this->generate_writes(block_size /* data_size */, block_size /* max_size_per_iov */);
});
g_helper->runner().execute().get();
}

// step-1: wait for all writes to be completed
this->wait_for_all_writes(exp_entries);

// step-2: restart leader replica
if (g_helper->replica_num() == 0) {
LOGINFO("Restart homestore: replica_num = 0");
g_helper->restart(10 /* shutdown_delay_sec */);
// g_helper->sync_for_test_start();
}

exp_entries += SISL_OPTIONS["num_io"].as< uint64_t >();
// step-3: on leader, wait for a while for replica-1 to finish shutdown so that it can be removed from raft-groups
// and following I/O issued by leader won't be pushed to relica-1;
if (g_helper->replica_num() == 1) {
LOGINFO("Wait for grpc connection to replica-0 to be removed from raft-groups, and wait for awhile before "
"sending new I/O.");
std::this_thread::sleep_for(std::chrono::seconds{5});

LOGINFO("Switch the leader to replica_num = 1");
switch_all_db_leader();

g_helper->runner().set_num_tasks(SISL_OPTIONS["num_io"].as< uint64_t >());

// before replica-1 started, issue I/O so that replica-1 is lagging behind;
auto block_size = SISL_OPTIONS["block_size"].as< uint32_t >();
LOGINFO("Run on worker threads to schedule append on repldev for {} Bytes.", block_size);
g_helper->runner().set_task([this, block_size]() {
this->generate_writes(block_size /* data_size */, block_size /* max_size_per_iov */);
});
g_helper->runner().execute().get();
}

this->wait_for_all_writes(exp_entries);

if (g_helper->replica_num() != 0) { std::this_thread::sleep_for(std::chrono::seconds{10}); }

g_helper->sync_for_verify_start();
LOGINFO("Validate all data written so far by reading them");
this->validate_all_data();
g_helper->sync_for_cleanup_start();
}

// TODO
// double restart:
// 1. restart one follower(F1) while I/O keep running.
Expand Down

0 comments on commit dcaad8f

Please sign in to comment.