Skip to content

Commit

Permalink
Implement flush_sync
Browse files Browse the repository at this point in the history
  • Loading branch information
szmyd committed Dec 18, 2024
1 parent 75c50c7 commit aa7443f
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 7 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def build_requirements(self):

def requirements(self):
self.requires("iomgr/[~=8]")
self.requires("sisl/[~=8]")
self.requires("sisl/[~=8.9]")

# FOSS, rarely updated
self.requires("boost/1.79.0")
Expand Down
32 changes: 32 additions & 0 deletions src/homelogstore/log_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,11 @@ void HomeLogStore::on_write_completion(logstore_req* const req, const logdev_key
m_flush_batch_max_lsn = std::max(m_flush_batch_max_lsn, req->seq_num);
HISTOGRAM_OBSERVE(HomeLogStoreMgrSI().m_metrics, logstore_append_latency, get_elapsed_time_us(req->start_time));
(req->cb) ? req->cb(req, ld_key) : m_comp_cb(req, ld_key);

if (m_sync_flush_waiter_lsn.load() == req->seq_num) {
// Sync flush is waiting for this lsn to be completed, wake up the sync flush cv
m_sync_flush_cv.notify_one();
}
}

void HomeLogStore::on_read_completion(logstore_req* const req, const logdev_key ld_key) {
Expand Down Expand Up @@ -437,6 +442,33 @@ logstore_seq_num_t HomeLogStore::get_contiguous_completed_seq_num(const logstore
return (logstore_seq_num_t)m_records.completed_upto(from + 1);
}

void HomeLogStore::flush_sync(logstore_seq_num_t upto_seq_num) {
// Logdev flush is async call and if flush_sync is called on the same thread which could potentially do logdev
// flush, waiting sync would cause deadlock.
HS_DBG_ASSERT_EQ(LogDev::flush_in_current_thread(), false,
"Logstore flush sync cannot be called on same thread which could do logdev flush");

if (upto_seq_num == invalid_lsn()) { upto_seq_num = m_records.active_upto(); }

// if we have flushed already, we are done
if (m_records.completed_upto() >= upto_seq_num) { return; }
{
std::unique_lock lk(m_sync_flush_mtx);
// Step 1: Mark the waiter lsn to the seqnum we wanted to wait for. The completion of every lsn checks
// for this and if this lsn is completed, will make a callback which signals the cv.
m_sync_flush_waiter_lsn.store(upto_seq_num);
// Step 2: After marking this lsn, we again do a check, to avoid a race where completion checked for no lsn
// and the lsn is stored in step 1 above.
if (m_records.completed_upto() >= upto_seq_num) { return; }
// Step 3: Force a flush (with least threshold)
m_logdev.flush_if_needed();
// Step 4: Wait for completion
m_sync_flush_cv.wait(lk, [this, upto_seq_num] { return m_records.completed_upto() >= upto_seq_num; });
// NOTE: We are not resetting the lsn because same seq number should never have 2 completions and thus not
// doing it saves an atomic instruction
}
}

uint64_t HomeLogStore::rollback_async(logstore_seq_num_t to_lsn, on_rollback_cb_t cb) {
// Validate if the lsn to which it is rolledback to is not truncated.
auto ret = m_records.status(to_lsn + 1);
Expand Down
15 changes: 9 additions & 6 deletions src/homelogstore/log_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -489,15 +489,13 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > {
}

/**
* @brief Sync the log store to disk
* @brief Flush this log store (write/sync to disk) up to the sequence number
*
* @param
* @param seq_num Sequence number upto which logs are to be flushed. If not provided, will wait to flush all seq
* numbers issued prior.
* @return True on success
*/
bool sync() {
// TODO: Implement this method
return true;
}
void flush_sync(logstore_seq_num_t upto_seq_num = invalid_lsn());

/**
* @brief Rollback the given instance to the given sequence number
Expand Down Expand Up @@ -540,6 +538,11 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > {
// batch
logstore_seq_num_t m_flush_batch_max_lsn{std::numeric_limits< logstore_seq_num_t >::min()};

// Sync flush sections
std::atomic< logstore_seq_num_t > m_sync_flush_waiter_lsn{invalid_lsn()};
std::mutex m_sync_flush_mtx;
std::condition_variable m_sync_flush_cv;

std::vector< seq_ld_key_pair > m_truncation_barriers; // List of truncation barriers
truncation_info m_safe_truncation_boundary;
sisl::sobject_ptr m_sobject;
Expand Down

0 comments on commit aa7443f

Please sign in to comment.