From aa7443fe3e7a9b32dc8dd0c73eedf06be0bef9cd Mon Sep 17 00:00:00 2001 From: Brian Szmyd Date: Tue, 17 Dec 2024 21:08:24 -0700 Subject: [PATCH] Implement flush_sync --- conanfile.py | 2 +- src/homelogstore/log_store.cpp | 32 ++++++++++++++++++++++++++++++++ src/homelogstore/log_store.hpp | 15 +++++++++------ 3 files changed, 42 insertions(+), 7 deletions(-) diff --git a/conanfile.py b/conanfile.py index 71c6ffc40..101616b31 100644 --- a/conanfile.py +++ b/conanfile.py @@ -55,7 +55,7 @@ def build_requirements(self): def requirements(self): self.requires("iomgr/[~=8]") - self.requires("sisl/[~=8]") + self.requires("sisl/[~=8.9]") # FOSS, rarely updated self.requires("boost/1.79.0") diff --git a/src/homelogstore/log_store.cpp b/src/homelogstore/log_store.cpp index 9a30359f8..feafefa9f 100644 --- a/src/homelogstore/log_store.cpp +++ b/src/homelogstore/log_store.cpp @@ -173,6 +173,11 @@ void HomeLogStore::on_write_completion(logstore_req* const req, const logdev_key m_flush_batch_max_lsn = std::max(m_flush_batch_max_lsn, req->seq_num); HISTOGRAM_OBSERVE(HomeLogStoreMgrSI().m_metrics, logstore_append_latency, get_elapsed_time_us(req->start_time)); (req->cb) ? req->cb(req, ld_key) : m_comp_cb(req, ld_key); + + if (m_sync_flush_waiter_lsn.load() == req->seq_num) { + // Sync flush is waiting for this lsn to be completed, wake up the sync flush cv + m_sync_flush_cv.notify_one(); + } } void HomeLogStore::on_read_completion(logstore_req* const req, const logdev_key ld_key) { @@ -437,6 +442,33 @@ logstore_seq_num_t HomeLogStore::get_contiguous_completed_seq_num(const logstore return (logstore_seq_num_t)m_records.completed_upto(from + 1); } +void HomeLogStore::flush_sync(logstore_seq_num_t upto_seq_num) { + // Logdev flush is async call and if flush_sync is called on the same thread which could potentially do logdev + // flush, waiting sync would cause deadlock. + HS_DBG_ASSERT_EQ(LogDev::flush_in_current_thread(), false, + "Logstore flush sync cannot be called on same thread which could do logdev flush"); + + if (upto_seq_num == invalid_lsn()) { upto_seq_num = m_records.active_upto(); } + + // if we have flushed already, we are done + if (m_records.completed_upto() >= upto_seq_num) { return; } + { + std::unique_lock lk(m_sync_flush_mtx); + // Step 1: Mark the waiter lsn to the seqnum we wanted to wait for. The completion of every lsn checks + // for this and if this lsn is completed, will make a callback which signals the cv. + m_sync_flush_waiter_lsn.store(upto_seq_num); + // Step 2: After marking this lsn, we again do a check, to avoid a race where completion checked for no lsn + // and the lsn is stored in step 1 above. + if (m_records.completed_upto() >= upto_seq_num) { return; } + // Step 3: Force a flush (with least threshold) + m_logdev.flush_if_needed(); + // Step 4: Wait for completion + m_sync_flush_cv.wait(lk, [this, upto_seq_num] { return m_records.completed_upto() >= upto_seq_num; }); + // NOTE: We are not resetting the lsn because same seq number should never have 2 completions and thus not + // doing it saves an atomic instruction + } +} + uint64_t HomeLogStore::rollback_async(logstore_seq_num_t to_lsn, on_rollback_cb_t cb) { // Validate if the lsn to which it is rolledback to is not truncated. auto ret = m_records.status(to_lsn + 1); diff --git a/src/homelogstore/log_store.hpp b/src/homelogstore/log_store.hpp index ec44f09cd..492e1d420 100644 --- a/src/homelogstore/log_store.hpp +++ b/src/homelogstore/log_store.hpp @@ -489,15 +489,13 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > { } /** - * @brief Sync the log store to disk + * @brief Flush this log store (write/sync to disk) up to the sequence number * - * @param + * @param seq_num Sequence number upto which logs are to be flushed. If not provided, will wait to flush all seq + * numbers issued prior. * @return True on success */ - bool sync() { - // TODO: Implement this method - return true; - } + void flush_sync(logstore_seq_num_t upto_seq_num = invalid_lsn()); /** * @brief Rollback the given instance to the given sequence number @@ -540,6 +538,11 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > { // batch logstore_seq_num_t m_flush_batch_max_lsn{std::numeric_limits< logstore_seq_num_t >::min()}; + // Sync flush sections + std::atomic< logstore_seq_num_t > m_sync_flush_waiter_lsn{invalid_lsn()}; + std::mutex m_sync_flush_mtx; + std::condition_variable m_sync_flush_cv; + std::vector< seq_ld_key_pair > m_truncation_barriers; // List of truncation barriers truncation_info m_safe_truncation_boundary; sisl::sobject_ptr m_sobject;