Skip to content

Commit

Permalink
log store sync flush (#469)
Browse files Browse the repository at this point in the history
  • Loading branch information
JacksonYao287 authored Aug 13, 2024
1 parent 4e803a0 commit 2080ad4
Show file tree
Hide file tree
Showing 18 changed files with 726 additions and 1,397 deletions.
3 changes: 1 addition & 2 deletions conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.4.40"

version = "6.4.41"
homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
topics = ("ebay", "nublox")
Expand Down
160 changes: 31 additions & 129 deletions src/include/homestore/logstore/log_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <set>
#include <unordered_map>
#include <vector>
#include <tuple>

#include <sisl/fds/buffer.hpp>
#include <sisl/fds/stream_tracker.hpp>
Expand Down Expand Up @@ -81,18 +82,6 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > {
*/
log_replay_done_cb_t get_log_replay_done_cb() const { return m_replay_done_cb; }

/**
* @brief Write the blob at the user specified seq number in sync manner. Under the covers it will call async
* write and then wait for its completion. As such this is much lesser performing than async version since it
* involves mutex/cv combination
*
* @param seq_num : Sequence number to insert data
* @param b : Data blob to write to log
*
* @return is write completed successfully.
*/
bool write_sync(logstore_seq_num_t seq_num, const sisl::io_blob& b);

/**
* @brief Write the blob at the user specified seq number - prepared as a request in async fashion.
*
Expand All @@ -110,17 +99,7 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > {
* @param cookie : Any cookie or context which will passed back in the callback
* @param cb Callback upon completion which is called with the status, seq_num and cookie that was passed.
*/
void write_async(logstore_seq_num_t seq_num, const sisl::io_blob& b, void* cookie, const log_write_comp_cb_t& cb,
bool flush_wait = false);

/**
* @brief This method appends the blob into the log and it returns the generated seq number
*
* @param b Blob of data to append
* @return logstore_seq_num_t Returns the seqnum generated by the log
*/
// This method is not implemented yet
logstore_seq_num_t append_sync(const sisl::io_blob& b);
void write_async(logstore_seq_num_t seq_num, const sisl::io_blob& b, void* cookie, const log_write_comp_cb_t& cb);

/**
* @brief This method appends the blob into the log and makes a callback at the end of the append.
Expand All @@ -132,6 +111,14 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > {
*/
logstore_seq_num_t append_async(const sisl::io_blob& b, void* cookie, const log_write_comp_cb_t& completion_cb);

/**
* @brief Write the blob at the user specified seq number and flush, just like write_sync
*
* @param seq_num: Seq number to write to
* @param b : Blob of data
*/
void write_and_flush(logstore_seq_num_t seq_num, const sisl::io_blob& b);

/**
* @brief Read the log provided the sequence number synchronously. This is not the most efficient way to read
* as reader will be blocked until read is completed. In addition, it is built on-top of async system by doing
Expand All @@ -144,25 +131,6 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > {
*/
log_buffer read_sync(logstore_seq_num_t seq_num);

/**
* @brief Read the log based on the logstore_req prepared. In case callback is supplied, it uses the callback
* to provide the data it has read. If not overridden, use default callback registered during initialization.
*
* @param req Request containing seq_num
* @param cb [OPTIONAL] Callback to get the data back, if it needs to be different from the default registered
* one.
*/
void read_async(logstore_req* req, const log_found_cb_t& cb = nullptr);

/**
* @brief Read the log for the seq_num and make the callback with the data
*
* @param seq_num Seqnumber to read the log from
* @param cookie Any cookie or context which will passed back in the callback
* @param cb Callback which contains seq_num, cookie and
*/
void read_async(logstore_seq_num_t seq_num, void* cookie, const log_found_cb_t& cb);

/**
* @brief Truncate the logs for this log store upto the seq_num provided (inclusive). Once truncated, the reads
* on seq_num <= upto_seq_num will return an error. The truncation in general is a 2 step process, where first
Expand All @@ -175,7 +143,6 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > {
* expensive and grouping them together yields better results.
*
* Note: this flag currently is not used, meaning all truncate is in memory only;
* @return number of records to truncate
*/
void truncate(logstore_seq_num_t upto_seq_num, bool in_memory_truncate_only = true);

Expand All @@ -188,30 +155,17 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > {
*/
void fill_gap(logstore_seq_num_t seq_num);

/**
* @brief Get the safe truncation log dev key from this log store perspective. Please note that the safe idx is
* not globally safe, but it is safe from this log store perspective only. To get global safe id, one should
* access all log stores and get the minimum of them before truncating.
*
* It could return invalid logdev_key which indicates that this log store does not have any valid logdev key
* to truncate. This could happen when there were no ios on this logstore since last truncation or at least no
* ios are flushed yet. The caller should simply ignore this return value.
*
* @return truncation_entry_t: Which contains the logdev key and its corresponding seq_num to truncate and also
* is that entry represents the entire log store.
*/
// truncation_entry_t get_safe_truncation_boundary() const;

/**
* @brief Get the last truncated seqnum upto which we have truncated. If called after recovery, it returns the
* first seq_num it has seen-1.
*
* @return logstore_seq_num_t
* @return the last truncated seqnum upto which we have truncated
*/
logstore_seq_num_t truncated_upto() const {
const auto ts{m_safe_truncation_boundary.seq_num.load(std::memory_order_acquire)};
return (ts == std::numeric_limits< logstore_seq_num_t >::max()) ? -1 : ts;
}
logstore_seq_num_t truncated_upto() const { return m_start_lsn.load(std::memory_order_acquire) - 1; }

logdev_key get_trunc_ld_key() const { return m_trunc_ld_key; }

std::tuple< logstore_seq_num_t, logdev_key, logstore_seq_num_t > truncate_info() const;

sisl::StreamTracker< logstore_record >& log_records() { return m_records; }

Expand Down Expand Up @@ -256,60 +210,31 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > {
*
* @param seq_num Sequence number upto which logs are to be flushed. If not provided, will wait to flush all seq
* numbers issued prior.
* @return True on success
*/
void flush_sync(logstore_seq_num_t upto_seq_num = invalid_lsn());
void flush(logstore_seq_num_t upto_seq_num = invalid_lsn());

/**
* @brief Rollback the given instance to the given sequence number
*
* @param seq_num Sequence number back which logs are to be rollbacked
* @param to_lsn Sequence number back which logs are to be rollbacked
* @return True on success
*/
uint64_t rollback_async(logstore_seq_num_t to_lsn, on_rollback_cb_t cb);
bool rollback(logstore_seq_num_t to_lsn);

auto seq_num() const { return m_seq_num.load(std::memory_order_acquire); }

std::shared_ptr< LogDev > get_logdev() { return m_logdev; }
auto start_lsn() const { return m_start_lsn.load(std::memory_order_acquire); }

nlohmann::json dump_log_store(const log_dump_req& dump_req = log_dump_req());

nlohmann::json get_status(int verbosity) const;

/**
* Retrieves the truncation information before device truncation.
*
* @return A constant reference to the truncation_info object representing the truncation information.
*/
const truncation_info& pre_device_truncation();

/**
* \brief post device truncation processing.
*
* This function is used to update safe truncation boundary to the specified `trunc_upto_key`.
*
* \param trunc_upto_key The key indicating the log entry up to which truncation has been performed.
*/
void post_device_truncation(const logdev_key& trunc_upto_key);

/**
* Handles the completion of a write operation in the log store.
*
* @param req The logstore_req object representing the completed write operation.
* @param ld_key The logdev_key associated with the completed write operation.
* @param flush_ld_key when we truncate to req, which position we should truncate in the logdev.
*/
void on_write_completion(logstore_req* req, const logdev_key& ld_key);

/**
* \brief Handles the completion of a read operation in the log store.
*
* This function is called when a read operation in the log store has completed.
* It takes a pointer to a logstore_req object and a logdev_key object as parameters.
*
* \param req The pointer to the logstore_req object representing the read request.
* \param ld_key The logdev_key object representing the key used for the read operation.
*/
void on_read_completion(logstore_req* req, const logdev_key& ld_key);
void on_write_completion(logstore_req* req, const logdev_key& ld_key, const logdev_key& flush_ld_key);

/**
* @brief Handles the event when a log is found.
Expand All @@ -327,27 +252,10 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > {
*/
void on_log_found(logstore_seq_num_t seq_num, const logdev_key& ld_key, const logdev_key& flush_ld_key,
log_buffer buf);
/**
* @brief Handles the completion of a batch flush operation to update internal state.
*
* This function is called when a batch flush operation is completed.
* It takes a `logdev_key` parameter that represents the key of the flushed batch.
*
* This function is also called during log store recovery;
*
* @param flush_batch_ld_key The key of the flushed batch.
*/
void on_batch_completion(const logdev_key& flush_batch_ld_key);

private:
/**
* Truncates the log store up to the specified sequence number.
*
* @param upto_seq_num The sequence number up to which the log store should be truncated.
*/
void do_truncate(logstore_seq_num_t upto_seq_num);
std::shared_ptr< LogDev > get_logdev() { return m_logdev; }

int search_max_le(logstore_seq_num_t input_sn);
auto get_comp_cb() const { return m_comp_cb; }

private:
logstore_id_t m_store_id;
Expand All @@ -357,21 +265,15 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > {
log_req_comp_cb_t m_comp_cb;
log_found_cb_t m_found_cb;
log_replay_done_cb_t m_replay_done_cb;
std::atomic< logstore_seq_num_t > m_seq_num;
// the first seq_num that is in the log store
std::atomic< logstore_seq_num_t > m_start_lsn;
// the next seq_num that will be put into the log store
std::atomic< logstore_seq_num_t > m_next_lsn;
// the last seq_num that is in the log store
std::atomic< logstore_seq_num_t > m_tail_lsn;
std::string m_fq_name;
LogStoreServiceMetrics& m_metrics;

// seq_ld_key_pair m_flush_batch_max = {-1, {0, 0}}; // The maximum seqnum we have seen in the prev flushed
// batch
logstore_seq_num_t m_flush_batch_max_lsn{std::numeric_limits< logstore_seq_num_t >::min()};

// Sync flush sections
std::atomic< logstore_seq_num_t > m_sync_flush_waiter_lsn{invalid_lsn()};
std::mutex m_sync_flush_mtx;
std::mutex m_single_sync_flush_mtx;
std::condition_variable m_sync_flush_cv;

std::vector< seq_ld_key_pair > m_truncation_barriers; // List of truncation barriers
truncation_info m_safe_truncation_boundary;
logdev_key m_trunc_ld_key{0, 0};
};
} // namespace homestore
39 changes: 8 additions & 31 deletions src/include/homestore/logstore/log_store_internal.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
#include <nlohmann/json.hpp>

namespace homestore {

///////////////////// All typedefs ///////////////////////////////
class logstore_req;
class HomeLogStore;
Expand All @@ -50,16 +49,14 @@ typedef std::function< void(logstore_seq_num_t, sisl::io_blob&, logdev_key, void
typedef std::function< void(logstore_seq_num_t, log_buffer, void*) > log_found_cb_t;
typedef std::function< void(std::shared_ptr< HomeLogStore >) > log_store_opened_cb_t;
typedef std::function< void(std::shared_ptr< HomeLogStore >, logstore_seq_num_t) > log_replay_done_cb_t;
typedef std::function< void(const std::unordered_map< logdev_id_t, logdev_key >&) > device_truncate_cb_t;

typedef int64_t logid_t;

struct logdev_key {
logid_t idx;
off_t dev_offset;

constexpr logdev_key(const logid_t idx = std::numeric_limits< logid_t >::min(),
const off_t dev_offset = std::numeric_limits< uint64_t >::min()) :
constexpr logdev_key(const logid_t idx = -1, const off_t dev_offset = std::numeric_limits< uint64_t >::min()) :
idx{idx}, dev_offset{dev_offset} {}
logdev_key(const logdev_key&) = default;
logdev_key& operator=(const logdev_key&) = default;
Expand All @@ -72,11 +69,11 @@ struct logdev_key {
operator bool() const { return is_valid(); }
bool is_valid() const { return !is_lowest() && !is_highest(); }

bool is_lowest() const { return (idx == std::numeric_limits< logid_t >::min()); }
bool is_lowest() const { return (idx == -1); }
bool is_highest() const { return (idx == std::numeric_limits< logid_t >::max()); }

void set_lowest() {
idx = std::numeric_limits< logid_t >::min();
idx = -1;
dev_offset = std::numeric_limits< uint64_t >::min();
}

Expand Down Expand Up @@ -109,9 +106,11 @@ struct log_dump_req {

struct logstore_record {
logdev_key m_dev_key;
// indicates the safe truncation point of the log store
logdev_key m_trunc_key;

logstore_record() = default;
logstore_record(const logdev_key& key) : m_dev_key{key} {}
logstore_record(const logdev_key& key, const logdev_key& trunc_key) : m_dev_key{key}, m_trunc_key{trunc_key} {}
};

class HomeLogStore;
Expand All @@ -121,7 +120,6 @@ struct logstore_req {
logstore_seq_num_t seq_num; // Log store specific seq_num (which could be monotonically increaseing with logstore)
sisl::io_blob data; // Data blob containing data
void* cookie; // User generated cookie (considered as opaque)
bool is_write; // Directon of IO
bool is_internal_req; // If the req is created internally by HomeLogStore itself
log_req_comp_cb_t cb; // Callback upon completion of write (overridden than default)
Clock::time_point start_time;
Expand All @@ -138,13 +136,11 @@ struct logstore_req {
// TODO: Implement this method
return 0;
}
static logstore_req* make(HomeLogStore* store, logstore_seq_num_t seq_num, const sisl::io_blob& data,
bool is_write_req = true) {
static logstore_req* make(HomeLogStore* store, logstore_seq_num_t seq_num, const sisl::io_blob& data) {
logstore_req* req = new logstore_req();
req->log_store = store;
req->seq_num = seq_num;
req->data = data;
req->is_write = is_write_req;
req->is_internal_req = true;
req->cb = nullptr;

Expand All @@ -158,25 +154,6 @@ struct logstore_req {
logstore_req() = default;
};

struct seq_ld_key_pair {
logstore_seq_num_t seq_num{-1};
logdev_key ld_key;
};

struct truncation_info {
// Safe log dev location upto which it is truncatable
logdev_key ld_key{std::numeric_limits< logid_t >::min(), 0};

// LSN of this log store upto which it is truncated
std::atomic< logstore_seq_num_t > seq_num{-1};

// Is there any entry which is already store truncated but waiting for device truncation
bool pending_dev_truncation{false};

// Any truncation entries/barriers which are not part of this truncation
bool active_writes_not_part_of_truncation{false};
};

#pragma pack(1)
struct logstore_superblk {
logstore_superblk(const logstore_seq_num_t seq_num = 0) : m_first_seq_num{seq_num} {}
Expand All @@ -194,4 +171,4 @@ struct logstore_superblk {
logstore_seq_num_t m_first_seq_num{0};
};
#pragma pack()
} // namespace homestore
} // namespace homestore
Loading

0 comments on commit 2080ad4

Please sign in to comment.