Skip to content

Commit

Permalink
sync_flush should wait is_completed not is_active.
Browse files Browse the repository at this point in the history
Log entry added into StreamTracker in write_async , through
m_records.create().  Once create the status is active.

Later on in HomeLogStore::on_write_completion it update the tracker
and in the `m_records.update`, is_completed set to true.

We should wait for is_completed == true to indicate a log entry has
been persisted.

Signed-off-by: Xiaoxi Chen <[email protected]>
  • Loading branch information
xiaoxichen committed Jul 10, 2024
1 parent 1a0cef8 commit d84c2ad
Showing 1 changed file with 8 additions and 5 deletions.
13 changes: 8 additions & 5 deletions src/lib/logstore/log_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,14 +125,17 @@ logstore_seq_num_t HomeLogStore::append_async(const sisl::io_blob& b, void* cook

log_buffer HomeLogStore::read_sync(logstore_seq_num_t seq_num) {
// If seq_num has not been flushed yet, but issued, then we flush them before reading
auto const s = m_records.status(seq_num);
auto s = m_records.status(seq_num);
THIS_LOGSTORE_LOG(DEBUG, "read_sync seq_num {}, is_active {}, is_complete {}, is_hole {}, is_out_of_range {}", seq_num, s.is_active, s.is_completed, s.is_hole, s.is_out_of_range);
if (s.is_out_of_range || s.is_hole) {
// THIS_LOGSTORE_LOG(ERROR, "ld_key not valid {}", seq_num);
throw std::out_of_range("key not valid");
} else if (!s.is_completed) {
THIS_LOGSTORE_LOG(TRACE, "Reading lsn={}:{} before flushed, doing flush first", m_store_id, seq_num);
THIS_LOGSTORE_LOG(DEBUG, "Reading lsn={}:{} before flushed, doing flush first", m_store_id, seq_num);
flush_sync(seq_num);
THIS_LOGSTORE_LOG(DEBUG, "After flushing, seq_num {}, is_active {}, is_complete {}, is_hole {}, is_out_of_range {}", seq_num, s.is_active, s.is_completed, s.is_hole, s.is_out_of_range);
}
s = m_records.status(seq_num);

const auto record = m_records.at(seq_num);
const logdev_key ld_key = record.m_dev_key;
Expand Down Expand Up @@ -405,7 +408,7 @@ void HomeLogStore::flush_sync(logstore_seq_num_t upto_seq_num) {
if (upto_seq_num == invalid_lsn()) { upto_seq_num = m_records.active_upto(); }

// if we have flushed already, we are done
if (!m_records.status(upto_seq_num).is_active) { return; }
if (m_records.status(upto_seq_num).is_completed) { return; }

{
std::unique_lock lk(m_sync_flush_mtx);
Expand All @@ -416,13 +419,13 @@ void HomeLogStore::flush_sync(logstore_seq_num_t upto_seq_num) {

// Step 2: After marking this lsn, we again do a check, to avoid a race where completion checked for no lsn
// and the lsn is stored in step 1 above.
if (!m_records.status(upto_seq_num).is_active) { return; }
if (m_records.status(upto_seq_num).is_completed) { return; }

// Step 3: Force a flush (with least threshold)
m_logdev->flush_if_needed(1);

// Step 4: Wait for completion
m_sync_flush_cv.wait(lk, [this, upto_seq_num] { return !m_records.status(upto_seq_num).is_active; });
m_sync_flush_cv.wait_for(lk, std::chrono::milliseconds(10), [this, upto_seq_num] { return m_records.status(upto_seq_num).is_completed; });

// NOTE: We are not resetting the lsn because same seq number should never have 2 completions and thus not
// doing it saves an atomic instruction
Expand Down

0 comments on commit d84c2ad

Please sign in to comment.