diff --git a/conanfile.py b/conanfile.py index 49abb180e..a83dea6b4 100644 --- a/conanfile.py +++ b/conanfile.py @@ -5,7 +5,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "6.2.4" + version = "6.3.1" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" diff --git a/src/include/homestore/logstore/log_store.hpp b/src/include/homestore/logstore/log_store.hpp index 71a1cdcda..6c0b493ec 100644 --- a/src/include/homestore/logstore/log_store.hpp +++ b/src/include/homestore/logstore/log_store.hpp @@ -173,6 +173,8 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > { * to set this to true on cases where there are multiple log stores, so that once all in-memory truncation is * completed, a device truncation can be triggered for all the logstores. The device truncation is more * expensive and grouping them together yields better results. + * + * Note: this flag currently is not used, meaning all truncate is in memory only; * @return number of records to truncate */ void truncate(logstore_seq_num_t upto_seq_num, bool in_memory_truncate_only = true); @@ -274,18 +276,80 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > { nlohmann::json get_status(int verbosity) const; + /** + * Retrieves the truncation information before device truncation. + * + * @return A constant reference to the truncation_info object representing the truncation information. + */ const truncation_info& pre_device_truncation(); + + /** + * \brief post device truncation processing. + * + * This function is used to update safe truncation boundary to the specified `trunc_upto_key`. + * + * \param trunc_upto_key The key indicating the log entry up to which truncation has been performed. + */ void post_device_truncation(const logdev_key& trunc_upto_key); + + /** + * Handles the completion of a write operation in the log store. + * + * @param req The logstore_req object representing the completed write operation. + * @param ld_key The logdev_key associated with the completed write operation. + */ void on_write_completion(logstore_req* req, const logdev_key& ld_key); + + /** + * \brief Handles the completion of a read operation in the log store. + * + * This function is called when a read operation in the log store has completed. + * It takes a pointer to a logstore_req object and a logdev_key object as parameters. + * + * \param req The pointer to the logstore_req object representing the read request. + * \param ld_key The logdev_key object representing the key used for the read operation. + */ void on_read_completion(logstore_req* req, const logdev_key& ld_key); + + /** + * @brief Handles the event when a log is found. + * + * This function is called when a log is found in the log store. It takes the sequence number of the log, + * the log device key, the flush log device key, and the log buffer as parameters. + * + * During LogDev::do_load during recovery boot, whenever a log is found, the associated logstore's on_log_found + * method is called. + * + * @param seq_num The sequence number of the log. + * @param ld_key The log device key. + * @param flush_ld_key The flush log device key. + * @param buf The log buffer. + */ void on_log_found(logstore_seq_num_t seq_num, const logdev_key& ld_key, const logdev_key& flush_ld_key, log_buffer buf); + /** + * @brief Handles the completion of a batch flush operation to update internal state. + * + * This function is called when a batch flush operation is completed. + * It takes a `logdev_key` parameter that represents the key of the flushed batch. + * + * This function is also called during log store recovery; + * + * @param flush_batch_ld_key The key of the flushed batch. + */ void on_batch_completion(const logdev_key& flush_batch_ld_key); private: + /** + * Truncates the log store up to the specified sequence number. + * + * @param upto_seq_num The sequence number up to which the log store should be truncated. + */ void do_truncate(logstore_seq_num_t upto_seq_num); + int search_max_le(logstore_seq_num_t input_sn); +private: logstore_id_t m_store_id; std::shared_ptr< LogDev > m_logdev; sisl::StreamTracker< logstore_record > m_records; diff --git a/src/include/homestore/logstore_service.hpp b/src/include/homestore/logstore_service.hpp index 158b6c6ba..44e1acac1 100644 --- a/src/include/homestore/logstore_service.hpp +++ b/src/include/homestore/logstore_service.hpp @@ -158,6 +158,12 @@ class LogStoreService { uint32_t used_size() const; uint32_t total_size() const; iomgr::io_fiber_t flush_thread() { return m_flush_fiber; } + + /** + * This is used when the actual LogDev truncate is triggered; + * + * @return The IO fiber associated with the truncate thread. + */ iomgr::io_fiber_t truncate_thread() { return m_truncate_fiber; } private: diff --git a/src/include/homestore/replication/repl_decls.h b/src/include/homestore/replication/repl_decls.h index 99253b9f5..ac15a53af 100644 --- a/src/include/homestore/replication/repl_decls.h +++ b/src/include/homestore/replication/repl_decls.h @@ -65,6 +65,9 @@ using remote_blkid_list_t = folly::small_vector< RemoteBlkId, 4 >; using replica_id_t = uuid_t; using group_id_t = uuid_t; +using store_lsn_t = int64_t; +using repl_lsn_t = int64_t; + struct peer_info { // Peer ID. replica_id_t id_; diff --git a/src/include/homestore/replication/repl_dev.h b/src/include/homestore/replication/repl_dev.h index c189114aa..943bb84d8 100644 --- a/src/include/homestore/replication/repl_dev.h +++ b/src/include/homestore/replication/repl_dev.h @@ -10,6 +10,7 @@ #include #include #include +#include namespace nuraft { template < typename T > @@ -50,6 +51,11 @@ struct repl_key { std::string to_string() const { return fmt::format("server={}, term={}, dsn={}", server_id, term, dsn); } }; +struct repl_snapshot { + uint64_t last_log_idx_{0}; + uint64_t last_log_term_{0}; +}; + struct repl_journal_entry; struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost::thread_safe_counter > { friend class SoloReplDev; @@ -192,6 +198,9 @@ class ReplDevListener { /// @brief Called when the replica set is being stopped virtual void on_replica_stop() = 0; + /// @brief Called when the snapshot is being created by nuraft; + virtual AsyncReplResult<> create_snapshot(repl_snapshot& s) = 0; + private: std::weak_ptr< ReplDev > m_repl_dev; }; diff --git a/src/include/homestore/replication_service.hpp b/src/include/homestore/replication_service.hpp index 19ee11701..d24722202 100644 --- a/src/include/homestore/replication_service.hpp +++ b/src/include/homestore/replication_service.hpp @@ -20,7 +20,6 @@ VENUM(repl_impl_type, uint8_t, solo // For single node - no replication ); - class ReplApplication; class ReplicationService { diff --git a/src/lib/checkpoint/cp_mgr.cpp b/src/lib/checkpoint/cp_mgr.cpp index d73a0ddd1..104db1ac2 100644 --- a/src/lib/checkpoint/cp_mgr.cpp +++ b/src/lib/checkpoint/cp_mgr.cpp @@ -37,7 +37,7 @@ CPManager::CPManager() : nullptr); resource_mgr().register_dirty_buf_exceed_cb( - [this]([[maybe_unused]] int64_t dirty_buf_count) { this->trigger_cp_flush(false /* false */); }); + [this]([[maybe_unused]] int64_t dirty_buf_count, bool critical) { this->trigger_cp_flush(false /* force */); }); start_cp_thread(); } diff --git a/src/lib/common/homestore_config.fbs b/src/lib/common/homestore_config.fbs index 408a3ab98..8cb718298 100644 --- a/src/lib/common/homestore_config.fbs +++ b/src/lib/common/homestore_config.fbs @@ -158,8 +158,21 @@ table ResourceLimits { /* precentage of memory used during recovery */ memory_in_recovery_precent: uint32 = 40; - /* journal size used percentage */ - journal_size_percent: uint32 = 50; + /* journal size used percentage high watermark -- trigger cp */ + journal_vdev_size_percent: uint32 = 50; + + /* journal size used percentage critical watermark -- trigger truncation */ + journal_vdev_size_percent_critical: uint32 = 90; + + /* [not used] journal descriptor size (NuObject: Per PG) Threshold in MB -- ready for truncation */ + journal_descriptor_size_threshold_mb: uint32 = 2048(hotswap); + + /* num entries that raft logstore wants to reserve -- its truncate should not across this */ + /* 0 means HomeStore doesn't reserve anything and let nuraft controlls the truncation */ + raft_logstore_reserve_threshold: uint32 = 0 (hotswap); + + /* resource audit timer in ms */ + resource_audit_timer_ms: uint32 = 120000; /* We crash if volume is 95 percent filled and no disk space left */ vol_threshhold_used_size_p: uint32 = 95; @@ -199,14 +212,17 @@ table Consensus { heartbeat_period_ms: uint32 = 250; // Re-election timeout low and high mark - elect_to_low_ms: uint32 = 900; - elect_to_high_ms: uint32 = 1400; + elect_to_low_ms: uint32 = 800; + elect_to_high_ms: uint32 = 1700; // When a new member is being synced, the batch size of number of logs to be shipped log_sync_batch_size: int32 = 100; // Log distance with which snapshot/compact needs to happen. 0 means snapshot is disabled - snapshot_freq_distance: int32 = 0; + snapshot_freq_distance: uint32 = 2000; + + // Num reserved log items while triggering compact from raft server, only consumed by nuraft server; + num_reserved_log_items: uint32 = 20000; // Max append batch size max_append_batch_size: int32 = 64; diff --git a/src/lib/common/resource_mgr.cpp b/src/lib/common/resource_mgr.cpp index 71a2e97d4..482d60549 100644 --- a/src/lib/common/resource_mgr.cpp +++ b/src/lib/common/resource_mgr.cpp @@ -14,13 +14,67 @@ * *********************************************************************************/ #include +#include +#include +#include #include "resource_mgr.hpp" #include "homestore_assert.hpp" +#include "replication/repl_dev/raft_repl_dev.h" namespace homestore { ResourceMgr& resource_mgr() { return hs()->resource_mgr(); } -void ResourceMgr::set_total_cap(uint64_t total_cap) { m_total_cap = total_cap; } +void ResourceMgr::start(uint64_t total_cap) { + m_total_cap = total_cap; + start_timer(); +} + +void ResourceMgr::stop() { + LOGINFO("Cancel resource manager timer."); + iomanager.cancel_timer(m_res_audit_timer_hdl); + m_res_audit_timer_hdl = iomgr::null_timer_handle; +} + +// +// 1. Conceptually in rare case(not poosible for NuObject, possibly true for NuBlox2.0) truncate itself can't garunteen +// the space is freed up upto satisfy resource manager. e.g. multiple log stores on this same descriptor and one +// logstore lagging really behind and not able to truncate much space. Doing multiple truncation won't help in this +// case. +// 2. And any write on any other descriptor will trigger a high_watermark_check, and if it were to trigger critial +// alert on this vdev, truncation will be made immediately on all descriptors; +// 3. If still no space can be freed, there is nothing we can't here to back pressure to above layer by rejecting log +// writes on this descriptor; +// +void ResourceMgr::trigger_truncate() { + if (hs()->has_repl_data_service()) { + // first make sure all repl dev's underlying raft log store make corresponding reservation during + // truncate -- set the safe truncate boundary for each raft log store; + hs()->repl_service().iterate_repl_devs([](cshared< ReplDev >& rd) { + // lock is already taken by repl service layer; + std::dynamic_pointer_cast< RaftReplDev >(rd)->truncate( + HS_DYNAMIC_CONFIG(resource_limits.raft_logstore_reserve_threshold)); + }); + + // next do device truncate which go through all logdevs and truncate them; + hs()->logstore_service().device_truncate(); + } + + // TODO: add device_truncate callback to audit how much space was freed per each LogDev and add related + // metrics; +} + +void ResourceMgr::start_timer() { + auto const res_mgr_timer_ms = HS_DYNAMIC_CONFIG(resource_limits.resource_audit_timer_ms); + LOGINFO("resource audit timer is set to {} usec", res_mgr_timer_ms); + + m_res_audit_timer_hdl = iomanager.schedule_global_timer( + res_mgr_timer_ms * 1000 * 1000, true /* recurring */, nullptr /* cookie */, iomgr::reactor_regex::all_worker, + [this](void*) { + // all resource timely audit routine should arrive here; + this->trigger_truncate(); + }, + true /* wait_to_schedule */); +} /* monitor dirty buffer count */ void ResourceMgr::inc_dirty_buf_size(const uint32_t size) { @@ -28,7 +82,7 @@ void ResourceMgr::inc_dirty_buf_size(const uint32_t size) { const auto dirty_buf_cnt = m_hs_dirty_buf_cnt.fetch_add(size, std::memory_order_relaxed); COUNTER_INCREMENT(m_metrics, dirty_buf_cnt, size); if (m_dirty_buf_exceed_cb && ((dirty_buf_cnt + size) > get_dirty_buf_limit())) { - m_dirty_buf_exceed_cb(dirty_buf_cnt + size); + m_dirty_buf_exceed_cb(dirty_buf_cnt + size, false /* critical */); } } @@ -106,22 +160,37 @@ uint64_t ResourceMgr::get_cache_size() const { return ((HS_STATIC_CONFIG(input.io_mem_size()) * HS_DYNAMIC_CONFIG(resource_limits.cache_size_percent)) / 100); } -/* monitor journal size */ -bool ResourceMgr::check_journal_size(const uint64_t used_size, const uint64_t total_size) { - if (m_journal_exceed_cb) { +bool ResourceMgr::check_journal_descriptor_size(const uint64_t used_size) const { + return (used_size >= get_journal_descriptor_size_limit()); +} + +/* monitor journal vdev size */ +bool ResourceMgr::check_journal_vdev_size(const uint64_t used_size, const uint64_t total_size) { + if (m_journal_vdev_exceed_cb) { const uint32_t used_pct = (100 * used_size / total_size); - if (used_pct >= HS_DYNAMIC_CONFIG(resource_limits.journal_size_percent)) { - m_journal_exceed_cb(used_size); + if (used_pct >= get_journal_vdev_size_limit()) { + m_journal_vdev_exceed_cb(used_size, used_pct >= get_journal_vdev_size_critical_limit() /* is_critical */); HS_LOG_EVERY_N(WARN, base, 50, "high watermark hit, used percentage: {}, high watermark percentage: {}", - used_pct, HS_DYNAMIC_CONFIG(resource_limits.journal_size_percent)); + used_pct, get_journal_vdev_size_limit()); return true; } } return false; } -void ResourceMgr::register_journal_exceed_cb(exceed_limit_cb_t cb) { m_journal_exceed_cb = std::move(cb); } -uint32_t ResourceMgr::get_journal_size_limit() const { return HS_DYNAMIC_CONFIG(resource_limits.journal_size_percent); } +void ResourceMgr::register_journal_vdev_exceed_cb(exceed_limit_cb_t cb) { m_journal_vdev_exceed_cb = std::move(cb); } + +uint32_t ResourceMgr::get_journal_descriptor_size_limit() const { + return HS_DYNAMIC_CONFIG(resource_limits.journal_descriptor_size_threshold_mb) * 1024 * 1024; +} + +uint32_t ResourceMgr::get_journal_vdev_size_critical_limit() const { + return HS_DYNAMIC_CONFIG(resource_limits.journal_vdev_size_percent_critical); +} + +uint32_t ResourceMgr::get_journal_vdev_size_limit() const { + return HS_DYNAMIC_CONFIG(resource_limits.journal_vdev_size_percent); +} /* monitor chunk size */ void ResourceMgr::check_chunk_free_size_and_trigger_cp(uint64_t free_size, uint64_t alloc_size) {} diff --git a/src/lib/common/resource_mgr.hpp b/src/lib/common/resource_mgr.hpp index 54fc459b6..9d4cb10ea 100644 --- a/src/lib/common/resource_mgr.hpp +++ b/src/lib/common/resource_mgr.hpp @@ -39,12 +39,13 @@ class RsrcMgrMetrics : public sisl::MetricsGroup { ~RsrcMgrMetrics() { deregister_me_from_farm(); } }; -typedef std::function< void(int64_t /* dirty_buf_cnt */) > exceed_limit_cb_t; +typedef std::function< void(int64_t /* dirty_buf_cnt */, bool /* critical */) > exceed_limit_cb_t; const uint32_t max_qd_multiplier = 32; class ResourceMgr { public: - void set_total_cap(uint64_t total_cap); + void start(uint64_t total_cap); + void stop(); /* monitor dirty buffer count */ void inc_dirty_buf_size(const uint32_t size); @@ -75,11 +76,42 @@ class ResourceMgr { /* get cache size */ uint64_t get_cache_size() const; - /* monitor journal size */ - bool check_journal_size(const uint64_t used_size, const uint64_t total_size); - void register_journal_exceed_cb(exceed_limit_cb_t cb); - - uint32_t get_journal_size_limit() const; + /** + * @brief Checks if the journal virtual device (vdev) size is within the specified limits. + * + * This function compares the used size of the journal vdev with the total size of the vdev + * and returns true if the used size is within the limits, and false otherwise. + * + * If it exceeds the limit, it will call the callback function registered with register_journal_vdev_exceed_cb(). + * + * @param used_size The used size of the journal vdev. + * @param total_size The total size of the journal vdev. + * @return true if the used size is exceeding the limits, false if not exceeding limit or caller didn't registered + * any callback (caller not interested). + */ + bool check_journal_vdev_size(const uint64_t used_size, const uint64_t total_size); + + /** + * @brief Checks if the given used size is within the acceptable range for the journal descriptor. + * + * This function checks if the used size of the journal descriptor is within the acceptable range. + * The acceptable range is determined by the implementation of the resource manager. + * + * @param used_size The used size of the journal descriptor. + * @return true if the used size is exceeding the acceptable range, false otherwise. + */ + bool check_journal_descriptor_size(const uint64_t used_size) const; + + /** + * Registers a callback function to be called when the journal virtual device exceeds its limit. + * + * @param cb The callback function to be registered. + */ + void register_journal_vdev_exceed_cb(exceed_limit_cb_t cb); + + uint32_t get_journal_vdev_size_limit() const; + uint32_t get_journal_vdev_size_critical_limit() const; + uint32_t get_journal_descriptor_size_limit() const; /* monitor chunk size */ void check_chunk_free_size_and_trigger_cp(uint64_t free_size, uint64_t alloc_size); @@ -90,9 +122,21 @@ class ResourceMgr { void reset_dirty_buf_qd(); + /** + * Triggers the truncation process. + * This function is responsible for initiating the truncation process. + */ + void trigger_truncate(); + private: int64_t get_dirty_buf_limit() const; + /** + * Starts resource manager resource audit timer. + */ + void start_timer(); + +private: std::atomic< int64_t > m_hs_dirty_buf_cnt; std::atomic< int64_t > m_hs_fb_cnt; // free count std::atomic< int64_t > m_hs_fb_size; // free size @@ -100,10 +144,14 @@ class ResourceMgr { std::atomic< int64_t > m_memory_used_in_recovery; std::atomic< uint32_t > m_flush_dirty_buf_q_depth{64}; uint64_t m_total_cap; + + // TODO: make it event_cb exceed_limit_cb_t m_dirty_buf_exceed_cb; exceed_limit_cb_t m_free_blks_exceed_cb; - exceed_limit_cb_t m_journal_exceed_cb; + exceed_limit_cb_t m_journal_vdev_exceed_cb; RsrcMgrMetrics m_metrics; + + iomgr::timer_handle_t m_res_audit_timer_hdl; }; extern ResourceMgr& resource_mgr(); diff --git a/src/lib/device/journal_vdev.cpp b/src/lib/device/journal_vdev.cpp index 424a4e57c..bc20bd5f3 100644 --- a/src/lib/device/journal_vdev.cpp +++ b/src/lib/device/journal_vdev.cpp @@ -23,6 +23,9 @@ #include #include #include +#include +#include +#include "replication/repl_dev/raft_repl_dev.h" #include "device/chunk.h" #include "device/device.h" #include "device/physical_dev.hpp" @@ -51,6 +54,16 @@ JournalVirtualDev::JournalVirtualDev(DeviceManager& dmgr, const vdev_info& vinfo return private_blob; }, m_vdev_info.hs_dev_type, m_vdev_info.vdev_id, m_vdev_info.chunk_size}); + + resource_mgr().register_journal_vdev_exceed_cb([this]([[maybe_unused]] int64_t dirty_buf_count, bool critical) { + // either it is critical or non-critical, call cp_flush; + hs()->cp_mgr().trigger_cp_flush(false /* force */); + + if (critical) { + LOGINFO("Critical journal vdev size threshold reached. Triggering truncate."); + resource_mgr().trigger_truncate(); + } + }); } JournalVirtualDev::~JournalVirtualDev() {} @@ -637,8 +650,20 @@ bool JournalVirtualDev::Descriptor::is_offset_at_last_chunk(off_t bytes_offset) return false; } +// +// This API is ways called in single thread +// void JournalVirtualDev::Descriptor::high_watermark_check() { - if (resource_mgr().check_journal_size(used_size(), size())) { +#if 0 + // high watermark check for the individual journal descriptor; + if (resource_mgr().check_journal_descriptor_size(used_size())) { + // the next resource manager audit will call truncation for this descriptor; + set_ready_for_truncate(); + } +#endif + + // high watermark check for the entire journal vdev; + if (resource_mgr().check_journal_vdev_size(m_vdev.used_size(), m_vdev.size())) { COUNTER_INCREMENT(m_vdev.m_metrics, vdev_high_watermark_count, 1); if (m_vdev.m_event_cb && m_truncate_done) { diff --git a/src/lib/device/journal_vdev.hpp b/src/lib/device/journal_vdev.hpp index b0a304553..b4f3fb163 100644 --- a/src/lib/device/journal_vdev.hpp +++ b/src/lib/device/journal_vdev.hpp @@ -80,14 +80,15 @@ class JournalVirtualDev : public VirtualDev { /** * @brief : allocate space specified by input size. + * this API will always be called in single thread; * * @param size : size to be allocated * * @return : the start unique offset of the allocated space * * Possible calling sequence: - * offset_1 = reserve(size1); - * offset_2 = reserve(size2); + * offset_1 = alloc_next_append_blk(size1); + * offset_2 = alloc_next_append_blk(size2); * write_at_offset(offset_2); * write_at_offset(offset_1); */ diff --git a/src/lib/homestore.cpp b/src/lib/homestore.cpp index 4929da24f..a3c7e3096 100644 --- a/src/lib/homestore.cpp +++ b/src/lib/homestore.cpp @@ -238,7 +238,6 @@ void HomeStore::do_start() { m_meta_service->start(m_dev_mgr->is_first_time_boot()); m_cp_mgr->start(is_first_time_boot()); - m_resource_mgr->set_total_cap(m_dev_mgr->total_capacity()); if (has_index_service()) { m_index_service->start(); } @@ -254,6 +253,8 @@ void HomeStore::do_start() { } m_cp_mgr->start_timer(); + + m_resource_mgr->start(m_dev_mgr->total_capacity()); m_init_done = true; } @@ -268,6 +269,8 @@ void HomeStore::shutdown() { m_cp_mgr->shutdown(); m_cp_mgr.reset(); + m_resource_mgr->stop(); + if (has_repl_data_service()) { // Log and Data services are stopped by repl service s_cast< GenericReplService* >(m_repl_service.get())->stop(); diff --git a/src/lib/logstore/log_dev.cpp b/src/lib/logstore/log_dev.cpp index 541c54768..cb2813543 100644 --- a/src/lib/logstore/log_dev.cpp +++ b/src/lib/logstore/log_dev.cpp @@ -507,6 +507,7 @@ bool LogDev::run_under_flush_lock(const flush_blocked_callback& cb) { } } + // the contract here is if cb return falses, it means it will unlock_flush by itself (in another thread); if (cb()) { unlock_flush(); } return true; } @@ -682,7 +683,7 @@ void LogDev::remove_log_store(logstore_id_t store_id) { unreserve_store_id(store_id); } -void LogDev::device_truncate_under_lock(const std::shared_ptr< truncate_req >& treq) { +void LogDev::device_truncate_under_lock(const std::shared_ptr< truncate_req > treq) { run_under_flush_lock([this, treq]() { iomanager.run_on_forget(logstore_service().truncate_thread(), [this, treq]() { const logdev_key trunc_upto = do_device_truncate(treq->dry_run); @@ -698,6 +699,7 @@ void LogDev::device_truncate_under_lock(const std::shared_ptr< truncate_req >& t if (treq->cb) { treq->cb(treq->m_trunc_upto_result); } if (treq->wait_till_done) { treq->cv.notify_one(); } } + unlock_flush(); }); return false; // Do not release the flush lock yet, the scheduler will unlock it. diff --git a/src/lib/logstore/log_dev.hpp b/src/lib/logstore/log_dev.hpp index 530388d0e..6cdad9df3 100644 --- a/src/lib/logstore/log_dev.hpp +++ b/src/lib/logstore/log_dev.hpp @@ -743,14 +743,6 @@ class LogDev : public std::enable_shared_from_this< LogDev > { */ void unlock_flush(bool do_flush = true); - /** - * @brief : truncate up to input log id; - * - * @param key : the key containing log id that needs to be truncate up to; - * @return number of records to truncate - */ - uint64_t truncate(const logdev_key& key); - /** * @brief Rollback the logid range specific to the given store id. This method persists the information * synchronously to the underlying storage. Once rolledback those logids in this range are ignored (only for @@ -792,13 +784,45 @@ class LogDev : public std::enable_shared_from_this< LogDev > { void on_logfound(logstore_id_t id, logstore_seq_num_t seq_num, logdev_key ld_key, logdev_key flush_ld_key, log_buffer buf, uint32_t nremaining_in_batch); void on_batch_completion(HomeLogStore* log_store, uint32_t nremaining_in_batch, logdev_key flush_ld_key); - void device_truncate_under_lock(const std::shared_ptr< truncate_req >& treq); - logdev_key do_device_truncate(bool dry_run = false); + + /** + * Truncates the device under lock. + * + * This function is responsible for truncating the device based on the provided truncate request. + * The truncation operation is performed under a lock to ensure thread safety. + * + * @param treq The truncate request to be processed. + */ + void device_truncate_under_lock(const std::shared_ptr< truncate_req > treq); + void handle_unopened_log_stores(bool format); logdev_id_t get_id() { return m_logdev_id; } shared< JournalVirtualDev::Descriptor > get_journal_descriptor() const { return m_vdev_jd; } + // bool ready_for_truncate() const { return m_vdev_jd->ready_for_truncate(); } + private: + /** + * @brief : truncate up to input log id; + * + * @param key : the key containing log id that needs to be truncate up to; + * @return number of records to truncate + */ + uint64_t truncate(const logdev_key& key); + + /** + * Truncates the device. + * + * This function truncates the device and returns the corresponding logdev_key. + * + * @param dry_run If set to true, the function performs a dry run without actually truncating the device, it only + * updates the corresponding truncation barriers, pretending the truncation happened without actually discarding the + * log entries on device. + * + * @return The logdev_key representing the truncated device. + */ + logdev_key do_device_truncate(bool dry_run = false); + LogGroup* make_log_group(uint32_t estimated_records) { m_log_group_pool[m_log_group_idx].reset(estimated_records); return &m_log_group_pool[m_log_group_idx]; diff --git a/src/lib/logstore/log_store.cpp b/src/lib/logstore/log_store.cpp index 48023f0e5..bd60291c6 100644 --- a/src/lib/logstore/log_store.cpp +++ b/src/lib/logstore/log_store.cpp @@ -44,6 +44,7 @@ HomeLogStore::HomeLogStore(std::shared_ptr< LogDev > logdev, logstore_id_t id, b m_truncation_barriers.reserve(10000); m_safe_truncation_boundary.ld_key = m_logdev->get_last_flush_ld_key(); m_safe_truncation_boundary.seq_num.store(start_lsn - 1, std::memory_order_release); + THIS_LOGSTORE_LOG(TRACE, "m_safe_truncation_boundary.ld_key={}", m_safe_truncation_boundary.ld_key); } bool HomeLogStore::write_sync(logstore_seq_num_t seq_num, const sisl::io_blob& b) { @@ -83,7 +84,10 @@ void HomeLogStore::write_async(logstore_req* req, const log_req_comp_cb_t& cb) { HS_LOG_ASSERT((cb || m_comp_cb), "Expected either cb is not null or default cb registered"); req->cb = (cb ? cb : m_comp_cb); req->start_time = Clock::now(); - if (req->seq_num == 0) { m_safe_truncation_boundary.ld_key = m_logdev->get_last_flush_ld_key(); } + if (req->seq_num == 0) { + m_safe_truncation_boundary.ld_key = m_logdev->get_last_flush_ld_key(); + THIS_LOGSTORE_LOG(TRACE, "m_safe_truncation_boundary.ld_key={}", m_safe_truncation_boundary.ld_key); + } #ifndef NDEBUG const auto trunc_upto_lsn = truncated_upto(); if (req->seq_num <= trunc_upto_lsn) { @@ -274,6 +278,7 @@ void HomeLogStore::do_truncate(logstore_seq_num_t upto_seq_num) { (ind == static_cast< int >(m_truncation_barriers.size() - 1))); m_safe_truncation_boundary.ld_key = m_truncation_barriers[ind].ld_key; + THIS_LOGSTORE_LOG(TRACE, "m_safe_truncation_boundary.ld_key={}", m_safe_truncation_boundary.ld_key); m_safe_truncation_boundary.pending_dev_truncation = true; m_truncation_barriers.erase(m_truncation_barriers.begin(), m_truncation_barriers.begin() + ind + 1); @@ -291,6 +296,7 @@ void HomeLogStore::post_device_truncation(const logdev_key& trunc_upto_loc) { // This method is expected to be called always with this m_safe_truncation_boundary.pending_dev_truncation = false; m_safe_truncation_boundary.ld_key = trunc_upto_loc; + THIS_LOGSTORE_LOG(TRACE, "m_safe_truncation_boundary.ld_key={}", m_safe_truncation_boundary.ld_key); } else { HS_REL_ASSERT(0, "We expect post_device_truncation to be called only for logstores which has min of all " diff --git a/src/lib/logstore/log_store_service.cpp b/src/lib/logstore/log_store_service.cpp index 16d155da5..46a85e6de 100644 --- a/src/lib/logstore/log_store_service.cpp +++ b/src/lib/logstore/log_store_service.cpp @@ -106,7 +106,7 @@ void LogStoreService::start(bool format) { } void LogStoreService::stop() { - device_truncate(nullptr, true, false); + // device_truncate(nullptr, true, false); for (auto& [id, logdev] : m_id_logdev_map) { logdev->stop(); } @@ -235,9 +235,11 @@ void LogStoreService::device_truncate(const device_truncate_cb_t& cb, bool wait_ treq->cb = cb; if (treq->wait_till_done) { treq->trunc_outstanding = m_id_logdev_map.size(); } + // TODO: make device_truncate_under_lock return future and do collectAllFutures; for (auto& [id, logdev] : m_id_logdev_map) { logdev->device_truncate_under_lock(treq); } + if (treq->wait_till_done) { std::unique_lock< std::mutex > lk{treq->mtx}; treq->cv.wait(lk, [&] { return (treq->trunc_outstanding == 0); }); diff --git a/src/lib/replication/log_store/home_raft_log_store.cpp b/src/lib/replication/log_store/home_raft_log_store.cpp index eff66b33a..4d9b2443a 100644 --- a/src/lib/replication/log_store/home_raft_log_store.cpp +++ b/src/lib/replication/log_store/home_raft_log_store.cpp @@ -16,6 +16,8 @@ #include "home_raft_log_store.h" #include "storage_engine_buffer.h" #include +#include "common/homestore_assert.hpp" +#include using namespace homestore; @@ -60,6 +62,32 @@ static uint64_t extract_term(const log_buffer& log_bytes) { return (*r_cast< uint64_t const* >(raw_ptr)); } +void HomeRaftLogStore::truncate(uint32_t num_reserved_cnt, repl_lsn_t compact_lsn) { + auto const last_lsn = last_index(); + auto const start_lsn = start_index(); + + if (start_lsn + num_reserved_cnt >= last_lsn) { + REPL_STORE_LOG(DEBUG, + "Store={} LogDev={}: Skipping truncating because of reserved logs entries is not enough. " + "start_lsn={}, resv_cnt={}, last_lsn={}", + m_logstore_id, m_logdev_id, start_lsn, num_reserved_cnt, last_lsn); + return; + } else { + // + // truncate_lsn can not accross compact_lsn passed down by raft server; + // + // When will it happen: + // compact_lsn can be smaller than last_lsn - num_reserved_cnt, when raft is configured with + // snapshot_distance of a large value, and dynamic config "resvered log entries" a smaller value. + // + auto truncate_lsn = std::min(last_lsn - num_reserved_cnt, (ulong)to_store_lsn(compact_lsn)); + + REPL_STORE_LOG(INFO, "LogDev={}: Truncating log entries from {} to {}, compact_lsn={}, last_lsn={}", + m_logdev_id, start_lsn, truncate_lsn, compact_lsn, last_lsn); + m_log_store->truncate(truncate_lsn); + } +} + HomeRaftLogStore::HomeRaftLogStore(logdev_id_t logdev_id, logstore_id_t logstore_id) { m_dummy_log_entry = nuraft::cs_new< nuraft::log_entry >(0, nuraft::buffer::alloc(0), nuraft::log_val_type::app_log); @@ -93,6 +121,11 @@ ulong HomeRaftLogStore::next_slot() const { return next_slot; } +ulong HomeRaftLogStore::last_index() const { + uint64_t last_index = m_log_store->get_contiguous_completed_seq_num(m_last_durable_lsn); + return last_index; +} + ulong HomeRaftLogStore::start_index() const { // start_index starts from 1. ulong start_index = std::max((repl_lsn_t)1, to_repl_lsn(m_log_store->truncated_upto()) + 1); @@ -245,13 +278,23 @@ void HomeRaftLogStore::apply_pack(ulong index, nuraft::buffer& pack) { bool HomeRaftLogStore::compact(ulong compact_lsn) { auto cur_max_lsn = m_log_store->get_contiguous_issued_seq_num(m_last_durable_lsn); if (cur_max_lsn < to_store_lsn(compact_lsn)) { + // release this assert if for some use case, we should tolorant this case; + // for now, don't expect this case to happen. + RELEASE_ASSERT(false, "compact_lsn={} is beyond the current max_lsn={}", compact_lsn, cur_max_lsn); + // We need to fill the remaining entries with dummy data. for (auto lsn{cur_max_lsn + 1}; lsn <= to_store_lsn(compact_lsn); ++lsn) { append(m_dummy_log_entry); } } + m_log_store->flush_sync(to_store_lsn(compact_lsn)); - m_log_store->truncate(to_store_lsn(compact_lsn)); + + // we rely on resrouce mgr timer to trigger truncate for all log stores in system; + // this will be friendly for multiple logstore on same logdev; + + // m_log_store->truncate(to_store_lsn(compact_lsn)); + return true; } diff --git a/src/lib/replication/log_store/home_raft_log_store.h b/src/lib/replication/log_store/home_raft_log_store.h index 4e6288d1a..e3da2b379 100644 --- a/src/lib/replication/log_store/home_raft_log_store.h +++ b/src/lib/replication/log_store/home_raft_log_store.h @@ -29,8 +29,6 @@ namespace homestore { -using store_lsn_t = int64_t; -using repl_lsn_t = int64_t; using raft_buf_ptr_t = nuraft::ptr< nuraft::buffer >; class HomeRaftLogStore : public nuraft::log_store { @@ -137,7 +135,7 @@ class HomeRaftLogStore : public nuraft::log_store { * @param index The start log index number (inclusive). * @param pack */ - virtual void apply_pack(ulong index, nuraft::buffer& pack); + virtual void apply_pack(ulong index, nuraft::buffer& pack) override; /** * Compact the log store by purging all log entries, @@ -169,9 +167,27 @@ class HomeRaftLogStore : public nuraft::log_store { */ virtual ulong last_durable_index() override; +public: + // non-override functions from nuraft::log_store logstore_id_t logstore_id() const { return m_logstore_id; } logdev_id_t logdev_id() const { return m_logdev_id; } + /** + * Returns the last completed index in the log store. + * + * @return The last completed index in the log store. + */ + ulong last_index() const; + + /** + * Truncates the log store + * + * @param num_reserved_cnt The number of log entries to be reserved. + * @param compact_lsn This is the truncation barrier passed down by raft server. Truncation should not across this + * LSN; + */ + void truncate(uint32_t num_reserved_cnt, repl_lsn_t compact_lsn); + private: logstore_id_t m_logstore_id; logdev_id_t m_logdev_id; @@ -179,4 +195,4 @@ class HomeRaftLogStore : public nuraft::log_store { nuraft::ptr< nuraft::log_entry > m_dummy_log_entry; store_lsn_t m_last_durable_lsn{-1}; }; -} // namespace homestore \ No newline at end of file +} // namespace homestore diff --git a/src/lib/replication/log_store/repl_log_store.cpp b/src/lib/replication/log_store/repl_log_store.cpp index 54f5dd7b3..5ca44bf78 100644 --- a/src/lib/replication/log_store/repl_log_store.cpp +++ b/src/lib/replication/log_store/repl_log_store.cpp @@ -73,4 +73,9 @@ void ReplLogStore::end_of_append_batch(ulong start_lsn, ulong count) { std::string ReplLogStore::rdev_name() const { return m_rd.rdev_name(); } +bool ReplLogStore::compact(ulong compact_upto_lsn) { + RD_LOG(DEBUG, "Raft Channel: compact_to_lsn={}", compact_upto_lsn); + m_rd.on_compact(compact_upto_lsn); + return HomeRaftLogStore::compact(compact_upto_lsn); +} } // namespace homestore diff --git a/src/lib/replication/log_store/repl_log_store.h b/src/lib/replication/log_store/repl_log_store.h index c2fb615f2..1ae0b2826 100644 --- a/src/lib/replication/log_store/repl_log_store.h +++ b/src/lib/replication/log_store/repl_log_store.h @@ -22,9 +22,11 @@ class ReplLogStore : public HomeRaftLogStore { ReplLogStore(RaftReplDev& rd, RaftStateMachine& sm, Args&&... args) : HomeRaftLogStore{std::forward< Args >(args)...}, m_rd{rd}, m_sm{sm} {} + //////////////////////// function override //////////////////////// uint64_t append(nuraft::ptr< nuraft::log_entry >& entry) override; void write_at(ulong index, nuraft::ptr< nuraft::log_entry >& entry) override; void end_of_append_batch(ulong start_lsn, ulong count) override; + bool compact(ulong last_lsn) override; private: std::string rdev_name() const; diff --git a/src/lib/replication/repl_dev/common.h b/src/lib/replication/repl_dev/common.h index 77ab5dfec..676c7797e 100644 --- a/src/lib/replication/repl_dev/common.h +++ b/src/lib/replication/repl_dev/common.h @@ -16,6 +16,7 @@ #include +#include #include #include #include @@ -63,8 +64,9 @@ struct repl_dev_superblk { uuid_t group_id; // group_id of this replica set logdev_id_t logdev_id; logstore_id_t logstore_id; // Logstore id for the data journal - int64_t commit_lsn; // LSN upto which this replica has committed - int64_t checkpoint_lsn; // LSN upto which this replica have checkpointed the data + repl_lsn_t commit_lsn; // LSN upto which this replica has committed + repl_lsn_t checkpoint_lsn; // LSN upto which this replica have checkpointed the Data + repl_lsn_t compact_lsn; // maximum LSN that can be compacted to uint64_t group_ordinal; // Ordinal number which will be used to indicate the rdevXYZ for debugging uint64_t get_magic() const { return magic; } diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index 9189ea9a5..a1755e2a0 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -40,6 +40,8 @@ RaftReplDev::RaftReplDev(RaftReplService& svc, superblk< raft_repl_dev_superblk m_next_dsn = m_rd_sb->last_applied_dsn + 1; m_commit_upto_lsn = m_rd_sb->commit_lsn; m_last_flushed_commit_lsn = m_commit_upto_lsn; + m_compact_lsn = m_rd_sb->compact_lsn; + m_rdev_name = fmt::format("rdev{}", m_rd_sb->group_ordinal); // Its ok not to do compare exchange, because loading is always single threaded as of now @@ -108,6 +110,18 @@ bool RaftReplDev::join_group() { void RaftReplDev::use_config(json_superblk raft_config_sb) { m_raft_config_sb = std::move(raft_config_sb); } +void RaftReplDev::on_create_snapshot(nuraft::snapshot& s, nuraft::async_result< bool >::handler_type& when_done) { + RD_LOG(DEBUG, "create_snapshot last_idx={}/term={}", s.get_last_log_idx(), s.get_last_log_term()); + repl_snapshot snapshot{.last_log_idx_ = s.get_last_log_idx(), .last_log_term_ = s.get_last_log_term()}; + auto result = m_listener->create_snapshot(snapshot).get(); + auto null_except = std::shared_ptr< std::exception >(); + HS_REL_ASSERT(result.hasError() == false, "Not expecting creating snapshot to return false. "); + m_last_snapshot = nuraft::cs_new< nuraft::snapshot >(s.get_last_log_idx(), s.get_last_log_term(), + s.get_last_config(), s.size(), s.get_type()); + auto ret_val{true}; + if (when_done) { when_done(ret_val, null_except); } +} + void RaftReplDev::async_alloc_write(sisl::blob const& header, sisl::blob const& key, sisl::sg_list const& value, repl_req_ptr_t rreq) { if (!rreq) { auto rreq = repl_req_ptr_t(new repl_req_ctx{}); } @@ -505,7 +519,7 @@ void RaftReplDev::check_and_fetch_remote_data(std::vector< repl_req_ptr_t > rreq void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs) { if (rreqs.size() == 0) { return; } - std::vector< ::flatbuffers::Offset< RequestEntry > > entries; + std::vector<::flatbuffers::Offset< RequestEntry > > entries; entries.reserve(rreqs.size()); shared< flatbuffers::FlatBufferBuilder > builder = std::make_shared< flatbuffers::FlatBufferBuilder >(); @@ -986,12 +1000,14 @@ void RaftReplDev::report_committed(repl_req_ptr_t rreq) { } void RaftReplDev::cp_flush(CP*) { - auto lsn = m_commit_upto_lsn.load(); + auto const lsn = m_commit_upto_lsn.load(); + auto const clsn = m_compact_lsn.load(); + if (lsn == m_last_flushed_commit_lsn) { // Not dirtied since last flush ignore return; } - + m_rd_sb->compact_lsn = clsn; m_rd_sb->commit_lsn = lsn; m_rd_sb->checkpoint_lsn = lsn; m_rd_sb->last_applied_dsn = m_next_dsn.load(); diff --git a/src/lib/replication/repl_dev/raft_repl_dev.h b/src/lib/replication/repl_dev/raft_repl_dev.h index 7fcbe393c..6451852f4 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.h +++ b/src/lib/replication/repl_dev/raft_repl_dev.h @@ -2,6 +2,7 @@ #include +#include #include #include #include @@ -21,7 +22,7 @@ struct raft_repl_dev_superblk : public repl_dev_superblk { uint32_t raft_sb_version{RAFT_REPL_DEV_SB_VERSION}; logstore_id_t free_blks_journal_id; // Logstore id for storing free blkid records uint8_t is_timeline_consistent; // Flag to indicate whether the recovery of followers need to be timeline consistent - uint64_t last_applied_dsn; // Last applied data sequence number + uint64_t last_applied_dsn; // Last applied data sequence Number uint32_t get_raft_sb_version() const { return raft_sb_version; } }; @@ -75,7 +76,9 @@ class RaftReplDev : public ReplDev, raft_repl_dev_superblk m_sb_in_mem; // Cached version which is used to read and for staging std::atomic< repl_lsn_t > m_commit_upto_lsn{0}; // LSN which was lastly written, to track flushes - repl_lsn_t m_last_flushed_commit_lsn{0}; // LSN upto which it was flushed to persistent store + std::atomic< repl_lsn_t > m_compact_lsn{0}; // LSN upto which it was compacted, it is used to track where to + + repl_lsn_t m_last_flushed_commit_lsn{0}; // LSN upto which it was flushed to persistent store iomgr::timer_handle_t m_sb_flush_timer_hdl; std::atomic< uint64_t > m_next_dsn{0}; // Data Sequence Number that will keep incrementing for each data entry @@ -86,6 +89,8 @@ class RaftReplDev : public ReplDev, RaftReplDevMetrics m_metrics; + nuraft::ptr< nuraft::snapshot > m_last_snapshot{nullptr}; + static std::atomic< uint64_t > s_next_group_ordinal; public: @@ -114,6 +119,8 @@ class RaftReplDev : public ReplDev, uint32_t get_blk_size() const override; repl_lsn_t get_last_commit_lsn() const { return m_commit_upto_lsn.load(); } + // void truncate_if_needed() override; + //////////////// Accessor/shortcut methods /////////////////////// nuraft_mesg::repl_service_ctx* group_msg_service(); nuraft::raft_server* raft_server(); @@ -129,6 +136,34 @@ class RaftReplDev : public ReplDev, void cp_flush(CP* cp); void cp_cleanup(CP* cp); + /// @brief This method is called when the data journal is compacted + /// + /// @param upto_lsn : LSN upto which the data journal was compacted + void on_compact(repl_lsn_t upto_lsn) { m_compact_lsn.store(upto_lsn); } + + /** + * \brief Handles the creation of a snapshot. + * + * This function is called when a snapshot needs to be created in the replication process. + * It takes a reference to a `nuraft::snapshot` object and a handler for the asynchronous result. + * The handler will be called when the snapshot creation is completed. + * + * \param s The snapshot object to be created. + * \param when_done The handler to be called when the snapshot creation is completed. + */ + void on_create_snapshot(nuraft::snapshot& s, nuraft::async_result< bool >::handler_type& when_done); + + /** + * Truncates the replication log by providing a specified number of reserved entries. + * + * @param num_reserved_entries The number of reserved entries of the replication log. + */ + void truncate(uint32_t num_reserved_entries) { + m_data_journal->truncate(num_reserved_entries, m_compact_lsn.load()); + } + + nuraft::ptr< nuraft::snapshot > get_last_snapshot() { return m_last_snapshot; } + protected: //////////////// All nuraft::state_mgr overrides /////////////////////// nuraft::ptr< nuraft::cluster_config > load_config() override; diff --git a/src/lib/replication/repl_dev/raft_state_machine.cpp b/src/lib/replication/repl_dev/raft_state_machine.cpp index d2805384b..a79c8224b 100644 --- a/src/lib/replication/repl_dev/raft_state_machine.cpp +++ b/src/lib/replication/repl_dev/raft_state_machine.cpp @@ -2,6 +2,7 @@ #include #include #include +#include #include "service/raft_repl_service.h" #include "repl_dev/raft_state_machine.h" @@ -248,11 +249,10 @@ repl_req_ptr_t RaftStateMachine::lsn_to_req(int64_t lsn) { nuraft_mesg::repl_service_ctx* RaftStateMachine::group_msg_service() { return m_rd.group_msg_service(); } void RaftStateMachine::create_snapshot(nuraft::snapshot& s, nuraft::async_result< bool >::handler_type& when_done) { - RD_LOG(DEBUG, "create_snapshot {}/{}", s.get_last_log_idx(), s.get_last_log_term()); - auto null_except = std::shared_ptr< std::exception >(); - auto ret_val{false}; - if (when_done) when_done(ret_val, null_except); + m_rd.on_create_snapshot(s, when_done); } std::string RaftStateMachine::rdev_name() const { return m_rd.rdev_name(); } + +nuraft::ptr< nuraft::snapshot > RaftStateMachine::last_snapshot() { return m_rd.get_last_snapshot(); } } // namespace homestore diff --git a/src/lib/replication/repl_dev/raft_state_machine.h b/src/lib/replication/repl_dev/raft_state_machine.h index bb1f71071..51902b699 100644 --- a/src/lib/replication/repl_dev/raft_state_machine.h +++ b/src/lib/replication/repl_dev/raft_state_machine.h @@ -104,8 +104,9 @@ class RaftStateMachine : public nuraft::state_machine { void rollback(uint64_t lsn, nuraft::buffer&) override { LOGCRITICAL("Unimplemented rollback on: [{}]", lsn); } bool apply_snapshot(nuraft::snapshot&) override { return false; } + void create_snapshot(nuraft::snapshot& s, nuraft::async_result< bool >::handler_type& when_done) override; - nuraft::ptr< nuraft::snapshot > last_snapshot() override { return nullptr; } + nuraft::ptr< nuraft::snapshot > last_snapshot() override; ////////// APIs outside of nuraft::state_machine requirements //////////////////// ReplServiceError propose_to_raft(repl_req_ptr_t rreq); diff --git a/src/lib/replication/service/generic_repl_svc.h b/src/lib/replication/service/generic_repl_svc.h index e55ac3f05..44aa839f9 100644 --- a/src/lib/replication/service/generic_repl_svc.h +++ b/src/lib/replication/service/generic_repl_svc.h @@ -57,6 +57,8 @@ class GenericReplService : public ReplicationService { hs_stats get_cap_stats() const override; replica_id_t get_my_repl_uuid() const { return m_my_uuid; } + // void resource_audit() override; + protected: virtual void add_repl_dev(group_id_t group_id, shared< ReplDev > rdev); virtual void load_repl_dev(sisl::byte_view const& buf, void* meta_cookie) = 0; @@ -73,7 +75,6 @@ class SoloReplService : public GenericReplService { void load_repl_dev(sisl::byte_view const& buf, void* meta_cookie) override; AsyncReplResult<> replace_member(group_id_t group_id, replica_id_t member_out, replica_id_t member_in) const override; - }; class SoloReplServiceCPHandler : public CPCallbacks { diff --git a/src/lib/replication/service/raft_repl_service.cpp b/src/lib/replication/service/raft_repl_service.cpp index b355b2f68..e6c0c3892 100644 --- a/src/lib/replication/service/raft_repl_service.cpp +++ b/src/lib/replication/service/raft_repl_service.cpp @@ -98,7 +98,7 @@ void RaftReplService::start() { .with_fresh_log_gap(HS_DYNAMIC_CONFIG(consensus.stale_log_gap_lo_threshold)) .with_snapshot_enabled(HS_DYNAMIC_CONFIG(consensus.snapshot_freq_distance)) .with_leadership_expiry(HS_DYNAMIC_CONFIG(consensus.leadership_expiry_ms)) - .with_reserved_log_items(0) // In reality ReplLogStore retains much more than this + .with_reserved_log_items(HS_DYNAMIC_CONFIG(consensus.num_reserved_log_items)) .with_auto_forwarding(false); r_params.return_method_ = nuraft::raft_params::async_handler; m_msg_mgr->register_mgr_type(params.default_group_type_, r_params); diff --git a/src/lib/replication/service/raft_repl_service.h b/src/lib/replication/service/raft_repl_service.h index e12ebf41c..b50ab4004 100644 --- a/src/lib/replication/service/raft_repl_service.h +++ b/src/lib/replication/service/raft_repl_service.h @@ -71,6 +71,7 @@ class RaftReplService : public GenericReplService, private: void raft_group_config_found(sisl::byte_view const& buf, void* meta_cookie); + void start_reaper_thread(); void stop_reaper_thread(); void fetch_pending_data(); diff --git a/src/tests/test_raft_repl_dev.cpp b/src/tests/test_raft_repl_dev.cpp index 1203ae191..5116bca57 100644 --- a/src/tests/test_raft_repl_dev.cpp +++ b/src/tests/test_raft_repl_dev.cpp @@ -49,7 +49,15 @@ SISL_OPTION_GROUP(test_raft_repl_dev, (block_size, "", "block_size", "block size to io", ::cxxopts::value< uint32_t >()->default_value("4096"), "number"), (num_raft_groups, "", "num_raft_groups", "number of raft groups per test", - ::cxxopts::value< uint32_t >()->default_value("1"), "number")); + ::cxxopts::value< uint32_t >()->default_value("1"), "number"), + // for below replication parameter, their default value always get from dynamic config, only used + // when specified by user + (snapshot_distance, "", "snapshot_distance", "distance between snapshots", + ::cxxopts::value< uint32_t >()->default_value("0"), "number"), + (num_raft_logs_resv, "", "num_raft_logs_resv", "number of raft logs reserved", + ::cxxopts::value< uint32_t >()->default_value("0"), "number"), + (res_mgr_audit_timer_ms, "", "res_mgr_audit_timer_ms", "resource manager audit timer", + ::cxxopts::value< uint32_t >()->default_value("0"), "number")); SISL_OPTIONS_ENABLE(logging, test_raft_repl_dev, iomgr, config, test_common_setup, test_repl_common_setup) @@ -147,6 +155,8 @@ class TestReplicatedDB : public homestore::ReplDevListener { *(r_cast< uint64_t const* >(key.cbytes()))); } + AsyncReplResult<> create_snapshot(repl_snapshot& s) override { return make_async_success<>(); } + ReplResult< blk_alloc_hints > get_blk_alloc_hints(sisl::blob const& header, uint32_t data_size) override { return blk_alloc_hints{}; } @@ -548,13 +558,24 @@ TEST_F(RaftReplDevTest, Drop_Raft_Entry_Switch_Leader) { } #endif -// TODO -// double restart: -// 1. restart one follower(F1) while I/O keep running. -// 2. after F1 reboots and leader is resyncing with F1 (after sending the appended entries), this leader also retarts. -// 3. F1 should receive error from grpc saying originator not there. -// 4. F2 should be appending entries to F1 and F1 should be able to catch up with F2 (fetch data from F2). // +// This test case should be run in long running mode to see the effect of snapshot and compaction +// Example: +// ./bin/test_raft_repl_dev --gtest_filter=*Snapshot_and_Compact* --log_mods replication:debug --num_io=999999 +// --snapshot_distance=200 --num_raft_logs_resv=20000 --res_mgr_audit_timer_ms=120000 +// +TEST_F(RaftReplDevTest, Snapshot_and_Compact) { + LOGINFO("Homestore replica={} setup completed", g_helper->replica_num()); + g_helper->sync_for_test_start(); + + uint64_t entries_per_attempt = SISL_OPTIONS["num_io"].as< uint64_t >(); + this->write_on_leader(entries_per_attempt, true /* wait_for_commit on all replicas */); + + g_helper->sync_for_verify_start(); + LOGINFO("Validate all data written so far by reading them"); + this->validate_data(); + g_helper->sync_for_cleanup_start(); +} int main(int argc, char* argv[]) { int parsed_argc = argc; @@ -571,10 +592,25 @@ int main(int argc, char* argv[]) { SISL_OPTIONS_LOAD(parsed_argc, argv, logging, config, test_raft_repl_dev, iomgr, test_common_setup, test_repl_common_setup); + // // Entire test suite assumes that once a replica takes over as leader, it stays until it is explicitly yielded. // Otherwise it is very hard to control or accurately test behavior. Hence we forcibly override the // leadership_expiry time. - HS_SETTINGS_FACTORY().modifiable_settings([](auto& s) { s.consensus.leadership_expiry_ms = -1; }); + // + HS_SETTINGS_FACTORY().modifiable_settings([](auto& s) { + s.consensus.leadership_expiry_ms = -1; // -1 means never expires; + + // only reset when user specified the value for test; + if (SISL_OPTIONS.count("snapshot_distance")) { + s.consensus.snapshot_freq_distance = SISL_OPTIONS["snapshot_distance"].as< uint32_t >(); + } + if (SISL_OPTIONS.count("num_raft_logs_resv")) { + s.resource_limits.raft_logstore_reserve_threshold = SISL_OPTIONS["num_raft_logs_resv"].as< uint32_t >(); + } + if (SISL_OPTIONS.count("res_mgr_audit_timer_ms")) { + s.resource_limits.resource_audit_timer_ms = SISL_OPTIONS["res_mgr_audit_timer_ms"].as< uint32_t >(); + } + }); HS_SETTINGS_FACTORY().save(); FLAGS_folly_global_cpu_executor_threads = 4; diff --git a/src/tests/test_solo_repl_dev.cpp b/src/tests/test_solo_repl_dev.cpp index 92eed8337..401ad8d58 100644 --- a/src/tests/test_solo_repl_dev.cpp +++ b/src/tests/test_solo_repl_dev.cpp @@ -109,6 +109,8 @@ class SoloReplDevTest : public testing::Test { } } + AsyncReplResult<> create_snapshot(repl_snapshot& s) override { return make_async_success<>(); } + bool on_pre_commit(int64_t lsn, const sisl::blob& header, const sisl::blob& key, cintrusive< repl_req_ctx >& ctx) override { return true;