diff --git a/conanfile.py b/conanfile.py index d417c3fc7..f18a621f6 100644 --- a/conanfile.py +++ b/conanfile.py @@ -9,7 +9,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "6.4.42" + version = "6.4.43" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" topics = ("ebay", "nublox") diff --git a/src/lib/logstore/log_dev.cpp b/src/lib/logstore/log_dev.cpp index 0e22c9825..be0ad2607 100644 --- a/src/lib/logstore/log_dev.cpp +++ b/src/lib/logstore/log_dev.cpp @@ -113,6 +113,12 @@ void LogDev::stop() { { std::unique_lock lg = flush_guard(); m_stopped = true; + // waiting under lock to make sure no new flush is started + while (m_pending_callback.load() > 0) { + THIS_LOGDEV_LOG(INFO, "Waiting for pending callbacks to complete, pending callbacks {}", + m_pending_callback.load()); + std::this_thread::sleep_for(std::chrono::milliseconds{1000}); + } } // after we call stop, we need to do any pending device truncations truncate(); @@ -489,13 +495,16 @@ void LogDev::on_flush_completion(LogGroup* lg) { // since we support out-of-order lsn write, so no need to guarantee the order of logstore write completion // TODO:: add some logic to guarantee all the callback is done when stop. - for (auto const& [idx, req] : req_map) + for (auto const& [idx, req] : req_map) { + m_pending_callback++; iomanager.run_on_forget(iomgr::reactor_regex::random_worker, iomgr::fiber_regex::syncio_only, [this, dev_offset, idx, req]() { auto ld_key = logdev_key{idx, dev_offset}; auto comp_cb = req->log_store->get_comp_cb(); (req->cb) ? req->cb(req, ld_key) : comp_cb(req, ld_key); + m_pending_callback--; }); + } } uint64_t LogDev::truncate() { diff --git a/src/lib/logstore/log_dev.hpp b/src/lib/logstore/log_dev.hpp index d5a545453..cf09e57bc 100644 --- a/src/lib/logstore/log_dev.hpp +++ b/src/lib/logstore/log_dev.hpp @@ -810,6 +810,7 @@ class LogDev : public std::enable_shared_from_this< LogDev > { // callback of the append_async we schedule another flush.), so we need the lock to be locked for multitimes in the // same thread. iomgr::FiberManagerLib::mutex m_flush_mtx; + std::atomic_uint64_t m_pending_callback{0}; }; // LogDev } // namespace homestore