diff --git a/conanfile.py b/conanfile.py index 91946e9eb..de3a12195 100644 --- a/conanfile.py +++ b/conanfile.py @@ -5,7 +5,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "5.1.9" + version = "5.1.10" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index 30f98570c..abddc8bdf 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -28,7 +28,8 @@ RaftReplDev::RaftReplDev(RaftReplService& svc, superblk< raft_repl_dev_superblk m_group_id{rd_sb->group_id}, m_my_repl_id{svc.get_my_repl_uuid()}, m_raft_server_id{nuraft_mesg::to_server_id(m_my_repl_id)}, - m_rd_sb{std::move(rd_sb)} { + m_rd_sb{std::move(rd_sb)}, + m_metrics{fmt::format("{}_{}", group_id_str(), m_raft_server_id).c_str()} { m_state_machine = std::make_shared< RaftStateMachine >(*this); if (load_existing) { @@ -216,7 +217,11 @@ void RaftReplDev::on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_ sgs_vec.push_back(sgs); async_read(local_blkid, sgs, total_size).thenValue([this, &ctx](auto&& err) { - RD_REL_ASSERT(!err, "Error in reading data"); // TODO: Find a way to return error to the Listener + if (err) { + COUNTER_INCREMENT(m_metrics, read_err_cnt, 1); + RD_REL_ASSERT(false, "Error in reading data"); // TODO: Find a way to return error to the Listener + } + { std::unique_lock< std::mutex > lk{ctx->mtx}; --(ctx->outstanding_read_cnt); @@ -337,6 +342,7 @@ void RaftReplDev::on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_d .async_write(r_cast< const char* >(data), push_req->data_size(), rreq->local_blkid) .thenValue([this, rreq](auto&& err) { if (err) { + COUNTER_INCREMENT(m_metrics, write_err_cnt, 1); RD_DBG_ASSERT(false, "Error in writing data"); handle_error(rreq, ReplServiceError::DRIVE_WRITE_ERROR); } else { @@ -460,7 +466,7 @@ void RaftReplDev::check_and_fetch_remote_data(std::vector< repl_req_ptr_t >* rre void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs) { if (rreqs.size() == 0) { return; } - std::vector< ::flatbuffers::Offset< RequestEntry > > entries; + std::vector<::flatbuffers::Offset< RequestEntry > > entries; entries.reserve(rreqs.size()); shared< flatbuffers::FlatBufferBuilder > builder = std::make_shared< flatbuffers::FlatBufferBuilder >(); @@ -487,6 +493,9 @@ void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs) { builder->FinishSizePrefixed( CreateFetchData(*builder, CreateFetchDataRequest(*builder, builder->CreateVector(entries)))); + COUNTER_INCREMENT(m_metrics, fetch_rreq_cnt, 1); + COUNTER_INCREMENT(m_metrics, fetch_total_entries_cnt, rreqs.size()); + // leader can change, on the receiving side, we need to check if the leader is still the one who originated the // blkid; group_msg_service() @@ -503,6 +512,7 @@ void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs) { "Not able to fetching data from originator={}, error={}, probably originator is down. Will " "retry when new leader start appending log entries", rreqs.front()->remote_blkid.server_id, e.error()); + COUNTER_INCREMENT(m_metrics, fetch_err_cnt, 1); for (auto const& rreq : rreqs) { handle_error(rreq, RaftReplService::to_repl_error(e.error())); } @@ -512,6 +522,8 @@ void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs) { auto raw_data = e.value().response_blob().cbytes(); auto total_size = e.value().response_blob().size(); + COUNTER_INCREMENT(m_metrics, fetch_total_blk_size, total_size); + RD_DBG_ASSERT_GT(total_size, 0, "Empty response from remote"); RD_DBG_ASSERT(raw_data, "Empty response from remote"); @@ -591,6 +603,7 @@ void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs) { for (auto const& err_c : vf) { if (sisl_unlikely(err_c.value())) { auto ec = err_c.value(); + COUNTER_INCREMENT(m_metrics, write_err_cnt, 1); RD_LOG(ERROR, "Error in writing data: {}", ec.value()); // TODO: actually will never arrive here as iomgr will assert (should not assert but // to raise alert and leave the raft group); diff --git a/src/lib/replication/repl_dev/raft_repl_dev.h b/src/lib/replication/repl_dev/raft_repl_dev.h index 5c5f80627..e7e56c1ef 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.h +++ b/src/lib/replication/repl_dev/raft_repl_dev.h @@ -29,6 +29,28 @@ struct raft_repl_dev_superblk : public repl_dev_superblk { using raft_buf_ptr_t = nuraft::ptr< nuraft::buffer >; +class RaftReplDevMetrics : public sisl::MetricsGroup { +public: + explicit RaftReplDevMetrics(const char* inst_name) : sisl::MetricsGroup("RaftReplDev", inst_name) { + REGISTER_COUNTER(read_err_cnt, "total read error count", "read_err_cnt", {"op", "read"}); + REGISTER_COUNTER(write_err_cnt, "total write error count", "write_err_cnt", {"op", "write"}); + REGISTER_COUNTER(fetch_err_cnt, "total fetch data error count", "fetch_err_cnt", {"op", "fetch"}); + + REGISTER_COUNTER(fetch_rreq_cnt, "total fetch data count", "fetch_data_req_cnt", {"op", "fetch"}); + REGISTER_COUNTER(fetch_total_blk_size, "total fetch data blocks size", "fetch_total_blk_size", {"op", "fetch"}); + REGISTER_COUNTER(fetch_total_entries_cnt, "total fetch total entries count", "fetch_total_entries_cnt", + {"op", "fetch"}); + + register_me_to_farm(); + } + + RaftReplDevMetrics(const RaftReplDevMetrics&) = delete; + RaftReplDevMetrics(RaftReplDevMetrics&&) noexcept = delete; + RaftReplDevMetrics& operator=(const RaftReplDevMetrics&) = delete; + RaftReplDevMetrics& operator=(RaftReplDevMetrics&&) noexcept = delete; + ~RaftReplDevMetrics() { deregister_me_from_farm(); } +}; + class RaftReplService; class CP; class RaftReplDev : public ReplDev, @@ -62,6 +84,8 @@ class RaftReplDev : public ReplDev, iomgr::null_timer_handle}; // non-recurring timer doesn't need to be cancelled on shutdown; bool m_resync_mode{false}; + RaftReplDevMetrics m_metrics; + static std::atomic< uint64_t > s_next_group_ordinal; public: diff --git a/src/tests/test_common/hs_repl_test_common.hpp b/src/tests/test_common/hs_repl_test_common.hpp index fbdcd6861..3084d0a16 100644 --- a/src/tests/test_common/hs_repl_test_common.hpp +++ b/src/tests/test_common/hs_repl_test_common.hpp @@ -83,6 +83,7 @@ class HSReplTestHelper { } else { cv_.wait(lg, [this, new_phase]() { return (phase_ == new_phase); }); } + count = 0; } }; @@ -182,7 +183,7 @@ class HSReplTestHelper { void teardown() { LOGINFO("Stopping Homestore replica={}", replica_num_); - sisl::GrpcAsyncClientWorker::shutdown_all(); + // sisl::GrpcAsyncClientWorker::shutdown_all(); test_common::HSTestHelper::shutdown_homestore(); } diff --git a/src/tests/test_raft_repl_dev.cpp b/src/tests/test_raft_repl_dev.cpp index 3e8e1ae7e..22384917f 100644 --- a/src/tests/test_raft_repl_dev.cpp +++ b/src/tests/test_raft_repl_dev.cpp @@ -105,6 +105,9 @@ class TestReplicatedDB : public homestore::ReplDevListener { void on_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, MultiBlkId const& blkids, cintrusive< repl_req_ctx >& ctx) override { + + m_num_commits.fetch_add(1, std::memory_order_relaxed); + ASSERT_EQ(header.size(), sizeof(test_req::journal_header)); auto jheader = r_cast< test_req::journal_header const* >(header.cbytes()); @@ -199,6 +202,8 @@ class TestReplicatedDB : public homestore::ReplDevListener { g_helper->runner().execute().get(); } + uint64_t db_num_writes() const { return m_num_commits.load(std::memory_order_relaxed); } + uint64_t db_size() const { std::shared_lock lk(db_mtx_); return inmem_db_.size(); @@ -207,6 +212,7 @@ class TestReplicatedDB : public homestore::ReplDevListener { private: std::map< Key, Value > inmem_db_; std::shared_mutex db_mtx_; + std::atomic< uint64_t > m_num_commits; }; class RaftReplDevTest : public testing::Test { @@ -228,7 +234,7 @@ class RaftReplDevTest : public testing::Test { while (true) { uint64_t total_writes{0}; for (auto const& db : dbs_) { - total_writes += db->db_size(); + total_writes += db->db_num_writes(); } if (total_writes >= exp_writes) { break; } @@ -373,7 +379,7 @@ TEST_F(RaftReplDevTest, All_restart_one_follower_inc_resync) { if (g_helper->replica_num() == 1) { LOGINFO("Restart homestore: replica_num = 1"); g_helper->restart(10 /* shutdown_delay_sec */); - g_helper->sync_for_test_start(); + // g_helper->sync_for_test_start(); } exp_entries += SISL_OPTIONS["num_io"].as< uint64_t >(); @@ -401,7 +407,6 @@ TEST_F(RaftReplDevTest, All_restart_one_follower_inc_resync) { this->validate_all_data(); g_helper->sync_for_cleanup_start(); } - // // staging the fetch remote data with flip point; // @@ -432,7 +437,7 @@ TEST_F(RaftReplDevTest, All_restart_one_follower_inc_resync_with_staging) { if (g_helper->replica_num() == 1) { LOGINFO("Restart homestore: replica_num = 1"); g_helper->restart(10 /* shutdown_delay_sec */); - g_helper->sync_for_test_start(); + // g_helper->sync_for_test_start(); } exp_entries += SISL_OPTIONS["num_io"].as< uint64_t >(); @@ -461,6 +466,65 @@ TEST_F(RaftReplDevTest, All_restart_one_follower_inc_resync_with_staging) { g_helper->sync_for_cleanup_start(); } +// do some io before restart; +TEST_F(RaftReplDevTest, All_restart_leader) { + LOGINFO("Homestore replica={} setup completed", g_helper->replica_num()); + g_helper->sync_for_test_start(); + + // step-0: do some IO before restart one member; + uint64_t exp_entries = 20; + if (g_helper->replica_num() == 0) { + g_helper->runner().set_num_tasks(exp_entries); + auto block_size = SISL_OPTIONS["block_size"].as< uint32_t >(); + LOGINFO("Run on worker threads to schedule append on repldev for {} Bytes.", block_size); + g_helper->runner().set_task([this, block_size]() { + this->generate_writes(block_size /* data_size */, block_size /* max_size_per_iov */); + }); + g_helper->runner().execute().get(); + } + + // step-1: wait for all writes to be completed + this->wait_for_all_writes(exp_entries); + + // step-2: restart leader replica + if (g_helper->replica_num() == 0) { + LOGINFO("Restart homestore: replica_num = 0"); + g_helper->restart(10 /* shutdown_delay_sec */); + // g_helper->sync_for_test_start(); + } + + exp_entries += SISL_OPTIONS["num_io"].as< uint64_t >(); + // step-3: on leader, wait for a while for replica-1 to finish shutdown so that it can be removed from raft-groups + // and following I/O issued by leader won't be pushed to relica-1; + if (g_helper->replica_num() == 1) { + LOGINFO("Wait for grpc connection to replica-0 to be removed from raft-groups, and wait for awhile before " + "sending new I/O."); + std::this_thread::sleep_for(std::chrono::seconds{5}); + + LOGINFO("Switch the leader to replica_num = 1"); + switch_all_db_leader(); + + g_helper->runner().set_num_tasks(SISL_OPTIONS["num_io"].as< uint64_t >()); + + // before replica-1 started, issue I/O so that replica-1 is lagging behind; + auto block_size = SISL_OPTIONS["block_size"].as< uint32_t >(); + LOGINFO("Run on worker threads to schedule append on repldev for {} Bytes.", block_size); + g_helper->runner().set_task([this, block_size]() { + this->generate_writes(block_size /* data_size */, block_size /* max_size_per_iov */); + }); + g_helper->runner().execute().get(); + } + + this->wait_for_all_writes(exp_entries); + + if (g_helper->replica_num() != 0) { std::this_thread::sleep_for(std::chrono::seconds{10}); } + + g_helper->sync_for_verify_start(); + LOGINFO("Validate all data written so far by reading them"); + this->validate_all_data(); + g_helper->sync_for_cleanup_start(); +} + // TODO // double restart: // 1. restart one follower(F1) while I/O keep running.