From d84c2adb8b4ebdc3341e86fe218b481cb31e30a6 Mon Sep 17 00:00:00 2001 From: Xiaoxi Chen Date: Wed, 10 Jul 2024 09:16:30 -0700 Subject: [PATCH] sync_flush should wait is_completed not is_active. Log entry added into StreamTracker in write_async , through m_records.create(). Once create the status is active. Later on in HomeLogStore::on_write_completion it update the tracker and in the `m_records.update`, is_completed set to true. We should wait for is_completed == true to indicate a log entry has been persisted. Signed-off-by: Xiaoxi Chen --- src/lib/logstore/log_store.cpp | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/lib/logstore/log_store.cpp b/src/lib/logstore/log_store.cpp index bd60291c6..2efe84298 100644 --- a/src/lib/logstore/log_store.cpp +++ b/src/lib/logstore/log_store.cpp @@ -125,14 +125,17 @@ logstore_seq_num_t HomeLogStore::append_async(const sisl::io_blob& b, void* cook log_buffer HomeLogStore::read_sync(logstore_seq_num_t seq_num) { // If seq_num has not been flushed yet, but issued, then we flush them before reading - auto const s = m_records.status(seq_num); + auto s = m_records.status(seq_num); + THIS_LOGSTORE_LOG(DEBUG, "read_sync seq_num {}, is_active {}, is_complete {}, is_hole {}, is_out_of_range {}", seq_num, s.is_active, s.is_completed, s.is_hole, s.is_out_of_range); if (s.is_out_of_range || s.is_hole) { // THIS_LOGSTORE_LOG(ERROR, "ld_key not valid {}", seq_num); throw std::out_of_range("key not valid"); } else if (!s.is_completed) { - THIS_LOGSTORE_LOG(TRACE, "Reading lsn={}:{} before flushed, doing flush first", m_store_id, seq_num); + THIS_LOGSTORE_LOG(DEBUG, "Reading lsn={}:{} before flushed, doing flush first", m_store_id, seq_num); flush_sync(seq_num); + THIS_LOGSTORE_LOG(DEBUG, "After flushing, seq_num {}, is_active {}, is_complete {}, is_hole {}, is_out_of_range {}", seq_num, s.is_active, s.is_completed, s.is_hole, s.is_out_of_range); } + s = m_records.status(seq_num); const auto record = m_records.at(seq_num); const logdev_key ld_key = record.m_dev_key; @@ -405,7 +408,7 @@ void HomeLogStore::flush_sync(logstore_seq_num_t upto_seq_num) { if (upto_seq_num == invalid_lsn()) { upto_seq_num = m_records.active_upto(); } // if we have flushed already, we are done - if (!m_records.status(upto_seq_num).is_active) { return; } + if (m_records.status(upto_seq_num).is_completed) { return; } { std::unique_lock lk(m_sync_flush_mtx); @@ -416,13 +419,13 @@ void HomeLogStore::flush_sync(logstore_seq_num_t upto_seq_num) { // Step 2: After marking this lsn, we again do a check, to avoid a race where completion checked for no lsn // and the lsn is stored in step 1 above. - if (!m_records.status(upto_seq_num).is_active) { return; } + if (m_records.status(upto_seq_num).is_completed) { return; } // Step 3: Force a flush (with least threshold) m_logdev->flush_if_needed(1); // Step 4: Wait for completion - m_sync_flush_cv.wait(lk, [this, upto_seq_num] { return !m_records.status(upto_seq_num).is_active; }); + m_sync_flush_cv.wait_for(lk, std::chrono::milliseconds(10), [this, upto_seq_num] { return m_records.status(upto_seq_num).is_completed; }); // NOTE: We are not resetting the lsn because same seq number should never have 2 completions and thus not // doing it saves an atomic instruction