diff --git a/conanfile.py b/conanfile.py index 69cf3a1e9..369fcbbdb 100644 --- a/conanfile.py +++ b/conanfile.py @@ -5,7 +5,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "5.1.5" + version = "5.1.6" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" diff --git a/src/lib/common/homestore_config.fbs b/src/lib/common/homestore_config.fbs index 1ac3545c7..faa2a1358 100644 --- a/src/lib/common/homestore_config.fbs +++ b/src/lib/common/homestore_config.fbs @@ -225,6 +225,9 @@ table Consensus { // Leadership expiry 120 seconds leadership_expiry_ms: uint32 = 120000; + + // data fetch max size limit in MB + data_fetch_max_size_mb: uint32 = 2; } table HomeStoreSettings { diff --git a/src/lib/device/physical_dev.cpp b/src/lib/device/physical_dev.cpp index fe44059de..9517c4422 100644 --- a/src/lib/device/physical_dev.cpp +++ b/src/lib/device/physical_dev.cpp @@ -360,6 +360,8 @@ void PhysicalDev::load_chunks(std::function< bool(cshared< Chunk >&) >&& chunk_f cinfo->checksum = info_crc; auto chunk = std::make_shared< Chunk >(this, *cinfo, cslot); + m_chunk_data_area.insert( + ChunkInterval::right_open(cinfo->chunk_start_offset, cinfo->chunk_start_offset + cinfo->chunk_size)); if (chunk_found_cb(chunk)) { get_stream(chunk).m_chunks_map.insert(std::pair{cinfo->chunk_id, chunk}); } } hs_utils::iobuf_free(buf, sisl::buftag::superblk); diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index b7cc53b4f..d03e0563d 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -148,7 +148,7 @@ void RaftReplDev::push_data_to_all_followers(repl_req_ptr_t rreq) { flatbuffers::FlatBufferToString(builder.GetBufferPointer() + sizeof(flatbuffers::uoffset_t), PushDataRequestTypeTable()));*/ - LOGINFO("Data Channel: Pushing data to all followers: rreq=[{}]", rreq->to_compact_string()); + RD_LOG(DEBUG, "Data Channel: Pushing data to all followers: rreq=[{}]", rreq->to_compact_string()); group_msg_service() ->data_service_request_unidirectional(nuraft_mesg::role_regex::ALL, PUSH_DATA, rreq->pkts) @@ -184,16 +184,16 @@ void RaftReplDev::on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_ ctx->outstanding_read_cnt = fetch_req->request()->entries()->size(); for (auto const& req : *(fetch_req->request()->entries())) { - RD_LOG(INFO, "Data Channel: FetchData received: lsn={}", req->lsn()); - auto const& lsn = req->lsn(); - auto const& term = req->raft_term(); - auto const& dsn = req->dsn(); - auto const& header = req->user_header(); - auto const& key = req->user_key(); auto const& originator = req->blkid_originator(); auto const& remote_blkid = req->remote_blkid(); + RD_LOG(DEBUG, "Data Channel: FetchData received: lsn={}", lsn); + + // release this assert if in the future we want to fetch from non-originator; + RD_REL_ASSERT(originator == server_id(), + "Not expect to receive fetch data from remote when I am not the originator of this request"); + // fetch data based on the remote_blkid if (originator == server_id()) { // We are the originator of the blkid, read data locally; @@ -220,11 +220,6 @@ void RaftReplDev::on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_ } ctx->cv.notify_one(); }); - } else { - // TODO: if we are not the originator, we need to fetch based on lsn; - // To be implemented; - RD_LOG(INFO, "I am not the originaltor for the requested blks, originaltor: {}, server_id: {}.", originator, - server_id()); } } @@ -395,6 +390,16 @@ repl_req_ptr_t RaftReplDev::follower_create_req(repl_key const& rkey, sisl::blob return rreq; } +auto RaftReplDev::get_max_data_fetch_size() const { +#ifdef _PRERELEASE + if (iomgr_flip::instance()->test_flip("simulate_staging_fetch_data")) { + LOGINFO("Flip simulate_staging_fetch_data is enabled, return max_data_fetch_size: 16K"); + return 4 * 4096ull; + } +#endif + return HS_DYNAMIC_CONFIG(consensus.data_fetch_max_size_mb) * 1024 * 1024ull; +} + void RaftReplDev::check_and_fetch_remote_data(std::vector< repl_req_ptr_t >* rreqs) { // Pop any entries that are already completed - from the entries list as well as from map rreqs->erase(std::remove_if( @@ -416,25 +421,51 @@ void RaftReplDev::check_and_fetch_remote_data(std::vector< repl_req_ptr_t >* rre if (rreqs->size()) { // Some data not completed yet, let's fetch from remote; - fetch_data_from_remote(rreqs); + auto total_size_to_fetch = 0ul; + std::vector< repl_req_ptr_t > next_batch_rreqs; + const auto max_batch_size = get_max_data_fetch_size(); + for (auto const& rreq : *rreqs) { + auto const& size = rreq->remote_blkid.blkid.blk_count() * get_blk_size(); + if ((total_size_to_fetch + size) >= max_batch_size) { + fetch_data_from_remote(std::move(next_batch_rreqs)); + next_batch_rreqs.clear(); + total_size_to_fetch = 0; + } + + total_size_to_fetch += size; + next_batch_rreqs.emplace_back(rreq); + } + + // check if there is any left over not processed; + if (next_batch_rreqs.size()) { fetch_data_from_remote(std::move(next_batch_rreqs)); } } } -void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t >* rreqs) { +void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs) { + if (rreqs.size() == 0) { return; } + std::vector<::flatbuffers::Offset< RequestEntry > > entries; - entries.reserve(rreqs->size()); + entries.reserve(rreqs.size()); shared< flatbuffers::FlatBufferBuilder > builder = std::make_shared< flatbuffers::FlatBufferBuilder >(); + RD_LOG(DEBUG, "Data Channel : FetchData from remote: rreq.size={}, my server_id={}", rreqs.size(), server_id()); + auto const& originator = rreqs.front()->remote_blkid.server_id; - for (auto const& rreq : *rreqs) { + for (auto const& rreq : rreqs) { entries.push_back(CreateRequestEntry(*builder, rreq->get_lsn(), rreq->term(), rreq->dsn(), builder->CreateVector(rreq->header.cbytes(), rreq->header.size()), builder->CreateVector(rreq->key.cbytes(), rreq->key.size()), rreq->remote_blkid.server_id /* blkid_originator */, builder->CreateVector(rreq->remote_blkid.blkid.serialize().cbytes(), rreq->remote_blkid.blkid.serialized_size()))); - LOGINFO("Fetching data from remote: rreq=[{}], remote_blkid={}", rreq->to_compact_string(), - rreq->remote_blkid.blkid.to_string()); + // releax this assert if there is a case in same batch originator can be different (can't think of one now) + // but if there were to be such case, we need to group rreqs by originator and send them in separate + // batches; + RD_DBG_ASSERT(rreq->remote_blkid.server_id == originator, "Unexpected originator for rreq={}", + rreq->to_compact_string()); + + RD_LOG(TRACE, "Fetching data from originator={}, remote: rreq=[{}], remote_blkid={}, my server_id={}", + originator, rreq->to_compact_string(), rreq->remote_blkid.blkid.to_string(), server_id()); } builder->FinishSizePrefixed( @@ -444,17 +475,36 @@ void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t >* rreqs) { // blkid; group_msg_service() ->data_service_request_bidirectional( - nuraft_mesg::role_regex::LEADER, FETCH_DATA, + originator, FETCH_DATA, sisl::io_blob_list_t{ sisl::io_blob{builder->GetBufferPointer(), builder->GetSize(), false /* is_aligned */}}) .via(&folly::InlineExecutor::instance()) .thenValue([this, builder, rreqs](auto e) { - RD_REL_ASSERT(!!e, "Error in fetching data"); + if (!e) { + // if we are here, it means the original who sent the log entries are down. + // we need to handle error and when the other member becomes leader, it will resend the log entries; + RD_LOG(INFO, + "Not able to fetching data from originator={}, error={}, probably originator is down. Will " + "retry when new leader start appending log entries", + rreqs.front()->remote_blkid.server_id, e.error()); + for (auto const& rreq : rreqs) { + handle_error(rreq, RaftReplService::to_repl_error(e.error())); + } + return; + } auto raw_data = e.value().response_blob().cbytes(); auto total_size = e.value().response_blob().size(); - for (auto const& rreq : *rreqs) { + RD_DBG_ASSERT_GT(total_size, 0, "Empty response from remote"); + RD_DBG_ASSERT(raw_data, "Empty response from remote"); + + RD_LOG(INFO, "Data Channel: FetchData completed for reques.size()={} ", rreqs.size()); + + thread_local std::vector< folly::Future< std::error_code > > futs; // static is impplied + futs.clear(); + + for (auto const& rreq : rreqs) { auto const data_size = rreq->remote_blkid.blkid.blk_count() * get_blk_size(); // if data is already received, skip it because someone is already doing the write; if (rreq->state.load() & uint32_cast(repl_req_state_t::DATA_RECEIVED)) { @@ -508,25 +558,36 @@ void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t >* rreqs) { } // Schedule a write and upon completion, mark the data as written. - data_service() - .async_write(r_cast< const char* >(data), data_size, rreq->local_blkid) - .thenValue([this, rreq](auto&& err) { - RD_REL_ASSERT(!err, - "Error in writing data"); // TODO: Find a way to return error to the Listener - rreq->state.fetch_or(uint32_cast(repl_req_state_t::DATA_WRITTEN)); - rreq->data_written_promise.setValue(); - RD_LOG(INFO, "Data Channel: Data Write completed rreq=[{}]", rreq->to_compact_string()); - }); + futs.emplace_back( + data_service().async_write(r_cast< const char* >(data), data_size, rreq->local_blkid)); // move the raw_data pointer to next rreq's data; raw_data += data_size; total_size -= data_size; - LOGINFO( - "Data Channel: Data fetched from remote: rreq=[{}], data_size: {}, total_size: {}, local_blkid: {}", - rreq->to_compact_string(), data_size, total_size, rreq->local_blkid.to_string()); + RD_LOG(INFO, + "Data Channel: Data fetched from remote: rreq=[{}], data_size: {}, total_size: {}, " + "local_blkid: {}", + rreq->to_compact_string(), data_size, total_size, rreq->local_blkid.to_string()); } + folly::collectAllUnsafe(futs).thenValue([this, rreqs, e = std::move(e)](auto&& vf) { + for (auto const& err_c : vf) { + if (sisl_unlikely(err_c.value())) { + auto ec = err_c.value(); + RD_LOG(ERROR, "Error in writing data: {}", ec.value()); + // TODO: actually will never arrive here as iomgr will assert (should not assert but + // to raise alert and leave the raft group); + } + } + + for (auto const& rreq : rreqs) { + rreq->state.fetch_or(uint32_cast(repl_req_state_t::DATA_WRITTEN)); + rreq->data_written_promise.setValue(); + RD_LOG(TRACE, "Data Channel: Data Write completed rreq=[{}]", rreq->to_compact_string()); + } + }); + builder->Release(); RD_DBG_ASSERT_EQ(total_size, 0, "Total size mismatch, some data is not consumed"); @@ -564,17 +625,14 @@ AsyncNotify RaftReplDev::notify_after_data_written(std::vector< repl_req_ptr_t > // if in resync mode, fetch data from remote immediately; check_and_fetch_remote_data(rreqs); } else { - check_and_fetch_remote_data(rreqs); // some data are not in completed state, let's schedule a timer to check it again; // we wait for data channel to fill in the data. Still if its not done we trigger a fetch from remote; -#if 0 m_wait_data_timer_hdl = iomanager.schedule_global_timer( // timer wakes up in current thread; HS_DYNAMIC_CONFIG(consensus.wait_data_write_timer_sec) * 1000 * 1000 * 1000, false /* recurring */, nullptr /* cookie */, iomgr::reactor_regex::all_worker, [this, rreqs](auto /*cookie*/) { - LOGINFO("Data Channel: Wait data write timer fired, checking if data is written"); + RD_LOG(INFO, "Data Channel: Wait data write timer fired, checking if data is written"); check_and_fetch_remote_data(rreqs); }); -#endif } // block waiting here until all the futs are ready (data channel filled in and promises are made); diff --git a/src/lib/replication/repl_dev/raft_repl_dev.h b/src/lib/replication/repl_dev/raft_repl_dev.h index 2a0a98fcf..257598a32 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.h +++ b/src/lib/replication/repl_dev/raft_repl_dev.h @@ -58,7 +58,8 @@ class RaftReplDev : public ReplDev, std::atomic< uint64_t > m_next_dsn{0}; // Data Sequence Number that will keep incrementing for each data entry // - iomgr::timer_handle_t m_wait_data_timer_hdl{iomgr::null_timer_handle}; + iomgr::timer_handle_t m_wait_data_timer_hdl{ + iomgr::null_timer_handle}; // non-recurring timer doesn't need to be cancelled on shutdown; bool m_resync_mode{false}; static std::atomic< uint64_t > s_next_group_ordinal; @@ -81,7 +82,7 @@ class RaftReplDev : public ReplDev, AsyncReplResult<> become_leader() override; bool is_leader() const override; const replica_id_t get_leader_id() const override; - std::vector get_replication_status() const override; + std::vector< peer_info > get_replication_status() const override; group_id_t group_id() const override { return m_group_id; } std::string group_id_str() const { return boost::uuids::to_string(m_group_id); } std::string rdev_name() const { return m_rdev_name; } @@ -124,8 +125,8 @@ class RaftReplDev : public ReplDev, void on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_data); void on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_data); void check_and_fetch_remote_data(std::vector< repl_req_ptr_t >* rreqs); - void fetch_data_from_remote(std::vector< repl_req_ptr_t >* rreqs); - + void fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs); + auto get_max_data_fetch_size() const; bool is_resync_mode() { return m_resync_mode; } void handle_error(repl_req_ptr_t const& rreq, ReplServiceError err); }; diff --git a/src/tests/test_common/homestore_test_common.hpp b/src/tests/test_common/homestore_test_common.hpp index c30d30bb1..33b6d63a6 100644 --- a/src/tests/test_common/homestore_test_common.hpp +++ b/src/tests/test_common/homestore_test_common.hpp @@ -175,7 +175,7 @@ class HSTestHelper { static void start_homestore(const std::string& test_name, std::map< uint32_t, test_params >&& svc_params, hs_before_services_starting_cb_t cb = nullptr, bool fake_restart = false, - bool init_device = true) { + bool init_device = true, uint32_t shutdown_delay_sec = 5) { auto const ndevices = SISL_OPTIONS["num_devs"].as< uint32_t >(); auto const dev_size = SISL_OPTIONS["dev_size_mb"].as< uint64_t >() * 1024 * 1024; auto num_threads = SISL_OPTIONS["num_threads"].as< uint32_t >(); @@ -185,7 +185,7 @@ class HSTestHelper { if (fake_restart) { shutdown_homestore(false); // sisl::GrpcAsyncClientWorker::shutdown_all(); - std::this_thread::sleep_for(std::chrono::seconds{5}); + std::this_thread::sleep_for(std::chrono::seconds{shutdown_delay_sec}); } std::vector< homestore::dev_info > device_info; diff --git a/src/tests/test_common/hs_repl_test_common.hpp b/src/tests/test_common/hs_repl_test_common.hpp index 1d78bc432..6367be928 100644 --- a/src/tests/test_common/hs_repl_test_common.hpp +++ b/src/tests/test_common/hs_repl_test_common.hpp @@ -186,12 +186,12 @@ class HSReplTestHelper { setup(); } - void restart() { + void restart(uint32_t shutdown_delay_secs = 5) { test_common::HSTestHelper::start_homestore( name_ + std::to_string(replica_num_), {{HS_SERVICE::REPLICATION, {.repl_app = std::make_unique< TestReplApplication >(*this)}}, {HS_SERVICE::LOG, {}}}, - nullptr, true /* restart */); + nullptr, true /* restart */, true /* init_device */, shutdown_delay_secs); } void restart_one_by_one() { @@ -305,4 +305,4 @@ class HSReplTestHelper { Runner io_runner_; }; -} // namespace test_common \ No newline at end of file +} // namespace test_common diff --git a/src/tests/test_raft_repl_dev.cpp b/src/tests/test_raft_repl_dev.cpp index 0a5cfb7d1..e31b61a1a 100644 --- a/src/tests/test_raft_repl_dev.cpp +++ b/src/tests/test_raft_repl_dev.cpp @@ -41,7 +41,7 @@ using namespace homestore; SISL_LOGGING_DEF(test_raft_repl_dev) -SISL_LOGGING_INIT(HOMESTORE_LOG_MODS) +SISL_LOGGING_INIT(HOMESTORE_LOG_MODS, nuraft_mesg) SISL_OPTION_GROUP(test_raft_repl_dev, (block_size, "", "block_size", "block size to io", @@ -331,45 +331,132 @@ TEST_F(RaftReplDevTest, All_Append_Fetch_Remote_Data) { g_helper->sync_for_verify_start(); - // TODO: seems with filip and fetch remote, the data size is not correct; LOGINFO("Validate all data written so far by reading them"); this->validate_all_data(); + g_helper->sync_for_cleanup_start(); } -TEST_F(RaftReplDevTest, All_ReplService) { +// do some io before restart; +TEST_F(RaftReplDevTest, All_restart_one_follower_inc_resync) { LOGINFO("Homestore replica={} setup completed", g_helper->replica_num()); g_helper->sync_for_test_start(); - auto repl_dev = dynamic_pointer_cast< RaftReplDev >(pick_one_db().repl_dev()); - auto group_id = repl_dev->group_id(); - auto my_id_str = repl_dev->my_replica_id_str(); - auto leader = repl_dev->get_leader_id(); - ASSERT_TRUE(leader != replica_id_t()) - << "Error getting leader id for group_id=" << boost::uuids::to_string(group_id).c_str(); - auto leader_str = boost::uuids::to_string(leader); - LOGINFO("Got raft leader {} for group {}", leader_str, group_id); + // step-0: do some IO before restart one member; + uint64_t exp_entries = 20; + if (g_helper->replica_num() == 0) { + g_helper->runner().set_num_tasks(exp_entries); + auto block_size = SISL_OPTIONS["block_size"].as< uint32_t >(); + LOGINFO("Run on worker threads to schedule append on repldev for {} Bytes.", block_size); + g_helper->runner().set_task([this, block_size]() { + this->generate_writes(block_size /* data_size */, block_size /* max_size_per_iov */); + }); + g_helper->runner().execute().get(); + } + // step-1: wait for all writes to be completed + this->wait_for_all_writes(exp_entries); + + // step-2: restart one non-leader replica + if (g_helper->replica_num() == 1) { + LOGINFO("Restart homestore: replica_num = 1"); + g_helper->restart(10 /* shutdown_delay_sec */); + g_helper->sync_for_test_start(); + } + + exp_entries += SISL_OPTIONS["num_io"].as< uint64_t >(); + // step-3: on leader, wait for a while for replica-1 to finish shutdown so that it can be removed from raft-groups + // and following I/O issued by leader won't be pushed to relica-1; if (g_helper->replica_num() == 0) { - ASSERT_TRUE(leader_str == my_id_str) - << "Leader id " << leader_str.c_str() << " should equals to my ID " << my_id_str.c_str(); - } else { - ASSERT_TRUE(leader_str != my_id_str) << "I am a follower, Leader id " << leader_str.c_str() - << " should not equals to my ID " << my_id_str.c_str(); + LOGINFO("Wait for grpc connection to replica-1 to expire and removed from raft-groups."); + std::this_thread::sleep_for(std::chrono::seconds{5}); + + g_helper->runner().set_num_tasks(SISL_OPTIONS["num_io"].as< uint64_t >()); + + // before replica-1 started, issue I/O so that replica-1 is lagging behind; + auto block_size = SISL_OPTIONS["block_size"].as< uint32_t >(); + LOGINFO("Run on worker threads to schedule append on repldev for {} Bytes.", block_size); + g_helper->runner().set_task([this, block_size]() { + this->generate_writes(block_size /* data_size */, block_size /* max_size_per_iov */); + }); + g_helper->runner().execute().get(); } - auto peers_info = repl_dev->get_replication_status(); - LOGINFO("Got peers_info size {} for group {}", peers_info.size(), group_id); + this->wait_for_all_writes(exp_entries); + + g_helper->sync_for_verify_start(); + LOGINFO("Validate all data written so far by reading them"); + this->validate_all_data(); + g_helper->sync_for_cleanup_start(); +} + +// +// staging the fetch remote data with flip point; +// +TEST_F(RaftReplDevTest, All_restart_one_follower_inc_resync_with_staging) { + LOGINFO("Homestore replica={} setup completed", g_helper->replica_num()); + g_helper->sync_for_test_start(); + +#ifdef _PRERELEASE + set_flip_point("simulate_staging_fetch_data"); +#endif + + // step-0: do some IO before restart one member; + uint64_t exp_entries = 20; + if (g_helper->replica_num() == 0) { + g_helper->runner().set_num_tasks(20); + auto block_size = SISL_OPTIONS["block_size"].as< uint32_t >(); + LOGINFO("Run on worker threads to schedule append on repldev for {} Bytes.", block_size); + g_helper->runner().set_task([this, block_size]() { + this->generate_writes(block_size /* data_size */, block_size /* max_size_per_iov */); + }); + g_helper->runner().execute().get(); + } + + // step-1: wait for all writes to be completed + this->wait_for_all_writes(exp_entries); + + // step-2: restart one non-leader replica + if (g_helper->replica_num() == 1) { + LOGINFO("Restart homestore: replica_num = 1"); + g_helper->restart(10 /* shutdown_delay_sec */); + g_helper->sync_for_test_start(); + } + + exp_entries += SISL_OPTIONS["num_io"].as< uint64_t >(); + // step-3: on leader, wait for a while for replica-1 to finish shutdown so that it can be removed from raft-groups + // and following I/O issued by leader won't be pushed to relica-1; if (g_helper->replica_num() == 0) { - auto const num_replicas = SISL_OPTIONS["replicas"].as< uint32_t >(); - EXPECT_TRUE(peers_info.size() == num_replicas) - << "Expecting peers_info size " << peers_info.size() << " but got " << peers_info.size(); - } else { - EXPECT_TRUE(peers_info.size() == 0) << "Expecting zero length on follower, got " << peers_info.size(); + LOGINFO("Wait for grpc connection to replica-1 to expire and removed from raft-groups."); + std::this_thread::sleep_for(std::chrono::seconds{5}); + + g_helper->runner().set_num_tasks(SISL_OPTIONS["num_io"].as< uint64_t >()); + + // before replica-1 started, issue I/O so that replica-1 is lagging behind; + auto block_size = SISL_OPTIONS["block_size"].as< uint32_t >(); + LOGINFO("Run on worker threads to schedule append on repldev for {} Bytes.", block_size); + g_helper->runner().set_task([this, block_size]() { + this->generate_writes(block_size /* data_size */, block_size /* max_size_per_iov */); + }); + g_helper->runner().execute().get(); } + + this->wait_for_all_writes(exp_entries); + + g_helper->sync_for_verify_start(); + LOGINFO("Validate all data written so far by reading them"); + this->validate_all_data(); g_helper->sync_for_cleanup_start(); } +// TODO +// double restart: +// 1. restart one follower(F1) while I/O keep running. +// 2. after F1 reboots and leader is resyncing with F1 (after sending the appended entries), this leader also retarts. +// 3. F1 should receive error from grpc saying originator not there. +// 4. F2 should be appending entries to F1 and F1 should be able to catch up with F2 (fetch data from F2). +// + int main(int argc, char* argv[]) { int parsed_argc{argc}; char** orig_argv = argv;