From f064bc6ba87f1171a51f80a70228cfa772378e52 Mon Sep 17 00:00:00 2001 From: Jie Yao Date: Thu, 25 Jul 2024 21:27:50 -0700 Subject: [PATCH] log store sync flush --- conanfile.py | 1 - src/include/homestore/logstore/log_store.hpp | 154 +---- .../homestore/logstore/log_store_internal.hpp | 40 +- src/include/homestore/logstore_service.hpp | 17 +- src/lib/common/homestore_config.fbs | 5 + src/lib/logstore/log_dev.cpp | 542 +++++------------- src/lib/logstore/log_dev.hpp | 334 +++++------ src/lib/logstore/log_store.cpp | 451 +++++---------- src/lib/logstore/log_store_service.cpp | 48 +- src/lib/logstore/log_stream.cpp | 4 +- .../log_store/home_raft_log_store.cpp | 23 +- src/tests/test_log_dev.cpp | 47 +- src/tests/test_log_store.cpp | 189 +++--- src/tests/test_log_store_long_run.cpp | 43 +- 14 files changed, 603 insertions(+), 1295 deletions(-) diff --git a/conanfile.py b/conanfile.py index d70d168fd..5030e6c2e 100644 --- a/conanfile.py +++ b/conanfile.py @@ -10,7 +10,6 @@ class HomestoreConan(ConanFile): name = "homestore" version = "6.4.32" - homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" topics = ("ebay", "nublox") diff --git a/src/include/homestore/logstore/log_store.hpp b/src/include/homestore/logstore/log_store.hpp index eec7eac50..0ec046e66 100644 --- a/src/include/homestore/logstore/log_store.hpp +++ b/src/include/homestore/logstore/log_store.hpp @@ -25,6 +25,7 @@ #include #include #include +#include #include #include @@ -81,18 +82,6 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > { */ log_replay_done_cb_t get_log_replay_done_cb() const { return m_replay_done_cb; } - /** - * @brief Write the blob at the user specified seq number in sync manner. Under the covers it will call async - * write and then wait for its completion. As such this is much lesser performing than async version since it - * involves mutex/cv combination - * - * @param seq_num : Sequence number to insert data - * @param b : Data blob to write to log - * - * @return is write completed successfully. - */ - bool write_sync(logstore_seq_num_t seq_num, const sisl::io_blob& b); - /** * @brief Write the blob at the user specified seq number - prepared as a request in async fashion. * @@ -110,17 +99,7 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > { * @param cookie : Any cookie or context which will passed back in the callback * @param cb Callback upon completion which is called with the status, seq_num and cookie that was passed. */ - void write_async(logstore_seq_num_t seq_num, const sisl::io_blob& b, void* cookie, const log_write_comp_cb_t& cb, - bool flush_wait = false); - - /** - * @brief This method appends the blob into the log and it returns the generated seq number - * - * @param b Blob of data to append - * @return logstore_seq_num_t Returns the seqnum generated by the log - */ - // This method is not implemented yet - logstore_seq_num_t append_sync(const sisl::io_blob& b); + void write_async(logstore_seq_num_t seq_num, const sisl::io_blob& b, void* cookie, const log_write_comp_cb_t& cb); /** * @brief This method appends the blob into the log and makes a callback at the end of the append. @@ -132,6 +111,14 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > { */ logstore_seq_num_t append_async(const sisl::io_blob& b, void* cookie, const log_write_comp_cb_t& completion_cb); + /** + * @brief Write the blob at the user specified seq number and flush, just like write_sync + * + * @param seq_num: Seq number to write to + * @param b : Blob of data + */ + void write_and_flush(logstore_seq_num_t seq_num, const sisl::io_blob& b); + /** * @brief Read the log provided the sequence number synchronously. This is not the most efficient way to read * as reader will be blocked until read is completed. In addition, it is built on-top of async system by doing @@ -144,25 +131,6 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > { */ log_buffer read_sync(logstore_seq_num_t seq_num); - /** - * @brief Read the log based on the logstore_req prepared. In case callback is supplied, it uses the callback - * to provide the data it has read. If not overridden, use default callback registered during initialization. - * - * @param req Request containing seq_num - * @param cb [OPTIONAL] Callback to get the data back, if it needs to be different from the default registered - * one. - */ - void read_async(logstore_req* req, const log_found_cb_t& cb = nullptr); - - /** - * @brief Read the log for the seq_num and make the callback with the data - * - * @param seq_num Seqnumber to read the log from - * @param cookie Any cookie or context which will passed back in the callback - * @param cb Callback which contains seq_num, cookie and - */ - void read_async(logstore_seq_num_t seq_num, void* cookie, const log_found_cb_t& cb); - /** * @brief Truncate the logs for this log store upto the seq_num provided (inclusive). Once truncated, the reads * on seq_num <= upto_seq_num will return an error. The truncation in general is a 2 step process, where first @@ -175,7 +143,6 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > { * expensive and grouping them together yields better results. * * Note: this flag currently is not used, meaning all truncate is in memory only; - * @return number of records to truncate */ void truncate(logstore_seq_num_t upto_seq_num, bool in_memory_truncate_only = true); @@ -188,30 +155,17 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > { */ void fill_gap(logstore_seq_num_t seq_num); - /** - * @brief Get the safe truncation log dev key from this log store perspective. Please note that the safe idx is - * not globally safe, but it is safe from this log store perspective only. To get global safe id, one should - * access all log stores and get the minimum of them before truncating. - * - * It could return invalid logdev_key which indicates that this log store does not have any valid logdev key - * to truncate. This could happen when there were no ios on this logstore since last truncation or at least no - * ios are flushed yet. The caller should simply ignore this return value. - * - * @return truncation_entry_t: Which contains the logdev key and its corresponding seq_num to truncate and also - * is that entry represents the entire log store. - */ - // truncation_entry_t get_safe_truncation_boundary() const; - /** * @brief Get the last truncated seqnum upto which we have truncated. If called after recovery, it returns the * first seq_num it has seen-1. * - * @return logstore_seq_num_t + * @return the last truncated seqnum upto which we have truncated */ - logstore_seq_num_t truncated_upto() const { - const auto ts{m_safe_truncation_boundary.seq_num.load(std::memory_order_acquire)}; - return (ts == std::numeric_limits< logstore_seq_num_t >::max()) ? -1 : ts; - } + logstore_seq_num_t truncated_upto() const { return m_start_lsn.load(std::memory_order_acquire) - 1; } + + logdev_key get_trunc_ld_key() const { return m_trunc_ld_key; } + + std::tuple< logstore_seq_num_t, logdev_key, logstore_seq_num_t > truncate_info() const; sisl::StreamTracker< logstore_record >& log_records() { return m_records; } @@ -256,42 +210,23 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > { * * @param seq_num Sequence number upto which logs are to be flushed. If not provided, will wait to flush all seq * numbers issued prior. - * @return True on success */ - void flush_sync(logstore_seq_num_t upto_seq_num = invalid_lsn()); + void flush(logstore_seq_num_t upto_seq_num = invalid_lsn()); /** * @brief Rollback the given instance to the given sequence number * - * @param seq_num Sequence number back which logs are to be rollbacked + * @param to_lsn Sequence number back which logs are to be rollbacked * @return True on success */ - uint64_t rollback_async(logstore_seq_num_t to_lsn, on_rollback_cb_t cb); - - auto seq_num() const { return m_seq_num.load(std::memory_order_acquire); } + bool rollback(logstore_seq_num_t to_lsn); - std::shared_ptr< LogDev > get_logdev() { return m_logdev; } + auto start_lsn() const { return m_start_lsn.load(std::memory_order_acquire); } nlohmann::json dump_log_store(const log_dump_req& dump_req = log_dump_req()); nlohmann::json get_status(int verbosity) const; - /** - * Retrieves the truncation information before device truncation. - * - * @return A constant reference to the truncation_info object representing the truncation information. - */ - const truncation_info& pre_device_truncation(); - - /** - * \brief post device truncation processing. - * - * This function is used to update safe truncation boundary to the specified `trunc_upto_key`. - * - * \param trunc_upto_key The key indicating the log entry up to which truncation has been performed. - */ - void post_device_truncation(const logdev_key& trunc_upto_key); - /** * Handles the completion of a write operation in the log store. * @@ -300,17 +235,6 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > { */ void on_write_completion(logstore_req* req, const logdev_key& ld_key); - /** - * \brief Handles the completion of a read operation in the log store. - * - * This function is called when a read operation in the log store has completed. - * It takes a pointer to a logstore_req object and a logdev_key object as parameters. - * - * \param req The pointer to the logstore_req object representing the read request. - * \param ld_key The logdev_key object representing the key used for the read operation. - */ - void on_read_completion(logstore_req* req, const logdev_key& ld_key); - /** * @brief Handles the event when a log is found. * @@ -327,27 +251,8 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > { */ void on_log_found(logstore_seq_num_t seq_num, const logdev_key& ld_key, const logdev_key& flush_ld_key, log_buffer buf); - /** - * @brief Handles the completion of a batch flush operation to update internal state. - * - * This function is called when a batch flush operation is completed. - * It takes a `logdev_key` parameter that represents the key of the flushed batch. - * - * This function is also called during log store recovery; - * - * @param flush_batch_ld_key The key of the flushed batch. - */ - void on_batch_completion(const logdev_key& flush_batch_ld_key); -private: - /** - * Truncates the log store up to the specified sequence number. - * - * @param upto_seq_num The sequence number up to which the log store should be truncated. - */ - void do_truncate(logstore_seq_num_t upto_seq_num); - - int search_max_le(logstore_seq_num_t input_sn); + std::shared_ptr< LogDev > get_logdev() { return m_logdev; } private: logstore_id_t m_store_id; @@ -357,21 +262,12 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > { log_req_comp_cb_t m_comp_cb; log_found_cb_t m_found_cb; log_replay_done_cb_t m_replay_done_cb; - std::atomic< logstore_seq_num_t > m_seq_num; + std::atomic< logstore_seq_num_t > m_start_lsn; + std::atomic< logstore_seq_num_t > m_next_lsn; + std::atomic< logstore_seq_num_t > m_tail_lsn; std::string m_fq_name; LogStoreServiceMetrics& m_metrics; - // seq_ld_key_pair m_flush_batch_max = {-1, {0, 0}}; // The maximum seqnum we have seen in the prev flushed - // batch - logstore_seq_num_t m_flush_batch_max_lsn{std::numeric_limits< logstore_seq_num_t >::min()}; - - // Sync flush sections - std::atomic< logstore_seq_num_t > m_sync_flush_waiter_lsn{invalid_lsn()}; - std::mutex m_sync_flush_mtx; - std::mutex m_single_sync_flush_mtx; - std::condition_variable m_sync_flush_cv; - - std::vector< seq_ld_key_pair > m_truncation_barriers; // List of truncation barriers - truncation_info m_safe_truncation_boundary; + logdev_key m_trunc_ld_key; }; } // namespace homestore diff --git a/src/include/homestore/logstore/log_store_internal.hpp b/src/include/homestore/logstore/log_store_internal.hpp index 125e35daa..50a60720a 100644 --- a/src/include/homestore/logstore/log_store_internal.hpp +++ b/src/include/homestore/logstore/log_store_internal.hpp @@ -32,7 +32,6 @@ #include namespace homestore { - ///////////////////// All typedefs /////////////////////////////// class logstore_req; class HomeLogStore; @@ -50,7 +49,6 @@ typedef std::function< void(logstore_seq_num_t, sisl::io_blob&, logdev_key, void typedef std::function< void(logstore_seq_num_t, log_buffer, void*) > log_found_cb_t; typedef std::function< void(std::shared_ptr< HomeLogStore >) > log_store_opened_cb_t; typedef std::function< void(std::shared_ptr< HomeLogStore >, logstore_seq_num_t) > log_replay_done_cb_t; -typedef std::function< void(const std::unordered_map< logdev_id_t, logdev_key >&) > device_truncate_cb_t; typedef int64_t logid_t; @@ -58,8 +56,7 @@ struct logdev_key { logid_t idx; off_t dev_offset; - constexpr logdev_key(const logid_t idx = std::numeric_limits< logid_t >::min(), - const off_t dev_offset = std::numeric_limits< uint64_t >::min()) : + constexpr logdev_key(const logid_t idx = -1, const off_t dev_offset = std::numeric_limits< uint64_t >::min()) : idx{idx}, dev_offset{dev_offset} {} logdev_key(const logdev_key&) = default; logdev_key& operator=(const logdev_key&) = default; @@ -72,11 +69,11 @@ struct logdev_key { operator bool() const { return is_valid(); } bool is_valid() const { return !is_lowest() && !is_highest(); } - bool is_lowest() const { return (idx == std::numeric_limits< logid_t >::min()); } + bool is_lowest() const { return (idx == -1); } bool is_highest() const { return (idx == std::numeric_limits< logid_t >::max()); } void set_lowest() { - idx = std::numeric_limits< logid_t >::min(); + idx = -1; dev_offset = std::numeric_limits< uint64_t >::min(); } @@ -109,9 +106,12 @@ struct log_dump_req { struct logstore_record { logdev_key m_dev_key; + logdev_key m_trunc_key; + // only when out-of-order write happens and the higher lsn is flushed in a lower LogGroup while the lower lsn is + // flushed in a higher LogGroup , they are different logstore_record() = default; - logstore_record(const logdev_key& key) : m_dev_key{key} {} + logstore_record(const logdev_key& key, const logdev_key& trunc_key) : m_dev_key{key}, m_trunc_key{trunc_key} {} }; class HomeLogStore; @@ -121,7 +121,6 @@ struct logstore_req { logstore_seq_num_t seq_num; // Log store specific seq_num (which could be monotonically increaseing with logstore) sisl::io_blob data; // Data blob containing data void* cookie; // User generated cookie (considered as opaque) - bool is_write; // Directon of IO bool is_internal_req; // If the req is created internally by HomeLogStore itself log_req_comp_cb_t cb; // Callback upon completion of write (overridden than default) Clock::time_point start_time; @@ -138,13 +137,11 @@ struct logstore_req { // TODO: Implement this method return 0; } - static logstore_req* make(HomeLogStore* store, logstore_seq_num_t seq_num, const sisl::io_blob& data, - bool is_write_req = true) { + static logstore_req* make(HomeLogStore* store, logstore_seq_num_t seq_num, const sisl::io_blob& data) { logstore_req* req = new logstore_req(); req->log_store = store; req->seq_num = seq_num; req->data = data; - req->is_write = is_write_req; req->is_internal_req = true; req->cb = nullptr; @@ -158,25 +155,6 @@ struct logstore_req { logstore_req() = default; }; -struct seq_ld_key_pair { - logstore_seq_num_t seq_num{-1}; - logdev_key ld_key; -}; - -struct truncation_info { - // Safe log dev location upto which it is truncatable - logdev_key ld_key{std::numeric_limits< logid_t >::min(), 0}; - - // LSN of this log store upto which it is truncated - std::atomic< logstore_seq_num_t > seq_num{-1}; - - // Is there any entry which is already store truncated but waiting for device truncation - bool pending_dev_truncation{false}; - - // Any truncation entries/barriers which are not part of this truncation - bool active_writes_not_part_of_truncation{false}; -}; - #pragma pack(1) struct logstore_superblk { logstore_superblk(const logstore_seq_num_t seq_num = 0) : m_first_seq_num{seq_num} {} @@ -194,4 +172,4 @@ struct logstore_superblk { logstore_seq_num_t m_first_seq_num{0}; }; #pragma pack() -} // namespace homestore +} // namespace homestore \ No newline at end of file diff --git a/src/include/homestore/logstore_service.hpp b/src/include/homestore/logstore_service.hpp index bd5347dd6..44ba1ab53 100644 --- a/src/include/homestore/logstore_service.hpp +++ b/src/include/homestore/logstore_service.hpp @@ -155,13 +155,8 @@ class LogStoreService { /** * @brief Schedule a truncate all the log stores physically on the device. - * - * @param cb [OPTIONAL] Callback once truncation is completed, if provided (Default no callback) - * @param wait_till_done [OPTIONAL] Wait for the truncation to complete before returning from this method. - * Default to false - * @param dry_run: If the truncate is a real one or just dry run to simulate the truncation */ - void device_truncate(const device_truncate_cb_t& cb = nullptr, bool wait_till_done = false, bool dry_run = false); + void device_truncate(); folly::Future< std::error_code > create_vdev(uint64_t size, HSDevType devType, uint32_t chunk_size); std::shared_ptr< VirtualDev > open_vdev(const vdev_info& vinfo, bool load_existing); @@ -178,13 +173,6 @@ class LogStoreService { uint32_t total_size() const; iomgr::io_fiber_t flush_thread() { return m_flush_fiber; } - /** - * This is used when the actual LogDev truncate is triggered; - * - * @return The IO fiber associated with the truncate thread. - */ - iomgr::io_fiber_t truncate_thread() { return m_truncate_fiber; } - void delete_unopened_logdevs(); private: @@ -194,14 +182,13 @@ class LogStoreService { void logdev_super_blk_found(const sisl::byte_view& buf, void* meta_cookie); void rollback_super_blk_found(const sisl::byte_view& buf, void* meta_cookie); void start_threads(); - void flush_if_needed(); + void flush(); private: std::unordered_map< logdev_id_t, std::shared_ptr< LogDev > > m_id_logdev_map; folly::SharedMutexWritePriority m_logdev_map_mtx; std::shared_ptr< JournalVirtualDev > m_logdev_vdev; - iomgr::io_fiber_t m_truncate_fiber; iomgr::io_fiber_t m_flush_fiber; LogStoreServiceMetrics m_metrics; std::unordered_set< logdev_id_t > m_unopened_logdev; diff --git a/src/lib/common/homestore_config.fbs b/src/lib/common/homestore_config.fbs index c283bc9ba..b17927d26 100644 --- a/src/lib/common/homestore_config.fbs +++ b/src/lib/common/homestore_config.fbs @@ -112,6 +112,11 @@ table LogStore { // Logdev will flush the logs only in a dedicated thread. Turn this on, if flush IO doesn't want to // intervene with data IO path. flush_only_in_dedicated_thread: bool = true; + + //we support 3 flush mode , 1(inline), 2 (timer) and 4(explicitly), mixed flush mode is also supportted + //for example, if we want inline and explicitly, we just set the flush mode to 1+4 = 5 + //for nuobject case, we only support explicitly mode + flush_mode: uint32 = 4; } table Generic { diff --git a/src/lib/logstore/log_dev.cpp b/src/lib/logstore/log_dev.cpp index 12e4de193..b72e21957 100644 --- a/src/lib/logstore/log_dev.cpp +++ b/src/lib/logstore/log_dev.cpp @@ -43,16 +43,18 @@ SISL_LOGGING_DECL(logstore) static bool has_data_service() { return HomeStore::instance()->has_data_service(); } // static BlkDataService& data_service() { return HomeStore::instance()->data_service(); } -LogDev::LogDev(const logdev_id_t id, JournalVirtualDev* vdev) : m_logdev_id{id}, m_vdev(vdev) { +LogDev::LogDev(logdev_id_t id, flush_mode_t flush_mode) : m_logdev_id{id}, m_flush_mode{flush_mode} { m_flush_size_multiple = HS_DYNAMIC_CONFIG(logstore->flush_size_multiple_logdev); - // Each logdev has one journal descriptor. - m_vdev_jd = m_vdev->open(m_logdev_id); - RELEASE_ASSERT(m_vdev_jd, "Journal descriptor is null"); } LogDev::~LogDev() = default; -void LogDev::start(bool format) { +void LogDev::start(bool format, std::shared_ptr< JournalVirtualDev > vdev) { + // Each logdev has one journal descriptor. + m_vdev = vdev; + m_vdev_jd = m_vdev->open(m_logdev_id); + RELEASE_ASSERT(m_vdev_jd, "Journal descriptor is null"); + if (m_flush_size_multiple == 0) { m_flush_size_multiple = m_vdev->optimal_page_size(); } THIS_LOGDEV_LOG(INFO, "Initializing logdev with flush size multiple={}", m_flush_size_multiple); @@ -83,10 +85,11 @@ void LogDev::start(bool format) { m_log_idx = m_logdev_meta.get_start_log_idx(); do_load(m_logdev_meta.get_start_dev_offset()); m_log_records->reinit(m_log_idx); + m_log_records_last_truncate_to_idx = m_log_idx - 1; m_last_flush_idx = m_log_idx - 1; } - start_timer(); + if (allow_timer_flush()) start_timer(); handle_unopened_log_stores(format); { @@ -96,7 +99,7 @@ void LogDev::start(bool format) { for (auto& p : m_id_logstore_map) { auto& lstore{p.second.log_store}; if (lstore && lstore->get_log_replay_done_cb()) { - lstore->get_log_replay_done_cb()(lstore, lstore->seq_num() - 1); + lstore->get_log_replay_done_cb()(lstore, lstore->start_lsn() - 1); lstore->truncate(lstore->truncated_upto()); } } @@ -105,26 +108,17 @@ void LogDev::start(bool format) { } void LogDev::stop() { - THIS_LOGDEV_LOG(INFO, "Logdev stopping log_dev={}", m_logdev_id); - HS_LOG_ASSERT((m_pending_flush_size == 0), "LogDev stop attempted while writes to logdev are pending completion"); - const bool locked_now = run_under_flush_lock([this]() { - { - std::unique_lock< std::mutex > lk{m_block_flush_q_mutex}; - m_stopped = true; - } - m_block_flush_q_cv.notify_one(); - return true; - }); - - if (!locked_now) { THIS_LOGDEV_LOG(INFO, "LogDev stop is queued because of pending flush or truncation ongoing"); } - + THIS_LOGDEV_LOG(INFO, "Logdev stopping id {}", m_logdev_id); + HS_LOG_ASSERT((m_pending_flush_size.load() == 0), + "LogDev stop attempted while writes to logdev are pending completion"); { - // Wait for the stopped to be completed - std::unique_lock< std::mutex > lk{m_block_flush_q_mutex}; - m_block_flush_q_cv.wait(lk, [&] { return m_stopped; }); + std::unique_lock lg = flush_guard(); + m_stopped = true; } + // after we call stop, we need to do any pending device truncations + truncate(); - stop_timer(); + if (allow_timer_flush()) stop_timer(); { folly::SharedMutexWritePriority::WriteHolder holder(m_store_map_mtx); @@ -135,13 +129,10 @@ void LogDev::stop() { m_logdev_meta.reset(); m_log_idx.store(0); m_pending_flush_size.store(0); - m_is_flushing.store(false); m_last_flush_idx = -1; m_last_truncate_idx = -1; m_last_crc = INVALID_CRC32_VALUE; - if (m_block_flush_q != nullptr) { - sisl::VectorPool< flush_blocked_callback >::free(m_block_flush_q, false /* no_cache */); - } + for (size_t i{0}; i < max_log_group; ++i) { m_log_group_pool[i].stop(); } @@ -150,6 +141,11 @@ void LogDev::stop() { m_hs.reset(); } +bool LogDev::is_stopped() { + std::unique_lock lg = flush_guard(); + return m_stopped; +} + void LogDev::destroy() { THIS_LOGDEV_LOG(INFO, "Logdev destroy metablks log_dev={}", m_logdev_id); m_logdev_meta.destroy(); @@ -157,13 +153,12 @@ void LogDev::destroy() { void LogDev::start_timer() { // Currently only tests set it to 0. - if (HS_DYNAMIC_CONFIG(logstore.flush_timer_frequency_us) == 0) { return; } - - iomanager.run_on_wait(logstore_service().flush_thread(), [this]() { - m_flush_timer_hdl = iomanager.schedule_thread_timer(HS_DYNAMIC_CONFIG(logstore.flush_timer_frequency_us) * 1000, - true /* recurring */, nullptr /* cookie */, - [this](void*) { flush_if_needed(); }); - }); + if (HS_DYNAMIC_CONFIG(logstore.flush_timer_frequency_us)) + iomanager.run_on_wait(logstore_service().flush_thread(), [this]() { + m_flush_timer_hdl = iomanager.schedule_thread_timer( + HS_DYNAMIC_CONFIG(logstore.flush_timer_frequency_us) * 1000, true /* recurring */, nullptr /* cookie */, + [this](void*) { flush_if_necessary(); }); + }); } void LogDev::stop_timer() { @@ -174,7 +169,7 @@ void LogDev::stop_timer() { } } -void LogDev::do_load(const off_t device_cursor) { +void LogDev::do_load(off_t device_cursor) { log_stream_reader lstream{device_cursor, m_vdev, m_vdev_jd, m_flush_size_multiple}; logid_t loaded_from{-1}; off_t group_dev_offset = 0; @@ -255,31 +250,21 @@ void LogDev::assert_next_pages(log_stream_reader& lstream) { } } -int64_t LogDev::append_async(const logstore_id_t store_id, const logstore_seq_num_t seq_num, const sisl::io_blob& data, - void* cb_context, bool flush_wait) { - auto prev_size = m_pending_flush_size.fetch_add(data.size(), std::memory_order_relaxed); +int64_t LogDev::append_async(logstore_id_t store_id, logstore_seq_num_t seq_num, const sisl::io_blob& data, + void* cb_context) { const auto idx = m_log_idx.fetch_add(1, std::memory_order_acq_rel); - auto threshold_size = LogDev::flush_data_threshold_size(); m_log_records->create(idx, store_id, seq_num, data, cb_context); - - if (HS_DYNAMIC_CONFIG(logstore.flush_threshold_size) == 0) { - // This is set in tests to disable implicit flush. This will be removed in future. - return idx; - } - - if (flush_wait || - ((prev_size < threshold_size && ((prev_size + data.size()) >= threshold_size) && - !m_is_flushing.load(std::memory_order_relaxed)))) { - flush_if_needed(flush_wait ? 1 : -1); - } + m_pending_flush_size.fetch_add(data.size(), std::memory_order_relaxed); + if (allow_inline_flush()) flush_if_necessary(); return idx; } log_buffer LogDev::read(const logdev_key& key, serialized_log_record& return_record_header) { + auto buf = sisl::make_byte_array(initial_read_size, m_flush_size_multiple, sisl::buftag::logread); auto ec = m_vdev_jd->sync_pread(buf->bytes(), initial_read_size, key.dev_offset); if (ec) { - LOGERROR("Failed to read from ournal vdev log_dev={} {} {}", m_logdev_id, ec.value(), ec.message()); + LOGERROR("Failed to read from Journal vdev log_dev={} {} {}", m_logdev_id, ec.value(), ec.message()); return {}; } @@ -330,7 +315,7 @@ logstore_id_t LogDev::reserve_store_id() { return m_logdev_meta.reserve_store(true /* persist_now */); } -void LogDev::unreserve_store_id(const logstore_id_t store_id) { +void LogDev::unreserve_store_id(logstore_id_t store_id) { std::unique_lock lg{m_meta_mutex}; /* Get the current log_idx as marker and insert into garbage store id. Upon device truncation, these ids will @@ -354,7 +339,7 @@ void LogDev::get_registered_store_ids(std::vector< logstore_id_t >& registered, /* * This method prepares the log records to be flushed and returns the log_group which is fully prepared */ -LogGroup* LogDev::prepare_flush(const int32_t estimated_records) { +LogGroup* LogDev::prepare_flush(int32_t estimated_records) { int64_t flushing_upto_idx{-1}; assert(estimated_records > 0); @@ -369,7 +354,7 @@ LogGroup* LogDev::prepare_flush(const int32_t estimated_records) { } }); - lg->finish(m_logdev_id, get_prev_crc()); + lg->finish(m_logdev_id, m_last_crc); if (sisl_unlikely(flushing_upto_idx == -1)) { return nullptr; } lg->m_flush_log_idx_from = m_last_flush_idx + 1; lg->m_flush_log_idx_upto = flushing_upto_idx; @@ -386,10 +371,13 @@ bool LogDev::can_flush_in_this_thread() { return (!HS_DYNAMIC_CONFIG(logstore.flush_only_in_dedicated_thread) && iomanager.am_i_worker_reactor()); } -// This method checks if in case we were to add a record of size provided, do we enter into a state which exceeds -// our threshold. If so, it first flushes whats accumulated so far and then add the pending flush size counter with -// the new record size -bool LogDev::flush_if_needed(int64_t threshold_size) { +bool LogDev::flush_if_necessary(int64_t threshold_size) { + if (!can_flush_in_this_thread()) { + iomanager.run_on_forget(logstore_service().flush_thread(), + [this, threshold_size]() { flush_if_necessary(threshold_size); }); + return false; + } + // If after adding the record size, if we have enough to flush or if its been too much time before we actually // flushed, attempt to flush by setting the atomic bool variable. if (threshold_size < 0) { threshold_size = LogDev::flush_data_threshold_size(); } @@ -399,91 +387,59 @@ bool LogDev::flush_if_needed(int64_t threshold_size) { bool const flush_by_size = (pending_sz >= threshold_size); bool const flush_by_time = !flush_by_size && pending_sz && (elapsed_time > HS_DYNAMIC_CONFIG(logstore.max_time_between_flush_us)); - if (flush_by_size || flush_by_time) { - // First off, check if we can flush in this thread itself, if not, schedule it into different thread - if (!can_flush_in_this_thread()) { - iomanager.run_on_forget(logstore_service().flush_thread(), - [this, threshold_size]() { flush_if_needed(threshold_size); }); - return false; + std::unique_lock lck(m_flush_mtx, std::try_to_lock); + if (lck.owns_lock()) { + if (m_stopped) return false; + return flush(); } + } + return false; +} - bool expected_flushing{false}; - if (!m_is_flushing.compare_exchange_strong(expected_flushing, true, std::memory_order_acq_rel)) { - return false; - } - THIS_LOGDEV_LOG(TRACE, - "Flushing now because either pending_size={} is greater than data_threshold={} or " - "elapsed time since last flush={} us is greater than max_time_between_flush={} us", - pending_sz, threshold_size, elapsed_time, - HS_DYNAMIC_CONFIG(logstore.max_time_between_flush_us)); - - m_last_flush_time = Clock::now(); - // We were able to win the flushing competition and now we gather all the flush data and reserve a slot. - auto new_idx = m_log_idx.load(std::memory_order_relaxed) - 1; - if (m_last_flush_idx >= new_idx) { - THIS_LOGDEV_LOG(TRACE, "Log idx {} is just flushed", new_idx); - unlock_flush(false); - return false; - } +bool LogDev::flush_under_guard() { + std::unique_lock lg = flush_guard(); + return flush(); +} - auto* lg = prepare_flush(new_idx - m_last_flush_idx + 4); // Estimate 4 more extra in case of parallel writes - if (sisl_unlikely(!lg)) { - THIS_LOGDEV_LOG(TRACE, "Log idx {} last_flush_idx {} prepare flush failed", new_idx, m_last_flush_idx); - unlock_flush(false); - return false; - } - auto sz = m_pending_flush_size.fetch_sub(lg->actual_data_size(), std::memory_order_relaxed); - HS_REL_ASSERT_GE((sz - lg->actual_data_size()), 0, "size {} lg size{}", sz, lg->actual_data_size()); - - off_t offset = m_vdev_jd->alloc_next_append_blk(lg->header()->total_size()); - lg->m_log_dev_offset = offset; - HS_REL_ASSERT_NE(lg->m_log_dev_offset, INVALID_OFFSET, "log dev is full"); - THIS_LOGDEV_LOG(TRACE, "Flushing log group data size={} at offset=0x{} log_group={}", lg->actual_data_size(), - to_hex(offset), *lg); - // THIS_LOGDEV_LOG(DEBUG, "Log Group: {}", *lg); - do_flush(lg); - return true; - } else { +bool LogDev::flush() { + m_last_flush_time = Clock::now(); + // We were able to win the flushing competition and now we gather all the flush data and reserve a slot. + auto new_idx = m_log_idx.load(std::memory_order_acquire) - 1; + if (m_last_flush_idx >= new_idx) { + THIS_LOGDEV_LOG(TRACE, "Log idx {} is just flushed", new_idx); return false; } -} - -void LogDev::do_flush(LogGroup* lg) { -#ifdef _PRERELEASE - if (iomgr_flip::instance()->delay_flip< int >( - "simulate_log_flush_delay", [this, lg]() { do_flush_write(lg); }, m_logdev_id)) { - THIS_LOGDEV_LOG(INFO, "Delaying flush by rescheduling the async write"); - return; + LogGroup* lg = prepare_flush(new_idx - m_last_flush_idx + 4); // Estimate 4 more extra in case of parallel writes + if (sisl_unlikely(!lg)) { + THIS_LOGDEV_LOG(TRACE, "Log idx {} last_flush_idx {} prepare flush failed", new_idx, m_last_flush_idx); + return false; } -#endif - - // if (has_data_service() && data_service().is_fsync_needed()) { - // data_service().fsync([this, lg]() { do_flush_write(lg); }) - // } else { - // do_flush_write(lg); - // } - do_flush_write(lg); -} + auto sz = m_pending_flush_size.fetch_sub(lg->actual_data_size(), std::memory_order_relaxed); + HS_REL_ASSERT_GE((sz - lg->actual_data_size()), 0, "size {} lg size{}", sz, lg->actual_data_size()); + off_t offset = m_vdev_jd->alloc_next_append_blk(lg->header()->total_size()); + lg->m_log_dev_offset = offset; + + HS_REL_ASSERT_NE(lg->m_log_dev_offset, INVALID_OFFSET, "log dev is full"); + THIS_LOGDEV_LOG(TRACE, "Flushing log group data size={} at offset=0x{} log_group={}", lg->actual_data_size(), + to_hex(offset), *lg); -void LogDev::do_flush_write(LogGroup* lg) { HISTOGRAM_OBSERVE(logstore_service().m_metrics, logdev_flush_records_distribution, lg->nrecords()); HISTOGRAM_OBSERVE(logstore_service().m_metrics, logdev_flush_size_distribution, lg->actual_data_size()); - THIS_LOGDEV_LOG(TRACE, "vdev offset={} log group total size={}", lg->m_log_dev_offset, lg->header()->total_size()); // write log - m_vdev_jd->async_pwritev(lg->iovecs().data(), int_cast(lg->iovecs().size()), lg->m_log_dev_offset) - .thenValue([this, lg](auto) { on_flush_completion(lg); }); + m_vdev_jd->sync_pwritev(lg->iovecs().data(), int_cast(lg->iovecs().size()), lg->m_log_dev_offset); + + on_flush_completion(lg); + return true; } void LogDev::on_flush_completion(LogGroup* lg) { - lg->m_flush_finish_time = Clock::now(); - lg->m_post_flush_msg_rcvd_time = Clock::now(); + auto done_time = Clock::now(); THIS_LOGDEV_LOG(TRACE, "Flush completed for logid[{} - {}]", lg->m_flush_log_idx_from, lg->m_flush_log_idx_upto); m_log_records->complete(lg->m_flush_log_idx_from, lg->m_flush_log_idx_upto); m_last_flush_idx = lg->m_flush_log_idx_upto; - const auto flush_ld_key = logdev_key{m_last_flush_idx, lg->m_log_dev_offset + lg->header()->total_size()}; m_last_crc = lg->header()->cur_grp_crc; auto from_indx = lg->m_flush_log_idx_from; @@ -491,141 +447,92 @@ void LogDev::on_flush_completion(LogGroup* lg) { auto dev_offset = lg->m_log_dev_offset; for (auto idx = from_indx; idx <= upto_indx; ++idx) { auto& record = m_log_records->at(idx); - on_io_completion(record.store_id, logdev_key{idx, dev_offset}, flush_ld_key, upto_indx - idx, record.context); + logstore_req* req = s_cast< logstore_req* >(record.context); + HomeLogStore* log_store = req->log_store; + HS_LOG_ASSERT_EQ(log_store->get_store_id(), record.store_id, + "Expecting store id in log store and flush completion to match"); + + log_store->on_write_completion(req, logdev_key{idx, dev_offset}); } - lg->m_post_flush_process_done_time = Clock::now(); + if (m_log_records_safe_truncate_to_idx < upto_indx) m_log_records_safe_truncate_to_idx = upto_indx; - HISTOGRAM_OBSERVE(logstore_service().m_metrics, logdev_flush_done_msg_time_ns, - get_elapsed_time_us(lg->m_flush_finish_time, lg->m_post_flush_msg_rcvd_time)); + HISTOGRAM_OBSERVE(logstore_service().m_metrics, logdev_flush_time_us, + get_elapsed_time_us(m_last_flush_time, done_time)); HISTOGRAM_OBSERVE(logstore_service().m_metrics, logdev_post_flush_processing_latency, - get_elapsed_time_us(lg->m_post_flush_msg_rcvd_time, lg->m_post_flush_process_done_time)); + get_elapsed_time_us(done_time)); free_log_group(lg); - unlock_flush(); -} - -bool LogDev::run_under_flush_lock(const flush_blocked_callback& cb) { - { - std::unique_lock lk{m_block_flush_q_mutex}; - if (m_stopped) { - THIS_LOGDEV_LOG(WARN, "Trying to lock a flush on a stopped logdev, not locking the flush"); - return false; - } - bool expected_flushing{false}; - if (!m_is_flushing.compare_exchange_strong(expected_flushing, true, std::memory_order_acq_rel)) { - // Flushing is blocked already, add it to the callback q - if (m_block_flush_q == nullptr) { m_block_flush_q = sisl::VectorPool< flush_blocked_callback >::alloc(); } - m_block_flush_q->emplace_back(cb); - return false; - } + // Truncate the log records, as it is no longer needed to be kept in memory. Everytime there is a read, we actually + // read from the drives. + // TODO:: we can keep this in memory to accelerate read until it is truncated + if (from_indx == m_log_records_last_truncate_to_idx + 1) { + m_log_records->truncate(m_log_records_safe_truncate_to_idx); + m_log_records_last_truncate_to_idx = m_log_records_safe_truncate_to_idx; } - - // the contract here is if cb return falses, it means it will unlock_flush by itself (in another thread); - if (cb()) { unlock_flush(); } - return true; } -void LogDev::unlock_flush(bool do_flush) { - std::vector< flush_blocked_callback >* flush_q{nullptr}; - { - std::unique_lock lk{m_block_flush_q_mutex}; - if (m_block_flush_q != nullptr) { - flush_q = m_block_flush_q; - m_block_flush_q = nullptr; +uint64_t LogDev::truncate() { + // Order of this lock has to be preserved. We take externally visible lock which is flush lock first. This prevents + // any further update to tail_lsn and also flushes conurrently with truncation. Then we take the store map lock, + // which is contained in this class and then meta_mutex. Reason for this is, we take meta_mutex under other + // store_map lock, so reversing could cause deadlock + std::unique_lock fg = flush_guard(); + folly::SharedMutexWritePriority::ReadHolder holder(m_store_map_mtx); + std::unique_lock mg{m_meta_mutex}; + + logdev_key min_safe_ld_key = logdev_key::out_of_bound_ld_key(); + // Walk through all the stores and find the least logdev_key that we can truncate + for (auto& [store_id, store] : m_id_logstore_map) { + auto lstore = store.log_store; + if (lstore == nullptr) { continue; } + auto const [trunc_lsn, trunc_ld_key, tail_lsn] = lstore->truncate_info(); + if (trunc_lsn == tail_lsn) { + THIS_LOGDEV_LOG(DEBUG, "Store_id={} didn't have any writes since last truncation, skipping ", store_id); + m_logdev_meta.remove_all_rollback_records(store_id, m_stopped /* persist_now */); + continue; } + HS_DBG_ASSERT_GE(trunc_ld_key.idx, m_last_truncate_idx, "Trying to truncate logid which is already truncated"); + m_logdev_meta.update_store_superblk(store_id, logstore_superblk(trunc_lsn + 1), m_stopped /* persist_now */); + + // We found a new minimum logdev_key that we can truncate to + if (trunc_ld_key.idx < min_safe_ld_key.idx) { min_safe_ld_key = trunc_ld_key; } } - if (flush_q) { - for (auto it = flush_q->begin(); it != flush_q->end(); ++it) { - auto& cb = *it; - if (m_stopped) { - THIS_LOGDEV_LOG(INFO, "Logdev is stopped and thus not processing outstanding flush_lock_q"); - return; - } - if (!cb()) { - // NOTE: Under this if condition DO NOT ASSUME flush lock is still being held. This is because - // callee is saying, I will unlock the flush lock on my own and before returning from cb to here, - // the callee could have schedule a job in other thread and unlock the flush. - std::unique_lock lk{m_block_flush_q_mutex}; - THIS_LOGDEV_LOG(DEBUG, - "flush cb wanted to hold onto the flush lock, so putting the {} remaining entries back " - "to the q at head position which already has {} entries", - std::distance(flush_q->begin(), it), m_block_flush_q ? m_block_flush_q->size() : 0); - - flush_q->erase(flush_q->begin(), it + 1); - if (m_block_flush_q == nullptr) { - m_block_flush_q = flush_q; - } else { - m_block_flush_q->insert(m_block_flush_q->begin(), flush_q->begin(), flush_q->end()); - } + // There are no writes or no truncation called for any of the store, so we can't truncate anything + if (min_safe_ld_key == logdev_key::out_of_bound_ld_key()) { return 0; } - return; - } - } - sisl::VectorPool< flush_blocked_callback >::free(flush_q); - } - m_is_flushing.store(false, std::memory_order_release); + uint64_t const num_records_to_truncate = uint64_cast(min_safe_ld_key.idx - m_last_truncate_idx); + HS_PERIODIC_LOG(INFO, logstore, "Truncating log_dev={} log_id={} vdev_offset={} truncated {} log records", + m_logdev_id, min_safe_ld_key.idx, min_safe_ld_key.dev_offset, num_records_to_truncate); - // Try to do chain flush if its really needed. - THIS_LOGDEV_LOG(TRACE, "Unlocked the flush, try doing chain flushing if needed"); - // send a message to see if a new flush can be triggered - if (do_flush) { flush_if_needed(); } -} + // Truncate them in vdev + m_vdev_jd->truncate(min_safe_ld_key.dev_offset); + THIS_LOGDEV_LOG(DEBUG, "LogDev::truncation done upto log_id={} ", min_safe_ld_key.idx); -uint64_t LogDev::truncate(const logdev_key& key) { - HS_DBG_ASSERT_GE(key.idx, m_last_truncate_idx); - uint64_t const num_records_to_truncate = static_cast< uint64_t >(key.idx - m_last_truncate_idx); - THIS_LOGDEV_LOG(DEBUG, "LogDev::truncate num {} idx {}", num_records_to_truncate, key.idx); - if (num_records_to_truncate > 0) { - HS_PERIODIC_LOG(INFO, logstore, - "Truncating log device upto log_dev={} log_id={} vdev_offset={} truncated {} log records", - m_logdev_id, key.idx, key.dev_offset, num_records_to_truncate); - m_log_records->truncate(key.idx); - off_t new_offset = m_vdev_jd->truncate(key.dev_offset); - THIS_LOGDEV_LOG(DEBUG, "LogDev::truncate done {} offset old {} new {}", key.idx, key.dev_offset, new_offset); - m_last_truncate_idx = key.idx; - - { - std::unique_lock< std::mutex > lk{m_meta_mutex}; - - // Update the start offset to be read upon restart - m_logdev_meta.set_start_dev_offset(new_offset, key.idx + 1, false /* persist_now */); - - // Now that store is truncated, we can reclaim the store ids which are garbaged (if any) earlier -#ifdef _PRERELEASE - bool garbage_collect = false; -#endif - for (auto it{std::cbegin(m_garbage_store_ids)}; it != std::cend(m_garbage_store_ids);) { - if (it->first > key.idx) break; - - HS_PERIODIC_LOG(DEBUG, logstore, "Garbage collecting the log_dev={} log_store={} log_idx={}", - m_logdev_id, it->second, it->first); - m_logdev_meta.unreserve_store(it->second, false /* persist_now */); - it = m_garbage_store_ids.erase(it); -#ifdef _PRERELEASE - garbage_collect = true; -#endif - } + // Update the start offset to be read upon restart + m_last_truncate_idx = min_safe_ld_key.idx; + m_logdev_meta.set_start_dev_offset(min_safe_ld_key.dev_offset, min_safe_ld_key.idx + 1, + m_stopped /* persist_now */); - // We can remove the rollback records of those upto which logid is getting truncated - m_logdev_meta.remove_rollback_record_upto(key.idx, false /* persist_now */); - THIS_LOGDEV_LOG(DEBUG, "LogDev::truncate remove rollback {}", key.idx); - m_logdev_meta.persist(); -#ifdef _PRERELEASE - if (garbage_collect && iomgr_flip::instance()->test_flip("logdev_abort_after_garbage")) { - THIS_LOGDEV_LOG(INFO, "logdev aborting after unreserving garbage ids"); - hs()->crash_simulator().crash(); - return num_records_to_truncate; - } -#endif - } + // When a logstore is removed, it unregisteres the store and keeps the store id in garbage list. We can capture + // these store_ids upto the log_idx which is truncated and then unreserve those. Now on we can re-use the store_id + // on new store creation + for (auto it{std::cbegin(m_garbage_store_ids)}; it != std::cend(m_garbage_store_ids);) { + if (it->first > min_safe_ld_key.idx) { break; } + + HS_PERIODIC_LOG(DEBUG, logstore, "Garbage collecting log_store={} in log_dev={} log_idx={}", it->second, + m_logdev_id, it->first); + m_logdev_meta.unreserve_store(it->second, m_stopped /* persist_now */); + it = m_garbage_store_ids.erase(it); } - return num_records_to_truncate; -} -void LogDev::update_store_superblk(logstore_id_t store_id, const logstore_superblk& lsb, bool persist_now) { - std::unique_lock lg{m_meta_mutex}; - m_logdev_meta.update_store_superblk(store_id, lsb, persist_now); + // We can remove the rollback records of those upto which logid is getting truncated + m_logdev_meta.remove_rollback_record_upto(min_safe_ld_key.idx, m_stopped /* persist_now */); + THIS_LOGDEV_LOG(DEBUG, "LogDev::truncate remove rollback {}", min_safe_ld_key.idx); + + // All logdev meta information is updated in-memory, persist now + m_logdev_meta.persist(); + return num_records_to_truncate; } void LogDev::rollback(logstore_id_t store_id, logid_range_t id_range) { @@ -634,7 +541,6 @@ void LogDev::rollback(logstore_id_t store_id, logid_range_t id_range) { } /////////////////////////////// LogStore Section /////////////////////////////////////// - void LogDev::handle_unopened_log_stores(bool format) { for (auto it{std::begin(m_unopened_store_io)}; it != std::end(m_unopened_store_io); ++it) { LOGINFO("skip log entries for store id {}-{}, ios {}", m_logdev_id, it->first, it->second); @@ -669,7 +575,7 @@ std::shared_ptr< HomeLogStore > LogDev::create_new_log_store(bool append_mode) { HS_REL_ASSERT((it == m_id_logstore_map.end()), "store_id {}-{} already exists", m_logdev_id, store_id); m_id_logstore_map.insert(std::pair(store_id, logstore_info{.log_store = lstore, .append_mode = append_mode})); } - HS_LOG(INFO, logstore, "Created log store log_dev={} log_store={}", m_logdev_id, store_id); + LOGINFO("Created log store log_dev={} log_store={}", m_logdev_id, store_id); return lstore; } @@ -698,29 +604,6 @@ void LogDev::remove_log_store(logstore_id_t store_id) { unreserve_store_id(store_id); } -void LogDev::device_truncate_under_lock(const std::shared_ptr< truncate_req > treq) { - run_under_flush_lock([this, treq]() { - iomanager.run_on_forget(logstore_service().truncate_thread(), [this, treq]() { - const logdev_key trunc_upto = do_device_truncate(treq->dry_run); - bool done{false}; - if (treq->cb || treq->wait_till_done) { - { - std::lock_guard< std::mutex > lk{treq->mtx}; - done = (--treq->trunc_outstanding == 0); - treq->m_trunc_upto_result[m_logdev_id] = trunc_upto; - } - } - if (done) { - if (treq->cb) { treq->cb(treq->m_trunc_upto_result); } - if (treq->wait_till_done) { treq->cv.notify_one(); } - } - - unlock_flush(); - }); - return false; // Do not release the flush lock yet, the scheduler will unlock it. - }); -} - void LogDev::on_log_store_found(logstore_id_t store_id, const logstore_superblk& sb) { folly::SharedMutexWritePriority::ReadHolder holder(m_store_map_mtx); auto it = m_id_logstore_map.find(store_id); @@ -732,7 +615,7 @@ void LogDev::on_log_store_found(logstore_id_t store_id, const logstore_superblk& return; } - LOGINFO("Found a logstore log_dev={} log_store={} with start seq_num={}, Creating a new HomeLogStore instance", + LOGINFO("Found a logstore log_dev={} log_store={} with start lsn={}, Creating a new HomeLogStore instance", m_logdev_id, store_id, sb.m_first_seq_num); logstore_info& info = it->second; info.log_store = @@ -740,23 +623,7 @@ void LogDev::on_log_store_found(logstore_id_t store_id, const logstore_superblk& info.promise.setValue(info.log_store); } -static thread_local std::vector< std::shared_ptr< HomeLogStore > > s_cur_flush_batch_stores; - -void LogDev::on_io_completion(logstore_id_t id, logdev_key ld_key, logdev_key flush_ld_key, - uint32_t nremaining_in_batch, void* ctx) { - auto* req = s_cast< logstore_req* >(ctx); - HomeLogStore* log_store = req->log_store; - - if (req->is_write) { - HS_LOG_ASSERT_EQ(log_store->get_store_id(), id, "Expecting store id in log store and io completion to match"); - log_store->on_write_completion(req, ld_key); - on_batch_completion(log_store, nremaining_in_batch, flush_ld_key); - } else { - log_store->on_read_completion(req, ld_key); - } -} - -void LogDev::on_logfound(logstore_id_t id, logstore_seq_num_t seq_num, logdev_key ld_key, logdev_key flush_ld_key, +void LogDev::on_logfound(logstore_id_t id, logstore_seq_num_t lsn, logdev_key ld_key, logdev_key flush_ld_key, log_buffer buf, uint32_t nremaining_in_batch) { HomeLogStore* log_store{nullptr}; @@ -774,95 +641,7 @@ void LogDev::on_logfound(logstore_id_t id, logstore_seq_num_t seq_num, logdev_ke log_store = it->second.log_store.get(); } if (!log_store) { return; } - log_store->on_log_found(seq_num, ld_key, flush_ld_key, buf); - on_batch_completion(log_store, nremaining_in_batch, flush_ld_key); -} - -void LogDev::on_batch_completion(HomeLogStore* log_store, uint32_t nremaining_in_batch, logdev_key flush_ld_key) { - /* check if it is a first update on this log store */ - auto id = log_store->get_store_id(); - const auto it = m_last_flush_info.find(id); - if ((it == std::end(m_last_flush_info)) || (it->second != flush_ld_key.idx)) { - // first time completion in this batch for a given store_id - m_last_flush_info.insert_or_assign(id, flush_ld_key.idx); - if (it == std::end(m_last_flush_info)) { s_cur_flush_batch_stores.push_back(log_store->shared_from_this()); } - } - if (nremaining_in_batch == 0) { - // This batch is completed, call all log stores participated in this batch about the end of batch - HS_LOG_ASSERT_GT(s_cur_flush_batch_stores.size(), 0U, "Expecting one store to be flushed in batch"); - - for (auto& l : s_cur_flush_batch_stores) { - l->on_batch_completion(flush_ld_key); - } - s_cur_flush_batch_stores.clear(); - m_last_flush_info.clear(); - } -} - -logdev_key LogDev::do_device_truncate(bool dry_run) { - static thread_local std::vector< std::shared_ptr< HomeLogStore > > m_min_trunc_stores; - static thread_local std::vector< std::shared_ptr< HomeLogStore > > m_non_participating_stores; - - m_min_trunc_stores.clear(); - m_non_participating_stores.clear(); - logdev_key min_safe_ld_key = logdev_key::out_of_bound_ld_key(); - - std::string dbg_str{"Format [store_id:trunc_lsn:logidx:dev_trunc_pending?:active_writes_in_trucate?] "}; - - { - folly::SharedMutexWritePriority::ReadHolder holder(m_store_map_mtx); - for (auto& id_logstore : m_id_logstore_map) { - auto& store_ptr = id_logstore.second.log_store; - const auto& trunc_info = store_ptr->pre_device_truncation(); - - if (!trunc_info.pending_dev_truncation && !trunc_info.active_writes_not_part_of_truncation) { - // This log store neither has any pending device truncation nor active logstore io going on for now. - // Ignore this log store for min safe boundary calculation. - fmt::format_to(std::back_inserter(dbg_str), "[{}:None] ", store_ptr->get_store_id()); - m_non_participating_stores.push_back(store_ptr); - continue; - } - - fmt::format_to(std::back_inserter(dbg_str), "[{}:{}:{}:{}:{}] ", store_ptr->get_store_id(), - trunc_info.seq_num.load(), trunc_info.ld_key.idx, trunc_info.pending_dev_truncation, - trunc_info.active_writes_not_part_of_truncation); - if (trunc_info.ld_key.idx > min_safe_ld_key.idx) { continue; } - - if (trunc_info.ld_key.idx < min_safe_ld_key.idx) { - // New minimum safe l entry - min_safe_ld_key = trunc_info.ld_key; - m_min_trunc_stores.clear(); - } - m_min_trunc_stores.push_back(store_ptr); - } - } - - if ((min_safe_ld_key == logdev_key::out_of_bound_ld_key()) || (min_safe_ld_key.idx < 0)) { - HS_PERIODIC_LOG(INFO, logstore, - "[log_dev={}] No log store append on any log stores, skipping device truncation, " - "all_logstore_info:<{}>", - m_logdev_id, dbg_str); - return min_safe_ld_key; - } - - // Got the safest log id to truncate and actually truncate upto the safe log idx to the log device - if (!dry_run) { truncate(min_safe_ld_key); } - HS_PERIODIC_LOG(INFO, logstore, - "[log_dev={}] LogDevice truncate, all_logstore_info:<{}> safe log dev key to truncate={}", - m_logdev_id, dbg_str, min_safe_ld_key); - - // We call post device truncation only to the log stores whose prepared truncation points are fully - // truncated or to stores which didn't particpate in this device truncation. - for (auto& store_ptr : m_min_trunc_stores) { - store_ptr->post_device_truncation(min_safe_ld_key); - } - for (auto& store_ptr : m_non_participating_stores) { - store_ptr->post_device_truncation(min_safe_ld_key); - } - m_min_trunc_stores.clear(); // Not clearing here, would cause a shared_ptr ref holding. - m_non_participating_stores.clear(); - - return min_safe_ld_key; + log_store->on_log_found(lsn, ld_key, flush_ld_key, buf); } nlohmann::json LogDev::dump_log_store(const log_dump_req& dump_req) { @@ -900,7 +679,6 @@ nlohmann::json LogDev::get_status(int verbosity) const { js["time_since_last_log_flush_ns"] = get_elapsed_time_ns(m_last_flush_time); if (verbosity == 2) { js["logdev_stopped?"] = m_stopped; - js["is_log_flushing_now?"] = m_is_flushing.load(std::memory_order_relaxed); js["logdev_sb_start_offset"] = m_logdev_meta.get_start_dev_offset(); js["logdev_sb_num_stores_reserved"] = m_logdev_meta.num_stores_reserved(); } @@ -934,20 +712,13 @@ logdev_superblk* LogDevMetadata::create(logdev_id_t id) { return sb; } -void LogDevMetadata::destroy() { - m_rollback_sb.destroy(); - m_sb.destroy(); -} - void LogDevMetadata::reset() { m_id_reserver.reset(); m_store_info.clear(); } void LogDevMetadata::logdev_super_blk_found(const sisl::byte_view& buf, void* meta_cookie) { - m_sb.load(buf, meta_cookie); - LOGINFO("Logdev superblk found log_dev={}", m_sb->logdev_id); HS_REL_ASSERT_EQ(m_sb->get_magic(), logdev_superblk::LOGDEV_SB_MAGIC, "Invalid logdev metablk, magic mismatch"); HS_REL_ASSERT_EQ(m_sb->get_version(), logdev_superblk::LOGDEV_SB_VERSION, "Invalid version of logdev metablk"); } @@ -1168,4 +939,9 @@ bool LogDevMetadata::resize_rollback_sb_if_needed() { return false; } } + +void LogDevMetadata::destroy() { + m_rollback_sb.destroy(); + m_sb.destroy(); +} } // namespace homestore diff --git a/src/lib/logstore/log_dev.hpp b/src/lib/logstore/log_dev.hpp index 2614ba25c..fda95fbb5 100644 --- a/src/lib/logstore/log_dev.hpp +++ b/src/lib/logstore/log_dev.hpp @@ -293,9 +293,6 @@ class LogGroup { off_t m_log_dev_offset; uint64_t m_flush_multiple_size{0}; - Clock::time_point m_flush_finish_time; // Time at which flush is completed - Clock::time_point m_post_flush_msg_rcvd_time; // Time at which flush done message delivered - Clock::time_point m_post_flush_process_done_time; // Time at which entire log group cb is called private: log_group_footer* add_and_get_footer(); @@ -386,21 +383,6 @@ std::basic_ostream< charT, traits >& operator<<(std::basic_ostream< charT, trait } // namespace homestore -namespace fmt { -template <> -struct formatter< homestore::logdev_key > { - template < typename ParseContext > - constexpr auto parse(ParseContext& ctx) { - return ctx.begin(); - } - - template < typename FormatContext > - auto format(homestore::logdev_key const& k, FormatContext& ctx) { - return format_to(ctx.out(), "[idx={} dev_offset={}]", k.idx, k.dev_offset); - } -}; -} // namespace fmt - namespace homestore { using log_buffer = sisl::byte_view; @@ -501,7 +483,6 @@ class LogDevMetadata { ~LogDevMetadata() = default; logdev_superblk* create(logdev_id_t id); - void destroy(); void reset(); std::vector< std::pair< logstore_id_t, logstore_superblk > > load(); void persist(); @@ -530,6 +511,7 @@ class LogDevMetadata { void logdev_super_blk_found(const sisl::byte_view& buf, void* meta_cookie); void rollback_super_blk_found(const sisl::byte_view& buf, void* meta_cookie); + void destroy(); private: bool resize_logdev_sb_if_needed(); @@ -540,7 +522,6 @@ class LogDevMetadata { } uint32_t store_capacity() const; - void remove_all_rollback_records(logstore_id_t id); private: superblk< logdev_superblk > m_sb; @@ -557,8 +538,8 @@ class JournalVirtualDev; class log_stream_reader { public: - log_stream_reader(off_t device_cursor, JournalVirtualDev* vdev, shared< JournalVirtualDev::Descriptor > vdev_jd, - uint64_t min_read_size); + log_stream_reader(off_t device_cursor, std::shared_ptr< JournalVirtualDev > vdev, + shared< JournalVirtualDev::Descriptor > vdev_jd, uint64_t min_read_size); log_stream_reader(const log_stream_reader&) = delete; log_stream_reader& operator=(const log_stream_reader&) = delete; log_stream_reader(log_stream_reader&&) noexcept = delete; @@ -572,7 +553,7 @@ class log_stream_reader { sisl::byte_view read_next_bytes(uint64_t nbytes, bool& end_of_stream); private: - JournalVirtualDev* m_vdev; + std::shared_ptr< JournalVirtualDev > m_vdev; shared< JournalVirtualDev::Descriptor > m_vdev_jd; // Journal descriptor. sisl::byte_view m_cur_log_buf; off_t m_first_group_cursor; @@ -586,36 +567,26 @@ struct logstore_info { bool append_mode; folly::SharedPromise< std::shared_ptr< HomeLogStore > > promise{}; }; -struct truncate_req { - std::mutex mtx; - std::condition_variable cv; - bool wait_till_done{false}; - bool dry_run{false}; - device_truncate_cb_t cb; - std::unordered_map< logdev_id_t, logdev_key > m_trunc_upto_result; - int trunc_outstanding{0}; -}; static std::string const logdev_sb_meta_name{"Logdev_sb"}; static std::string const logdev_rollback_sb_meta_name{"Logdev_rollback_sb"}; +VENUM(flush_mode_t, uint32_t, // Various flush modes (can be or'ed together) + INLINE = 1 << 0, // Allow flush inline with the append + TIMER = 1 << 1, // Allow timer based automatic flush + EXPLICIT = 1 << 2, // Allow explcitly user calling flush +); + class LogDev : public std::enable_shared_from_this< LogDev > { friend class HomeLogStore; public: - // NOTE: Possibly change these in future to include constant correctness - typedef std::function< void(logstore_id_t, logdev_key, logdev_key, uint32_t nremaining_in_batch, void*) > - log_append_comp_callback; - typedef std::function< void(logstore_id_t, logstore_seq_num_t, logdev_key, logdev_key, log_buffer, uint32_t) > - log_found_callback; - typedef std::function< void(logstore_id_t, const logstore_superblk&) > store_found_callback; - typedef std::function< bool(void) > flush_blocked_callback; - static inline int64_t flush_data_threshold_size() { return HS_DYNAMIC_CONFIG(logstore.flush_threshold_size) - sizeof(log_group_header); } - LogDev(logdev_id_t logdev_id, JournalVirtualDev* vdev); + LogDev(logdev_id_t logdev_id, + flush_mode_t flush_mode = static_cast< flush_mode_t >(HS_DYNAMIC_CONFIG(logstore.flush_mode))); LogDev(const LogDev&) = delete; LogDev& operator=(const LogDev&) = delete; LogDev(LogDev&&) noexcept = delete; @@ -627,8 +598,9 @@ class LogDev : public std::enable_shared_from_this< LogDev > { * to the recovery. It is expected that all callbacks are registered before calling the start. * * @param format: Do we need to format the logdev or not. + * @param blk_store: The blk_store associated to this logdev */ - void start(bool format); + void start(bool format, std::shared_ptr< JournalVirtualDev > vdev); /** * @brief Stop the logdev. It resets all the parameters it is using and thus can be started later @@ -637,22 +609,16 @@ class LogDev : public std::enable_shared_from_this< LogDev > { void stop(); /** - * @brief Destroy the logdev metablks. + * @brief return whether the logdev is stopped or not * */ - void destroy(); + bool is_stopped(); /** - * @brief Start the flush timer. - * - */ - void start_timer(); - - /** - * @brief Stop the flush timer. + * @brief Destroy the logdev metablks. * */ - void stop_timer(); + void destroy(); /** * @brief Append the data to the log device asynchronously. The buffer that is passed is expected to be valid, till @@ -668,7 +634,7 @@ class LogDev : public std::enable_shared_from_this< LogDev > { * @return logid_t : log_idx of the log of the data. */ logid_t append_async(logstore_id_t store_id, logstore_seq_num_t seq_num, const sisl::io_blob& data, - void* cb_context, bool flush_wait = false); + void* cb_context); /** * @brief Read the log id from the device offset @@ -684,44 +650,29 @@ class LogDev : public std::enable_shared_from_this< LogDev > { */ log_buffer read(const logdev_key& key, serialized_log_record& record_header); - /** - * @brief Load the data from the blkstore starting with offset. This method loads data in bulk and then call - * the registered logfound_cb with key and buffer. NOTE: This method is not thread safe. It is expected to be called - * during recovery - * - * @param offset Log blkstore device offset. - */ - void load(uint64_t offset); - - // callback from blkstore, registered at vdev creation; - // void process_logdev_completions(const boost::intrusive_ptr< virtualdev_req >& vd_req); - - /** - * @brief Reserve logstore id and persist if needed. It persists the entire map about the logstore id inside the - * - * @return uint32_t : Return the reserved id - */ - logstore_id_t reserve_store_id(); + /// @brief Flush the log device in case if pending data size is at least the threshold size. This is a blocking call + /// and hence it is required to run on thread/fiber which can run blocking io. If not run on such thread, it will + /// redirect the flush to a flush thread and run there. + /// + /// @param threshold_size [Optional]: Size in bytes after which it will flush, if set to -1, will use default size + /// + /// @return bool : True if it has flushed the data, false otherwise + bool flush_if_necessary(int64_t threshold_size = -1); - /** - * @brief Unreserve the logstore id. It does not immediately unregisters and persist the unregistered map, but it - * will add to the waiting list (garbage list) and then during truncation, it actually unreserves and persits map. - * - * @param store_id - */ - void unreserve_store_id(logstore_id_t store_id); + /// @brief : Look at all logstore and find out the safest point upto which it can truncate and truncate them. + /// + /// @return number of log records it has truncated + uint64_t truncate(); /** - * @brief Is the given store id already reserved. + * @brief Rollback the logid range specific to the given store id. This method persists the information + * synchronously to the underlying storage. Once rolledback those logids in this range are ignored (only for + * this logstore) during load. * - * @return true or false - */ - bool is_reserved_store_id(logstore_id_t id); - - /** - * @brief This method persist the store ids reserved/unreserved inside the vdev super block + * @param store_id : Store id whose logids are to be rolled back or invalidated + * @param id_range : Log id range to rollback/invalidate */ - void persist_store_ids(); + void rollback(logstore_id_t store_id, logid_range_t id_range); /** * @brief This method get all the store ids that are registered already and out of them which are being garbaged @@ -732,103 +683,72 @@ class LogDev : public std::enable_shared_from_this< LogDev > { */ void get_registered_store_ids(std::vector< logstore_id_t >& registered, std::vector< logstore_id_t >& garbage); - crc32_t get_prev_crc() const { return m_last_crc; } - - /** - * @brief This method attempts to block the log flush and then make a callback cb. If it is already blocked, - * then after previous flush is completed, it will make the callback (while log flush is still under blocked state) - * - * @param cb Callback - * @return true or false based on if it is able to block the flush right away. - */ - bool run_under_flush_lock(const flush_blocked_callback& cb); - - /** - * @brief Unblock the flush. While unblocking if there are other requests to block or any flush pending it first - * executes them before unblocking - */ - void unlock_flush(bool do_flush = true); - - /** - * @brief Rollback the logid range specific to the given store id. This method persists the information - * synchronously to the underlying storage. Once rolledback those logids in this range are ignored (only for - * this logstore) during load. - * - * @param store_id : Store id whose logids are to be rolled back or invalidated - * @param id_range : Log id range to rollback/invalidate - */ - void rollback(logstore_id_t store_id, logid_range_t id_range); - - void update_store_superblk(logstore_id_t idx, const logstore_superblk& meta, bool persist_now); - nlohmann::json dump_log_store(const log_dump_req& dum_req); nlohmann::json get_status(int verbosity) const; - bool flush_if_needed(int64_t threshold_size = -1); - - bool is_aligned_buf_needed(size_t size) const { - return (log_record::is_size_inlineable(size, m_flush_size_multiple) == false); - } - - uint64_t get_flush_size_multiple() const { return m_flush_size_multiple; } - logdev_key get_last_flush_ld_key() const { return logdev_key{m_last_flush_idx, m_last_flush_dev_offset}; } - - LogDevMetadata& log_dev_meta() { return m_logdev_meta; } - static bool can_flush_in_this_thread(); - - // Logstore management. - std::shared_ptr< HomeLogStore > create_new_log_store(bool append_mode = false); + //////////////////// Logstore management /////////////////////// + /// @brief Create a new log store under this log device + /// @param append_mode Is this log store is append mode or not. If append mode, write_async call fails and only + /// append_async calls succeed. + /// + /// @return shared< HomeLogStore > : The newly created log store + shared< HomeLogStore > create_new_log_store(bool append_mode = false); + + /// @brief Open the log store which was created under this log device. It expects that log store id is already + /// created. Behavior of opening a log store which was never created is unknown. One can create log store in + /// non-append mode, but upon restart, it can be opened in append_mode. Log store is not usable until the future is + /// armed with logstore. It is expected that caller calls this method before LogDev::start() is called, otherwise + /// unopened log devs and log stores are removed. + /// + /// @param store_id Store id to open the log store + /// @param append_mode Is this log store is append mode or not. If append mode, write_async call fails and only + /// append_async calls succeed. + /// @return future< shared< HomeLogStore > > : Future which will be set with the log store once it is opened folly::Future< shared< HomeLogStore > > open_log_store(logstore_id_t store_id, bool append_mode); - bool close_log_store(logstore_id_t store_id) { - // TODO: Implement this method - return true; - } + + /// @brief Remove the log store and its associated resources + /// @param store_id Store id that was created/opened void remove_log_store(logstore_id_t store_id); - void on_io_completion(logstore_id_t id, logdev_key ld_key, logdev_key flush_idx, uint32_t nremaining_in_batch, - void* ctx); - void on_log_store_found(logstore_id_t store_id, const logstore_superblk& sb); - void on_logfound(logstore_id_t id, logstore_seq_num_t seq_num, logdev_key ld_key, logdev_key flush_ld_key, - log_buffer buf, uint32_t nremaining_in_batch); - void on_batch_completion(HomeLogStore* log_store, uint32_t nremaining_in_batch, logdev_key flush_ld_key); - /** - * Truncates the device under lock. - * - * This function is responsible for truncating the device based on the provided truncate request. - * The truncation operation is performed under a lock to ensure thread safety. - * - * @param treq The truncate request to be processed. - */ - void device_truncate_under_lock(const std::shared_ptr< truncate_req > treq); + /// @return externally visible lock to avoid flush concurrently + auto flush_guard() { return std::unique_lock(m_flush_mtx); } - void handle_unopened_log_stores(bool format); - logdev_id_t get_id() { return m_logdev_id; } - shared< JournalVirtualDev::Descriptor > get_journal_descriptor() const { return m_vdev_jd; } - bool is_stopped() { return m_stopped; } + /// @brief do flush under the protection of flush guard + bool flush_under_guard(); - // bool ready_for_truncate() const { return m_vdev_jd->ready_for_truncate(); } + ///////////////// Getters /////////////////////// + LogDevMetadata& log_dev_meta() { return m_logdev_meta; } + logdev_id_t get_id() const { return m_logdev_id; } + uint64_t get_flush_size_multiple() const { return m_flush_size_multiple; } private: + void start_timer(); + void stop_timer(); + + bool allow_inline_flush() const { return uint32_cast(m_flush_mode) & uint32_cast(flush_mode_t::INLINE); } + bool allow_timer_flush() const { return uint32_cast(m_flush_mode) & uint32_cast(flush_mode_t::TIMER); } + bool allow_explicit_flush() const { return uint32_cast(m_flush_mode) & uint32_cast(flush_mode_t::EXPLICIT); } + /** - * @brief : truncate up to input log id; + * @brief Reserve logstore id and persist if needed. It persists the entire map about the logstore id inside the * - * @param key : the key containing log id that needs to be truncate up to; - * @return number of records to truncate + * @return uint32_t : Return the reserved id */ - uint64_t truncate(const logdev_key& key); + logstore_id_t reserve_store_id(); /** - * Truncates the device. - * - * This function truncates the device and returns the corresponding logdev_key. - * - * @param dry_run If set to true, the function performs a dry run without actually truncating the device, it only - * updates the corresponding truncation barriers, pretending the truncation happened without actually discarding the - * log entries on device. + * @brief Unreserve the logstore id. It does not immediately unregisters and persist the unregistered map, but it + * will add to the waiting list (garbage list) and then during truncation, it actually unreserves and persits map. * - * @return The logdev_key representing the truncated device. + * @param store_id */ - logdev_key do_device_truncate(bool dry_run = false); + void unreserve_store_id(logstore_id_t store_id); + + void on_flush_completion(LogGroup* lg); + void on_log_store_found(logstore_id_t store_id, const logstore_superblk& sb); + void handle_unopened_log_stores(bool format); + void on_logfound(logstore_id_t id, logstore_seq_num_t seq_num, logdev_key ld_key, logdev_key flush_ld_key, + log_buffer buf, uint32_t nremaining_in_batch); LogGroup* make_log_group(uint32_t estimated_records) { m_log_group_pool[m_log_group_idx].reset(estimated_records); @@ -838,73 +758,83 @@ class LogDev : public std::enable_shared_from_this< LogDev > { void free_log_group(LogGroup* lg) { m_log_group_idx = !m_log_group_idx; } LogGroup* prepare_flush(int32_t estimated_record); - - void do_flush(LogGroup* lg); - void do_flush_write(LogGroup* lg); - void flush_by_size(uint32_t min_threshold, uint32_t new_record_size = 0, logid_t new_idx = -1); - void on_flush_completion(LogGroup* lg); void do_load(off_t offset); + void assert_next_pages(log_stream_reader& lstream); -#if 0 - log_group_header* read_validate_header(uint8_t* buf, uint32_t size, bool* read_more); - sisl::byte_array read_next_header(uint32_t max_buf_reads); -#endif + /// @brief force to flush the log device + /// @return whether real flush is done + bool flush(); - void _persist_info_block(); - void assert_next_pages(log_stream_reader& lstream); - void set_flush_status(bool flush_status); - bool get_flush_status(); + bool can_flush_in_this_thread(); private: - std::unique_ptr< sisl::StreamTracker< log_record > > - m_log_records; // The container which stores all in-memory log records - std::atomic< logid_t > m_log_idx{0}; // Generator of log idx - std::atomic< int64_t > m_pending_flush_size{0}; // How much flushable logs are pending - std::atomic< bool > m_is_flushing{false}; // Is LogDev currently flushing (so far supports one flusher at a time) - bool m_stopped{true}; // Is Logdev stopped. We don't need lock here, because it is updated under flush lock + std::unique_ptr< sisl::StreamTracker< log_record > > m_log_records; // Container stores all in-memory log records + std::atomic< logid_t > m_log_idx{0}; // Generator of log idx + std::atomic< int64_t > m_pending_flush_size{0}; // How much flushable logs are pending + bool m_stopped{false}; // Is Logdev stopped. We don't need lock here, because it is updated under flush lock logdev_id_t m_logdev_id; - JournalVirtualDev* m_vdev{nullptr}; + std::shared_ptr< JournalVirtualDev > m_vdev; shared< JournalVirtualDev::Descriptor > m_vdev_jd; // Journal descriptor. HomeStoreSafePtr m_hs; // Back pointer to homestore + flush_mode_t m_flush_mode; folly::SharedMutexWritePriority m_store_map_mtx; std::unordered_map< logstore_id_t, logstore_info > m_id_logstore_map; std::unordered_map< logstore_id_t, uint64_t > m_unopened_store_io; std::unordered_set< logstore_id_t > m_unopened_store_id; - std::unordered_map< logstore_id_t, logid_t > m_last_flush_info; - std::multimap< logid_t, logstore_id_t > m_garbage_store_ids; Clock::time_point m_last_flush_time; logid_t m_last_flush_idx{-1}; // Track last flushed, last device offset and truncated log idx - off_t m_last_flush_dev_offset{0}; - logid_t m_last_truncate_idx{-1}; + logid_t m_last_truncate_idx{std::numeric_limits< logid_t >::min()}; // logdev truncate up to this idx + + // after completing flush, we can not truncate the log records directly. since we might shedule new flush/append in + // the callback of the append_async, which can be found in the logstore test case. + // this means we might schedule call on_flush_completion in another on_flush_completion, which is a recursive call. + // for example: + // on_flush_completion_1 will flush log record 0 - 10 and will truncate to 10 + // on_flush_completion_2 will flush log record 10 - 20 and will truncate to 20 + // on_flush_completion_3 will flush log record 20 - 30 and will truncate to 30 + // on_flush_completion_1 will call on_flush_completion_2 and on_flush_completion_2 will call on_flush_completion_3 + // in this case, on_flush_completion_3 will be completed before on_flush_completion_2, so when on_flush_completion_3 + // is done and on_flush_completion_2 is back to be executed, the log records are already truncated to 30, so the + // truncate to 20 (which will be called in on_flush_completion_2) will be invalid. + // what`s more, `auto& record = m_log_records->at(idx);` this line will be invalid, since the log record in that + // index has already been truncated. + + // so , in the recursive case, we need to keep log records not being truancated when flush is done until it is not + // be used by the lower stack function. coming to the example here, on_flush_completion_3 can not truncate to 30, + // since log records 10 -20 is needed for on_flush_completion_2. for the same reason, on_flush_completion_2 can not + // truncate to 20, since log records 0 -10 is needed for on_flush_completion_1. + + // the solution is we involve the following two member, + // 1 m_log_records_safe_truncate_to_idx , the index where we can truncate to safely when doing real truncation. this + // is updated by the last call in the call stack. in our example, on_flush_completion_3 will update it to 30. when + // calling in on_flush_completion_1 and on_flush_completion_2, this will not be updated since the upto_indx in these + // two will be smaller than that in on_flush_completion_3. + + // m_log_records_last_truncate_to_idx, the index where we can truncate from. it is used to keeps all the truncation + // sequentiall + logid_t m_log_records_safe_truncate_to_idx{std::numeric_limits< logid_t >::min()}; + logid_t m_log_records_last_truncate_to_idx{std::numeric_limits< logid_t >::min()}; crc32_t m_last_crc{INVALID_CRC32_VALUE}; - log_append_comp_callback m_append_comp_cb{nullptr}; - log_found_callback m_logfound_cb{nullptr}; - store_found_callback m_store_found_cb{nullptr}; // LogDev Info block related fields std::mutex m_meta_mutex; LogDevMetadata m_logdev_meta; - - // Block flush Q request Q - std::mutex m_block_flush_q_mutex; - std::condition_variable m_block_flush_q_cv; - std::mutex m_comp_mutex; - std::vector< flush_blocked_callback >* m_block_flush_q{nullptr}; - - void* m_sb_cookie{nullptr}; uint64_t m_flush_size_multiple{0}; // Pool for creating log group LogGroup m_log_group_pool[max_log_group]; uint32_t m_log_group_idx{0}; - std::atomic< bool > m_flush_status = false; // Timer handle iomgr::timer_handle_t m_flush_timer_hdl{iomgr::null_timer_handle}; + // if we support inline flush mode, we might schedule flush operation in the same thread(for exampel, in the + // callback of the append_async we schedule another flush.), so we need the lock to be locked for multitimes in the + // same thread. + iomgr::FiberManagerLib::mutex m_flush_mtx; }; // LogDev } // namespace homestore diff --git a/src/lib/logstore/log_store.cpp b/src/lib/logstore/log_store.cpp index 672f05558..09fae7002 100644 --- a/src/lib/logstore/log_store.cpp +++ b/src/lib/logstore/log_store.cpp @@ -38,77 +38,35 @@ HomeLogStore::HomeLogStore(std::shared_ptr< LogDev > logdev, logstore_id_t id, b m_logdev{logdev}, m_records{"HomeLogStoreRecords", start_lsn - 1}, m_append_mode{append_mode}, - m_seq_num{start_lsn}, + m_start_lsn{start_lsn}, + m_next_lsn{start_lsn}, + m_tail_lsn{start_lsn - 1}, m_fq_name{fmt::format("{} log_dev={}", id, logdev->get_id())}, - m_metrics{logstore_service().metrics()} { - m_truncation_barriers.reserve(10000); - m_safe_truncation_boundary.ld_key = m_logdev->get_last_flush_ld_key(); - m_safe_truncation_boundary.seq_num.store(start_lsn - 1, std::memory_order_release); - THIS_LOGSTORE_LOG(TRACE, "m_safe_truncation_boundary.ld_key={}", m_safe_truncation_boundary.ld_key); -} - -bool HomeLogStore::write_sync(logstore_seq_num_t seq_num, const sisl::io_blob& b) { - HS_LOG_ASSERT((!iomanager.am_i_worker_reactor()), "Sync can not be done in worker reactor thread"); - - // these should be static so that they stay in scope in the lambda in case function ends before lambda completes - struct Context { - std::mutex write_mutex; - std::condition_variable write_cv; - bool write_done{false}; - bool ret{false}; - }; - auto ctx = std::make_shared< Context >(); - this->write_async( - seq_num, b, nullptr, - [seq_num, this, ctx](homestore::logstore_seq_num_t seq_num_cb, [[maybe_unused]] const sisl::io_blob& b, - homestore::logdev_key ld_key, [[maybe_unused]] void* cb_ctx) { - HS_DBG_ASSERT((ld_key && seq_num == seq_num_cb), "Write_Async failed or corrupted"); - { - std::unique_lock< std::mutex > lk{ctx->write_mutex}; - ctx->write_done = true; - ctx->ret = true; - } - ctx->write_cv.notify_one(); - }, - true /* flush_wait */); - - { - std::unique_lock< std::mutex > lk{ctx->write_mutex}; - ctx->write_cv.wait(lk, [&ctx] { return ctx->write_done; }); - } - - return ctx->ret; -} + m_metrics{logstore_service().metrics()} {} void HomeLogStore::write_async(logstore_req* req, const log_req_comp_cb_t& cb) { HS_LOG_ASSERT((cb || m_comp_cb), "Expected either cb is not null or default cb registered"); req->cb = (cb ? cb : m_comp_cb); req->start_time = Clock::now(); - if (req->seq_num == 0) { - m_safe_truncation_boundary.ld_key = m_logdev->get_last_flush_ld_key(); - THIS_LOGSTORE_LOG(TRACE, "m_safe_truncation_boundary.ld_key={}", m_safe_truncation_boundary.ld_key); - } + #ifndef NDEBUG - const auto trunc_upto_lsn = truncated_upto(); - if (req->seq_num <= trunc_upto_lsn) { - THIS_LOGSTORE_LOG(ERROR, "Assert: Appending lsn={} lesser than or equal to truncated_upto_lsn={}", req->seq_num, - trunc_upto_lsn); + if (req->seq_num < start_lsn()) { + THIS_LOGSTORE_LOG(ERROR, "Assert: Writing lsn={} lesser than start_lsn={}", req->seq_num, start_lsn()); HS_DBG_ASSERT(0, "Assertion"); } #endif - + // Todo :: any read after this should get the log m_records.create(req->seq_num); COUNTER_INCREMENT(m_metrics, logstore_append_count, 1); HISTOGRAM_OBSERVE(m_metrics, logstore_record_size, req->data.size()); - m_logdev->append_async(m_store_id, req->seq_num, req->data, static_cast< void* >(req), req->flush_wait); + m_logdev->append_async(m_store_id, req->seq_num, req->data, static_cast< void* >(req)); } void HomeLogStore::write_async(logstore_seq_num_t seq_num, const sisl::io_blob& b, void* cookie, - const log_write_comp_cb_t& cb, bool flush_wait) { + const log_write_comp_cb_t& cb) { // Form an internal request and issue the write - auto* req = logstore_req::make(this, seq_num, b, true /* is_write_req */); + auto* req = logstore_req::make(this, seq_num, b); req->cookie = cookie; - req->flush_wait = flush_wait; write_async(req, [cb](logstore_req* req, logdev_key written_lkey) { if (cb) { cb(req->seq_num, req->data, written_lkey, req->cookie); } @@ -118,20 +76,30 @@ void HomeLogStore::write_async(logstore_seq_num_t seq_num, const sisl::io_blob& logstore_seq_num_t HomeLogStore::append_async(const sisl::io_blob& b, void* cookie, const log_write_comp_cb_t& cb) { HS_DBG_ASSERT_EQ(m_append_mode, true, "append_async can be called only on append only mode"); - const auto seq_num = m_seq_num.fetch_add(1, std::memory_order_acq_rel); + const auto seq_num = m_next_lsn.fetch_add(1, std::memory_order_acq_rel); write_async(seq_num, b, cookie, cb); return seq_num; } +void HomeLogStore::write_and_flush(logstore_seq_num_t seq_num, const sisl::io_blob& b) { + HS_LOG_ASSERT(iomanager.am_i_sync_io_capable(), + "Write and flush is a blocking IO, which can't run in this thread, please reschedule to a fiber"); + if (seq_num > m_next_lsn.load(std::memory_order_relaxed)) m_next_lsn.store(seq_num + 1, std::memory_order_relaxed); + write_async(seq_num, b, nullptr /* cookie */, nullptr /* cb */); + m_logdev->flush_under_guard(); +} + log_buffer HomeLogStore::read_sync(logstore_seq_num_t seq_num) { + HS_LOG_ASSERT(iomanager.am_i_sync_io_capable(), + "Read sync is a blocking IO, which can't run in this thread, reschedule to a fiber"); + // If seq_num has not been flushed yet, but issued, then we flush them before reading auto const s = m_records.status(seq_num); if (s.is_out_of_range || s.is_hole) { - // THIS_LOGSTORE_LOG(ERROR, "ld_key not valid {}", seq_num); throw std::out_of_range("key not valid"); } else if (!s.is_completed) { THIS_LOGSTORE_LOG(TRACE, "Reading lsn={}:{} before flushed, doing flush first", m_store_id, seq_num); - flush_sync(seq_num); + m_logdev->flush_under_guard(); } const auto record = m_records.at(seq_num); @@ -142,167 +110,108 @@ log_buffer HomeLogStore::read_sync(logstore_seq_num_t seq_num) { } const auto start_time = Clock::now(); - // THIS_LOGSTORE_LOG(TRACE, "Reading lsn={}:{} mapped to logdev_key=[idx={} dev_offset={}]", m_store_id, seq_num, - // ld_key.idx, ld_key.dev_offset); + THIS_LOGSTORE_LOG(TRACE, "Reading lsn={}:{} mapped to logdev_key=[idx={} dev_offset={}]", m_store_id, seq_num, + ld_key.idx, ld_key.dev_offset); COUNTER_INCREMENT(m_metrics, logstore_read_count, 1); serialized_log_record header; const auto b = m_logdev->read(ld_key, header); HISTOGRAM_OBSERVE(m_metrics, logstore_read_latency, get_elapsed_time_us(start_time)); return b; } -#if 0 -void HomeLogStore::read_async(logstore_req* req, const log_found_cb_t& cb) { - HS_LOG_ASSERT( ((cb != nullptr) || (m_comp_cb != nullptr)), - "Expected either cb is not null or default cb registered"); - auto record = m_records.at(req->seq_num); - logdev_key ld_key = record.m_dev_key; - req->cb = cb; - m_logdev->read_async(ld_key, (void*)req); -} - -void HomeLogStore::read_async(logstore_seq_num_t seq_num, void* cookie, const log_found_cb_t& cb) { - auto record = m_records.at(seq_num); - logdev_key ld_key = record.m_dev_key; - sisl::io_blob b; - auto* req = logstore_req::make(this, seq_num, &b, false /* not write */); - read_async(req, [cookie, cb](logstore_seq_num_t seq_num, log_buffer log_buf, void* cookie) { - cb(seq, log_buf, cookie); - logstore_req::free(req); - }); -} -#endif void HomeLogStore::on_write_completion(logstore_req* req, const logdev_key& ld_key) { - std::unique_lock lk(m_sync_flush_mtx); + // Logstore supports out-of-order lsn writes, in that case we need to mark the truncation key for this lsn as the + // one which is being written by the higher lsn. This is to ensure that we don't truncate higher lsn's logdev_key + // when we truncate the lower lsns. + // + // out-of-order means we can write lsns in any order and flush them, say we have + //-> Write lsn=1 + //-> Write lsn=4 + //-> Write lsn=2 + // and can flush. When we check for contiguous completion or recovery we get upto lsn=2. The moment we have + // lsn=3, 4 also will be visible. + // This is an additional feature outside of a typical logstore, to allow external replication engine to control the + // logstore. This feature isn't used by RAFT, as we start logstores in append_only mode. + + // TODO: In case of out-of-order lsns, it needs to read the records of the tail_lsn and get their truncation key. + // This involves a read lock and an atomic operation. We can optimize this in case if the ld_key is updated for the + // same batch. + logdev_key trunc_key; + if (m_tail_lsn < req->seq_num) { + m_tail_lsn = req->seq_num; + trunc_key = ld_key; + } else { + // this means out-of-order happens. for example , if lsn=1, 4 are written and flushed , they will be flushed in + // LogGroup1. when lsn=2 , 3 is written and flushed , they will be flushed in LogGroup2. the m_log_dev_offset of + // LogGroup2 is larger than that of LogGroup1. now , if we want to truncate to lsn 3, we can not remove logGroup + // 1 from logDev, since that will not only remove lsn 1 but also will remove lsn 4. so we set the m_trunc_key of + // lsn 2 and 3 to the same as lsn 4(tail_lsn).when truncation is shecduled, the safe truncation ld_key of this + // logstore will be the m_trunc_key of lsn 4, which will keep LogGroup 1 from being removed. + trunc_key = m_records.at(m_tail_lsn).m_trunc_key; + } + + atomic_update_max(m_next_lsn, req->seq_num + 1, std::memory_order_acq_rel); // Upon completion, create the mapping between seq_num and log dev key - m_records.update(req->seq_num, [&](logstore_record& rec) -> bool { + m_records.update(req->seq_num, [&ld_key, &trunc_key](logstore_record& rec) -> bool { rec.m_dev_key = ld_key; - THIS_LOGSTORE_LOG(DEBUG, "Completed write of store lsn {} logdev_key={}", req->seq_num, ld_key); + rec.m_trunc_key = trunc_key; return true; }); - // assert(flush_ld_key.idx >= m_last_flush_ldkey.idx); - // Update the maximum lsn we have seen for this batch for this store, it is needed to create truncation barrier - m_flush_batch_max_lsn = std::max(m_flush_batch_max_lsn, req->seq_num); + THIS_LOGSTORE_LOG(TRACE, "Completed write of lsn={} logdev_key:{} tail_lsn={} trunc_key:{}", req->seq_num, + ld_key.to_string(), m_tail_lsn.load(std::memory_order_relaxed), trunc_key.to_string()); HISTOGRAM_OBSERVE(m_metrics, logstore_append_latency, get_elapsed_time_us(req->start_time)); - auto lsn = req->seq_num; - (req->cb) ? req->cb(req, ld_key) : m_comp_cb(req, ld_key); - - if (m_sync_flush_waiter_lsn.load() == lsn) { - // Sync flush is waiting for this lsn to be completed, wake up the sync flush cv - m_sync_flush_cv.notify_one(); - } -} - -void HomeLogStore::on_read_completion(logstore_req* req, const logdev_key& ld_key) { (req->cb) ? req->cb(req, ld_key) : m_comp_cb(req, ld_key); } void HomeLogStore::on_log_found(logstore_seq_num_t seq_num, const logdev_key& ld_key, const logdev_key& flush_ld_key, log_buffer buf) { - THIS_LOGSTORE_LOG(DEBUG, "Found a log lsn={} logdev_key={}", seq_num, ld_key); - - // Create the mapping between seq_num and log dev key - m_records.create_and_complete(seq_num, ld_key); - atomic_update_max(m_seq_num, seq_num + 1, std::memory_order_acq_rel); - m_flush_batch_max_lsn = std::max(m_flush_batch_max_lsn, seq_num); - - if (seq_num <= m_safe_truncation_boundary.seq_num.load(std::memory_order_acquire)) { - THIS_LOGSTORE_LOG(TRACE, "Log lsn={} is already truncated on per device, ignoring", seq_num); - return; + THIS_LOGSTORE_LOG(DEBUG, "Found a log lsn={} logdev_key:{} tail_lsn={}", seq_num, ld_key.to_string(), + m_tail_lsn.load(std::memory_order_relaxed)); + logdev_key trunc_key; + if (m_tail_lsn < seq_num) { + m_tail_lsn = seq_num; + trunc_key = flush_ld_key; + } else { + trunc_key = m_records.at(m_tail_lsn).m_trunc_key; } - if (m_found_cb != nullptr) m_found_cb(seq_num, buf, nullptr); -} -void HomeLogStore::on_batch_completion(const logdev_key& flush_batch_ld_key) { - assert(m_flush_batch_max_lsn != std::numeric_limits< logstore_seq_num_t >::min()); + // Create the mapping between seq_num and log dev key + m_records.create_and_complete(seq_num, logstore_record(ld_key, trunc_key)); + atomic_update_max(m_next_lsn, seq_num + 1, std::memory_order_acq_rel); - // Create a new truncation barrier for this completion key - if (m_truncation_barriers.size() && (m_truncation_barriers.back().seq_num >= m_flush_batch_max_lsn)) { - m_truncation_barriers.back().ld_key = flush_batch_ld_key; - } else { - m_truncation_barriers.push_back({m_flush_batch_max_lsn, flush_batch_ld_key}); - } - m_flush_batch_max_lsn = std::numeric_limits< logstore_seq_num_t >::min(); // Reset the flush batch for next batch. + if (m_found_cb != nullptr) { m_found_cb(seq_num, buf, nullptr); } } -void HomeLogStore::truncate(logstore_seq_num_t upto_seq_num, bool in_memory_truncate_only) { -#if 0 - if (!iomanager.is_io_thread()) { - LOGDFATAL("Expected truncate to be called from iomanager thread. Ignoring truncate"); - return; - } -#endif - +void HomeLogStore::truncate(logstore_seq_num_t upto_lsn, bool in_memory_truncate_only) { #ifndef NDEBUG - const auto s = m_safe_truncation_boundary.seq_num.load(std::memory_order_acquire); - if (s != -1) { - auto cs = get_contiguous_completed_seq_num(s); - if (upto_seq_num > cs) { - THIS_LOGSTORE_LOG(WARN, - "Truncation issued on seq_num={} outside of contiguous completions={}, " - "still proceeding to truncate", - upto_seq_num, cs); - } + auto cs = get_contiguous_completed_seq_num(0); + if (upto_lsn > cs) { + THIS_LOGSTORE_LOG(WARN, + "Truncation issued on seq_num={} outside of contiguous completions={}, " + "still proceeding to truncate", + upto_lsn, cs); } -#endif - // First try to block the flushing of logdevice and if we are successfully able to do, then - auto shared_this = shared_from_this(); - m_logdev->run_under_flush_lock([shared_this, upto_seq_num]() { - shared_this->do_truncate(upto_seq_num); - return true; - }); -} +#endif -// NOTE: This method assumes the flush lock is already acquired by the caller -void HomeLogStore::do_truncate(logstore_seq_num_t upto_seq_num) { - m_records.truncate(upto_seq_num); - m_safe_truncation_boundary.seq_num.store(upto_seq_num, std::memory_order_release); - - // Need to update the superblock with meta, we don't persist yet, will be done as part of log dev truncation - m_logdev->update_store_superblk(m_store_id, logstore_superblk{upto_seq_num + 1}, false /* persist_now */); - - const int ind = search_max_le(upto_seq_num); - if (ind < 0) { - // m_safe_truncation_boundary.pending_dev_truncation = false; - THIS_LOGSTORE_PERIODIC_LOG(DEBUG, - "Truncate upto lsn={}, possibly already truncated so ignoring. Current safe device " - "truncation barrier=", - upto_seq_num, m_safe_truncation_boundary.ld_key); - return; + if (upto_lsn > m_tail_lsn) { + THIS_LOGSTORE_LOG(WARN, + "Truncation issued on lsn={} which is greater than tail_lsn={}, truncating upto tail_lsn", + upto_lsn, m_tail_lsn.load(std::memory_order_relaxed)); + m_trunc_ld_key = m_records.at(m_tail_lsn).m_trunc_key; + upto_lsn = m_tail_lsn; + } else { + m_trunc_ld_key = m_records.at(upto_lsn).m_trunc_key; } - - THIS_LOGSTORE_PERIODIC_LOG( - DEBUG, "Truncate upto lsn={}, nearest safe device truncation barrier , is_last_barrier={}", - upto_seq_num, ind, m_truncation_barriers[ind].ld_key, - (ind == static_cast< int >(m_truncation_barriers.size() - 1))); - - m_safe_truncation_boundary.ld_key = m_truncation_barriers[ind].ld_key; - THIS_LOGSTORE_LOG(TRACE, "m_safe_truncation_boundary.ld_key={}", m_safe_truncation_boundary.ld_key); - m_safe_truncation_boundary.pending_dev_truncation = true; - - m_truncation_barriers.erase(m_truncation_barriers.begin(), m_truncation_barriers.begin() + ind + 1); + m_records.truncate(upto_lsn); + m_start_lsn.store(upto_lsn + 1); + if (!in_memory_truncate_only) { m_logdev->truncate(); } } -// NOTE: This method assumes the flush lock is already acquired by the caller -const truncation_info& HomeLogStore::pre_device_truncation() { - m_safe_truncation_boundary.active_writes_not_part_of_truncation = (m_truncation_barriers.size() > 0); - return m_safe_truncation_boundary; -} - -// NOTE: This method assumes the flush lock is already acquired by the caller -void HomeLogStore::post_device_truncation(const logdev_key& trunc_upto_loc) { - if (trunc_upto_loc.idx >= m_safe_truncation_boundary.ld_key.idx) { - // This method is expected to be called always with this - m_safe_truncation_boundary.pending_dev_truncation = false; - m_safe_truncation_boundary.ld_key = trunc_upto_loc; - THIS_LOGSTORE_LOG(TRACE, "m_safe_truncation_boundary.ld_key={}", m_safe_truncation_boundary.ld_key); - } else { - HS_REL_ASSERT(0, - "We expect post_device_truncation to be called only for logstores which has min of all " - "truncation boundaries"); - } +std::tuple< logstore_seq_num_t, logdev_key, logstore_seq_num_t > HomeLogStore::truncate_info() const { + auto const trunc_lsn = m_start_lsn.load(std::memory_order_relaxed) - 1; + return std::make_tuple(trunc_lsn, m_trunc_ld_key, m_tail_lsn.load(std::memory_order_relaxed)); } void HomeLogStore::fill_gap(logstore_seq_num_t seq_num) { @@ -310,58 +219,29 @@ void HomeLogStore::fill_gap(logstore_seq_num_t seq_num) { seq_num); logdev_key empty_ld_key; - m_records.create_and_complete(seq_num, empty_ld_key); -} - -int HomeLogStore::search_max_le(logstore_seq_num_t input_sn) { - int mid{0}; - int start{-1}; - int end = int_cast(m_truncation_barriers.size()); - - while ((end - start) > 1) { - mid = start + (end - start) / 2; - const auto& mid_entry = m_truncation_barriers[mid]; - - if (mid_entry.seq_num == input_sn) { - return mid; - } else if (mid_entry.seq_num > input_sn) { - end = mid; - } else { - start = mid; - } - } - - return (end - 1); + m_records.create_and_complete(seq_num, logstore_record(empty_ld_key, empty_ld_key)); } nlohmann::json HomeLogStore::dump_log_store(const log_dump_req& dump_req) { nlohmann::json json_dump{}; // create root object json_dump["store_id"] = this->m_store_id; - const auto trunc_upto = this->truncated_upto(); - std::remove_const_t< decltype(trunc_upto) > idx{trunc_upto + 1}; - if (dump_req.start_seq_num != 0) { idx = dump_req.start_seq_num; } + int64_t start_idx = std::max(dump_req.start_seq_num, start_lsn()); // must use move operator= operation instead of move copy constructor nlohmann::json json_records = nlohmann::json::array(); - bool end_iterate{false}; - m_records.foreach_contiguous_completed( - idx, - [&json_records, &dump_req, &end_iterate, this](int64_t cur_idx, int64_t max_idx, - const homestore::logstore_record& record) -> bool { - // do a sync read - // must use move operator= operation instead of move copy constructor + m_records.foreach_all_completed( + start_idx, [this, &dump_req, &json_records](int64_t, homestore::logstore_record const& rec) -> bool { nlohmann::json json_val = nlohmann::json::object(); serialized_log_record record_header; - const auto log_buffer{m_logdev->read(record.m_dev_key, record_header)}; - + const auto log_buffer = m_logdev->read(rec.m_dev_key, record_header); try { - json_val["size"] = static_cast< uint32_t >(record_header.size); - json_val["offset"] = static_cast< uint32_t >(record_header.offset); - json_val["is_inlined"] = static_cast< uint32_t >(record_header.get_inlined()); - json_val["store_seq_num"] = static_cast< uint64_t >(record_header.store_seq_num); - json_val["store_id"] = static_cast< logstore_id_t >(record_header.store_id); + json_val["size"] = uint32_cast(record_header.size); + json_val["offset"] = uint32_cast(record_header.offset); + json_val["is_inlined"] = uint32_cast(record_header.get_inlined()); + json_val["lsn"] = uint64_cast(record_header.store_seq_num); + json_val["store_id"] = s_cast< logstore_id_t >(record_header.store_id); } catch (const std::exception& ex) { THIS_LOGSTORE_LOG(ERROR, "Exception in json dump- {}", ex.what()); } if (dump_req.verbosity_level == homestore::log_dump_verbosity::CONTENT) { @@ -371,9 +251,7 @@ nlohmann::json HomeLogStore::dump_log_store(const log_dump_req& dump_req) { json_val["content"] = std::move(content); } json_records.emplace_back(std::move(json_val)); - int64_t end_idx = std::min(max_idx, dump_req.end_seq_num); - end_iterate = (cur_idx < end_idx) ? true : false; - return end_iterate; + return true; }); json_dump["log_records"] = std::move(json_records); @@ -397,96 +275,67 @@ logstore_seq_num_t HomeLogStore::get_contiguous_completed_seq_num(logstore_seq_n return (logstore_seq_num_t)m_records.completed_upto(from + 1); } -void HomeLogStore::flush_sync(logstore_seq_num_t upto_seq_num) { - // Logdev flush is async call and if flush_sync is called on the same thread which could potentially do logdev - // flush, waiting sync would cause deadlock. - HS_DBG_ASSERT_EQ(LogDev::can_flush_in_this_thread(), false, - "Logstore flush sync cannot be called on same thread which could do logdev flush"); - - std::unique_lock lk(m_single_sync_flush_mtx); - if (upto_seq_num == invalid_lsn()) { upto_seq_num = m_records.active_upto(); } - - // if we have flushed already, we are done - if (!m_records.status(upto_seq_num).is_active) { return; } - - { - std::unique_lock lk(m_sync_flush_mtx); - - // Step 1: Mark the waiter lsn to the seqnum we wanted to wait for. The completion of every lsn checks - // for this and if this lsn is completed, will make a callback which signals the cv. - m_sync_flush_waiter_lsn.store(upto_seq_num); - - // Step 2: After marking this lsn, we again do a check, to avoid a race where completion checked for no lsn - // and the lsn is stored in step 1 above. - if (!m_records.status(upto_seq_num).is_active) { return; } - - // Step 3: Force a flush (with least threshold) - m_logdev->flush_if_needed(1); +void HomeLogStore::flush(logstore_seq_num_t upto_lsn) { + if (!m_logdev->allow_explicit_flush()) { + HS_LOG_ASSERT(false, + "Explicit flush is turned off or calling flush on wrong thread for this logdev, ignoring flush"); + return; + } - // Step 4: Wait for completion - m_sync_flush_cv.wait(lk, [this, upto_seq_num] { return !m_records.status(upto_seq_num).is_active; }); + if (upto_lsn == invalid_lsn()) { upto_lsn = m_records.active_upto(); } - // NOTE: We are not resetting the lsn because same seq number should never have 2 completions and thus not - // doing it saves an atomic instruction - THIS_LOGSTORE_LOG(TRACE, "flush_sync over upto_seq_num {}", upto_seq_num); - } + // if we have flushed already, we are done, else issue a flush + if (m_records.status(upto_lsn).is_active) m_logdev->flush_under_guard(); } -uint64_t HomeLogStore::rollback_async(logstore_seq_num_t to_lsn, on_rollback_cb_t cb) { +bool HomeLogStore::rollback(logstore_seq_num_t to_lsn) { // Validate if the lsn to which it is rolledback to is not truncated. auto ret = m_records.status(to_lsn + 1); if (ret.is_out_of_range) { HS_LOG_ASSERT(false, "Attempted to rollback to {} which is already truncated", to_lsn); - return 0; + return false; } - // Ensure that there are no pending lsn to flush. If so lets flush them now. - const auto from_lsn = get_contiguous_issued_seq_num(0); - if (get_contiguous_completed_seq_num(0) < from_lsn) { flush_sync(); } - HS_DBG_ASSERT_EQ(get_contiguous_completed_seq_num(0), get_contiguous_issued_seq_num(0), - "Still some pending lsns to flush, concurrent write and rollback is not supported"); - - // Do an in-memory rollback of lsns before we persist the log ids. This is done, so that subsequent appends can - // be queued without waiting for rollback async operation completion. It is safe to do so, since before returning - // from this method, we will queue ourselves to the flush lock and thus subsequent writes are guaranteed to go after - // this rollback is completed. - m_seq_num.store(to_lsn + 1, std::memory_order_release); // Rollback the next append lsn - logid_range_t logid_range = std::make_pair(m_records.at(to_lsn + 1).m_dev_key.idx, - m_records.at(from_lsn).m_dev_key.idx); // Get the logid range to rollback - m_records.rollback(to_lsn); // Rollback all bitset records and from here on, we can't access any lsns beyond to_lsn - - m_logdev->run_under_flush_lock([logid_range, to_lsn, this, comp_cb = std::move(cb)]() { - iomanager.run_on_forget(logstore_service().truncate_thread(), [logid_range, to_lsn, this, comp_cb]() { - // Rollback the log_ids in the range, for this log store (which persists this info in its superblk) - m_logdev->rollback(m_store_id, logid_range); - - // Remove all truncation barriers on rolled back lsns - for (auto it = std::rbegin(m_truncation_barriers); it != std::rend(m_truncation_barriers); ++it) { - if (it->seq_num > to_lsn) { - m_truncation_barriers.erase(std::next(it).base()); - } else { - break; - } + bool do_flush{false}; + do { + { + std::unique_lock lg = m_logdev->flush_guard(); + if (m_tail_lsn + 1 < m_next_lsn.load()) { + // We should flush any outstanding writes before we proceed with rollback + THIS_LOGSTORE_LOG(INFO, + "Rollback is issued while while there are some oustanding writes, tail_lsn={}, " + "next_lsn={}, will flush and retry rollback", + m_tail_lsn.load(std::memory_order_relaxed), + m_next_lsn.load(std::memory_order_relaxed)); + do_flush = true; + } else { + do_flush = false; + logid_range_t logid_range = + std::make_pair(m_records.at(to_lsn + 1).m_dev_key.idx, + m_records.at(m_tail_lsn).m_dev_key.idx); // Get the logid range to rollback + + // Update the next_lsn and tail lsn back to to_lsn and also rollback all stream records and now on, we + // can't access any lsns beyond to_lsn + m_next_lsn.store(to_lsn + 1, std::memory_order_release); // Rollback the next append lsn + m_tail_lsn = to_lsn; + m_records.rollback(to_lsn); + + // Rollback the log_ids in the range, for this log store (which persists this info in its superblk) + m_logdev->rollback(m_store_id, logid_range); } - m_flush_batch_max_lsn = invalid_lsn(); // Reset the flush batch for next batch. - if (comp_cb) { comp_cb(to_lsn); } - m_logdev->unlock_flush(); - }); - return false; - }); + } + if (do_flush) m_logdev->flush_under_guard(); + } while (do_flush); - return from_lsn - to_lsn; + return true; } nlohmann::json HomeLogStore::get_status(int verbosity) const { nlohmann::json js; js["append_mode"] = m_append_mode; - js["highest_lsn"] = m_seq_num.load(std::memory_order_relaxed); - js["max_lsn_in_prev_flush_batch"] = m_flush_batch_max_lsn; - js["truncated_upto_logdev_key"] = m_safe_truncation_boundary.ld_key.to_string(); - js["truncated_upto_lsn"] = m_safe_truncation_boundary.seq_num.load(std::memory_order_relaxed); - js["truncation_pending_on_device?"] = m_safe_truncation_boundary.pending_dev_truncation; - js["truncation_parallel_to_writes?"] = m_safe_truncation_boundary.active_writes_not_part_of_truncation; + js["start_lsn"] = m_start_lsn.load(std::memory_order_relaxed); + js["next_lsn"] = m_next_lsn.load(std::memory_order_relaxed); + js["tail_lsn"] = m_tail_lsn.load(std::memory_order_relaxed); js["logstore_records"] = m_records.get_status(verbosity); js["logstore_sb_first_lsn"] = m_logdev->log_dev_meta().store_superblk(m_store_id).m_first_seq_num; return js; diff --git a/src/lib/logstore/log_store_service.cpp b/src/lib/logstore/log_store_service.cpp index 5d9e0049f..d09d02f7c 100644 --- a/src/lib/logstore/log_store_service.cpp +++ b/src/lib/logstore/log_store_service.cpp @@ -115,7 +115,7 @@ void LogStoreService::start(bool format) { start_threads(); for (auto& [logdev_id, logdev] : m_id_logdev_map) { - logdev->start(format); + logdev->start(format, m_logdev_vdev); } } @@ -140,7 +140,7 @@ logdev_id_t LogStoreService::create_new_logdev() { folly::SharedMutexWritePriority::WriteHolder holder(m_logdev_map_mtx); logdev_id_t logdev_id = get_next_logdev_id(); auto logdev = create_new_logdev_internal(logdev_id); - logdev->start(true /* format */); + logdev->start(true /* format */, m_logdev_vdev); COUNTER_INCREMENT(m_metrics, logdevs_count, 1); HS_LOG(INFO, logstore, "Created log_dev={}", logdev_id); return logdev_id; @@ -181,7 +181,7 @@ void LogStoreService::delete_unopened_logdevs() { } std::shared_ptr< LogDev > LogStoreService::create_new_logdev_internal(logdev_id_t logdev_id) { - auto logdev = std::make_shared< LogDev >(logdev_id, m_logdev_vdev.get()); + auto logdev = std::make_shared< LogDev >(logdev_id); const auto it = m_id_logdev_map.find(logdev_id); HS_REL_ASSERT((it == m_id_logdev_map.end()), "logdev id {} already exists", logdev_id); m_id_logdev_map.insert(std::make_pair<>(logdev_id, logdev)); @@ -192,7 +192,7 @@ void LogStoreService::open_logdev(logdev_id_t logdev_id) { folly::SharedMutexWritePriority::WriteHolder holder(m_logdev_map_mtx); const auto it = m_id_logdev_map.find(logdev_id); if (it == m_id_logdev_map.end()) { - auto logdev = std::make_shared< LogDev >(logdev_id, m_logdev_vdev.get()); + auto logdev = std::make_shared< LogDev >(logdev_id); m_id_logdev_map.emplace(logdev_id, logdev); LOGDEBUGMOD(logstore, "log_dev={} does not exist, created!", logdev_id); } @@ -231,7 +231,7 @@ void LogStoreService::logdev_super_blk_found(const sisl::byte_view& buf, void* m logdev = it->second; HS_LOG(DEBUG, logstore, "Log dev superblk found log_dev={}", id); } else { - logdev = std::make_shared< LogDev >(id, m_logdev_vdev.get()); + logdev = std::make_shared< LogDev >(id); m_id_logdev_map.emplace(id, logdev); // when recover logdev meta blk, we get all the logdevs from the superblk. we put them in m_unopened_logdev // too. after logdev meta blks are all recovered, when a client opens a logdev, we remove it from @@ -290,27 +290,15 @@ void LogStoreService::remove_log_store(logdev_id_t logdev_id, logstore_id_t stor COUNTER_DECREMENT(m_metrics, logstores_count, 1); } -void LogStoreService::device_truncate(const device_truncate_cb_t& cb, bool wait_till_done, bool dry_run) { - const auto treq = std::make_shared< truncate_req >(); - treq->wait_till_done = wait_till_done; - treq->dry_run = dry_run; - treq->cb = cb; - if (treq->wait_till_done) { treq->trunc_outstanding = m_id_logdev_map.size(); } - +void LogStoreService::device_truncate() { // TODO: make device_truncate_under_lock return future and do collectAllFutures; - for (auto& [id, logdev] : m_id_logdev_map) { - logdev->device_truncate_under_lock(treq); - } - - if (treq->wait_till_done) { - std::unique_lock< std::mutex > lk{treq->mtx}; - treq->cv.wait(lk, [&] { return (treq->trunc_outstanding == 0); }); - } + for (auto& [id, logdev] : m_id_logdev_map) + logdev->truncate(); } -void LogStoreService::flush_if_needed() { +void LogStoreService::flush() { for (auto& [id, logdev] : m_id_logdev_map) { - logdev->flush_if_needed(); + logdev->flush_under_guard(); } } @@ -334,22 +322,9 @@ void LogStoreService::start_threads() { ctx->cv.notify_one(); } }); - - m_truncate_fiber = nullptr; - iomanager.create_reactor("logstore_truncater", iomgr::INTERRUPT_LOOP, 2 /* num_fibers */, - [this, ctx](bool is_started) { - if (is_started) { - m_truncate_fiber = iomanager.sync_io_capable_fibers()[0]; - { - std::unique_lock< std::mutex > lk{ctx->mtx}; - ++(ctx->thread_cnt); - } - ctx->cv.notify_one(); - } - }); { std::unique_lock< std::mutex > lk{ctx->mtx}; - ctx->cv.wait(lk, [ctx] { return (ctx->thread_cnt == 2); }); + ctx->cv.wait(lk, [ctx] { return (ctx->thread_cnt == 1); }); } } @@ -398,6 +373,7 @@ LogStoreServiceMetrics::LogStoreServiceMetrics() : sisl::MetricsGroup("LogStores REGISTER_HISTOGRAM(logdev_flush_done_msg_time_ns, "Logdev flush completion msg time in ns"); REGISTER_HISTOGRAM(logdev_post_flush_processing_latency, "Logdev post flush processing (including callbacks) latency"); + REGISTER_HISTOGRAM(logdev_flush_time_us, "time elapsed since last flush time in us"); REGISTER_HISTOGRAM(logdev_fsync_time_us, "Logdev fsync completion time in us"); register_me_to_farm(); diff --git a/src/lib/logstore/log_stream.cpp b/src/lib/logstore/log_stream.cpp index dd5eb5660..c5e732ccf 100644 --- a/src/lib/logstore/log_stream.cpp +++ b/src/lib/logstore/log_stream.cpp @@ -23,7 +23,7 @@ namespace homestore { SISL_LOGGING_DECL(logstore) -log_stream_reader::log_stream_reader(off_t device_cursor, JournalVirtualDev* vdev, +log_stream_reader::log_stream_reader(off_t device_cursor, shared< JournalVirtualDev > vdev, shared< JournalVirtualDev::Descriptor > vdev_jd, uint64_t read_size_multiple) : m_vdev{vdev}, m_vdev_jd{std::move(vdev_jd)}, @@ -164,7 +164,7 @@ sisl::byte_view log_stream_reader::group_in_next_page() { } sisl::byte_view log_stream_reader::read_next_bytes(uint64_t nbytes, bool& end_of_stream) { - // TO DO: Might need to address alignment based on data or fast type + // TODO: Might need to address alignment based on data or fast type const auto prev_pos = m_vdev_jd->seeked_pos(); auto sz_to_read = m_vdev_jd->sync_next_read(nullptr, nbytes); if (sz_to_read == -1) { diff --git a/src/lib/replication/log_store/home_raft_log_store.cpp b/src/lib/replication/log_store/home_raft_log_store.cpp index 2163a886c..0798187a8 100644 --- a/src/lib/replication/log_store/home_raft_log_store.cpp +++ b/src/lib/replication/log_store/home_raft_log_store.cpp @@ -36,8 +36,12 @@ SISL_LOGGING_DECL(replication) msg, ##__VA_ARGS__); namespace homestore { -static constexpr store_lsn_t to_store_lsn(uint64_t raft_lsn) { return s_cast< store_lsn_t >(raft_lsn) - 1; } -static constexpr store_lsn_t to_store_lsn(repl_lsn_t repl_lsn) { return repl_lsn - 1; } +static constexpr logstore_seq_num_t to_store_lsn(uint64_t raft_lsn) { + return static_cast< logstore_seq_num_t >(raft_lsn - 1); +} +static constexpr logstore_seq_num_t to_store_lsn(repl_lsn_t repl_lsn) { + return static_cast< logstore_seq_num_t >(repl_lsn - 1); +} static constexpr repl_lsn_t to_repl_lsn(store_lsn_t store_lsn) { return store_lsn + 1; } static nuraft::ptr< nuraft::log_entry > to_nuraft_log_entry(sisl::blob const& log_blob) { @@ -86,7 +90,8 @@ void HomeRaftLogStore::truncate(uint32_t num_reserved_cnt, repl_lsn_t compact_ls REPL_STORE_LOG(INFO, "LogDev={}: Truncating log entries from {} to {}, compact_lsn={}, last_lsn={}", m_logdev_id, start_lsn, truncate_lsn, compact_lsn, last_lsn); - m_log_store->truncate(truncate_lsn); + // do real truncation on log device + m_log_store->truncate(truncate_lsn, false); } } @@ -169,7 +174,7 @@ ulong HomeRaftLogStore::append(nuraft::ptr< nuraft::log_entry >& entry) { void HomeRaftLogStore::write_at(ulong index, nuraft::ptr< nuraft::log_entry >& entry) { auto buf = entry->serialize(); - m_log_store->rollback_async(to_store_lsn(index) - 1, nullptr); + m_log_store->rollback(to_store_lsn(index) - 1); // we need to reset the durable lsn, because its ok to set to lower number as it will be updated on next flush // calls, but it is dangerous to set higher number. @@ -180,8 +185,8 @@ void HomeRaftLogStore::write_at(ulong index, nuraft::ptr< nuraft::log_entry >& e } void HomeRaftLogStore::end_of_append_batch(ulong start, ulong cnt) { - store_lsn_t end_lsn = to_store_lsn(start + cnt - 1); - m_log_store->flush_sync(end_lsn); + auto end_lsn = to_store_lsn(start + cnt - 1); + m_log_store->flush(end_lsn); m_last_durable_lsn = end_lsn; } @@ -258,7 +263,7 @@ void HomeRaftLogStore::apply_pack(ulong index, nuraft::buffer& pack) { auto slot = next_slot(); if (index < slot) { // We are asked to apply/insert data behind next slot, so we must rollback before index and then append - m_log_store->rollback_async(to_store_lsn(index) - 1, nullptr); + m_log_store->rollback(to_store_lsn(index) - 1); } else if (index > slot) { // We are asked to apply/insert data after next slot, so we need to fill in with dummy entries upto the slot // before append the entries @@ -296,7 +301,7 @@ bool HomeRaftLogStore::compact(ulong compact_lsn) { } } - m_log_store->flush_sync(to_store_lsn(compact_lsn)); + m_log_store->flush(to_store_lsn(compact_lsn)); // we rely on resrouce mgr timer to trigger truncate for all log stores in system; // this will be friendly for multiple logstore on same logdev; @@ -307,7 +312,7 @@ bool HomeRaftLogStore::compact(ulong compact_lsn) { } bool HomeRaftLogStore::flush() { - m_log_store->flush_sync(); + m_log_store->flush(); return true; } diff --git a/src/tests/test_log_dev.cpp b/src/tests/test_log_dev.cpp index ada551940..52208ce89 100644 --- a/src/tests/test_log_dev.cpp +++ b/src/tests/test_log_dev.cpp @@ -149,8 +149,7 @@ class LogDevTest : public ::testing::Test { void insert_sync(std::shared_ptr< HomeLogStore > log_store, logstore_seq_num_t lsn, uint32_t fixed_size = 0) { bool io_memory{false}; auto* d = prepare_data(lsn, io_memory, fixed_size); - const bool succ = log_store->write_sync(lsn, {uintptr_cast(d), d->total_size(), false}); - EXPECT_TRUE(succ); + log_store->write_and_flush(lsn, {uintptr_cast(d), d->total_size(), false}); LOGINFO("Written sync data for LSN -> {}:{}", log_store->get_store_id(), lsn); if (io_memory) { iomanager.iobuf_free(uintptr_cast(d)); @@ -188,26 +187,7 @@ class LogDevTest : public ::testing::Test { try { read_verify(log_store, lsn); } catch (const std::exception& ex) { - logstore_seq_num_t trunc_upto = 0; - std::mutex mtx; - std::condition_variable cv; - bool get_trunc_upto = false; - log_store->get_logdev()->run_under_flush_lock( - [this, log_store, &trunc_upto, &get_trunc_upto, &mtx, &cv] { - // In case we run truncation in parallel to read, it is possible - // the truncated_upto accordingly. - trunc_upto = log_store->truncated_upto(); - std::unique_lock lock(mtx); - get_trunc_upto = true; - cv.notify_one(); - return true; - }); - std::unique_lock lock(mtx); - cv.wait(lock, [&get_trunc_upto] { return get_trunc_upto == true; }); - if (lsn <= trunc_upto) { - lsn = trunc_upto; - continue; - } + auto trunc_upto = log_store->truncated_upto(); LOGFATAL("Failed to read at upto {} lsn {} trunc_upto {}", upto, lsn, trunc_upto); } } @@ -215,27 +195,10 @@ class LogDevTest : public ::testing::Test { void rollback_validate(std::shared_ptr< HomeLogStore > log_store, logstore_seq_num_t& cur_lsn, uint32_t num_lsns_to_rollback) { - std::mutex mtx; - std::condition_variable cv; - bool rollback_done = false; cur_lsn -= num_lsns_to_rollback; auto const upto_lsn = cur_lsn - 1; - log_store->rollback_async(upto_lsn, [&](logstore_seq_num_t) { - ASSERT_EQ(log_store->get_contiguous_completed_seq_num(-1), upto_lsn) - << "Last completed seq num is not reset after rollback"; - ASSERT_EQ(log_store->get_contiguous_issued_seq_num(-1), upto_lsn) - << "Last issued seq num is not reset after rollback"; - read_all_verify(log_store); - { - std::unique_lock lock(mtx); - rollback_done = true; - } - cv.notify_one(); - }); - - // We wait till async rollback is finished as we do validation. - std::unique_lock lock(mtx); - cv.wait(lock, [&rollback_done] { return rollback_done == true; }); + log_store->rollback(upto_lsn); + read_all_verify(log_store); } void truncate_validate(std::shared_ptr< HomeLogStore > log_store) { @@ -243,7 +206,7 @@ class LogDevTest : public ::testing::Test { LOGINFO("truncate_validate upto {}", upto); log_store->truncate(upto); read_all_verify(log_store); - logstore_service().device_truncate(nullptr /* cb */, true /* wait_till_done */); + logstore_service().device_truncate(); } void rollback_records_validate(std::shared_ptr< HomeLogStore > log_store, uint32_t expected_count) { diff --git a/src/tests/test_log_store.cpp b/src/tests/test_log_store.cpp index d2cadd5f3..0f53a24a8 100644 --- a/src/tests/test_log_store.cpp +++ b/src/tests/test_log_store.cpp @@ -35,6 +35,7 @@ #include #include #include +#include #include #include @@ -165,7 +166,7 @@ class SampleLogStoreClient { } // Because of restart in tests, we have torce the flush of log entries. - m_log_store->get_logdev()->flush_if_needed(1); + m_log_store->get_logdev()->flush_if_necessary(1); } void iterate_validate(const bool expect_all_completed = false) { @@ -273,25 +274,7 @@ class SampleLogStoreClient { } } - logstore_seq_num_t get_truncated_upto() { - std::mutex mtx; - std::condition_variable cv; - bool get_trunc_upto = false; - logstore_seq_num_t trunc_upto = 0; - m_log_store->get_logdev()->run_under_flush_lock([this, &trunc_upto, &get_trunc_upto, &mtx, &cv]() { - // In case we run truncation in parallel to read, it is possible truncate moved, so adjust - // the truncated_upto accordingly. - trunc_upto = m_log_store->truncated_upto(); - std::unique_lock lock(mtx); - get_trunc_upto = true; - cv.notify_one(); - return true; - }); - - std::unique_lock lock(mtx); - cv.wait(lock, [&get_trunc_upto] { return get_trunc_upto == true; }); - return trunc_upto; - } + logstore_seq_num_t get_truncated_upto() { return m_log_store->truncated_upto(); } void fill_hole_and_validate() { const auto start = m_log_store->truncated_upto(); @@ -331,26 +314,13 @@ class SampleLogStoreClient { } void rollback_validate(uint32_t num_lsns_to_rollback) { - std::mutex mtx; - std::condition_variable cv; - bool rollback_done = false; auto const upto_lsn = m_cur_lsn.fetch_sub(num_lsns_to_rollback) - num_lsns_to_rollback - 1; - m_log_store->rollback_async(upto_lsn, [&](logstore_seq_num_t) { - ASSERT_EQ(m_log_store->get_contiguous_completed_seq_num(-1), upto_lsn) - << "Last completed seq num is not reset after rollback"; - ASSERT_EQ(m_log_store->get_contiguous_issued_seq_num(-1), upto_lsn) - << "Last issued seq num is not reset after rollback"; - read_validate(true); - { - std::unique_lock lock(mtx); - rollback_done = true; - } - cv.notify_one(); - }); - - // We wait till async rollback is finished as we do validation. - std::unique_lock lock(mtx); - cv.wait(lock, [&rollback_done] { return rollback_done == true; }); + m_log_store->rollback(upto_lsn); + ASSERT_EQ(m_log_store->get_contiguous_completed_seq_num(-1), upto_lsn) + << "Last completed seq num is not reset after rollback"; + ASSERT_EQ(m_log_store->get_contiguous_issued_seq_num(-1), upto_lsn) + << "Last issued seq num is not reset after rollback"; + read_validate(true); } void read(const logstore_seq_num_t lsn) { @@ -379,7 +349,7 @@ class SampleLogStoreClient { m_truncated_upto_lsn = lsn; } - void flush() { m_log_store->flush_sync(); } + void flush() { m_log_store->flush(); } bool has_all_lsns_truncated() const { return (m_truncated_upto_lsn.load() == (m_cur_lsn.load() - 1)); } @@ -508,13 +478,13 @@ class SampleDB { }); std::vector< logdev_id_t > logdev_id_vec; - for (uint32_t i{0}; i < n_log_stores; ++i) { + for (uint32_t i{0}; i < n_log_devs; ++i) { logdev_id_vec.push_back(logstore_service().create_new_logdev()); } for (uint32_t i{0}; i < n_log_stores; ++i) { m_log_store_clients.push_back(std::make_unique< SampleLogStoreClient >( - logdev_id_vec[i], bind_this(SampleDB::on_log_insert_completion, 3))); + logdev_id_vec[i % n_log_devs], bind_this(SampleDB::on_log_insert_completion, 3))); } SampleLogStoreClient::s_max_flush_multiple = logstore_service().get_logdev(logdev_id_vec[0])->get_flush_size_multiple(); @@ -553,7 +523,6 @@ class SampleDB { } private: - const static std::string s_fpath_root; std::vector< std::string > m_dev_names; std::function< void() > m_on_schedule_io_cb; test_log_store_comp_cb_t m_io_closure; @@ -562,8 +531,6 @@ class SampleDB { test_common::HSTestHelper m_helper; }; -const std::string SampleDB::s_fpath_root{"/tmp/log_store_dev_"}; - class LogStoreTest : public ::testing::Test { public: LogStoreTest() = default; @@ -574,13 +541,14 @@ class LogStoreTest : public ::testing::Test { virtual ~LogStoreTest() override = default; protected: - virtual void SetUp() override {}; - virtual void TearDown() override {}; + virtual void SetUp() override { SampleDB::instance().start_homestore(); }; + virtual void TearDown() override { SampleDB::instance().shutdown(); }; void init(uint64_t n_total_records, const std::vector< std::pair< size_t, int > >& inp_freqs = {}) { // m_nrecords_waiting_to_issue = std::lround(n_total_records / _batch_size) * _batch_size; m_nrecords_waiting_to_issue = n_total_records; m_nrecords_waiting_to_complete = 0; + m_schedule_more_inserts.store(true); SampleDB::instance().m_on_schedule_io_cb = std::bind(&LogStoreTest::do_insert, this); SampleDB::instance().m_io_closure = bind_this(LogStoreTest::on_insert_completion, 3); @@ -594,6 +562,15 @@ class LogStoreTest : public ::testing::Test { m_batch_size = batch_size; m_q_depth = q_depth; m_holes_per_batch = holes_per_batch; + std::thread([this]() { + while (m_schedule_more_inserts.load() && m_nrecords_waiting_to_issue) { + lanch_inserts(); + m_pending_smph.try_acquire_for((std::chrono::seconds(1))); + } + }).detach(); + } + + void lanch_inserts() { iomanager.run_on_forget(iomgr::reactor_regex::all_io, []() { if (SampleDB::instance().m_on_schedule_io_cb) SampleDB::instance().m_on_schedule_io_cb(); }); @@ -601,21 +578,19 @@ class LogStoreTest : public ::testing::Test { void do_insert() { // Randomly pick a store client and write journal entry batch. - for (;;) { - uint32_t batch_size{0}; - { - std::unique_lock< std::mutex > lock{m_pending_mtx}; - const bool insert = (m_nrecords_waiting_to_issue > 0) && (m_nrecords_waiting_to_complete < m_q_depth); - if (insert) { - batch_size = std::min< uint32_t >(m_batch_size, m_nrecords_waiting_to_issue); - m_nrecords_waiting_to_issue -= batch_size; - m_nrecords_waiting_to_complete += batch_size; - } else { - break; - } + uint32_t batch_size{0}; + { + std::unique_lock< std::mutex > lock{m_pending_mtx}; + const bool insert = (m_nrecords_waiting_to_issue > 0) && (m_nrecords_waiting_to_complete < m_q_depth); + if (insert) { + batch_size = std::min< uint32_t >(m_batch_size, m_nrecords_waiting_to_issue); + m_nrecords_waiting_to_issue -= batch_size; + m_nrecords_waiting_to_complete += batch_size; + } else { + return; } - pick_log_store()->insert_next_batch(batch_size, std::min(batch_size, m_holes_per_batch)); } + pick_log_store()->insert_next_batch(batch_size, std::min(batch_size, m_holes_per_batch)); } void on_insert_completion([[maybe_unused]] logdev_id_t fid, logstore_seq_num_t lsn, logdev_key ld_key) { @@ -627,10 +602,10 @@ class LogStoreTest : public ::testing::Test { if ((--m_nrecords_waiting_to_complete == 0) && (waiting_to_issue == 0)) { notify = true; } } if (notify) { + m_schedule_more_inserts.store(false); m_pending_cv.notify_all(); - } else if (waiting_to_issue > 0) { - do_insert(); } + if (!m_nrecords_waiting_to_complete) m_pending_smph.release(); } void wait_for_inserts() { @@ -735,7 +710,7 @@ class LogStoreTest : public ::testing::Test { ++skip_truncation; continue; } - lsc->truncate(lsc->m_log_store->get_contiguous_completed_seq_num(-1)); + lsc->truncate(c_seq_num); lsc->read_validate(); } @@ -744,42 +719,37 @@ class LogStoreTest : public ::testing::Test { return; } - bool failed{false}; - logstore_service().device_truncate( - [this, is_parallel_to_write, &failed](auto& trunc_lds) { - bool expect_forward_progress{true}; - uint32_t n_fully_truncated{0}; - if (is_parallel_to_write) { - for (const auto& lsc : SampleDB::instance().m_log_store_clients) { - if (lsc->has_all_lsns_truncated()) ++n_fully_truncated; - } + logstore_service().device_truncate(); - // While inserts are going on, truncation can guaranteed to be forward progressed if none of the - // log stores are fully truncated. If all stores are fully truncated, its obvious no progress, - // but even if one of the store is fully truncated, then it might be possible that logstore is - // holding lowest logdev location and waiting for next flush to finish to move the safe logdev - // location. - expect_forward_progress = (n_fully_truncated == 0); - } + bool expect_forward_progress{true}; + uint32_t n_fully_truncated{0}; + if (is_parallel_to_write) { + for (const auto& lsc : SampleDB::instance().m_log_store_clients) { + if (lsc->has_all_lsns_truncated()) ++n_fully_truncated; + } - if (expect_forward_progress) { - for (auto [fid, trunc_loc] : trunc_lds) { - if (trunc_loc == logdev_key::out_of_bound_ld_key()) { - LOGINFO("No forward progress for device truncation yet."); - } else { - // Validate the truncation is actually moving forward - auto idx = m_truncate_log_idx.count(fid) ? m_truncate_log_idx[fid].load() : -1; - if (trunc_loc.idx <= idx) { failed = true; } - ASSERT_GT(trunc_loc.idx, idx); - m_truncate_log_idx[fid].store(trunc_loc.idx); - } - } - } else { - LOGINFO("Do not expect forward progress for device truncation"); - } - }, - true /* wait_till_done */); - ASSERT_FALSE(failed); + // While inserts are going on, truncation can guaranteed to be forward progressed if none of the + // log stores are fully truncated. If all stores are fully truncated, its obvious no progress, + // but even if one of the store is fully truncated, then it might be possible that logstore is + // holding lowest logdev location and waiting for next flush to finish to move the safe logdev + // location. + expect_forward_progress = (n_fully_truncated == 0); + } + + if (expect_forward_progress) { + // we have multiple log_store for each logdev, so we need to update the highest truncated lsn and then do + // the verification + for (const auto& lsc : SampleDB::instance().m_log_store_clients) { + auto fid = lsc->m_log_store->get_logdev()->get_id(); + auto trunc_loc = lsc->m_log_store->get_trunc_ld_key(); + if (trunc_loc == logdev_key::out_of_bound_ld_key()) + LOGINFO("No forward progress for device truncation yet.") + if (m_truncate_log_idx.count(fid) && trunc_loc.idx > m_truncate_log_idx[fid].load()) + m_truncate_log_idx[fid].store(trunc_loc.idx); + } + } else { + LOGINFO("Do not expect forward progress for device truncation"); + } for (auto& logdev : logstore_service().get_all_logdevs()) { auto fid = logdev->get_id(); @@ -790,6 +760,7 @@ class LogStoreTest : public ::testing::Test { it = m_garbage_stores_upto[fid].erase(it); } } + validate_num_stores(); } @@ -910,6 +881,9 @@ class LogStoreTest : public ::testing::Test { std::map< logdev_id_t, std::map< logid_t, uint32_t > > m_garbage_stores_upto; std::array< uint32_t, 100 > m_store_distribution; + std::binary_semaphore m_pending_smph{0}; + std::atomic_bool m_schedule_more_inserts{true}; + uint32_t m_q_depth{64}; uint32_t m_batch_size{1}; uint32_t m_holes_per_batch{0}; @@ -965,8 +939,6 @@ TEST_F(LogStoreTest, BurstRandInsertThenTruncate) { } } -// TODO evaluate this test is valid and enable after fixing the flush lock. -#if 0 TEST_F(LogStoreTest, BurstSeqInsertAndTruncateInParallel) { const auto num_records = SISL_OPTIONS["num_records"].as< uint32_t >(); const auto iterations = SISL_OPTIONS["iterations"].as< uint32_t >(); @@ -1004,7 +976,6 @@ TEST_F(LogStoreTest, BurstSeqInsertAndTruncateInParallel) { this->truncate_validate(); } } -#endif TEST_F(LogStoreTest, RandInsertsWithHoles) { const auto num_records = SISL_OPTIONS["num_records"].as< uint32_t >(); @@ -1122,7 +1093,7 @@ TEST_F(LogStoreTest, ThrottleSeqInsertThenRecover) { this->iterate_validate(true); LOGINFO("Step 5: Restart homestore"); - SampleDB::instance().start_homestore(true /* restart */); + SampleDB::instance().start_homestore(true); this->recovery_validate(); this->init(num_records); @@ -1130,7 +1101,7 @@ TEST_F(LogStoreTest, ThrottleSeqInsertThenRecover) { this->read_validate(true); LOGINFO("Step 6: Restart homestore again to validate recovery on consecutive restarts"); - SampleDB::instance().start_homestore(true /* restart */); + SampleDB::instance().start_homestore(true); this->recovery_validate(); this->init(num_records); @@ -1147,7 +1118,7 @@ TEST_F(LogStoreTest, ThrottleSeqInsertThenRecover) { this->iterate_validate(true); LOGINFO("Step 10: Restart homestore again to validate recovery after inserts"); - SampleDB::instance().start_homestore(true /* restart */); + SampleDB::instance().start_homestore(true); this->recovery_validate(); this->init(num_records); @@ -1263,8 +1234,7 @@ TEST_F(LogStoreTest, WriteSyncThenRead) { bool io_memory{false}; auto* d = SampleLogStoreClient::prepare_data(i, io_memory); - const bool succ = tmp_log_store->write_sync(i, {uintptr_cast(d), d->total_size(), false}); - EXPECT_TRUE(succ); + tmp_log_store->write_and_flush(i, {uintptr_cast(d), d->total_size(), false}); LOGINFO("Written sync data for LSN -> {}", i); if (io_memory) { @@ -1292,7 +1262,8 @@ SISL_OPTIONS_ENABLE(logging, test_log_store, iomgr, test_common_setup) SISL_OPTION_GROUP(test_log_store, (num_logdevs, "", "num_logdevs", "number of log devs", ::cxxopts::value< uint32_t >()->default_value("4"), "number"), - (num_logstores, "", "num_logstores", "number of log stores", + (num_logstores, "", "num_logstores", + "number of log stores in all, they will spread to each logdev evenly", ::cxxopts::value< uint32_t >()->default_value("16"), "number"), (num_records, "", "num_records", "number of record to test", ::cxxopts::value< uint32_t >()->default_value("10000"), "number"), @@ -1307,9 +1278,5 @@ int main(int argc, char* argv[]) { spdlog::set_pattern("[%D %T%z] [%^%l%$] [%t] %v"); sisl::logging::SetModuleLogLevel("logstore", spdlog::level::level_enum::trace); sisl::logging::SetModuleLogLevel("journalvdev", spdlog::level::level_enum::debug); - - SampleDB::instance().start_homestore(); - const int ret = RUN_ALL_TESTS(); - SampleDB::instance().shutdown(SISL_OPTIONS["num_devs"].as< uint32_t >()); - return ret; + return RUN_ALL_TESTS(); } diff --git a/src/tests/test_log_store_long_run.cpp b/src/tests/test_log_store_long_run.cpp index a5e6009ba..0f4994158 100644 --- a/src/tests/test_log_store_long_run.cpp +++ b/src/tests/test_log_store_long_run.cpp @@ -135,7 +135,7 @@ class SampleLogStoreClient { } // Because of restart in tests, we have torce the flush of log entries. - m_log_store->get_logdev()->flush_if_needed(1); + m_log_store->get_logdev()->flush_if_necessary(1); } void read_validate(const bool expect_all_completed = false) { @@ -164,35 +164,18 @@ class SampleLogStoreClient { } void rollback_validate(uint32_t num_lsns_to_rollback) { - std::mutex mtx; - std::condition_variable cv; - bool rollback_done = false; - if (m_log_store->truncated_upto() == m_log_store->get_contiguous_completed_seq_num(-1)) { // No records to rollback. return; } - if ((m_cur_lsn - num_lsns_to_rollback - 1) <= m_log_store->get_contiguous_issued_seq_num(-1)) { return; } - auto const upto_lsn = m_cur_lsn.fetch_sub(num_lsns_to_rollback) - num_lsns_to_rollback - 1; - - m_log_store->rollback_async(upto_lsn, [&](logstore_seq_num_t) { - ASSERT_EQ(m_log_store->get_contiguous_completed_seq_num(-1), upto_lsn) - << "Last completed seq num is not reset after rollback"; - ASSERT_EQ(m_log_store->get_contiguous_issued_seq_num(-1), upto_lsn) - << "Last issued seq num is not reset after rollback"; - read_validate(true); - { - std::unique_lock lock(mtx); - rollback_done = true; - } - cv.notify_one(); - }); - - // We wait till async rollback is finished as we do validation. - std::unique_lock lock(mtx); - cv.wait(lock, [&rollback_done] { return rollback_done == true; }); + m_log_store->rollback(upto_lsn); + ASSERT_EQ(m_log_store->get_contiguous_completed_seq_num(-1), upto_lsn) + << "Last completed seq num is not reset after rollback"; + ASSERT_EQ(m_log_store->get_contiguous_issued_seq_num(-1), upto_lsn) + << "Last issued seq num is not reset after rollback"; + read_validate(true); } void recovery_validate() { @@ -225,7 +208,7 @@ class SampleLogStoreClient { m_truncated_upto_lsn = lsn; } - void flush() { m_log_store->flush_sync(); } + void flush() { m_log_store->flush(); } bool has_all_lsns_truncated() const { return (m_truncated_upto_lsn.load() == (m_cur_lsn.load() - 1)); } @@ -471,13 +454,8 @@ class LogStoreLongRun : public ::testing::Test { return; } - logstore_service().device_truncate( - [this](auto& trunc_lds) { - for (auto [fid, trunc_loc] : trunc_lds) { - m_truncate_log_idx[fid].store(trunc_loc.idx); - } - }, - true /* wait_till_done */); + logstore_service().device_truncate(); + validate_num_stores(); } @@ -554,7 +532,6 @@ class LogStoreLongRun : public ::testing::Test { uint64_t m_nrecords_waiting_to_complete{0}; std::mutex m_pending_mtx; std::condition_variable m_pending_cv; - std::map< logdev_id_t, std::atomic< logid_t > > m_truncate_log_idx; uint32_t m_q_depth{64}; uint32_t m_batch_size{1}; std::random_device rd{};