From be6234a393667a8914e19b43e9dff6037b0a0946 Mon Sep 17 00:00:00 2001 From: Sanal P Date: Tue, 28 May 2024 13:30:41 -0700 Subject: [PATCH] Fix raft repl test case for destroying lodev and remove group. Race condition when logdev metablk callbacks. Start delete of unopened logdev's after all logdev's loaded. In test wait till the logdev is permanently destroyed. --- conanfile.py | 2 +- src/include/homestore/logstore_service.hpp | 19 +++++++++- src/lib/device/journal_vdev.cpp | 4 ++ src/lib/logstore/log_store_service.cpp | 38 ++++++++++++++----- src/lib/logstore/log_stream.cpp | 10 +++++ .../replication/log_store/repl_log_store.cpp | 7 +++- .../replication/repl_dev/raft_repl_dev.cpp | 5 +++ src/lib/replication/repl_dev/raft_repl_dev.h | 3 +- .../repl_dev/raft_state_machine.cpp | 4 +- .../replication/service/generic_repl_svc.h | 1 - .../replication/service/raft_repl_service.cpp | 3 ++ .../replication/service/raft_repl_service.h | 1 - src/tests/test_log_dev.cpp | 2 + src/tests/test_raft_repl_dev.cpp | 25 +++++++++++- 14 files changed, 105 insertions(+), 19 deletions(-) diff --git a/conanfile.py b/conanfile.py index e23c7f9d7..de79625ed 100644 --- a/conanfile.py +++ b/conanfile.py @@ -5,7 +5,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "6.4.17" + version = "6.4.18" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" topics = ("ebay", "nublox") diff --git a/src/include/homestore/logstore_service.hpp b/src/include/homestore/logstore_service.hpp index 24af7ef2b..bd5347dd6 100644 --- a/src/include/homestore/logstore_service.hpp +++ b/src/include/homestore/logstore_service.hpp @@ -29,6 +29,7 @@ #include #include +#include namespace homestore { @@ -50,6 +51,17 @@ struct vdev_info; struct log_dump_req; struct logdev_superblk; +static constexpr uint64_t logstore_service_sb_magic{0xb0b0c01b}; +static constexpr uint32_t logstore_service_sb_version{0x1}; + +#pragma pack(1) +struct logstore_service_super_block { + uint64_t magic{logstore_service_sb_magic}; + uint32_t version{logstore_service_sb_version}; + uint32_t m_last_logdev_id{0}; +}; +#pragma pack() + class LogStoreService { friend class HomeLogStore; friend class LogDev; @@ -173,9 +185,12 @@ class LogStoreService { */ iomgr::io_fiber_t truncate_thread() { return m_truncate_fiber; } + void delete_unopened_logdevs(); + private: std::shared_ptr< LogDev > create_new_logdev_internal(logdev_id_t logdev_id); - void delete_unopened_logdevs(); + void on_meta_blk_found(const sisl::byte_view& buf, void* meta_cookie); + logdev_id_t get_next_logdev_id(); void logdev_super_blk_found(const sisl::byte_view& buf, void* meta_cookie); void rollback_super_blk_found(const sisl::byte_view& buf, void* meta_cookie); void start_threads(); @@ -183,7 +198,6 @@ class LogStoreService { private: std::unordered_map< logdev_id_t, std::shared_ptr< LogDev > > m_id_logdev_map; - std::unique_ptr< sisl::IDReserver > m_id_reserver; folly::SharedMutexWritePriority m_logdev_map_mtx; std::shared_ptr< JournalVirtualDev > m_logdev_vdev; @@ -191,6 +205,7 @@ class LogStoreService { iomgr::io_fiber_t m_flush_fiber; LogStoreServiceMetrics m_metrics; std::unordered_set< logdev_id_t > m_unopened_logdev; + superblk< logstore_service_super_block > m_sb; }; extern LogStoreService& logstore_service(); diff --git a/src/lib/device/journal_vdev.cpp b/src/lib/device/journal_vdev.cpp index 07ae01d3b..d82feb5bf 100644 --- a/src/lib/device/journal_vdev.cpp +++ b/src/lib/device/journal_vdev.cpp @@ -208,6 +208,10 @@ void JournalVirtualDev::destroy(logdev_id_t logdev_id) { void JournalVirtualDev::Descriptor::append_chunk() { // Get a new chunk from the pool. auto new_chunk = m_vdev.m_chunk_pool->dequeue(); +#if 0 + auto* pdev = new_chunk->physical_dev_mutable(); + pdev->sync_write_zero(new_chunk->size(), new_chunk->start_offset()); +#endif // Increase the right window and total size. m_total_size += new_chunk->size(); diff --git a/src/lib/logstore/log_store_service.cpp b/src/lib/logstore/log_store_service.cpp index 46e55655f..5d9e0049f 100644 --- a/src/lib/logstore/log_store_service.cpp +++ b/src/lib/logstore/log_store_service.cpp @@ -37,8 +37,7 @@ SISL_LOGGING_DECL(logstore) LogStoreService& logstore_service() { return hs()->logstore_service(); } /////////////////////////////////////// LogStoreService Section /////////////////////////////////////// -LogStoreService::LogStoreService() { - m_id_reserver = std::make_unique< sisl::IDReserver >(); +LogStoreService::LogStoreService() : m_sb{"LogStoreServiceSB"} { meta_service().register_handler( logdev_sb_meta_name, [this](meta_blk* mblk, sisl::byte_view buf, size_t size) { @@ -52,6 +51,17 @@ LogStoreService::LogStoreService() { rollback_super_blk_found(std::move(buf), voidptr_cast(mblk)); }, nullptr, true, std::optional< meta_subtype_vec_t >({logdev_sb_meta_name})); + + meta_service().register_handler( + "LogStoreServiceSB", + [this](meta_blk* mblk, sisl::byte_view buf, size_t size) { on_meta_blk_found(std::move(buf), (void*)mblk); }, + nullptr); +} + +void LogStoreService::on_meta_blk_found(const sisl::byte_view& buf, void* meta_cookie) { + m_sb.load(buf, meta_cookie); + HS_REL_ASSERT_EQ(m_sb->magic, logstore_service_sb_magic, "Invalid log service metablk, magic mismatch"); + HS_REL_ASSERT_EQ(m_sb->version, logstore_service_sb_version, "Invalid version of log service metablk"); } folly::Future< std::error_code > LogStoreService::create_vdev(uint64_t size, HSDevType devType, uint32_t chunk_size) { @@ -96,8 +106,10 @@ std::shared_ptr< VirtualDev > LogStoreService::open_vdev(const vdev_info& vinfo, void LogStoreService::start(bool format) { // hs()->status_mgr()->register_status_cb("LogStore", bind_this(LogStoreService::get_status, 1)); - - delete_unopened_logdevs(); + if (format) { + m_sb.create(sizeof(logstore_service_super_block)); + m_sb.write(); + } // Create an truncate thread loop which handles truncation which does sync IO start_threads(); @@ -116,12 +128,17 @@ void LogStoreService::stop() { folly::SharedMutexWritePriority::WriteHolder holder(m_logdev_map_mtx); m_id_logdev_map.clear(); } - m_id_reserver.reset(); +} + +logdev_id_t LogStoreService::get_next_logdev_id() { + auto id = ++(m_sb->m_last_logdev_id); + m_sb.write(); + return id; } logdev_id_t LogStoreService::create_new_logdev() { folly::SharedMutexWritePriority::WriteHolder holder(m_logdev_map_mtx); - logdev_id_t logdev_id = m_id_reserver->reserve(); + logdev_id_t logdev_id = get_next_logdev_id(); auto logdev = create_new_logdev_internal(logdev_id); logdev->start(true /* format */); COUNTER_INCREMENT(m_metrics, logdevs_count, 1); @@ -132,7 +149,10 @@ logdev_id_t LogStoreService::create_new_logdev() { void LogStoreService::destroy_log_dev(logdev_id_t logdev_id) { folly::SharedMutexWritePriority::WriteHolder holder(m_logdev_map_mtx); const auto it = m_id_logdev_map.find(logdev_id); - if (it == m_id_logdev_map.end()) { return; } + if (it == m_id_logdev_map.end()) { + LOGERROR("Logdev not found to destroy {}", logdev_id); + return; + } // Stop the logdev and release all the chunks from the journal vdev. auto& logdev = it->second; @@ -172,7 +192,6 @@ void LogStoreService::open_logdev(logdev_id_t logdev_id) { folly::SharedMutexWritePriority::WriteHolder holder(m_logdev_map_mtx); const auto it = m_id_logdev_map.find(logdev_id); if (it == m_id_logdev_map.end()) { - m_id_reserver->reserve(logdev_id); auto logdev = std::make_shared< LogDev >(logdev_id, m_logdev_vdev.get()); m_id_logdev_map.emplace(logdev_id, logdev); LOGDEBUGMOD(logstore, "log_dev={} does not exist, created!", logdev_id); @@ -206,11 +225,11 @@ void LogStoreService::logdev_super_blk_found(const sisl::byte_view& buf, void* m folly::SharedMutexWritePriority::WriteHolder holder(m_logdev_map_mtx); std::shared_ptr< LogDev > logdev; auto id = sb->logdev_id; - HS_LOG(DEBUG, logstore, "Log dev superblk found logdev={}", id); const auto it = m_id_logdev_map.find(id); // We could update the logdev map either with logdev or rollback superblks found callbacks. if (it != m_id_logdev_map.end()) { logdev = it->second; + HS_LOG(DEBUG, logstore, "Log dev superblk found log_dev={}", id); } else { logdev = std::make_shared< LogDev >(id, m_logdev_vdev.get()); m_id_logdev_map.emplace(id, logdev); @@ -219,6 +238,7 @@ void LogStoreService::logdev_super_blk_found(const sisl::byte_view& buf, void* m // m_unopened_logdev. so that when we start log service, all the left items in m_unopened_logdev are those // not open, which can be destroyed m_unopened_logdev.insert(id); + HS_LOG(DEBUG, logstore, "Log dev superblk found log_dev={} added to unopened list", id); } logdev->log_dev_meta().logdev_super_blk_found(buf, meta_cookie); diff --git a/src/lib/logstore/log_stream.cpp b/src/lib/logstore/log_stream.cpp index 2a00cd31c..dd5eb5660 100644 --- a/src/lib/logstore/log_stream.cpp +++ b/src/lib/logstore/log_stream.cpp @@ -70,6 +70,16 @@ sisl::byte_view log_stream_reader::next_group(off_t* out_dev_offset) { return ret_buf; } + if (header->logdev_id != m_vdev_jd->logdev_id()) { + LOGINFOMOD(logstore, "Entries found for different logdev {} at pos {}, must have come to end of log_dev={}", + header->logdev_id, m_vdev_jd->dev_offset(m_cur_read_bytes), m_vdev_jd->logdev_id()); + *out_dev_offset = m_vdev_jd->dev_offset(m_cur_read_bytes); + // move it by dma boundary if header is not valid + m_prev_crc = 0; + m_cur_read_bytes += m_read_size_multiple; + return ret_buf; + } + // Because reuse chunks without cleaning up, we could get chunks used by other logdev's // and it can happen that log group headers couldnt match. In that case check we dont error // if its the last chunk or not with is_offset_at_last_chunk else raise assert. diff --git a/src/lib/replication/log_store/repl_log_store.cpp b/src/lib/replication/log_store/repl_log_store.cpp index 79a725bc4..c678a90d4 100644 --- a/src/lib/replication/log_store/repl_log_store.cpp +++ b/src/lib/replication/log_store/repl_log_store.cpp @@ -8,7 +8,12 @@ namespace homestore { uint64_t ReplLogStore::append(nuraft::ptr< nuraft::log_entry >& entry) { // We don't want to transform anything that is not an app log - if (entry->get_val_type() != nuraft::log_val_type::app_log) { return HomeRaftLogStore::append(entry); } + if (entry->get_val_type() != nuraft::log_val_type::app_log) { + ulong lsn = HomeRaftLogStore::append(entry); + RD_LOGD("append entry term={}, log_val_type={} lsn={} size={}", entry->get_term(), + static_cast< uint32_t >(entry->get_val_type()), lsn, entry->get_buf().size()); + return lsn; + } repl_req_ptr_t rreq = m_sm.localize_journal_entry_finish(*entry); ulong lsn = HomeRaftLogStore::append(entry); diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index 25f54a3c8..d902bce52 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -124,6 +124,7 @@ folly::SemiFuture< ReplServiceError > RaftReplDev::destroy_group() { if (err != ReplServiceError::OK) { m_stage.update([](auto* stage) { *stage = repl_dev_stage_t::ACTIVE; }); return folly::makeSemiFuture< ReplServiceError >(std::move(err)); + LOGERROR("RaftReplDev::destroy_group failed {}", err); } return m_destroy_promise.getSemiFuture(); @@ -896,6 +897,7 @@ nuraft::ptr< nuraft::log_store > RaftReplDev::load_log_store() { return m_data_j int32_t RaftReplDev::server_id() { return m_raft_server_id; } bool RaftReplDev::is_destroy_pending() const { return (m_rd_sb->destroy_pending == 0x1); } +bool RaftReplDev::is_destroyed() const { return (*m_stage.access().get() == repl_dev_stage_t::PERMANENT_DESTROYED); } /////////////////////////////////// nuraft_mesg::mesg_state_mgr overrides //////////////////////////////////// void RaftReplDev::become_ready() { @@ -907,10 +909,12 @@ uint32_t RaftReplDev::get_logstore_id() const { return m_data_journal->logstore_ std::shared_ptr< nuraft::state_machine > RaftReplDev::get_state_machine() { return m_state_machine; } void RaftReplDev::permanent_destroy() { + RD_LOGI("Permanent destroy for raft repl dev"); m_rd_sb.destroy(); m_raft_config_sb.destroy(); m_data_journal->remove_store(); logstore_service().destroy_log_dev(m_data_journal->logdev_id()); + m_stage.update([](auto* stage) { *stage = repl_dev_stage_t::PERMANENT_DESTROYED; }); } void RaftReplDev::leave() { @@ -927,6 +931,7 @@ void RaftReplDev::leave() { m_rd_sb->destroy_pending = 0x1; m_rd_sb.write(); + RD_LOGI("RaftReplDev leave group"); m_destroy_promise.setValue(ReplServiceError::OK); // In case proposer is waiting for the destroy to complete } diff --git a/src/lib/replication/repl_dev/raft_repl_dev.h b/src/lib/replication/repl_dev/raft_repl_dev.h index 1395dfeb4..a8c308c1d 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.h +++ b/src/lib/replication/repl_dev/raft_repl_dev.h @@ -32,7 +32,7 @@ struct raft_repl_dev_superblk : public repl_dev_superblk { using raft_buf_ptr_t = nuraft::ptr< nuraft::buffer >; -ENUM(repl_dev_stage_t, uint8_t, INIT, ACTIVE, DESTROYING, DESTROYED); +ENUM(repl_dev_stage_t, uint8_t, INIT, ACTIVE, DESTROYING, DESTROYED, PERMANENT_DESTROYED); class RaftReplDevMetrics : public sisl::MetricsGroup { public: @@ -128,6 +128,7 @@ class RaftReplDev : public ReplDev, uint32_t get_blk_size() const override; repl_lsn_t get_last_commit_lsn() const { return m_commit_upto_lsn.load(); } bool is_destroy_pending() const; + bool is_destroyed() const; Clock::time_point destroyed_time() const { return m_destroyed_time; } //////////////// Accessor/shortcut methods /////////////////////// diff --git a/src/lib/replication/repl_dev/raft_state_machine.cpp b/src/lib/replication/repl_dev/raft_state_machine.cpp index 4f0d3f80a..a805b229b 100644 --- a/src/lib/replication/repl_dev/raft_state_machine.cpp +++ b/src/lib/replication/repl_dev/raft_state_machine.cpp @@ -184,8 +184,10 @@ raft_buf_ptr_t RaftStateMachine::pre_commit_ext(nuraft::state_machine::ext_op_pa raft_buf_ptr_t RaftStateMachine::commit_ext(nuraft::state_machine::ext_op_params const& params) { int64_t lsn = s_cast< int64_t >(params.log_idx); - + RD_LOGD("Raft channel: Received Commit message lsn {} store {} logdev {} size {}", lsn, + m_rd.m_data_journal->logstore_id(), m_rd.m_data_journal->logdev_id(), params.data->size()); repl_req_ptr_t rreq = lsn_to_req(lsn); + if (!rreq) { RD_LOGD("Raft channel got null rreq"); } RD_LOGD("Raft channel: Received Commit message rreq=[{}]", rreq->to_compact_string()); if (rreq->is_proposer()) { // This is the time to ensure flushing of journal happens in the proposer diff --git a/src/lib/replication/service/generic_repl_svc.h b/src/lib/replication/service/generic_repl_svc.h index 67894bdfe..e2d445427 100644 --- a/src/lib/replication/service/generic_repl_svc.h +++ b/src/lib/replication/service/generic_repl_svc.h @@ -56,7 +56,6 @@ class GenericReplService : public ReplicationService { hs_stats get_cap_stats() const override; replica_id_t get_my_repl_uuid() const { return m_my_uuid; } - // void resource_audit() override; protected: diff --git a/src/lib/replication/service/raft_repl_service.cpp b/src/lib/replication/service/raft_repl_service.cpp index a8291d8d1..c03fb9003 100644 --- a/src/lib/replication/service/raft_repl_service.cpp +++ b/src/lib/replication/service/raft_repl_service.cpp @@ -141,6 +141,9 @@ void RaftReplService::start() { // Step 8: Start a reaper thread which wakes up time-to-time and fetches pending data or cleans up old requests etc start_reaper_thread(); + + // Delete any unopened logstores. + hs()->logstore_service().delete_unopened_logdevs(); } void RaftReplService::stop() { diff --git a/src/lib/replication/service/raft_repl_service.h b/src/lib/replication/service/raft_repl_service.h index 4553adb59..fcd0b85de 100644 --- a/src/lib/replication/service/raft_repl_service.h +++ b/src/lib/replication/service/raft_repl_service.h @@ -74,7 +74,6 @@ class RaftReplService : public GenericReplService, private: void raft_group_config_found(sisl::byte_view const& buf, void* meta_cookie); - void start_reaper_thread(); void stop_reaper_thread(); void fetch_pending_data(); diff --git a/src/tests/test_log_dev.cpp b/src/tests/test_log_dev.cpp index bea956c7a..5a6a3c04b 100644 --- a/src/tests/test_log_dev.cpp +++ b/src/tests/test_log_dev.cpp @@ -436,6 +436,8 @@ TEST_F(LogDevTest, DeleteUnopenedLogDev) { LOGINFO("Restart homestore"); restart(); + // Explicitly call delete for unopened ones. + hs()->logstore_service().delete_unopened_logdevs(); auto log_devs = logstore_service().get_all_logdevs(); ASSERT_EQ(log_devs.size(), id_set.size() / 2); for (auto& logdev : log_devs) { diff --git a/src/tests/test_raft_repl_dev.cpp b/src/tests/test_raft_repl_dev.cpp index 6a791cbdf..97c118edd 100644 --- a/src/tests/test_raft_repl_dev.cpp +++ b/src/tests/test_raft_repl_dev.cpp @@ -36,6 +36,8 @@ #include "common/homestore_config.hpp" #include "common/homestore_assert.hpp" #include "common/homestore_utils.hpp" + +#define private public #include "test_common/hs_repl_test_common.hpp" #include "replication/service/raft_repl_service.h" #include "replication/repl_dev/raft_repl_dev.h" @@ -251,10 +253,21 @@ class RaftReplDevTest : public testing::Test { ASSERT_EQ(err, ReplServiceError::OK) << "Error in destroying the group"; }); } + + for (auto const& db : dbs_) { + auto repl_dev = std::dynamic_pointer_cast< RaftReplDev >(db->repl_dev()); + do { + std::this_thread::sleep_for(std::chrono::seconds(1)); + auto& raft_repl_svc = dynamic_cast< RaftReplService& >(hs()->repl_service()); + raft_repl_svc.gc_repl_devs(); + LOGINFO("Waiting for repl dev to get destroyed"); + } while (!repl_dev->is_destroyed()); + } } void generate_writes(uint64_t data_size, uint32_t max_size_per_iov, shared< TestReplicatedDB > db = nullptr) { if (db == nullptr) { db = pick_one_db(); } + LOGINFO("Writing on group_id={}", db->repl_dev()->group_id()); db->db_write(data_size, max_size_per_iov); } @@ -563,9 +576,12 @@ TEST_F(RaftReplDevTest, Leader_Restart) { // Step 1: Fill up entries on all replicas uint64_t entries_per_attempt = SISL_OPTIONS["num_io"].as< uint64_t >(); this->write_on_leader(entries_per_attempt, true /* wait for commit on all replicas */); + std::this_thread::sleep_for(std::chrono::seconds(3)); // Step 2: Restart replica-0 (Leader) with a very long delay so that it is lagging behind - this->restart_replica(0, 10 /* shutdown_delay_sec */); + LOGINFO("Restart leader"); + this->restart_replica(0, 15 /* shutdown_delay_sec */); + std::this_thread::sleep_for(std::chrono::seconds(3)); // Step 3: While the original leader is down, write entries into the new leader LOGINFO("After original leader is shutdown, insert more entries into the new leader"); @@ -647,14 +663,16 @@ TEST_F(RaftReplDevTest, RemoveReplDev) { this->write_on_leader(entries_per_attempt, false /* wait for commit on all */, dbs_.back()); std::this_thread::sleep_for(std::chrono::milliseconds(2)); this->remove_db(dbs_.back(), true /* wait_for_removal */); + std::this_thread::sleep_for(std::chrono::seconds(2)); // Step 3: Shutdown one of the follower and remove another repl_dev, once the follower is up, it should remove the // repl_dev and proceed LOGINFO("Shutdown one of the followers (replica=1) and then remove dbs on other members. Expect replica=1 to " "remove after it is up"); this->restart_replica(1, 15 /* shutdown_delay_sec */); + LOGINFO("After restart replica 1 {}", dbs_.size()); this->remove_db(dbs_.back(), true /* wait_for_removal */); - + LOGINFO("Remove last db {}", dbs_.size()); // TODO: Once generic crash flip/test_infra is available, use flip to crash during removal and restart them to see // if records are being removed g_helper->sync_for_cleanup_start(); @@ -682,9 +700,11 @@ TEST_F(RaftReplDevTest, GCReplReqs) { } this->write_on_leader(100 /* num_entries */, true /* wait_for_commit */); + std::this_thread::sleep_for(std::chrono::seconds(2)); // Step 2: Restart replica-0 (Leader) this->restart_replica(0, 10); + std::this_thread::sleep_for(std::chrono::seconds(2)); LOGINFO("After original leader is shutdown, insert more entries into the new leader"); this->write_on_leader(100, true /* wait for commit on all replicas */); @@ -726,6 +746,7 @@ int main(int argc, char* argv[]) { // HS_SETTINGS_FACTORY().modifiable_settings([](auto& s) { s.consensus.leadership_expiry_ms = -1; // -1 means never expires; + s.generic.repl_dev_cleanup_interval_sec = 0; // only reset when user specified the value for test; if (SISL_OPTIONS.count("snapshot_distance")) {