diff --git a/conanfile.py b/conanfile.py index d417c3fc7..f18a621f6 100644 --- a/conanfile.py +++ b/conanfile.py @@ -9,7 +9,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "6.4.42" + version = "6.4.43" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" topics = ("ebay", "nublox") diff --git a/src/include/homestore/logstore/log_store.hpp b/src/include/homestore/logstore/log_store.hpp index 8a570757c..71a537756 100644 --- a/src/include/homestore/logstore/log_store.hpp +++ b/src/include/homestore/logstore/log_store.hpp @@ -107,6 +107,14 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > { * @param b Blob of data to append * @param cookie Passed as is to the completion callback * @param completion_cb Completion callback which contains the seqnum, status and cookie + * + * Note that: completion_cb will be executed in background fibers, so different completion_cbs probabaly be executed + * concurrently. also, logstore does not guarantee the order of completion_cb execution. that means, the caller + * should: + * + * 1 add lock in the completion_cb if the caller wants to make sure the safety of concurrent execution. + * 2 keep in mind that the completion_cb probabaly be executed in different order than the append order. + * * @return internally generated sequence number */ logstore_seq_num_t append_async(const sisl::io_blob& b, void* cookie, const log_write_comp_cb_t& completion_cb); diff --git a/src/lib/logstore/log_dev.cpp b/src/lib/logstore/log_dev.cpp index 0e22c9825..507f02f19 100644 --- a/src/lib/logstore/log_dev.cpp +++ b/src/lib/logstore/log_dev.cpp @@ -113,6 +113,12 @@ void LogDev::stop() { { std::unique_lock lg = flush_guard(); m_stopped = true; + // waiting under lock to make sure no new flush is started + while (m_pending_callback.load() > 0) { + THIS_LOGDEV_LOG(INFO, "Waiting for pending callbacks to complete, pending callbacks {}", + m_pending_callback.load()); + std::this_thread::sleep_for(std::chrono::milliseconds{1000}); + } } // after we call stop, we need to do any pending device truncations truncate(); @@ -488,14 +494,16 @@ void LogDev::on_flush_completion(LogGroup* lg) { m_last_flush_idx = upto_indx; // since we support out-of-order lsn write, so no need to guarantee the order of logstore write completion - // TODO:: add some logic to guarantee all the callback is done when stop. - for (auto const& [idx, req] : req_map) + for (auto const& [idx, req] : req_map) { + m_pending_callback++; iomanager.run_on_forget(iomgr::reactor_regex::random_worker, iomgr::fiber_regex::syncio_only, [this, dev_offset, idx, req]() { auto ld_key = logdev_key{idx, dev_offset}; auto comp_cb = req->log_store->get_comp_cb(); (req->cb) ? req->cb(req, ld_key) : comp_cb(req, ld_key); + m_pending_callback--; }); + } } uint64_t LogDev::truncate() { diff --git a/src/lib/logstore/log_dev.hpp b/src/lib/logstore/log_dev.hpp index d5a545453..cf09e57bc 100644 --- a/src/lib/logstore/log_dev.hpp +++ b/src/lib/logstore/log_dev.hpp @@ -810,6 +810,7 @@ class LogDev : public std::enable_shared_from_this< LogDev > { // callback of the append_async we schedule another flush.), so we need the lock to be locked for multitimes in the // same thread. iomgr::FiberManagerLib::mutex m_flush_mtx; + std::atomic_uint64_t m_pending_callback{0}; }; // LogDev } // namespace homestore diff --git a/src/lib/logstore/log_stream.cpp b/src/lib/logstore/log_stream.cpp index c5e732ccf..d67121dd1 100644 --- a/src/lib/logstore/log_stream.cpp +++ b/src/lib/logstore/log_stream.cpp @@ -61,8 +61,8 @@ sisl::byte_view log_stream_reader::next_group(off_t* out_dev_offset) { HS_REL_ASSERT_GE(m_cur_log_buf.size(), m_read_size_multiple); const auto* header = r_cast< log_group_header const* >(m_cur_log_buf.bytes()); if (header->magic_word() != LOG_GROUP_HDR_MAGIC) { - LOGERROR("Logdev data not seeing magic at pos {}, must have come to end of log_dev={}", - m_vdev_jd->dev_offset(m_cur_read_bytes), m_vdev_jd->logdev_id()); + LOGDEBUGMOD(logstore, "Logdev data not seeing magic at pos {}, must have come to end of log_dev={}", + m_vdev_jd->dev_offset(m_cur_read_bytes), m_vdev_jd->logdev_id()); *out_dev_offset = m_vdev_jd->dev_offset(m_cur_read_bytes); // move it by dma boundary if header is not valid m_prev_crc = 0; @@ -86,8 +86,9 @@ sisl::byte_view log_stream_reader::next_group(off_t* out_dev_offset) { // compare it with prev crc if (m_prev_crc != 0 && m_prev_crc != header->prev_grp_crc) { // we reached at the end - LOGERROR("we have reached the end. crc doesn't match offset {} prev crc {} header prev crc {} log_dev={}", - m_vdev_jd->dev_offset(m_cur_read_bytes), header->prev_grp_crc, m_prev_crc, m_vdev_jd->logdev_id()); + LOGDEBUGMOD(logstore, + "we have reached the end. crc doesn't match offset {} prev crc {} header prev crc {} log_dev={}", + m_vdev_jd->dev_offset(m_cur_read_bytes), header->prev_grp_crc, m_prev_crc, m_vdev_jd->logdev_id()); *out_dev_offset = m_vdev_jd->dev_offset(m_cur_read_bytes); if (!m_vdev_jd->is_offset_at_last_chunk(*out_dev_offset)) { HS_REL_ASSERT(0, "data is corrupted {}", m_vdev_jd->logdev_id()); diff --git a/src/tests/test_log_store.cpp b/src/tests/test_log_store.cpp index e76244bff..6e1b6234c 100644 --- a/src/tests/test_log_store.cpp +++ b/src/tests/test_log_store.cpp @@ -495,13 +495,18 @@ class SampleDB { m_helper.shutdown_homestore(cleanup); if (cleanup) { m_log_store_clients.clear(); + std::unique_lock lock{m_completion_mtx}; m_highest_log_idx.clear(); } } void on_log_insert_completion(logdev_id_t fid, logstore_seq_num_t lsn, logdev_key ld_key) { - if (m_highest_log_idx.count(fid) == 0) { m_highest_log_idx[fid] = std::atomic{-1}; } - atomic_update_max(m_highest_log_idx[fid], ld_key.idx); + { + std::unique_lock lock{m_completion_mtx}; + if (m_highest_log_idx.count(fid) == 0) { m_highest_log_idx[fid] = std::atomic{-1}; } + atomic_update_max(m_highest_log_idx[fid], ld_key.idx); + } + if (m_io_closure) m_io_closure(fid, lsn, ld_key); } @@ -519,6 +524,7 @@ class SampleDB { } logid_t highest_log_idx(logdev_id_t fid) { + std::unique_lock lock{m_completion_mtx}; return m_highest_log_idx.count(fid) ? m_highest_log_idx[fid].load() : -1; } @@ -529,6 +535,7 @@ class SampleDB { std::vector< std::unique_ptr< SampleLogStoreClient > > m_log_store_clients; std::map< logdev_id_t, std::atomic< logid_t > > m_highest_log_idx; test_common::HSTestHelper m_helper; + std::mutex m_completion_mtx; }; class LogStoreTest : public ::testing::Test {