diff --git a/conanfile.py b/conanfile.py index e23c7f9d7..de79625ed 100644 --- a/conanfile.py +++ b/conanfile.py @@ -5,7 +5,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "6.4.17" + version = "6.4.18" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" topics = ("ebay", "nublox") diff --git a/src/include/homestore/logstore_service.hpp b/src/include/homestore/logstore_service.hpp index 24af7ef2b..bd5347dd6 100644 --- a/src/include/homestore/logstore_service.hpp +++ b/src/include/homestore/logstore_service.hpp @@ -29,6 +29,7 @@ #include #include +#include namespace homestore { @@ -50,6 +51,17 @@ struct vdev_info; struct log_dump_req; struct logdev_superblk; +static constexpr uint64_t logstore_service_sb_magic{0xb0b0c01b}; +static constexpr uint32_t logstore_service_sb_version{0x1}; + +#pragma pack(1) +struct logstore_service_super_block { + uint64_t magic{logstore_service_sb_magic}; + uint32_t version{logstore_service_sb_version}; + uint32_t m_last_logdev_id{0}; +}; +#pragma pack() + class LogStoreService { friend class HomeLogStore; friend class LogDev; @@ -173,9 +185,12 @@ class LogStoreService { */ iomgr::io_fiber_t truncate_thread() { return m_truncate_fiber; } + void delete_unopened_logdevs(); + private: std::shared_ptr< LogDev > create_new_logdev_internal(logdev_id_t logdev_id); - void delete_unopened_logdevs(); + void on_meta_blk_found(const sisl::byte_view& buf, void* meta_cookie); + logdev_id_t get_next_logdev_id(); void logdev_super_blk_found(const sisl::byte_view& buf, void* meta_cookie); void rollback_super_blk_found(const sisl::byte_view& buf, void* meta_cookie); void start_threads(); @@ -183,7 +198,6 @@ class LogStoreService { private: std::unordered_map< logdev_id_t, std::shared_ptr< LogDev > > m_id_logdev_map; - std::unique_ptr< sisl::IDReserver > m_id_reserver; folly::SharedMutexWritePriority m_logdev_map_mtx; std::shared_ptr< JournalVirtualDev > m_logdev_vdev; @@ -191,6 +205,7 @@ class LogStoreService { iomgr::io_fiber_t m_flush_fiber; LogStoreServiceMetrics m_metrics; std::unordered_set< logdev_id_t > m_unopened_logdev; + superblk< logstore_service_super_block > m_sb; }; extern LogStoreService& logstore_service(); diff --git a/src/lib/device/journal_vdev.cpp b/src/lib/device/journal_vdev.cpp index 07ae01d3b..d82feb5bf 100644 --- a/src/lib/device/journal_vdev.cpp +++ b/src/lib/device/journal_vdev.cpp @@ -208,6 +208,10 @@ void JournalVirtualDev::destroy(logdev_id_t logdev_id) { void JournalVirtualDev::Descriptor::append_chunk() { // Get a new chunk from the pool. auto new_chunk = m_vdev.m_chunk_pool->dequeue(); +#if 0 + auto* pdev = new_chunk->physical_dev_mutable(); + pdev->sync_write_zero(new_chunk->size(), new_chunk->start_offset()); +#endif // Increase the right window and total size. m_total_size += new_chunk->size(); diff --git a/src/lib/logstore/log_store_service.cpp b/src/lib/logstore/log_store_service.cpp index 46e55655f..5d9e0049f 100644 --- a/src/lib/logstore/log_store_service.cpp +++ b/src/lib/logstore/log_store_service.cpp @@ -37,8 +37,7 @@ SISL_LOGGING_DECL(logstore) LogStoreService& logstore_service() { return hs()->logstore_service(); } /////////////////////////////////////// LogStoreService Section /////////////////////////////////////// -LogStoreService::LogStoreService() { - m_id_reserver = std::make_unique< sisl::IDReserver >(); +LogStoreService::LogStoreService() : m_sb{"LogStoreServiceSB"} { meta_service().register_handler( logdev_sb_meta_name, [this](meta_blk* mblk, sisl::byte_view buf, size_t size) { @@ -52,6 +51,17 @@ LogStoreService::LogStoreService() { rollback_super_blk_found(std::move(buf), voidptr_cast(mblk)); }, nullptr, true, std::optional< meta_subtype_vec_t >({logdev_sb_meta_name})); + + meta_service().register_handler( + "LogStoreServiceSB", + [this](meta_blk* mblk, sisl::byte_view buf, size_t size) { on_meta_blk_found(std::move(buf), (void*)mblk); }, + nullptr); +} + +void LogStoreService::on_meta_blk_found(const sisl::byte_view& buf, void* meta_cookie) { + m_sb.load(buf, meta_cookie); + HS_REL_ASSERT_EQ(m_sb->magic, logstore_service_sb_magic, "Invalid log service metablk, magic mismatch"); + HS_REL_ASSERT_EQ(m_sb->version, logstore_service_sb_version, "Invalid version of log service metablk"); } folly::Future< std::error_code > LogStoreService::create_vdev(uint64_t size, HSDevType devType, uint32_t chunk_size) { @@ -96,8 +106,10 @@ std::shared_ptr< VirtualDev > LogStoreService::open_vdev(const vdev_info& vinfo, void LogStoreService::start(bool format) { // hs()->status_mgr()->register_status_cb("LogStore", bind_this(LogStoreService::get_status, 1)); - - delete_unopened_logdevs(); + if (format) { + m_sb.create(sizeof(logstore_service_super_block)); + m_sb.write(); + } // Create an truncate thread loop which handles truncation which does sync IO start_threads(); @@ -116,12 +128,17 @@ void LogStoreService::stop() { folly::SharedMutexWritePriority::WriteHolder holder(m_logdev_map_mtx); m_id_logdev_map.clear(); } - m_id_reserver.reset(); +} + +logdev_id_t LogStoreService::get_next_logdev_id() { + auto id = ++(m_sb->m_last_logdev_id); + m_sb.write(); + return id; } logdev_id_t LogStoreService::create_new_logdev() { folly::SharedMutexWritePriority::WriteHolder holder(m_logdev_map_mtx); - logdev_id_t logdev_id = m_id_reserver->reserve(); + logdev_id_t logdev_id = get_next_logdev_id(); auto logdev = create_new_logdev_internal(logdev_id); logdev->start(true /* format */); COUNTER_INCREMENT(m_metrics, logdevs_count, 1); @@ -132,7 +149,10 @@ logdev_id_t LogStoreService::create_new_logdev() { void LogStoreService::destroy_log_dev(logdev_id_t logdev_id) { folly::SharedMutexWritePriority::WriteHolder holder(m_logdev_map_mtx); const auto it = m_id_logdev_map.find(logdev_id); - if (it == m_id_logdev_map.end()) { return; } + if (it == m_id_logdev_map.end()) { + LOGERROR("Logdev not found to destroy {}", logdev_id); + return; + } // Stop the logdev and release all the chunks from the journal vdev. auto& logdev = it->second; @@ -172,7 +192,6 @@ void LogStoreService::open_logdev(logdev_id_t logdev_id) { folly::SharedMutexWritePriority::WriteHolder holder(m_logdev_map_mtx); const auto it = m_id_logdev_map.find(logdev_id); if (it == m_id_logdev_map.end()) { - m_id_reserver->reserve(logdev_id); auto logdev = std::make_shared< LogDev >(logdev_id, m_logdev_vdev.get()); m_id_logdev_map.emplace(logdev_id, logdev); LOGDEBUGMOD(logstore, "log_dev={} does not exist, created!", logdev_id); @@ -206,11 +225,11 @@ void LogStoreService::logdev_super_blk_found(const sisl::byte_view& buf, void* m folly::SharedMutexWritePriority::WriteHolder holder(m_logdev_map_mtx); std::shared_ptr< LogDev > logdev; auto id = sb->logdev_id; - HS_LOG(DEBUG, logstore, "Log dev superblk found logdev={}", id); const auto it = m_id_logdev_map.find(id); // We could update the logdev map either with logdev or rollback superblks found callbacks. if (it != m_id_logdev_map.end()) { logdev = it->second; + HS_LOG(DEBUG, logstore, "Log dev superblk found log_dev={}", id); } else { logdev = std::make_shared< LogDev >(id, m_logdev_vdev.get()); m_id_logdev_map.emplace(id, logdev); @@ -219,6 +238,7 @@ void LogStoreService::logdev_super_blk_found(const sisl::byte_view& buf, void* m // m_unopened_logdev. so that when we start log service, all the left items in m_unopened_logdev are those // not open, which can be destroyed m_unopened_logdev.insert(id); + HS_LOG(DEBUG, logstore, "Log dev superblk found log_dev={} added to unopened list", id); } logdev->log_dev_meta().logdev_super_blk_found(buf, meta_cookie); diff --git a/src/lib/logstore/log_stream.cpp b/src/lib/logstore/log_stream.cpp index 2a00cd31c..dd5eb5660 100644 --- a/src/lib/logstore/log_stream.cpp +++ b/src/lib/logstore/log_stream.cpp @@ -70,6 +70,16 @@ sisl::byte_view log_stream_reader::next_group(off_t* out_dev_offset) { return ret_buf; } + if (header->logdev_id != m_vdev_jd->logdev_id()) { + LOGINFOMOD(logstore, "Entries found for different logdev {} at pos {}, must have come to end of log_dev={}", + header->logdev_id, m_vdev_jd->dev_offset(m_cur_read_bytes), m_vdev_jd->logdev_id()); + *out_dev_offset = m_vdev_jd->dev_offset(m_cur_read_bytes); + // move it by dma boundary if header is not valid + m_prev_crc = 0; + m_cur_read_bytes += m_read_size_multiple; + return ret_buf; + } + // Because reuse chunks without cleaning up, we could get chunks used by other logdev's // and it can happen that log group headers couldnt match. In that case check we dont error // if its the last chunk or not with is_offset_at_last_chunk else raise assert. diff --git a/src/lib/replication/log_store/repl_log_store.cpp b/src/lib/replication/log_store/repl_log_store.cpp index 79a725bc4..c678a90d4 100644 --- a/src/lib/replication/log_store/repl_log_store.cpp +++ b/src/lib/replication/log_store/repl_log_store.cpp @@ -8,7 +8,12 @@ namespace homestore { uint64_t ReplLogStore::append(nuraft::ptr< nuraft::log_entry >& entry) { // We don't want to transform anything that is not an app log - if (entry->get_val_type() != nuraft::log_val_type::app_log) { return HomeRaftLogStore::append(entry); } + if (entry->get_val_type() != nuraft::log_val_type::app_log) { + ulong lsn = HomeRaftLogStore::append(entry); + RD_LOGD("append entry term={}, log_val_type={} lsn={} size={}", entry->get_term(), + static_cast< uint32_t >(entry->get_val_type()), lsn, entry->get_buf().size()); + return lsn; + } repl_req_ptr_t rreq = m_sm.localize_journal_entry_finish(*entry); ulong lsn = HomeRaftLogStore::append(entry); diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index 25f54a3c8..d902bce52 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -124,6 +124,7 @@ folly::SemiFuture< ReplServiceError > RaftReplDev::destroy_group() { if (err != ReplServiceError::OK) { m_stage.update([](auto* stage) { *stage = repl_dev_stage_t::ACTIVE; }); return folly::makeSemiFuture< ReplServiceError >(std::move(err)); + LOGERROR("RaftReplDev::destroy_group failed {}", err); } return m_destroy_promise.getSemiFuture(); @@ -896,6 +897,7 @@ nuraft::ptr< nuraft::log_store > RaftReplDev::load_log_store() { return m_data_j int32_t RaftReplDev::server_id() { return m_raft_server_id; } bool RaftReplDev::is_destroy_pending() const { return (m_rd_sb->destroy_pending == 0x1); } +bool RaftReplDev::is_destroyed() const { return (*m_stage.access().get() == repl_dev_stage_t::PERMANENT_DESTROYED); } /////////////////////////////////// nuraft_mesg::mesg_state_mgr overrides //////////////////////////////////// void RaftReplDev::become_ready() { @@ -907,10 +909,12 @@ uint32_t RaftReplDev::get_logstore_id() const { return m_data_journal->logstore_ std::shared_ptr< nuraft::state_machine > RaftReplDev::get_state_machine() { return m_state_machine; } void RaftReplDev::permanent_destroy() { + RD_LOGI("Permanent destroy for raft repl dev"); m_rd_sb.destroy(); m_raft_config_sb.destroy(); m_data_journal->remove_store(); logstore_service().destroy_log_dev(m_data_journal->logdev_id()); + m_stage.update([](auto* stage) { *stage = repl_dev_stage_t::PERMANENT_DESTROYED; }); } void RaftReplDev::leave() { @@ -927,6 +931,7 @@ void RaftReplDev::leave() { m_rd_sb->destroy_pending = 0x1; m_rd_sb.write(); + RD_LOGI("RaftReplDev leave group"); m_destroy_promise.setValue(ReplServiceError::OK); // In case proposer is waiting for the destroy to complete } diff --git a/src/lib/replication/repl_dev/raft_repl_dev.h b/src/lib/replication/repl_dev/raft_repl_dev.h index 1395dfeb4..a8c308c1d 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.h +++ b/src/lib/replication/repl_dev/raft_repl_dev.h @@ -32,7 +32,7 @@ struct raft_repl_dev_superblk : public repl_dev_superblk { using raft_buf_ptr_t = nuraft::ptr< nuraft::buffer >; -ENUM(repl_dev_stage_t, uint8_t, INIT, ACTIVE, DESTROYING, DESTROYED); +ENUM(repl_dev_stage_t, uint8_t, INIT, ACTIVE, DESTROYING, DESTROYED, PERMANENT_DESTROYED); class RaftReplDevMetrics : public sisl::MetricsGroup { public: @@ -128,6 +128,7 @@ class RaftReplDev : public ReplDev, uint32_t get_blk_size() const override; repl_lsn_t get_last_commit_lsn() const { return m_commit_upto_lsn.load(); } bool is_destroy_pending() const; + bool is_destroyed() const; Clock::time_point destroyed_time() const { return m_destroyed_time; } //////////////// Accessor/shortcut methods /////////////////////// diff --git a/src/lib/replication/repl_dev/raft_state_machine.cpp b/src/lib/replication/repl_dev/raft_state_machine.cpp index 4f0d3f80a..a805b229b 100644 --- a/src/lib/replication/repl_dev/raft_state_machine.cpp +++ b/src/lib/replication/repl_dev/raft_state_machine.cpp @@ -184,8 +184,10 @@ raft_buf_ptr_t RaftStateMachine::pre_commit_ext(nuraft::state_machine::ext_op_pa raft_buf_ptr_t RaftStateMachine::commit_ext(nuraft::state_machine::ext_op_params const& params) { int64_t lsn = s_cast< int64_t >(params.log_idx); - + RD_LOGD("Raft channel: Received Commit message lsn {} store {} logdev {} size {}", lsn, + m_rd.m_data_journal->logstore_id(), m_rd.m_data_journal->logdev_id(), params.data->size()); repl_req_ptr_t rreq = lsn_to_req(lsn); + if (!rreq) { RD_LOGD("Raft channel got null rreq"); } RD_LOGD("Raft channel: Received Commit message rreq=[{}]", rreq->to_compact_string()); if (rreq->is_proposer()) { // This is the time to ensure flushing of journal happens in the proposer diff --git a/src/lib/replication/service/generic_repl_svc.h b/src/lib/replication/service/generic_repl_svc.h index 67894bdfe..e2d445427 100644 --- a/src/lib/replication/service/generic_repl_svc.h +++ b/src/lib/replication/service/generic_repl_svc.h @@ -56,7 +56,6 @@ class GenericReplService : public ReplicationService { hs_stats get_cap_stats() const override; replica_id_t get_my_repl_uuid() const { return m_my_uuid; } - // void resource_audit() override; protected: diff --git a/src/lib/replication/service/raft_repl_service.cpp b/src/lib/replication/service/raft_repl_service.cpp index a8291d8d1..c03fb9003 100644 --- a/src/lib/replication/service/raft_repl_service.cpp +++ b/src/lib/replication/service/raft_repl_service.cpp @@ -141,6 +141,9 @@ void RaftReplService::start() { // Step 8: Start a reaper thread which wakes up time-to-time and fetches pending data or cleans up old requests etc start_reaper_thread(); + + // Delete any unopened logstores. + hs()->logstore_service().delete_unopened_logdevs(); } void RaftReplService::stop() { diff --git a/src/lib/replication/service/raft_repl_service.h b/src/lib/replication/service/raft_repl_service.h index 4553adb59..fcd0b85de 100644 --- a/src/lib/replication/service/raft_repl_service.h +++ b/src/lib/replication/service/raft_repl_service.h @@ -74,7 +74,6 @@ class RaftReplService : public GenericReplService, private: void raft_group_config_found(sisl::byte_view const& buf, void* meta_cookie); - void start_reaper_thread(); void stop_reaper_thread(); void fetch_pending_data(); diff --git a/src/tests/test_log_dev.cpp b/src/tests/test_log_dev.cpp index bea956c7a..5a6a3c04b 100644 --- a/src/tests/test_log_dev.cpp +++ b/src/tests/test_log_dev.cpp @@ -436,6 +436,8 @@ TEST_F(LogDevTest, DeleteUnopenedLogDev) { LOGINFO("Restart homestore"); restart(); + // Explicitly call delete for unopened ones. + hs()->logstore_service().delete_unopened_logdevs(); auto log_devs = logstore_service().get_all_logdevs(); ASSERT_EQ(log_devs.size(), id_set.size() / 2); for (auto& logdev : log_devs) { diff --git a/src/tests/test_raft_repl_dev.cpp b/src/tests/test_raft_repl_dev.cpp index 6a791cbdf..97c118edd 100644 --- a/src/tests/test_raft_repl_dev.cpp +++ b/src/tests/test_raft_repl_dev.cpp @@ -36,6 +36,8 @@ #include "common/homestore_config.hpp" #include "common/homestore_assert.hpp" #include "common/homestore_utils.hpp" + +#define private public #include "test_common/hs_repl_test_common.hpp" #include "replication/service/raft_repl_service.h" #include "replication/repl_dev/raft_repl_dev.h" @@ -251,10 +253,21 @@ class RaftReplDevTest : public testing::Test { ASSERT_EQ(err, ReplServiceError::OK) << "Error in destroying the group"; }); } + + for (auto const& db : dbs_) { + auto repl_dev = std::dynamic_pointer_cast< RaftReplDev >(db->repl_dev()); + do { + std::this_thread::sleep_for(std::chrono::seconds(1)); + auto& raft_repl_svc = dynamic_cast< RaftReplService& >(hs()->repl_service()); + raft_repl_svc.gc_repl_devs(); + LOGINFO("Waiting for repl dev to get destroyed"); + } while (!repl_dev->is_destroyed()); + } } void generate_writes(uint64_t data_size, uint32_t max_size_per_iov, shared< TestReplicatedDB > db = nullptr) { if (db == nullptr) { db = pick_one_db(); } + LOGINFO("Writing on group_id={}", db->repl_dev()->group_id()); db->db_write(data_size, max_size_per_iov); } @@ -563,9 +576,12 @@ TEST_F(RaftReplDevTest, Leader_Restart) { // Step 1: Fill up entries on all replicas uint64_t entries_per_attempt = SISL_OPTIONS["num_io"].as< uint64_t >(); this->write_on_leader(entries_per_attempt, true /* wait for commit on all replicas */); + std::this_thread::sleep_for(std::chrono::seconds(3)); // Step 2: Restart replica-0 (Leader) with a very long delay so that it is lagging behind - this->restart_replica(0, 10 /* shutdown_delay_sec */); + LOGINFO("Restart leader"); + this->restart_replica(0, 15 /* shutdown_delay_sec */); + std::this_thread::sleep_for(std::chrono::seconds(3)); // Step 3: While the original leader is down, write entries into the new leader LOGINFO("After original leader is shutdown, insert more entries into the new leader"); @@ -647,14 +663,16 @@ TEST_F(RaftReplDevTest, RemoveReplDev) { this->write_on_leader(entries_per_attempt, false /* wait for commit on all */, dbs_.back()); std::this_thread::sleep_for(std::chrono::milliseconds(2)); this->remove_db(dbs_.back(), true /* wait_for_removal */); + std::this_thread::sleep_for(std::chrono::seconds(2)); // Step 3: Shutdown one of the follower and remove another repl_dev, once the follower is up, it should remove the // repl_dev and proceed LOGINFO("Shutdown one of the followers (replica=1) and then remove dbs on other members. Expect replica=1 to " "remove after it is up"); this->restart_replica(1, 15 /* shutdown_delay_sec */); + LOGINFO("After restart replica 1 {}", dbs_.size()); this->remove_db(dbs_.back(), true /* wait_for_removal */); - + LOGINFO("Remove last db {}", dbs_.size()); // TODO: Once generic crash flip/test_infra is available, use flip to crash during removal and restart them to see // if records are being removed g_helper->sync_for_cleanup_start(); @@ -682,9 +700,11 @@ TEST_F(RaftReplDevTest, GCReplReqs) { } this->write_on_leader(100 /* num_entries */, true /* wait_for_commit */); + std::this_thread::sleep_for(std::chrono::seconds(2)); // Step 2: Restart replica-0 (Leader) this->restart_replica(0, 10); + std::this_thread::sleep_for(std::chrono::seconds(2)); LOGINFO("After original leader is shutdown, insert more entries into the new leader"); this->write_on_leader(100, true /* wait for commit on all replicas */); @@ -726,6 +746,7 @@ int main(int argc, char* argv[]) { // HS_SETTINGS_FACTORY().modifiable_settings([](auto& s) { s.consensus.leadership_expiry_ms = -1; // -1 means never expires; + s.generic.repl_dev_cleanup_interval_sec = 0; // only reset when user specified the value for test; if (SISL_OPTIONS.count("snapshot_distance")) {