diff --git a/src/include/homestore/checkpoint/cp_mgr.hpp b/src/include/homestore/checkpoint/cp_mgr.hpp index 9c783eb84..c3324711e 100644 --- a/src/include/homestore/checkpoint/cp_mgr.hpp +++ b/src/include/homestore/checkpoint/cp_mgr.hpp @@ -160,7 +160,13 @@ class CPManager { CPManager(); virtual ~CPManager(); + /// @brief Start the CPManager, which creates a first cp session. + /// @param first_time_boot void start(bool first_time_boot); + + /// @brief Start the cp timer so that periodic cps are started + void start_timer(); + /// @brief Shutdown the checkpoint manager services. It will not trigger a flush, but cancels any existing /// checkpoint session abruptly. If caller needs clean shutdown, then they explicitly needs to trigger cp flush /// before calling shutdown. diff --git a/src/include/homestore/logstore_service.hpp b/src/include/homestore/logstore_service.hpp index c10423e27..6d8fc09f9 100644 --- a/src/include/homestore/logstore_service.hpp +++ b/src/include/homestore/logstore_service.hpp @@ -92,8 +92,7 @@ class LogStoreService { * * @return std::shared_ptr< HomeLogStore > */ - std::shared_ptr< HomeLogStore > create_new_log_store(const logstore_family_id_t family_id, - const bool append_mode = false); + shared< HomeLogStore > create_new_log_store(logstore_family_id_t family_id, bool append_mode = false); /** * @brief Open an existing log store and does a recovery. It then creates an instance of this logstore and @@ -102,8 +101,8 @@ class LogStoreService { * @param store_id: Store ID of the log store to open * @return std::shared_ptr< HomeLogStore > */ - void open_log_store(const logstore_family_id_t family_id, const logstore_id_t store_id, const bool append_mode, - const log_store_opened_cb_t& on_open_cb); + folly::Future< shared< HomeLogStore > > open_log_store(logstore_family_id_t family_id, logstore_id_t store_id, + bool append_mode); /** * @brief Close the log store instance and free-up the resources diff --git a/src/include/homestore/replication/repl_decls.h b/src/include/homestore/replication/repl_decls.h index 9f9fee69f..fd347044f 100644 --- a/src/include/homestore/replication/repl_decls.h +++ b/src/include/homestore/replication/repl_decls.h @@ -3,6 +3,8 @@ #include #include +#include + #include #include #include @@ -13,6 +15,39 @@ SISL_LOGGING_DECL(replication) #define REPL_LOG_MODS grpc_server, HOMESTORE_LOG_MODS, nuraft_mesg, nuraft, replication namespace homestore { +// clang-format off +VENUM(ReplServiceError, int32_t, + OK = 0, // Everything OK + CANCELLED = -1, // Request was cancelled + TIMEOUT = -2, + NOT_LEADER = -3, + BAD_REQUEST = -4, + SERVER_ALREADY_EXISTS = -5, + CONFIG_CHANGING = -6, + SERVER_IS_JOINING = -7, + SERVER_NOT_FOUND = -8, + CANNOT_REMOVE_LEADER = -9, + SERVER_IS_LEAVING = -10, + TERM_MISMATCH = -11, + RESULT_NOT_EXIST_YET = -10000, + NOT_IMPLEMENTED = -10001, + NO_SPACE_LEFT = -20000, + DRIVE_WRITE_ERROR = -20001, + FAILED = -32768); +// clang-format on + +template < typename V, typename E > +using Result = folly::Expected< V, E >; + +template < class V > +using ReplResult = Result< V, ReplServiceError >; + +template < class V, class E > +using AsyncResult = folly::SemiFuture< Result< V, E > >; + +template < class V = folly::Unit > +using AsyncReplResult = AsyncResult< V, ReplServiceError >; + using blkid_list_t = folly::small_vector< BlkId, 4 >; // Fully qualified domain pba, unique pba id across replica set diff --git a/src/include/homestore/replication/repl_dev.h b/src/include/homestore/replication/repl_dev.h index 27727c943..28b494f0f 100644 --- a/src/include/homestore/replication/repl_dev.h +++ b/src/include/homestore/replication/repl_dev.h @@ -29,7 +29,8 @@ VENUM(repl_req_state_t, uint32_t, DATA_RECEIVED = 1 << 1, // Data has been received and being written to the storage DATA_WRITTEN = 1 << 2, // Data has been written to the storage LOG_RECEIVED = 1 << 3, // Log is received and waiting for data - LOG_FLUSHED = 1 << 4 // Log has been flushed + LOG_FLUSHED = 1 << 4, // Log has been flushed + ERRORED = 1 << 5 // Error has happened and cleaned up ) struct repl_key { @@ -149,12 +150,25 @@ class ReplDevListener { /// NOTE: Listener should do the free any resources created as part of pre-commit. /// /// @param lsn - The log sequence number getting rolled back - /// @param header - Header originally passed with repl_dev::write() api - /// @param key - Key originally passed with repl_dev::write() api - /// @param ctx - Context passed as part of the replica_set::write() api + /// @param header - Header originally passed with ReplDev::async_alloc_write() api + /// @param key - Key originally passed with ReplDev::async_alloc_write() api + /// @param ctx - Context passed as part of the ReplDev::async_alloc_write() api virtual void on_rollback(int64_t lsn, const sisl::blob& header, const sisl::blob& key, cintrusive< repl_req_ctx >& ctx) = 0; + /// @brief Called when the async_alloc_write call failed to initiate replication + /// + /// Called only on the node which called async_alloc_write + /// + /// + /// NOTE: Listener should do the free any resources created as part of pre-commit. + /// + /// @param header - Header originally passed with ReplDev::async_alloc_write() api + /// @param key - Key originally passed with ReplDev::async_alloc_write() api + /// @param ctx - Context passed as part of the ReplDev::async_alloc_write() api + virtual void on_error(ReplServiceError error, const sisl::blob& header, const sisl::blob& key, + cintrusive< repl_req_ctx >& ctx) = 0; + /// @brief Called when replication module is trying to allocate a block to write the value /// /// This function can be called both on leader and follower when it is trying to allocate a block to write the @@ -176,7 +190,7 @@ class ReplDevListener { class ReplDev { public: ReplDev() = default; - virtual ~ReplDev() = default; + virtual ~ReplDev() { detach_listener(); } /// @brief Replicate the data to the replica set. This method goes through the /// following steps: @@ -217,6 +231,10 @@ class ReplDev { /// @param blkids - blkids to be freed. virtual void async_free_blks(int64_t lsn, MultiBlkId const& blkid) = 0; + /// @brief Try to switch the current replica where this method called to become a leader. + /// @return True if it is successful, false otherwise. + virtual AsyncReplResult<> become_leader() = 0; + /// @brief Checks if this replica is the leader in this ReplDev /// @return true or false virtual bool is_leader() const = 0; @@ -231,6 +249,13 @@ class ReplDev { virtual void attach_listener(shared< ReplDevListener > listener) { m_listener = std::move(listener); } + virtual void detach_listener() { + if (m_listener) { + m_listener->set_repl_dev(nullptr); + m_listener.reset(); + } + } + protected: shared< ReplDevListener > m_listener; }; diff --git a/src/include/homestore/replication_service.hpp b/src/include/homestore/replication_service.hpp index 72d24d626..d24722202 100644 --- a/src/include/homestore/replication_service.hpp +++ b/src/include/homestore/replication_service.hpp @@ -4,48 +4,16 @@ #include #include -#include #include #include #include namespace homestore { -// clang-format off -VENUM(ReplServiceError, int32_t, - OK = 0, // Everything OK - CANCELLED = -1, // Request was cancelled - TIMEOUT = -2, - NOT_LEADER = -3, - BAD_REQUEST = -4, - SERVER_ALREADY_EXISTS = -5, - CONFIG_CHANGING = -6, - SERVER_IS_JOINING = -7, - SERVER_NOT_FOUND = -8, - CANNOT_REMOVE_LEADER = -9, - SERVER_IS_LEAVING = -10, - TERM_MISMATCH = -11, - RESULT_NOT_EXIST_YET = -10000, - NOT_IMPLEMENTED = -10001, - FAILED = -32768); -// clang-format on - class ReplDev; class ReplDevListener; struct hs_stats; -template < typename V, typename E > -using Result = folly::Expected< V, E >; - -template < class V > -using ReplResult = Result< V, ReplServiceError >; - -template < class V, class E > -using AsyncResult = folly::SemiFuture< Result< V, E > >; - -template < class V = folly::Unit > -using AsyncReplResult = AsyncResult< V, ReplServiceError >; - VENUM(repl_impl_type, uint8_t, server_side, // Completely homestore controlled replication client_assisted, // Client assisting in replication diff --git a/src/include/homestore/superblk_handler.hpp b/src/include/homestore/superblk_handler.hpp index 699edc5b6..bf07efca2 100644 --- a/src/include/homestore/superblk_handler.hpp +++ b/src/include/homestore/superblk_handler.hpp @@ -31,42 +31,42 @@ class superblk { return ++s_count; } - superblk(const std::string& meta_name = "") { set_name(meta_name); } + superblk(const std::string& sub_name = "") { set_name(sub_name); } superblk(const superblk&) = delete; superblk& operator=(const superblk&) = delete; - superblk(superblk&& rhs) noexcept - : m_meta_mgr_cookie(rhs.m_meta_mgr_cookie) - , m_raw_buf(std::move(rhs.m_raw_buf)) - , m_sb(rhs.m_sb) - , m_metablk_name(std::move(rhs.m_metablk_name)) { - rhs.m_meta_mgr_cookie = nullptr; - rhs.m_sb = nullptr; + superblk(superblk&& rhs) noexcept : + m_meta_blk(rhs.m_meta_blk), + m_raw_buf(std::move(rhs.m_raw_buf)), + m_sb(rhs.m_sb), + m_meta_sub_name(std::move(rhs.m_meta_sub_name)) { + rhs.m_meta_blk = nullptr; + rhs.m_sb = nullptr; } superblk& operator=(superblk&& rhs) noexcept { if (this != &rhs) { - m_meta_mgr_cookie = rhs.m_meta_mgr_cookie; + m_meta_blk = rhs.m_meta_blk; m_raw_buf = std::move(rhs.m_raw_buf); m_sb = rhs.m_sb; - m_metablk_name = std::move(rhs.m_metablk_name); - rhs.m_meta_mgr_cookie = nullptr; + m_meta_sub_name = std::move(rhs.m_meta_sub_name); + rhs.m_meta_blk = nullptr; rhs.m_sb = nullptr; - } - return *this; + } + return *this; } - void set_name(const std::string& meta_name) { - if (meta_name.empty()) { - m_metablk_name = "meta_blk_" + std::to_string(next_count()); + void set_name(const std::string& sub_name) { + if (sub_name.empty()) { + m_meta_sub_name = "meta_blk_" + std::to_string(next_count()); } else { - m_metablk_name = meta_name; + m_meta_sub_name = sub_name; } } - T* load(const sisl::byte_view& buf, void* meta_cookie) { - m_meta_mgr_cookie = voidptr_cast(meta_cookie); + T* load(const sisl::byte_view& buf, void* meta_blk) { + m_meta_blk = voidptr_cast(meta_blk); m_raw_buf = meta_service().is_aligned_buf_needed(buf.size()) ? buf.extract(meta_service().align_size()) : buf.extract(0); m_sb = r_cast< T* >(m_raw_buf->bytes()); @@ -85,9 +85,9 @@ class superblk { } void destroy() { - if (m_meta_mgr_cookie) { - meta_service().remove_sub_sb(m_meta_mgr_cookie); - m_meta_mgr_cookie = nullptr; + if (m_meta_blk) { + meta_service().remove_sub_sb(m_meta_blk); + m_meta_blk = nullptr; } m_raw_buf.reset(); m_sb = nullptr; @@ -97,10 +97,10 @@ class superblk { sisl::byte_array raw_buf() { return m_raw_buf; } void write() { - if (m_meta_mgr_cookie) { - meta_service().update_sub_sb(m_raw_buf->cbytes(), m_raw_buf->size(), m_meta_mgr_cookie); + if (m_meta_blk) { + meta_service().update_sub_sb(m_raw_buf->cbytes(), m_raw_buf->size(), m_meta_blk); } else { - meta_service().add_sub_sb(m_metablk_name, m_raw_buf->cbytes(), m_raw_buf->size(), m_meta_mgr_cookie); + meta_service().add_sub_sb(m_meta_sub_name, m_raw_buf->cbytes(), m_raw_buf->size(), m_meta_blk); } } @@ -111,17 +111,17 @@ class superblk { T& operator*() { return *m_sb; } private: - void* m_meta_mgr_cookie{nullptr}; + void* m_meta_blk{nullptr}; sisl::byte_array m_raw_buf; T* m_sb{nullptr}; - std::string m_metablk_name; + std::string m_meta_sub_name; }; class json_superblk { private: - void* m_meta_mgr_cookie{nullptr}; + void* m_meta_blk{nullptr}; nlohmann::json m_json_sb; - std::string m_metablk_name; + std::string m_meta_sub_name; public: static uint64_t next_count() { @@ -129,24 +129,24 @@ class json_superblk { return ++s_count; } - json_superblk(const std::string& meta_name = "") { set_name(meta_name); } + json_superblk(const std::string& sub_name = "") { set_name(sub_name); } - void set_name(const std::string& meta_name) { - if (meta_name.empty()) { - m_metablk_name = "meta_blk_" + std::to_string(next_count()); + void set_name(const std::string& sub_name) { + if (sub_name.empty()) { + m_meta_sub_name = "meta_blk_" + std::to_string(next_count()); } else { - m_metablk_name = meta_name; + m_meta_sub_name = sub_name; } } - nlohmann::json& load(const sisl::byte_view& buf, void* meta_cookie) { - m_meta_mgr_cookie = voidptr_cast(meta_cookie); + nlohmann::json& load(const sisl::byte_view& buf, void* meta_blk) { + m_meta_blk = voidptr_cast(meta_blk); std::string_view const b{c_charptr_cast(buf.bytes()), buf.size()}; try { m_json_sb = nlohmann::json::from_msgpack(b); } catch (nlohmann::json::exception const& e) { - DEBUG_ASSERT(false, "Failed to load superblk for meta_blk={}", m_metablk_name); + DEBUG_ASSERT(false, "Failed to load superblk for meta_blk={}", m_meta_sub_name); return m_json_sb; } return m_json_sb; @@ -155,9 +155,9 @@ class json_superblk { nlohmann::json& create() { return m_json_sb; } void destroy() { - if (m_meta_mgr_cookie) { - meta_service().remove_sub_sb(m_meta_mgr_cookie); - m_meta_mgr_cookie = nullptr; + if (m_meta_blk) { + meta_service().remove_sub_sb(m_meta_blk); + m_meta_blk = nullptr; } m_json_sb = nlohmann::json{}; } @@ -166,10 +166,10 @@ class json_superblk { void write() { auto do_write = [this](sisl::blob const& b) { - if (m_meta_mgr_cookie) { - meta_service().update_sub_sb(b.cbytes(), b.size(), m_meta_mgr_cookie); + if (m_meta_blk) { + meta_service().update_sub_sb(b.cbytes(), b.size(), m_meta_blk); } else { - meta_service().add_sub_sb(m_metablk_name, b.cbytes(), b.size(), m_meta_mgr_cookie); + meta_service().add_sub_sb(m_meta_sub_name, b.cbytes(), b.size(), m_meta_blk); } }; diff --git a/src/lib/checkpoint/cp_mgr.cpp b/src/lib/checkpoint/cp_mgr.cpp index ad6e3efdb..fba5a6099 100644 --- a/src/lib/checkpoint/cp_mgr.cpp +++ b/src/lib/checkpoint/cp_mgr.cpp @@ -50,7 +50,9 @@ void CPManager::start(bool first_time_boot) { create_first_cp(); m_sb.write(); } +} +void CPManager::start_timer() { LOGINFO("cp timer is set to {} usec", HS_DYNAMIC_CONFIG(generic.cp_timer_us)); m_cp_timer_hdl = iomanager.schedule_global_timer( HS_DYNAMIC_CONFIG(generic.cp_timer_us) * 1000, true, nullptr /*cookie*/, iomgr::reactor_regex::all_worker, diff --git a/src/lib/homestore.cpp b/src/lib/homestore.cpp index b22164b5d..b05387ee5 100644 --- a/src/lib/homestore.cpp +++ b/src/lib/homestore.cpp @@ -116,19 +116,23 @@ bool HomeStore::start(const hs_input_params& input, hs_before_services_starting_ std::call_once(flag1, [this]() { m_periodic_logger = sisl::logging::CreateCustomLogger("homestore", "_periodic", false, true /* tee_to_stdout_stderr */); + sisl::logging::SetLogPattern("[%D %T.%f] [%^%L%$] [%t] %v", m_periodic_logger); }); - sisl::logging::SetLogPattern("[%D %T.%f] [%^%L%$] [%t] %v", m_periodic_logger); HomeStoreDynamicConfig::init_settings_default(); LOGINFO("Homestore is loading with following services: {}", m_services.list()); if (has_meta_service()) { m_meta_service = std::make_unique< MetaBlkService >(); } - if (has_log_service()) { m_log_service = std::make_unique< LogStoreService >(); } - if (has_data_service()) { m_data_service = std::make_unique< BlkDataService >(std::move(s_custom_chunk_selector)); } if (has_index_service()) { m_index_service = std::make_unique< IndexService >(std::move(s_index_cbs)); } if (has_repl_data_service()) { m_repl_service = GenericReplService::create(std::move(s_repl_app)); + m_log_service = std::make_unique< LogStoreService >(); m_data_service = std::make_unique< BlkDataService >(std::move(s_custom_chunk_selector)); + } else { + if (has_log_service()) { m_log_service = std::make_unique< LogStoreService >(); } + if (has_data_service()) { + m_data_service = std::make_unique< BlkDataService >(std::move(s_custom_chunk_selector)); + } } m_cp_mgr = std::make_unique< CPManager >(); m_dev_mgr = std::make_unique< DeviceManager >(input.devices, bind_this(HomeStore::create_vdev_cb, 2)); @@ -208,17 +212,18 @@ void HomeStore::do_start() { if (has_index_service()) { m_index_service->start(); } - if (has_data_service()) { - m_data_service->start(); - } else if (has_repl_data_service()) { - m_data_service->start(); - s_cast< GenericReplService* >(m_repl_service.get())->start(); + if (has_repl_data_service()) { + s_cast< GenericReplService* >(m_repl_service.get())->start(); // Replservice starts logstore & data service + } else { + if (has_data_service()) { m_data_service->start(); } + if (has_log_service() && inp_params.auto_recovery) { + // In case of custom recovery, let consumer starts the recovery and it is consumer module's responsibilities + // to start log store + m_log_service->start(is_first_time_boot() /* format */); + } } - // In case of custom recovery, let consumer starts the recovery and it is consumer module's responsibilities - // to start log store - if (has_log_service() && inp_params.auto_recovery) { m_log_service->start(is_first_time_boot() /* format */); } - + m_cp_mgr->start_timer(); m_init_done = true; } @@ -234,8 +239,17 @@ void HomeStore::shutdown() { m_cp_mgr.reset(); if (has_repl_data_service()) { + // Log and Data services are stopped by repl service s_cast< GenericReplService* >(m_repl_service.get())->stop(); + m_log_service.reset(); + m_data_service.reset(); m_repl_service.reset(); + } else { + if (has_log_service()) { + m_log_service->stop(); + m_log_service.reset(); + } + if (has_data_service()) { m_data_service.reset(); } } if (has_index_service()) { @@ -243,18 +257,11 @@ void HomeStore::shutdown() { // m_index_service.reset(); } - if (has_log_service()) { - m_log_service->stop(); - m_log_service.reset(); - } - if (has_meta_service()) { m_meta_service->stop(); m_meta_service.reset(); } - if (has_data_service()) { m_data_service.reset(); } - m_dev_mgr->close_devices(); m_dev_mgr.reset(); diff --git a/src/lib/logstore/log_store_family.cpp b/src/lib/logstore/log_store_family.cpp index 0af47625d..33e1e4313 100644 --- a/src/lib/logstore/log_store_family.cpp +++ b/src/lib/logstore/log_store_family.cpp @@ -63,7 +63,7 @@ void LogStoreFamily::start(bool format, JournalVirtualDev* blk_store) { // Also call the logstore to inform that start/replay is completed. if (!format) { for (auto& p : m_id_logstore_map) { - auto& lstore{p.second.m_log_store}; + auto& lstore{p.second.log_store}; if (lstore && lstore->get_log_replay_done_cb()) { lstore->get_log_replay_done_cb()(lstore, lstore->seq_num() - 1); lstore->truncate(lstore->truncated_upto()); @@ -90,18 +90,25 @@ std::shared_ptr< HomeLogStore > LogStoreFamily::create_new_log_store(bool append folly::SharedMutexWritePriority::WriteHolder holder(m_store_map_mtx); const auto it = m_id_logstore_map.find(store_id); HS_REL_ASSERT((it == m_id_logstore_map.end()), "store_id {}-{} already exists", m_family_id, store_id); - m_id_logstore_map.insert(std::make_pair<>(store_id, logstore_info_t{lstore, nullptr, append_mode})); + m_id_logstore_map.insert(std::pair(store_id, logstore_info{.log_store = lstore, .append_mode = append_mode})); } LOGINFO("Created log store id {}-{}", m_family_id, store_id); return lstore; } -void LogStoreFamily::open_log_store(logstore_id_t store_id, bool append_mode, const log_store_opened_cb_t& on_open_cb) { +folly::Future< shared< HomeLogStore > > LogStoreFamily::open_log_store(logstore_id_t store_id, bool append_mode) { folly::SharedMutexWritePriority::WriteHolder holder(m_store_map_mtx); - const auto it = m_id_logstore_map.find(store_id); - HS_REL_ASSERT((it == m_id_logstore_map.end()), "store_id {}-{} already exists", m_family_id, store_id); - LOGINFO("Opening log store id {}-{}", m_family_id, store_id); - m_id_logstore_map.insert(std::make_pair<>(store_id, logstore_info_t{nullptr, on_open_cb, append_mode})); + auto it = m_id_logstore_map.find(store_id); + if (it == m_id_logstore_map.end()) { + bool happened; + std::tie(it, happened) = m_id_logstore_map.insert(std::pair(store_id, + logstore_info{ + .log_store = nullptr, + .append_mode = append_mode, + })); + HS_REL_ASSERT_EQ(happened, true, "Unable to insert logstore into id_logstore_map"); + } + return it->second.promise.getFuture(); } void LogStoreFamily::remove_log_store(logstore_id_t store_id) { @@ -143,9 +150,10 @@ void LogStoreFamily::device_truncate(const std::shared_ptr< truncate_req >& treq void LogStoreFamily::on_log_store_found(logstore_id_t store_id, const logstore_superblk& sb) { folly::SharedMutexWritePriority::ReadHolder holder(m_store_map_mtx); - const auto it = m_id_logstore_map.find(store_id); + auto it = m_id_logstore_map.find(store_id); if (it == m_id_logstore_map.end()) { - LOGERROR("Store Id {}-{} found but not opened yet.", m_family_id, store_id); + LOGERROR("Store Id {}-{} found but not opened yet, it will be discarded after logstore is started", m_family_id, + store_id); m_unopened_store_id.insert(store_id); m_unopened_store_io.insert(std::make_pair<>(store_id, 0)); return; @@ -153,9 +161,9 @@ void LogStoreFamily::on_log_store_found(logstore_id_t store_id, const logstore_s LOGINFO("Found a logstore store_id={}-{} with start seq_num={}, Creating a new HomeLogStore instance", m_family_id, store_id, sb.m_first_seq_num); - auto& l_info = const_cast< logstore_info_t& >(it->second); - l_info.m_log_store = std::make_shared< HomeLogStore >(*this, store_id, l_info.append_mode, sb.m_first_seq_num); - if (l_info.m_on_log_store_opened) l_info.m_on_log_store_opened(l_info.m_log_store); + logstore_info& info = it->second; + info.log_store = std::make_shared< HomeLogStore >(*this, store_id, info.append_mode, sb.m_first_seq_num); + info.promise.setValue(info.log_store); } static thread_local std::vector< std::shared_ptr< HomeLogStore > > s_cur_flush_batch_stores; @@ -189,7 +197,7 @@ void LogStoreFamily::on_logfound(logstore_id_t id, logstore_seq_num_t seq_num, l ++unopened_it->second; return; } - log_store = it->second.m_log_store.get(); + log_store = it->second.log_store.get(); } if (!log_store) { return; } log_store->on_log_found(seq_num, ld_key, flush_ld_key, buf); @@ -232,7 +240,7 @@ logdev_key LogStoreFamily::do_device_truncate(bool dry_run) { { folly::SharedMutexWritePriority::ReadHolder holder(m_store_map_mtx); for (auto& id_logstore : m_id_logstore_map) { - auto& store_ptr = id_logstore.second.m_log_store; + auto& store_ptr = id_logstore.second.log_store; const auto& trunc_info = store_ptr->pre_device_truncation(); if (!trunc_info.pending_dev_truncation && !trunc_info.active_writes_not_part_of_truncation) { @@ -290,8 +298,8 @@ nlohmann::json LogStoreFamily::dump_log_store(const log_dump_req& dump_req) { if (dump_req.log_store == nullptr) { folly::SharedMutexWritePriority::ReadHolder holder(m_store_map_mtx); for (auto& id_logstore : m_id_logstore_map) { - auto store_ptr{id_logstore.second.m_log_store}; - const std::string id{std::to_string(store_ptr->get_store_id())}; + auto store_ptr = id_logstore.second.log_store; + const std::string id = std::to_string(store_ptr->get_store_id()); // must use operator= construction as copy construction results in error nlohmann::json val = store_ptr->dump_log_store(dump_req); json_dump[id] = std::move(val); @@ -320,7 +328,7 @@ nlohmann::json LogStoreFamily::get_status(int verbosity) const { { folly::SharedMutexWritePriority::ReadHolder holder(m_store_map_mtx); for (const auto& [id, lstore] : m_id_logstore_map) { - js["logstore_id_" + std::to_string(id)] = lstore.m_log_store->get_status(verbosity); + js["logstore_id_" + std::to_string(id)] = lstore.log_store->get_status(verbosity); } } return js; diff --git a/src/lib/logstore/log_store_family.hpp b/src/lib/logstore/log_store_family.hpp index c92d05633..b451a70bf 100644 --- a/src/lib/logstore/log_store_family.hpp +++ b/src/lib/logstore/log_store_family.hpp @@ -28,7 +28,7 @@ #include #include -#include +#include #include #include @@ -37,10 +37,10 @@ namespace homestore { struct log_dump_req; -struct logstore_info_t { - std::shared_ptr< HomeLogStore > m_log_store; - log_store_opened_cb_t m_on_log_store_opened; +struct logstore_info { + std::shared_ptr< HomeLogStore > log_store; bool append_mode; + folly::SharedPromise< std::shared_ptr< HomeLogStore > > promise{}; }; struct truncate_req { @@ -71,8 +71,9 @@ class LogStoreFamily { void start(const bool format, JournalVirtualDev* blk_store); void stop(); - std::shared_ptr< HomeLogStore > create_new_log_store(bool append_mode = false); - void open_log_store(logstore_id_t store_id, bool append_mode, const log_store_opened_cb_t& on_open_cb); + shared< HomeLogStore > create_new_log_store(bool append_mode = false); + folly::Future< shared< HomeLogStore > > open_log_store(logstore_id_t store_id, bool append_mode); + bool close_log_store(logstore_id_t store_id) { // TODO: Implement this method return true; @@ -101,8 +102,8 @@ class LogStoreFamily { void on_batch_completion(HomeLogStore* log_store, uint32_t nremaining_in_batch, logdev_key flush_ld_key); private: - folly::SharedMutexWritePriority m_store_map_mtx; - std::unordered_map< logstore_id_t, logstore_info_t > m_id_logstore_map; + mutable folly::SharedMutexWritePriority m_store_map_mtx; + std::unordered_map< logstore_id_t, logstore_info > m_id_logstore_map; std::unordered_map< logstore_id_t, uint64_t > m_unopened_store_io; std::unordered_set< logstore_id_t > m_unopened_store_id; std::unordered_map< logstore_id_t, logid_t > m_last_flush_info; diff --git a/src/lib/logstore/log_store_service.cpp b/src/lib/logstore/log_store_service.cpp index 67f2dc1dd..e8f74d60e 100644 --- a/src/lib/logstore/log_store_service.cpp +++ b/src/lib/logstore/log_store_service.cpp @@ -105,18 +105,18 @@ void LogStoreService::stop() { } } -std::shared_ptr< HomeLogStore > LogStoreService::create_new_log_store(const logstore_family_id_t family_id, - const bool append_mode) { +shared< HomeLogStore > LogStoreService::create_new_log_store(const logstore_family_id_t family_id, + const bool append_mode) { HS_REL_ASSERT_LT(family_id, num_log_families); COUNTER_INCREMENT(m_metrics, logstores_count, 1); return m_logstore_families[family_id]->create_new_log_store(append_mode); } -void LogStoreService::open_log_store(const logstore_family_id_t family_id, const logstore_id_t store_id, - const bool append_mode, const log_store_opened_cb_t& on_open_cb) { +folly::Future< shared< HomeLogStore > > LogStoreService::open_log_store(logstore_family_id_t family_id, + logstore_id_t store_id, bool append_mode) { HS_REL_ASSERT_LT(family_id, num_log_families); COUNTER_INCREMENT(m_metrics, logstores_count, 1); - return m_logstore_families[family_id]->open_log_store(store_id, append_mode, on_open_cb); + return m_logstore_families[family_id]->open_log_store(store_id, append_mode); } void LogStoreService::remove_log_store(const logstore_family_id_t family_id, const logstore_id_t store_id) { diff --git a/src/lib/replication/log_store/home_raft_log_store.cpp b/src/lib/replication/log_store/home_raft_log_store.cpp index bb3b7d2fb..a8718053d 100644 --- a/src/lib/replication/log_store/home_raft_log_store.cpp +++ b/src/lib/replication/log_store/home_raft_log_store.cpp @@ -67,13 +67,14 @@ HomeRaftLogStore::HomeRaftLogStore(logstore_id_t logstore_id) { } else { m_logstore_id = logstore_id; LOGDEBUGMOD(replication, "Opening existing home log store id={}", logstore_id); - logstore_service().open_log_store(LogStoreService::DATA_LOG_FAMILY_IDX, logstore_id, true, - [this](shared< HomeLogStore > log_store) { - m_log_store = std::move(log_store); - DEBUG_ASSERT_EQ(m_logstore_id, m_log_store->get_store_id(), - "Mismatch in passed and create logstore id"); - REPL_STORE_LOG(DEBUG, "Home Log store created/opened successfully"); - }); + logstore_service() + .open_log_store(LogStoreService::DATA_LOG_FAMILY_IDX, logstore_id, true) + .thenValue([this](auto log_store) { + m_log_store = std::move(log_store); + DEBUG_ASSERT_EQ(m_logstore_id, m_log_store->get_store_id(), + "Mismatch in passed and create logstore id"); + REPL_STORE_LOG(DEBUG, "Home Log store created/opened successfully"); + }); } } @@ -85,20 +86,17 @@ void HomeRaftLogStore::remove_store() { ulong HomeRaftLogStore::next_slot() const { uint64_t next_slot = to_repl_lsn(m_log_store->get_contiguous_issued_seq_num(m_last_durable_lsn)) + 1; - REPL_STORE_LOG(DEBUG, "next_slot()={}", next_slot); return next_slot; } ulong HomeRaftLogStore::start_index() const { // start_index starts from 1. ulong start_index = std::max((repl_lsn_t)1, to_repl_lsn(m_log_store->truncated_upto()) + 1); - REPL_STORE_LOG(DEBUG, "start_index()={}", start_index); return start_index; } nuraft::ptr< nuraft::log_entry > HomeRaftLogStore::last_entry() const { store_lsn_t max_seq = m_log_store->get_contiguous_issued_seq_num(m_last_durable_lsn); - REPL_STORE_LOG(DEBUG, "last_entry() store seqnum={}", max_seq); if (max_seq < 0) { return m_dummy_log_entry; } nuraft::ptr< nuraft::log_entry > nle; diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index 0ee5885e8..6ecf7dfd8 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -38,11 +38,12 @@ RaftReplDev::RaftReplDev(RaftReplService& svc, superblk< raft_repl_dev_superblk } if (m_rd_sb->is_timeline_consistent) { - logstore_service().open_log_store(LogStoreService::CTRL_LOG_FAMILY_IDX, m_rd_sb->free_blks_journal_id, - false, [this](shared< HomeLogStore > log_store) { - m_free_blks_journal = std::move(log_store); - m_rd_sb->free_blks_journal_id = m_free_blks_journal->get_store_id(); - }); + logstore_service() + .open_log_store(LogStoreService::CTRL_LOG_FAMILY_IDX, m_rd_sb->free_blks_journal_id, false) + .thenValue([this](auto log_store) { + m_free_blks_journal = std::move(log_store); + m_rd_sb->free_blks_journal_id = m_free_blks_journal->get_store_id(); + }); } } else { m_data_journal = std::make_shared< ReplLogStore >(*this, *m_state_machine); @@ -66,6 +67,18 @@ RaftReplDev::RaftReplDev(RaftReplService& svc, superblk< raft_repl_dev_superblk // m_msg_mgr.bind_data_service_request(FETCH_DATA, m_group_id, bind_this(RaftReplDev::on_fetch_data_received, 2)); } +bool RaftReplDev::join_group() { + auto raft_result = + m_msg_mgr.join_group(m_group_id, "homestore_replication", + std::dynamic_pointer_cast< nuraft_mesg::mesg_state_mgr >(shared_from_this())); + if (!raft_result) { + HS_DBG_ASSERT(false, "Unable to join the group_id={} with error={}", boost::uuids::to_string(m_group_id), + raft_result.error()); + return false; + } + return true; +} + void RaftReplDev::use_config(json_superblk raft_config_sb) { m_raft_config_sb = std::move(raft_config_sb); } void RaftReplDev::async_alloc_write(sisl::blob const& header, sisl::blob const& key, sisl::sg_list const& value, @@ -85,18 +98,29 @@ void RaftReplDev::async_alloc_write(sisl::blob const& header, sisl::blob const& auto status = data_service().alloc_blks(uint32_cast(rreq->value.size), m_listener->get_blk_alloc_hints(rreq->header, rreq->value.size), rreq->local_blkid); - HS_REL_ASSERT_EQ(status, BlkAllocStatus::SUCCESS); + if (status != BlkAllocStatus::SUCCESS) { + HS_DBG_ASSERT_EQ(status, BlkAllocStatus::SUCCESS, "Unable to allocate blks"); + handle_error(rreq, ReplServiceError::NO_SPACE_LEFT); + return; + } + rreq->state.fetch_or(uint32_cast(repl_req_state_t::BLK_ALLOCATED)); // Write the data data_service().async_write(rreq->value, rreq->local_blkid).thenValue([this, rreq](auto&& err) { - HS_REL_ASSERT(!err, "Error in writing data"); // TODO: Find a way to return error to the Listener - rreq->state.fetch_or(uint32_cast(repl_req_state_t::DATA_WRITTEN)); - m_state_machine->propose_to_raft(std::move(rreq)); + if (!err) { + rreq->state.fetch_or(uint32_cast(repl_req_state_t::DATA_WRITTEN)); + auto raft_status = m_state_machine->propose_to_raft(std::move(rreq)); + if (raft_status != ReplServiceError::OK) { handle_error(rreq, raft_status); } + } else { + HS_DBG_ASSERT(false, "Error in writing data"); + handle_error(rreq, ReplServiceError::DRIVE_WRITE_ERROR); + } }); } else { RD_LOG(INFO, "Skipping data channel send since value size is 0"); rreq->state.fetch_or(uint32_cast(repl_req_state_t::DATA_WRITTEN)); - m_state_machine->propose_to_raft(std::move(rreq)); + auto raft_status = m_state_machine->propose_to_raft(std::move(rreq)); + if (raft_status != ReplServiceError::OK) { handle_error(rreq, raft_status); } } } @@ -120,8 +144,13 @@ void RaftReplDev::push_data_to_all_followers(repl_req_ptr_t rreq) { group_msg_service() ->data_service_request_unidirectional(nuraft_mesg::role_regex::ALL, PUSH_DATA, rreq->pkts) - .via(&folly::InlineExecutor::instance()) - .thenValue([this, rreq = std::move(rreq)](auto e) { + .deferValue([this, rreq = std::move(rreq)](auto e) { + if (e.hasError()) { + RD_LOG(ERROR, "Data Channel: Error in pushing data to all followers: rreq=[{}] error={}", + rreq->to_compact_string(), e.error()); + handle_error(rreq, RaftReplService::to_repl_error(e.error())); + return; + } // Release the buffer which holds the packets RD_LOG(INFO, "Data Channel: Data push completed for rreq=[{}]", rreq->to_compact_string()); rreq->fb_builder.Release(); @@ -129,6 +158,46 @@ void RaftReplDev::push_data_to_all_followers(repl_req_ptr_t rreq) { }); } +void RaftReplDev::handle_error(repl_req_ptr_t const& rreq, ReplServiceError err) { + if (err == ReplServiceError::OK) { return; } + + auto s = rreq->state.load(); + if ((s & uint32_cast(repl_req_state_t::ERRORED)) || + !(rreq->state.compare_exchange_strong(s, s | uint32_cast(repl_req_state_t::ERRORED)))) { + RD_LOG(INFO, "Raft Channel: Error in processing rreq=[{}] error={} already errored", rreq->to_compact_string(), + err); + return; + } + + // Free the blks which is allocated already + RD_LOG(INFO, "Raft Channel: Error in processing rreq=[{}] error={}", rreq->to_compact_string(), err); + if (rreq->state.load() & uint32_cast(repl_req_state_t::BLK_ALLOCATED)) { + auto blkid = rreq->local_blkid; + data_service().async_free_blk(blkid).thenValue([blkid](auto&& err) { + HS_LOG_ASSERT(!err, "freeing blkid={} upon error failed, potential to cause blk leak", blkid.to_string()); + }); + } + + HS_DBG_ASSERT(!(rreq->state.load() & uint32_cast(repl_req_state_t::LOG_FLUSHED)), + "Unexpected state, received error after log is flushed for rreq=[{}]", rreq->to_compact_string()); + + if (rreq->is_proposer) { + // Notify the proposer about the error + m_listener->on_error(err, rreq->header, rreq->key, rreq); + rreq->fb_builder.Release(); + rreq->pkts.clear(); + } else { + // Complete the response hence proposer can free up its resources + rreq->header = sisl::blob{}; + rreq->key = sisl::blob{}; + rreq->pkts = sisl::io_blob_list_t{}; + if (rreq->rpc_data) { + rreq->rpc_data->send_response(); + rreq->rpc_data = nullptr; + } + } +} + void RaftReplDev::on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_data) { auto const& incoming_buf = rpc_data->request_blob(); auto const fb_size = @@ -171,10 +240,14 @@ void RaftReplDev::on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_d data_service() .async_write(r_cast< const char* >(data), push_req->data_size(), rreq->local_blkid) .thenValue([this, rreq](auto&& err) { - RD_REL_ASSERT(!err, "Error in writing data"); // TODO: Find a way to return error to the Listener - rreq->state.fetch_or(uint32_cast(repl_req_state_t::DATA_WRITTEN)); - rreq->data_written_promise.setValue(); - RD_LOG(INFO, "Data Channel: Data Write completed rreq=[{}]", rreq->to_compact_string()); + if (err) { + RD_DBG_ASSERT(false, "Error in writing data"); + handle_error(rreq, ReplServiceError::DRIVE_WRITE_ERROR); + } else { + rreq->state.fetch_or(uint32_cast(repl_req_state_t::DATA_WRITTEN)); + rreq->data_written_promise.setValue(); + RD_LOG(INFO, "Data Channel: Data Write completed rreq=[{}]", rreq->to_compact_string()); + } }); } @@ -289,6 +362,16 @@ void RaftReplDev::async_free_blks(int64_t, MultiBlkId const& bid) { data_service().async_free_blk(bid); } +AsyncReplResult<> RaftReplDev::become_leader() { + return m_msg_mgr.become_leader(m_group_id).deferValue([this](auto&& e) { + if (e.hasError()) { + RD_LOG(ERROR, "Error in becoming leader: {}", e.error()); + return make_async_error<>(RaftReplService::to_repl_error(e.error())); + } + return make_async_success<>(); + }); +} + bool RaftReplDev::is_leader() const { return m_repl_svc_ctx->is_raft_leader(); } uint32_t RaftReplDev::get_blk_size() const { return data_service().get_blk_size(); } @@ -418,6 +501,8 @@ void RaftReplDev::leave() { /////////////////////////////////// Private metohds //////////////////////////////////// void RaftReplDev::report_committed(repl_req_ptr_t rreq) { + if (rreq->local_blkid.is_valid()) { data_service().commit_blk(rreq->local_blkid); } + auto prev_lsn = m_commit_upto_lsn.exchange(rreq->lsn); RD_DBG_ASSERT_GT(rreq->lsn, prev_lsn, "Out of order commit of lsns, it is not expected in RaftReplDev"); diff --git a/src/lib/replication/repl_dev/raft_repl_dev.h b/src/lib/replication/repl_dev/raft_repl_dev.h index 64336d9a9..066c34174 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.h +++ b/src/lib/replication/repl_dev/raft_repl_dev.h @@ -31,7 +31,9 @@ using raft_buf_ptr_t = nuraft::ptr< nuraft::buffer >; class RaftReplService; class CP; -class RaftReplDev : public ReplDev, public nuraft_mesg::mesg_state_mgr { +class RaftReplDev : public ReplDev, + public nuraft_mesg::mesg_state_mgr, + public std::enable_shared_from_this< RaftReplDev > { private: shared< RaftStateMachine > m_state_machine; RaftReplService& m_repl_svc; @@ -64,6 +66,7 @@ class RaftReplDev : public ReplDev, public nuraft_mesg::mesg_state_mgr { RaftReplDev(RaftReplService& svc, superblk< raft_repl_dev_superblk >&& rd_sb, bool load_existing); virtual ~RaftReplDev() = default; + bool join_group(); void destroy(); //////////////// All ReplDev overrides/implementation /////////////////////// @@ -72,6 +75,7 @@ class RaftReplDev : public ReplDev, public nuraft_mesg::mesg_state_mgr { folly::Future< std::error_code > async_read(MultiBlkId const& blkid, sisl::sg_list& sgs, uint32_t size, bool part_of_batch = false) override; void async_free_blks(int64_t lsn, MultiBlkId const& blkid) override; + AsyncReplResult<> become_leader() override; bool is_leader() const override; group_id_t group_id() const override { return m_group_id; } std::string group_id_str() const { return boost::uuids::to_string(m_group_id); } @@ -113,6 +117,7 @@ class RaftReplDev : public ReplDev, public nuraft_mesg::mesg_state_mgr { shared< nuraft::log_store > data_journal() { return m_data_journal; } void push_data_to_all_followers(repl_req_ptr_t rreq); void on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_data); + void handle_error(repl_req_ptr_t const& rreq, ReplServiceError err); }; } // namespace homestore diff --git a/src/lib/replication/repl_dev/raft_state_machine.cpp b/src/lib/replication/repl_dev/raft_state_machine.cpp index e25f965f3..f1811fd77 100644 --- a/src/lib/replication/repl_dev/raft_state_machine.cpp +++ b/src/lib/replication/repl_dev/raft_state_machine.cpp @@ -3,6 +3,7 @@ #include #include +#include "service/raft_repl_service.h" #include "repl_dev/raft_state_machine.h" #include "repl_dev/raft_repl_dev.h" @@ -59,7 +60,7 @@ raft_buf_ptr_t RaftStateMachine::commit_ext(nuraft::state_machine::ext_op_params uint64_t RaftStateMachine::last_commit_index() { return uint64_cast(m_rd.get_last_commit_lsn()); } -void RaftStateMachine::propose_to_raft(repl_req_ptr_t rreq) { +ReplServiceError RaftStateMachine::propose_to_raft(repl_req_ptr_t rreq) { uint32_t val_size = rreq->value.size ? rreq->local_blkid.serialized_size() : 0; uint32_t entry_size = sizeof(repl_journal_entry) + rreq->header.size() + rreq->key.size() + val_size; rreq->alloc_journal_entry(entry_size, true /* raft_buf */); @@ -98,8 +99,15 @@ void RaftStateMachine::propose_to_raft(repl_req_ptr_t rreq) { RD_LOG(TRACE, "Raft Channel: journal_entry=[{}] ", rreq->journal_entry->to_string()); - m_rd.raft_server()->append_entries_ext(*vec, param); + auto append_status = m_rd.raft_server()->append_entries_ext(*vec, param); sisl::VectorPool< raft_buf_ptr_t >::free(vec); + + if (append_status && !append_status->get_accepted()) { + RD_LOG(ERROR, "Raft Channel: Failed to propose rreq=[{}] result_code={}", rreq->to_compact_string(), + append_status->get_result_code()); + return RaftReplService::to_repl_error(append_status->get_result_code()); + } + return ReplServiceError::OK; } repl_req_ptr_t RaftStateMachine::transform_journal_entry(nuraft::ptr< nuraft::log_entry >& lentry) { @@ -109,25 +117,41 @@ repl_req_ptr_t RaftStateMachine::transform_journal_entry(nuraft::ptr< nuraft::lo // We don't want to transform anything that is not an app log if (lentry->get_val_type() != nuraft::log_val_type::app_log) { return nullptr; } - repl_journal_entry* jentry = r_cast< repl_journal_entry* >(lentry->get_buf().data_begin()); - RELEASE_ASSERT_EQ(jentry->major_version, repl_journal_entry::JOURNAL_ENTRY_MAJOR, - "Mismatched version of journal entry received from RAFT peer"); + // Validate the journal entry and see if it needs to be processed + { + repl_journal_entry* tmp_jentry = r_cast< repl_journal_entry* >(lentry->get_buf().data_begin()); + RELEASE_ASSERT_EQ(tmp_jentry->major_version, repl_journal_entry::JOURNAL_ENTRY_MAJOR, + "Mismatched version of journal entry received from RAFT peer"); + + RD_LOG(TRACE, "Received Raft log_entry=[term={}], journal_entry=[{}] ", lentry->get_term(), + tmp_jentry->to_string()); - RD_LOG(TRACE, "Received Raft log_entry=[term={}], journal_entry=[{}] ", lentry->get_term(), jentry->to_string()); + // For inline data we don't need to transform anything + if (tmp_jentry->code != journal_type_t::HS_LARGE_DATA) { return nullptr; } + + DEBUG_ASSERT_GT(tmp_jentry->value_size, 0, "Entry marked as large data, but value size is notified as 0"); + } - // For inline data we don't need to transform anything - if (jentry->code != journal_type_t::HS_LARGE_DATA) { return nullptr; } + auto log_to_journal_entry = [](raft_buf_ptr_t const& log_buf, auto const log_buf_data_offset) { + repl_journal_entry* jentry = r_cast< repl_journal_entry* >(log_buf->data_begin() + log_buf_data_offset); + sisl::blob const header = + sisl::blob{uintptr_cast(jentry) + sizeof(repl_journal_entry), jentry->user_header_size}; + sisl::blob const key = sisl::blob{header.cbytes() + header.size(), jentry->key_size}; + return std::make_tuple(jentry, header, key); + }; - sisl::blob const header = sisl::blob{uintptr_cast(jentry) + sizeof(repl_journal_entry), jentry->user_header_size}; - sisl::blob const key = sisl::blob{header.cbytes() + header.size(), jentry->key_size}; - DEBUG_ASSERT_GT(jentry->value_size, 0, "Entry marked as large data, but value size is notified as 0"); + // Serialize the log_entry buffer which returns the actual raft log_entry buffer. + auto log_buf = lentry->serialize(); + auto const log_buf_data_offset = log_buf->size() - lentry->get_buf().size(); + auto const [jentry, header, key] = log_to_journal_entry(log_buf, log_buf_data_offset); // From the repl_key, get the repl_req. In cases where log stream got here first, this method will create a new // repl_req and return that back. Fill up all of the required journal entry inside the repl_req auto rreq = m_rd.follower_create_req( repl_key{.server_id = jentry->server_id, .term = lentry->get_term(), .dsn = jentry->dsn}, header, key, jentry->value_size); - rreq->journal_buf = lentry->serialize(); + rreq->journal_buf = std::move(log_buf); + rreq->journal_entry = jentry; MultiBlkId entry_blkid; entry_blkid.deserialize(sisl::blob{key.cbytes() + key.size(), jentry->value_size}, true /* copy */); @@ -141,6 +165,7 @@ repl_req_ptr_t RaftStateMachine::transform_journal_entry(nuraft::ptr< nuraft::lo auto new_buf = nuraft::buffer::expand(*rreq->raft_journal_buf(), rreq->raft_journal_buf()->size() + local_size - remote_size); blkid_location = uintptr_cast(new_buf->data_begin()) + rreq->raft_journal_buf()->size() - jentry->value_size; + std::tie(rreq->journal_entry, rreq->header, rreq->key) = log_to_journal_entry(new_buf, log_buf_data_offset); rreq->journal_buf = std::move(new_buf); } else { // Can do in-place replace of remote blkid with local blkid. @@ -148,7 +173,6 @@ repl_req_ptr_t RaftStateMachine::transform_journal_entry(nuraft::ptr< nuraft::lo jentry->value_size; } std::memcpy(blkid_location, rreq->local_blkid.serialize().cbytes(), local_size); - rreq->journal_entry = r_cast< repl_journal_entry* >(rreq->raft_journal_buf()->data_begin()); return rreq; } diff --git a/src/lib/replication/repl_dev/raft_state_machine.h b/src/lib/replication/repl_dev/raft_state_machine.h index c341ebd3b..3f9d10eaf 100644 --- a/src/lib/replication/repl_dev/raft_state_machine.h +++ b/src/lib/replication/repl_dev/raft_state_machine.h @@ -108,7 +108,7 @@ class RaftStateMachine : public nuraft::state_machine { nuraft::ptr< nuraft::snapshot > last_snapshot() override { return nullptr; } ////////// APIs outside of nuraft::state_machine requirements //////////////////// - void propose_to_raft(repl_req_ptr_t rreq); + ReplServiceError propose_to_raft(repl_req_ptr_t rreq); repl_req_ptr_t transform_journal_entry(nuraft::ptr< nuraft::log_entry >& lentry); void link_lsn_to_req(repl_req_ptr_t rreq, int64_t lsn); repl_req_ptr_t lsn_to_req(int64_t lsn); diff --git a/src/lib/replication/repl_dev/solo_repl_dev.cpp b/src/lib/replication/repl_dev/solo_repl_dev.cpp index 3d62182ea..6277c2368 100644 --- a/src/lib/replication/repl_dev/solo_repl_dev.cpp +++ b/src/lib/replication/repl_dev/solo_repl_dev.cpp @@ -10,8 +10,13 @@ namespace homestore { SoloReplDev::SoloReplDev(superblk< repl_dev_superblk >&& rd_sb, bool load_existing) : m_rd_sb{std::move(rd_sb)}, m_group_id{m_rd_sb->group_id} { if (load_existing) { - logstore_service().open_log_store(LogStoreService::DATA_LOG_FAMILY_IDX, m_rd_sb->data_journal_id, true, - bind_this(SoloReplDev::on_data_journal_created, 1)); + logstore_service() + .open_log_store(LogStoreService::DATA_LOG_FAMILY_IDX, m_rd_sb->data_journal_id, true /* append_mode */) + .thenValue([this](auto log_store) { + m_data_journal = std::move(log_store); + m_rd_sb->data_journal_id = m_data_journal->get_store_id(); + m_data_journal->register_log_found_cb(bind_this(SoloReplDev::on_log_found, 3)); + }); } else { m_data_journal = logstore_service().create_new_log_store(LogStoreService::DATA_LOG_FAMILY_IDX, true /* append_mode */); @@ -20,12 +25,6 @@ SoloReplDev::SoloReplDev(superblk< repl_dev_superblk >&& rd_sb, bool load_existi } } -void SoloReplDev::on_data_journal_created(shared< HomeLogStore > log_store) { - m_data_journal = std::move(log_store); - m_rd_sb->data_journal_id = m_data_journal->get_store_id(); - m_data_journal->register_log_found_cb(bind_this(SoloReplDev::on_log_found, 3)); -} - void SoloReplDev::async_alloc_write(sisl::blob const& header, sisl::blob const& key, sisl::sg_list const& value, repl_req_ptr_t rreq) { if (!rreq) { auto rreq = repl_req_ptr_t(new repl_req_ctx{}); } diff --git a/src/lib/replication/repl_dev/solo_repl_dev.h b/src/lib/replication/repl_dev/solo_repl_dev.h index 331003f4a..1c104c2fc 100644 --- a/src/lib/replication/repl_dev/solo_repl_dev.h +++ b/src/lib/replication/repl_dev/solo_repl_dev.h @@ -46,6 +46,7 @@ class SoloReplDev : public ReplDev { void async_free_blks(int64_t lsn, MultiBlkId const& blkid) override; + AsyncReplResult<> become_leader() override { return make_async_error(ReplServiceError::OK); } bool is_leader() const override { return true; } uuid_t group_id() const override { return m_group_id; } @@ -56,7 +57,6 @@ class SoloReplDev : public ReplDev { void cp_cleanup(CP* cp); private: - void on_data_journal_created(shared< HomeLogStore > log_store); void write_journal(repl_req_ptr_t rreq); void on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx); }; diff --git a/src/lib/replication/service/generic_repl_svc.cpp b/src/lib/replication/service/generic_repl_svc.cpp index d169d4ce2..451c355dc 100644 --- a/src/lib/replication/service/generic_repl_svc.cpp +++ b/src/lib/replication/service/generic_repl_svc.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include "common/homestore_assert.hpp" #include "replication/service/generic_repl_svc.h" #include "replication/service/raft_repl_service.h" @@ -36,15 +37,20 @@ std::shared_ptr< GenericReplService > GenericReplService::create(cshared< ReplAp GenericReplService::GenericReplService(cshared< ReplApplication >& repl_app) : m_repl_app{repl_app}, m_my_uuid{repl_app->get_my_repl_id()} { + m_sb_bufs.reserve(100); meta_service().register_handler( get_meta_blk_name(), - [this](meta_blk* mblk, sisl::byte_view buf, size_t) { load_repl_dev(std::move(buf), voidptr_cast(mblk)); }, + [this](meta_blk* mblk, sisl::byte_view buf, size_t) { + m_sb_bufs.emplace_back(std::pair(std::move(buf), voidptr_cast(mblk))); + }, nullptr); } void GenericReplService::stop() { - std::unique_lock lg{m_rd_map_mtx}; - m_rd_map.clear(); + { + std::unique_lock lg{m_rd_map_mtx}; + m_rd_map.clear(); + } } ReplResult< shared< ReplDev > > GenericReplService::get_repl_dev(group_id_t group_id) const { @@ -77,10 +83,23 @@ hs_stats GenericReplService::get_cap_stats() const { SoloReplService::SoloReplService(cshared< ReplApplication >& repl_app) : GenericReplService{repl_app} {} void SoloReplService::start() { + for (auto const& [buf, mblk] : m_sb_bufs) { + load_repl_dev(buf, voidptr_cast(mblk)); + } + m_sb_bufs.clear(); + + hs()->data_service().start(); + hs()->logstore_service().start(hs()->is_first_time_boot()); + // Register to CP to flush the super blk and truncate the logstore hs()->cp_mgr().register_consumer(cp_consumer_t::REPLICATION_SVC, std::make_unique< SoloReplServiceCPHandler >()); } +void SoloReplService::stop() { + GenericReplService::stop(); + hs()->logstore_service().stop(); +} + AsyncReplResult< shared< ReplDev > > SoloReplService::create_repl_dev(group_id_t group_id, std::set< replica_id_t > const& members) { superblk< repl_dev_superblk > rd_sb{get_meta_blk_name()}; @@ -120,7 +139,7 @@ void SoloReplService::load_repl_dev(sisl::byte_view const& buf, void* meta_cooki { std::unique_lock lg(m_rd_map_mtx); auto [_, happened] = m_rd_map.emplace(group_id, rdev); - (void) happened; + (void)happened; HS_DBG_ASSERT(happened, "Unable to put the repl_dev in rd map for group_id={}", group_id); } } diff --git a/src/lib/replication/service/generic_repl_svc.h b/src/lib/replication/service/generic_repl_svc.h index 64b8ea47a..4169e5f80 100644 --- a/src/lib/replication/service/generic_repl_svc.h +++ b/src/lib/replication/service/generic_repl_svc.h @@ -41,6 +41,7 @@ class GenericReplService : public ReplicationService { std::shared_mutex m_rd_map_mtx; std::map< group_id_t, shared< ReplDev > > m_rd_map; replica_id_t m_my_uuid; + std::vector< std::pair< sisl::byte_view, void* > > m_sb_bufs; public: static std::shared_ptr< GenericReplService > create(cshared< ReplApplication >& repl_app); @@ -65,6 +66,7 @@ class SoloReplService : public GenericReplService { public: SoloReplService(cshared< ReplApplication >& repl_app); void start() override; + void stop() override; AsyncReplResult< shared< ReplDev > > create_repl_dev(group_id_t group_id, std::set< replica_id_t > const& members) override; diff --git a/src/lib/replication/service/raft_repl_service.cpp b/src/lib/replication/service/raft_repl_service.cpp index d25b49bc6..dde2da93c 100644 --- a/src/lib/replication/service/raft_repl_service.cpp +++ b/src/lib/replication/service/raft_repl_service.cpp @@ -17,6 +17,8 @@ #include #include +#include +#include #include "common/homestore_config.hpp" #include "common/homestore_assert.hpp" #include "replication/service/raft_repl_service.h" @@ -58,32 +60,31 @@ ReplServiceError RaftReplService::to_repl_error(nuraft::cmd_result_code code) { } RaftReplService::RaftReplService(cshared< ReplApplication >& repl_app) : GenericReplService{repl_app} { + m_config_sb_bufs.reserve(100); meta_service().register_handler( get_meta_blk_name() + "_raft_config", [this](meta_blk* mblk, sisl::byte_view buf, size_t) { - raft_group_config_found(std::move(buf), voidptr_cast(mblk)); + m_config_sb_bufs.emplace_back(std::pair(std::move(buf), voidptr_cast(mblk))); }, nullptr, false, std::optional< meta_subtype_vec_t >({get_meta_blk_name()})); } void RaftReplService::start() { - /*auto params = nuraft_mesg::Manager::Params{ + // Step 1: Initialize the Nuraft messaging service, which starts the nuraft service + auto params = nuraft_mesg::Manager::Params{ .server_uuid_ = m_my_uuid, .mesg_port_ = m_repl_app->lookup_peer(m_my_uuid).second, .default_group_type_ = "homestore_replication", .ssl_key_ = ioenvironment.get_ssl_key(), .ssl_cert_ = ioenvironment.get_ssl_cert(), .token_verifier_ = std::dynamic_pointer_cast< sisl::GrpcTokenVerifier >(ioenvironment.get_token_verifier()), - .token_client_ = std::dynamic_pointer_cast< sisl::GrpcTokenClient >(ioenvironment.get_token_client())};*/ - auto params = nuraft_mesg::Manager::Params(); - params.server_uuid_ = m_my_uuid; - params.mesg_port_ = m_repl_app->lookup_peer(m_my_uuid).second; - params.default_group_type_ = "homestore_replication"; + .token_client_ = std::dynamic_pointer_cast< sisl::GrpcTokenClient >(ioenvironment.get_token_client())}; m_msg_mgr = nuraft_mesg::init_messaging(params, weak_from_this(), true /* with_data_channel */); LOGINFOMOD(replication, "Starting RaftReplService with server_uuid={} port={}", boost::uuids::to_string(params.server_uuid_), params.mesg_port_); + // Step 2: Register all RAFT parameters. At the end of this step, raft is ready to be created/join group auto r_params = nuraft::raft_params() .with_election_timeout_lower(HS_DYNAMIC_CONFIG(consensus.elect_to_low_ms)) .with_election_timeout_upper(HS_DYNAMIC_CONFIG(consensus.elect_to_high_ms)) @@ -100,22 +101,68 @@ void RaftReplService::start() { r_params.return_method_ = nuraft::raft_params::async_handler; m_msg_mgr->register_mgr_type(params.default_group_type_, r_params); + // Step 3: Load all the repl devs from the cached superblks. This step creates the ReplDev instances and adds to + // list. It is still not joined the Raft group yet + for (auto const& [buf, mblk] : m_sb_bufs) { + load_repl_dev(buf, voidptr_cast(mblk)); + } + m_sb_bufs.clear(); + + // Step 4: Load all the raft group configs from the cached superblks. We have 2 superblks for each raft group + // a) repl_dev configuration (loaded in step 3). This block is updated on every append and persisted on next cp. + // b) raft group configuration (loaded in this step). This block is updated on every config change and persisted + // instantly + // + // We need to first load the repl_dev with its config and then attach the raft config to that repl dev. + for (auto const& [buf, mblk] : m_config_sb_bufs) { + raft_group_config_found(buf, voidptr_cast(mblk)); + } + m_config_sb_bufs.clear(); + + // Step 5: Start the data and logstore service now. This step is essential before we can ask Raft to join groups etc + hs()->data_service().start(); + hs()->logstore_service().start(hs()->is_first_time_boot()); + + // Step 6: Iterate all the repl dev and ask each one of the join the raft group. + for (auto it = m_rd_map.begin(); it != m_rd_map.end();) { + auto rdev = std::dynamic_pointer_cast< RaftReplDev >(it->second); + if (!rdev->join_group()) { + it = m_rd_map.erase(it); + } else { + ++it; + } + } + + // Step 7: Register to CPManager to ensure we can flush the superblk. hs()->cp_mgr().register_consumer(cp_consumer_t::REPLICATION_SVC, std::make_unique< RaftReplServiceCPHandler >()); } +void RaftReplService::stop() { + GenericReplService::stop(); + m_msg_mgr.reset(); + hs()->logstore_service().stop(); +} + void RaftReplService::raft_group_config_found(sisl::byte_view const& buf, void* meta_cookie) { json_superblk group_config; auto& js = group_config.load(buf, meta_cookie); + + DEBUG_ASSERT(js.contains("group_id"), "Missing group_id field in raft_config superblk"); std::string gid_str = js["group_id"]; RELEASE_ASSERT(!gid_str.empty(), "Invalid raft_group config found"); boost::uuids::string_generator gen; - uuid_t uuid = gen(gid_str); + uuid_t group_id = gen(gid_str); - auto v = get_repl_dev(uuid); - RELEASE_ASSERT(bool(v), "Not able to find the group_id corresponding, has repl_dev superblk not loaded yet?"); + auto v = get_repl_dev(group_id); + RELEASE_ASSERT(bool(v), "Can't find the group_id={}, has repl_dev superblk not loaded yet?", + boost::uuids::to_string(group_id)); - (std::dynamic_pointer_cast< RaftReplDev >(*v))->use_config(std::move(group_config)); + auto rdev = std::dynamic_pointer_cast< RaftReplDev >(*v); + auto listener = m_repl_app->create_repl_dev_listener(group_id); + listener->set_repl_dev(rdev.get()); + rdev->attach_listener(std::move(listener)); + rdev->use_config(std::move(group_config)); } std::string RaftReplService::lookup_peer(nuraft_mesg::peer_id_t const& peer) { @@ -125,6 +172,8 @@ std::string RaftReplService::lookup_peer(nuraft_mesg::peer_id_t const& peer) { shared< nuraft_mesg::mesg_state_mgr > RaftReplService::create_state_mgr(int32_t srv_id, nuraft_mesg::group_id_t const& group_id) { + LOGINFO("Creating RAFT state manager for server_id={} group_id={}", srv_id, boost::uuids::to_string(group_id)); + auto result = get_repl_dev(group_id); if (result) { return std::dynamic_pointer_cast< nuraft_mesg::mesg_state_mgr >(result.value()); } @@ -136,7 +185,12 @@ shared< nuraft_mesg::mesg_state_mgr > RaftReplService::create_state_mgr(int32_t // Create a new instance of Raft ReplDev (which is the state manager this method is looking for) auto rdev = std::make_shared< RaftReplDev >(*this, std::move(rd_sb), false /* load_existing */); - rdev->use_config(json_superblk{get_meta_blk_name() + "_raft_config"}); + + // Create a raft config for this repl_dev and assign it to the repl_dev + auto raft_config_sb = json_superblk{get_meta_blk_name() + "_raft_config"}; + (*raft_config_sb)["group_id"] = boost::uuids::to_string(group_id); + raft_config_sb.write(); + rdev->use_config(std::move(raft_config_sb)); // Attach the listener to the raft auto listener = m_repl_app->create_repl_dev_listener(group_id); @@ -170,6 +224,8 @@ AsyncReplResult< shared< ReplDev > > RaftReplService::create_repl_dev(group_id_t boost::uuids::to_string(member)); break; } else if (result.error() != nuraft::CONFIG_CHANGING) { + LOGWARN("Groupid={}, add member={} failed with error={}", boost::uuids::to_string(group_id), + boost::uuids::to_string(member), result.error()); return make_async_error< shared< ReplDev > >(to_repl_error(result.error())); } else { LOGWARN("Config is changing for group_id={} while adding member={}, retry operation in a second", @@ -205,14 +261,6 @@ void RaftReplService::load_repl_dev(sisl::byte_view const& buf, void* meta_cooki // Create an instance of ReplDev from loaded superblk auto rdev = std::make_shared< RaftReplDev >(*this, std::move(rd_sb), true /* load_existing */); - // Try to join the RAFT group - auto raft_result = m_msg_mgr->join_group(group_id, "homestore_replication", - std::dynamic_pointer_cast< nuraft_mesg::mesg_state_mgr >(rdev)); - if (!raft_result) { - HS_DBG_ASSERT(false, "Unable to join the group_id={} with error={}", boost::uuids::to_string(group_id).c_str(), - raft_result.error()); - } - // Add the RaftReplDev to the list of repl_devs add_repl_dev(group_id, rdev); } diff --git a/src/lib/replication/service/raft_repl_service.h b/src/lib/replication/service/raft_repl_service.h index fa12cd07e..48d496a23 100644 --- a/src/lib/replication/service/raft_repl_service.h +++ b/src/lib/replication/service/raft_repl_service.h @@ -37,6 +37,7 @@ class RaftReplService : public GenericReplService, private: shared< nuraft_mesg::Manager > m_msg_mgr; json_superblk m_config_sb; + std::vector< std::pair< sisl::byte_view, void* > > m_config_sb_bufs; public: RaftReplService(cshared< ReplApplication >& repl_app); @@ -52,6 +53,8 @@ class RaftReplService : public GenericReplService, protected: ///////////////////// Overrides of GenericReplService //////////////////// void start() override; + void stop() override; + AsyncReplResult< shared< ReplDev > > create_repl_dev(group_id_t group_id, std::set< replica_id_t > const& members) override; void load_repl_dev(sisl::byte_view const& buf, void* meta_cookie) override; diff --git a/src/tests/test_common/homestore_test_common.hpp b/src/tests/test_common/homestore_test_common.hpp index b108358dc..2cb946bd8 100644 --- a/src/tests/test_common/homestore_test_common.hpp +++ b/src/tests/test_common/homestore_test_common.hpp @@ -173,12 +173,6 @@ class HSTestHelper { vdev_size_type_t vdev_size_type{vdev_size_type_t::VDEV_SIZE_STATIC}; }; -#if 0 - static void start_homestore(const std::string& test_name, float meta_pct, float data_log_pct, float ctrl_log_pct, - float data_pct, float index_pct, hs_before_services_starting_cb_t cb, - bool restart = false, std::unique_ptr< IndexServiceCallbacks > index_svc_cb = nullptr, - bool default_data_svc_alloc_type = true); -#endif static void start_homestore(const std::string& test_name, std::map< uint32_t, test_params >&& svc_params, hs_before_services_starting_cb_t cb = nullptr, bool fake_restart = false, bool init_device = true) { @@ -190,6 +184,7 @@ class HSTestHelper { if (fake_restart) { shutdown_homestore(false); + // sisl::GrpcAsyncClientWorker::shutdown_all(); std::this_thread::sleep_for(std::chrono::seconds{5}); } diff --git a/src/tests/test_common/hs_repl_test_common.hpp b/src/tests/test_common/hs_repl_test_common.hpp index 19dec9959..005ea97db 100644 --- a/src/tests/test_common/hs_repl_test_common.hpp +++ b/src/tests/test_common/hs_repl_test_common.hpp @@ -59,6 +59,7 @@ class HSReplTestHelper { struct IPCData { bip::interprocess_mutex mtx_; bip::interprocess_condition cv_; + bip::interprocess_mutex exec_mtx_; repl_test_phase_t phase_{repl_test_phase_t::REGISTER}; uint32_t registered_count_{0}; @@ -79,8 +80,10 @@ class HSReplTestHelper { if (count == SISL_OPTIONS["replicas"].as< uint32_t >()) { phase_ = new_phase; cv_.notify_all(); - } else + } else { cv_.wait(lg, [this, new_phase]() { return (phase_ == new_phase); }); + } + count = 0; } }; @@ -123,6 +126,7 @@ class HSReplTestHelper { void setup() { replica_num_ = SISL_OPTIONS["replica_num"].as< uint16_t >(); sisl::logging::SetLogger(name_ + std::string("_replica_") + std::to_string(replica_num_)); + sisl::logging::SetLogPattern("[%D %T%z] [%^%L%$] [%n] [%t] %v"); auto const num_replicas = SISL_OPTIONS["replicas"].as< uint32_t >(); boost::uuids::string_generator gen; @@ -149,7 +153,8 @@ class HSReplTestHelper { for (uint32_t i{1}; i < num_replicas; ++i) { LOGINFO("Spawning Homestore replica={} instance", i); - boost::process::child c(argv_[0], "--replica_num", std::to_string(i), proc_grp_); + boost::process::child c(argv_[0], "--log_mods", "replication:trace", "--replica_num", std::to_string(i), + proc_grp_); c.detach(); } } else { @@ -182,6 +187,27 @@ class HSReplTestHelper { setup(); } + void restart() { + test_common::HSTestHelper::start_homestore( + name_ + std::to_string(replica_num_), + {{HS_SERVICE::REPLICATION, {.repl_app = std::make_unique< TestReplApplication >(*this)}}, + {HS_SERVICE::LOG_REPLICATED, {}}, + {HS_SERVICE::LOG_LOCAL, {}}}, + nullptr, true /* restart */); + } + + void restart_one_by_one() { + exclusive_replica([&]() { + LOGINFO("Restarting Homestore replica={}", replica_num_); + test_common::HSTestHelper::start_homestore( + name_ + std::to_string(replica_num_), + {{HS_SERVICE::REPLICATION, {.repl_app = std::make_unique< TestReplApplication >(*this)}}, + {HS_SERVICE::LOG_REPLICATED, {}}, + {HS_SERVICE::LOG_LOCAL, {}}}, + nullptr, true /* restart */); + }); + } + uint16_t replica_num() const { return replica_num_; } Runner& runner() { return io_runner_; } @@ -222,12 +248,24 @@ class HSReplTestHelper { return listener; } + void unregister_listener(shared< ReplDevListener > listener) { + { + std::unique_lock lg(groups_mtx_); + repl_groups_.erase(listener->repl_dev()->group_id()); + } + } + void sync_for_test_start() { ipc_data_->sync_for_test_start(); } void sync_for_verify_start() { ipc_data_->sync_for_verify_start(); } void sync_for_cleanup_start() { ipc_data_->sync_for_cleanup_start(); } void sync_dataset_size(uint64_t dataset_size) { ipc_data_->test_dataset_size_ = dataset_size; } uint64_t dataset_size() const { return ipc_data_->test_dataset_size_; } + void exclusive_replica(std::function< void() > const& f) { + std::unique_lock< bip::interprocess_mutex > lg(ipc_data_->exec_mtx_); + f(); + } + void check_and_kill(int port) { std::string command = "lsof -t -i:" + std::to_string(port); if (system(command.c_str())) { diff --git a/src/tests/test_log_store.cpp b/src/tests/test_log_store.cpp index 2cfb94b74..bf71cfa66 100644 --- a/src/tests/test_log_store.cpp +++ b/src/tests/test_log_store.cpp @@ -442,10 +442,9 @@ class SampleDB { if (restart) { for (uint32_t i{0}; i < n_log_stores; ++i) { SampleLogStoreClient* client = m_log_store_clients[i].get(); - logstore_service().open_log_store(client->m_family, client->m_store_id, false /* append_mode */, - [i, this, client](std::shared_ptr< HomeLogStore > log_store) { - client->set_log_store(log_store); - }); + logstore_service() + .open_log_store(client->m_family, client->m_store_id, false /* append_mode */) + .thenValue([i, this, client](auto log_store) { client->set_log_store(log_store); }); } } }, diff --git a/src/tests/test_raft_repl_dev.cpp b/src/tests/test_raft_repl_dev.cpp index b42f04d94..2ffaf2d70 100644 --- a/src/tests/test_raft_repl_dev.cpp +++ b/src/tests/test_raft_repl_dev.cpp @@ -45,8 +45,10 @@ SISL_LOGGING_INIT(HOMESTORE_LOG_MODS) SISL_OPTION_GROUP(test_raft_repl_dev, (block_size, "", "block_size", "block size to io", - ::cxxopts::value< uint32_t >()->default_value("4096"), "number")); -SISL_OPTIONS_ENABLE(logging, test_raft_repl_dev, iomgr, test_common_setup, test_repl_common_setup) + ::cxxopts::value< uint32_t >()->default_value("4096"), "number"), + (num_raft_groups, "", "num_raft_groups", "number of raft groups per test", + ::cxxopts::value< uint32_t >()->default_value("1"), "number")); +SISL_OPTIONS_ENABLE(logging, test_raft_repl_dev, iomgr, config, test_common_setup, test_repl_common_setup) static std::unique_ptr< test_common::HSReplTestHelper > g_helper; @@ -100,7 +102,6 @@ class TestReplicatedDB : public homestore::ReplDevListener { void on_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, MultiBlkId const& blkids, cintrusive< repl_req_ctx >& ctx) override { - LOGINFO("[Replica={}] Received commit on lsn={}", g_helper->replica_num(), lsn); ASSERT_EQ(header.size(), sizeof(test_req::journal_header)); auto jheader = r_cast< test_req::journal_header const* >(header.cbytes()); @@ -108,6 +109,9 @@ class TestReplicatedDB : public homestore::ReplDevListener { Value v{ .lsn_ = lsn, .data_size_ = jheader->data_size, .data_pattern_ = jheader->data_pattern, .blkid_ = blkids}; + LOGINFO("[Replica={}] Received commit on lsn={} key={} value[blkid={} pattern={}]", g_helper->replica_num(), + lsn, k.id_, v.blkid_.to_string(), v.data_pattern_); + { std::unique_lock lk(db_mtx_); inmem_db_.insert_or_assign(k, v); @@ -127,6 +131,12 @@ class TestReplicatedDB : public homestore::ReplDevListener { LOGINFO("[Replica={}] Received rollback on lsn={}", g_helper->replica_num(), lsn); } + void on_error(ReplServiceError error, const sisl::blob& header, const sisl::blob& key, + cintrusive< repl_req_ctx >& ctx) override { + LOGINFO("[Replica={}] Received error={} on key={}", g_helper->replica_num(), enum_name(error), + *(r_cast< uint64_t const* >(key.cbytes()))); + } + blk_alloc_hints get_blk_alloc_hints(sisl::blob const& header, uint32_t data_size) override { return blk_alloc_hints{}; } @@ -134,9 +144,10 @@ class TestReplicatedDB : public homestore::ReplDevListener { void on_replica_stop() override {} void db_write(uint64_t data_size, uint32_t max_size_per_iov) { + static std::atomic< uint32_t > s_uniq_num{0}; auto req = intrusive< test_req >(new test_req()); req->jheader.data_size = data_size; - req->jheader.data_pattern = ((long long)rand() << 32) | rand(); + req->jheader.data_pattern = ((long long)rand() << 32) | ++s_uniq_num; auto block_size = SISL_OPTIONS["block_size"].as< uint32_t >(); if (data_size != 0) { @@ -171,6 +182,8 @@ class TestReplicatedDB : public homestore::ReplDevListener { v.data_pattern_); iomanager.iobuf_free(uintptr_cast(iov.iov_base)); } + LOGINFO("Validated successfully key={} value[blkid={} pattern={}]", k.id_, v.blkid_.to_string(), + v.data_pattern_); g_helper->runner().next_task(); }); }); @@ -191,9 +204,11 @@ class RaftReplDevTest : public testing::Test { public: void SetUp() override { // By default it will create one db - auto db = std::make_shared< TestReplicatedDB >(); - g_helper->register_listener(db); - dbs_.emplace_back(std::move(db)); + for (uint32_t i{0}; i < SISL_OPTIONS["num_raft_groups"].as< uint32_t >(); ++i) { + auto db = std::make_shared< TestReplicatedDB >(); + g_helper->register_listener(db); + dbs_.emplace_back(std::move(db)); + } } void generate_writes(uint64_t data_size, uint32_t max_size_per_iov) { @@ -220,28 +235,59 @@ class RaftReplDevTest : public testing::Test { TestReplicatedDB& pick_one_db() { return *dbs_[0]; } + void switch_all_db_leader() { + for (auto const& db : dbs_) { + do { + auto result = db->repl_dev()->become_leader().get(); + if (result.hasError()) { + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + } else { + break; + } + } while (true); + } + } + private: std::vector< std::shared_ptr< TestReplicatedDB > > dbs_; }; -TEST_F(RaftReplDevTest, All_Append) { +TEST_F(RaftReplDevTest, All_Append_Restart_Append) { LOGINFO("Homestore replica={} setup completed", g_helper->replica_num()); g_helper->sync_for_test_start(); + uint64_t exp_entries = SISL_OPTIONS["num_io"].as< uint64_t >(); if (g_helper->replica_num() == 0) { - g_helper->sync_dataset_size(SISL_OPTIONS["num_io"].as< uint64_t >()); auto block_size = SISL_OPTIONS["block_size"].as< uint32_t >(); LOGINFO("Run on worker threads to schedule append on repldev for {} Bytes.", block_size); g_helper->runner().set_task([this, block_size]() { this->generate_writes(block_size, block_size); }); g_helper->runner().execute().get(); } - - this->wait_for_all_writes(g_helper->dataset_size()); + this->wait_for_all_writes(exp_entries); g_helper->sync_for_verify_start(); LOGINFO("Validate all data written so far by reading them"); this->validate_all_data(); + g_helper->sync_for_cleanup_start(); + LOGINFO("Restart all the homestore replicas"); + g_helper->restart(); + g_helper->sync_for_test_start(); + + exp_entries += SISL_OPTIONS["num_io"].as< uint64_t >(); + if (g_helper->replica_num() == 0) { + LOGINFO("Switch the leader to replica_num = 0"); + this->switch_all_db_leader(); + + LOGINFO("Post restart write the data again"); + auto block_size = SISL_OPTIONS["block_size"].as< uint32_t >(); + g_helper->runner().set_task([this, block_size]() { this->generate_writes(block_size, block_size); }); + g_helper->runner().execute().get(); + } + this->wait_for_all_writes(exp_entries); + + LOGINFO("Validate all data written (including pre-restart data) by reading them"); + this->validate_all_data(); g_helper->sync_for_cleanup_start(); } @@ -250,7 +296,8 @@ int main(int argc, char* argv[]) { char** orig_argv = argv; ::testing::InitGoogleTest(&parsed_argc, argv); - SISL_OPTIONS_LOAD(parsed_argc, argv, logging, test_raft_repl_dev, iomgr, test_common_setup, test_repl_common_setup); + SISL_OPTIONS_LOAD(parsed_argc, argv, logging, config, test_raft_repl_dev, iomgr, test_common_setup, + test_repl_common_setup); FLAGS_folly_global_cpu_executor_threads = 4; g_helper = std::make_unique< test_common::HSReplTestHelper >("test_raft_repl_dev", orig_argv); diff --git a/src/tests/test_solo_repl_dev.cpp b/src/tests/test_solo_repl_dev.cpp index 492a8006a..90b5bcca0 100644 --- a/src/tests/test_solo_repl_dev.cpp +++ b/src/tests/test_solo_repl_dev.cpp @@ -121,6 +121,10 @@ class SoloReplDevTest : public testing::Test { return blk_alloc_hints{}; } + void on_error(ReplServiceError error, const sisl::blob& header, const sisl::blob& key, + cintrusive< repl_req_ctx >& ctx) override { + LOGINFO("Received error={} on repl_dev", enum_name(error)); + } void on_replica_stop() override {} };