Skip to content

Commit

Permalink
log store sync flush
Browse files Browse the repository at this point in the history
  • Loading branch information
JacksonYao287 committed Jul 20, 2024
1 parent 39781c3 commit 4535916
Show file tree
Hide file tree
Showing 13 changed files with 548 additions and 1,242 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.4.26"
version = "6.4.27"
homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
topics = ("ebay", "nublox")
Expand Down
117 changes: 24 additions & 93 deletions src/include/homestore/logstore/log_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <set>
#include <unordered_map>
#include <vector>
#include <tuple>

#include <sisl/fds/buffer.hpp>
#include <sisl/fds/stream_tracker.hpp>
Expand Down Expand Up @@ -81,18 +82,6 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > {
*/
log_replay_done_cb_t get_log_replay_done_cb() const { return m_replay_done_cb; }

/**
* @brief Write the blob at the user specified seq number in sync manner. Under the covers it will call async
* write and then wait for its completion. As such this is much lesser performing than async version since it
* involves mutex/cv combination
*
* @param seq_num : Sequence number to insert data
* @param b : Data blob to write to log
*
* @return is write completed successfully.
*/
bool write_sync(logstore_seq_num_t seq_num, const sisl::io_blob& b);

/**
* @brief Write the blob at the user specified seq number - prepared as a request in async fashion.
*
Expand All @@ -110,17 +99,7 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > {
* @param cookie : Any cookie or context which will passed back in the callback
* @param cb Callback upon completion which is called with the status, seq_num and cookie that was passed.
*/
void write_async(logstore_seq_num_t seq_num, const sisl::io_blob& b, void* cookie, const log_write_comp_cb_t& cb,
bool flush_wait = false);

/**
* @brief This method appends the blob into the log and it returns the generated seq number
*
* @param b Blob of data to append
* @return logstore_seq_num_t Returns the seqnum generated by the log
*/
// This method is not implemented yet
logstore_seq_num_t append_sync(const sisl::io_blob& b);
void write_async(logstore_seq_num_t seq_num, const sisl::io_blob& b, void* cookie, const log_write_comp_cb_t& cb);

/**
* @brief This method appends the blob into the log and makes a callback at the end of the append.
Expand All @@ -132,6 +111,14 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > {
*/
logstore_seq_num_t append_async(const sisl::io_blob& b, void* cookie, const log_write_comp_cb_t& completion_cb);

/**
* @brief Write the blob at the user specified seq number and flush, just like write_sync
*
* @param seq_num: Seq number to write to
* @param b : Blob of data
*/
void write_and_flush(logstore_seq_num_t seq_num, const sisl::io_blob& b);

/**
* @brief Read the log provided the sequence number synchronously. This is not the most efficient way to read
* as reader will be blocked until read is completed. In addition, it is built on-top of async system by doing
Expand Down Expand Up @@ -208,10 +195,11 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > {
*
* @return logstore_seq_num_t
*/
logstore_seq_num_t truncated_upto() const {
const auto ts{m_safe_truncation_boundary.seq_num.load(std::memory_order_acquire)};
return (ts == std::numeric_limits< logstore_seq_num_t >::max()) ? -1 : ts;
}
logstore_seq_num_t truncated_upto() const { return m_start_lsn.load(std::memory_order_acquire) - 1; }

logdev_key get_trunc_ld_key() const { return m_trunc_ld_key; }

std::tuple< logstore_seq_num_t, logdev_key, logstore_seq_num_t > truncate_info() const;

sisl::StreamTracker< logstore_record >& log_records() { return m_records; }

Expand Down Expand Up @@ -258,40 +246,22 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > {
* numbers issued prior.
* @return True on success
*/
void flush_sync(logstore_seq_num_t upto_seq_num = invalid_lsn());
void flush(logstore_seq_num_t upto_seq_num = invalid_lsn());

/**
* @brief Rollback the given instance to the given sequence number
*
* @param seq_num Sequence number back which logs are to be rollbacked
* @param to_lsn Sequence number back which logs are to be rollbacked
* @return True on success
*/
uint64_t rollback_async(logstore_seq_num_t to_lsn, on_rollback_cb_t cb);
bool rollback(logstore_seq_num_t to_lsn);

auto seq_num() const { return m_seq_num.load(std::memory_order_acquire); }

std::shared_ptr< LogDev > get_logdev() { return m_logdev; }
auto start_lsn() const { return m_start_lsn.load(std::memory_order_acquire); }

nlohmann::json dump_log_store(const log_dump_req& dump_req = log_dump_req());

nlohmann::json get_status(int verbosity) const;

/**
* Retrieves the truncation information before device truncation.
*
* @return A constant reference to the truncation_info object representing the truncation information.
*/
const truncation_info& pre_device_truncation();

/**
* \brief post device truncation processing.
*
* This function is used to update safe truncation boundary to the specified `trunc_upto_key`.
*
* \param trunc_upto_key The key indicating the log entry up to which truncation has been performed.
*/
void post_device_truncation(const logdev_key& trunc_upto_key);

/**
* Handles the completion of a write operation in the log store.
*
Expand All @@ -300,17 +270,6 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > {
*/
void on_write_completion(logstore_req* req, const logdev_key& ld_key);

/**
* \brief Handles the completion of a read operation in the log store.
*
* This function is called when a read operation in the log store has completed.
* It takes a pointer to a logstore_req object and a logdev_key object as parameters.
*
* \param req The pointer to the logstore_req object representing the read request.
* \param ld_key The logdev_key object representing the key used for the read operation.
*/
void on_read_completion(logstore_req* req, const logdev_key& ld_key);

/**
* @brief Handles the event when a log is found.
*
Expand All @@ -327,27 +286,8 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > {
*/
void on_log_found(logstore_seq_num_t seq_num, const logdev_key& ld_key, const logdev_key& flush_ld_key,
log_buffer buf);
/**
* @brief Handles the completion of a batch flush operation to update internal state.
*
* This function is called when a batch flush operation is completed.
* It takes a `logdev_key` parameter that represents the key of the flushed batch.
*
* This function is also called during log store recovery;
*
* @param flush_batch_ld_key The key of the flushed batch.
*/
void on_batch_completion(const logdev_key& flush_batch_ld_key);

private:
/**
* Truncates the log store up to the specified sequence number.
*
* @param upto_seq_num The sequence number up to which the log store should be truncated.
*/
void do_truncate(logstore_seq_num_t upto_seq_num);

int search_max_le(logstore_seq_num_t input_sn);
std::shared_ptr< LogDev > get_logdev() { return m_logdev; }

private:
logstore_id_t m_store_id;
Expand All @@ -357,21 +297,12 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > {
log_req_comp_cb_t m_comp_cb;
log_found_cb_t m_found_cb;
log_replay_done_cb_t m_replay_done_cb;
std::atomic< logstore_seq_num_t > m_seq_num;
std::atomic< logstore_seq_num_t > m_start_lsn;
std::atomic< logstore_seq_num_t > m_next_lsn;
std::atomic< logstore_seq_num_t > m_tail_lsn;
std::string m_fq_name;
LogStoreServiceMetrics& m_metrics;

// seq_ld_key_pair m_flush_batch_max = {-1, {0, 0}}; // The maximum seqnum we have seen in the prev flushed
// batch
logstore_seq_num_t m_flush_batch_max_lsn{std::numeric_limits< logstore_seq_num_t >::min()};

// Sync flush sections
std::atomic< logstore_seq_num_t > m_sync_flush_waiter_lsn{invalid_lsn()};
std::mutex m_sync_flush_mtx;
std::mutex m_single_sync_flush_mtx;
std::condition_variable m_sync_flush_cv;

std::vector< seq_ld_key_pair > m_truncation_barriers; // List of truncation barriers
truncation_info m_safe_truncation_boundary;
logdev_key m_trunc_ld_key;
};
} // namespace homestore
33 changes: 6 additions & 27 deletions src/include/homestore/logstore/log_store_internal.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
#include <nlohmann/json.hpp>

namespace homestore {

///////////////////// All typedefs ///////////////////////////////
class logstore_req;
class HomeLogStore;
Expand All @@ -50,16 +49,14 @@ typedef std::function< void(logstore_seq_num_t, sisl::io_blob&, logdev_key, void
typedef std::function< void(logstore_seq_num_t, log_buffer, void*) > log_found_cb_t;
typedef std::function< void(std::shared_ptr< HomeLogStore >) > log_store_opened_cb_t;
typedef std::function< void(std::shared_ptr< HomeLogStore >, logstore_seq_num_t) > log_replay_done_cb_t;
typedef std::function< void(const std::unordered_map< logdev_id_t, logdev_key >&) > device_truncate_cb_t;

typedef int64_t logid_t;

struct logdev_key {
logid_t idx;
off_t dev_offset;

constexpr logdev_key(const logid_t idx = std::numeric_limits< logid_t >::min(),
const off_t dev_offset = std::numeric_limits< uint64_t >::min()) :
constexpr logdev_key(const logid_t idx = -1, const off_t dev_offset = std::numeric_limits< uint64_t >::min()) :
idx{idx}, dev_offset{dev_offset} {}
logdev_key(const logdev_key&) = default;
logdev_key& operator=(const logdev_key&) = default;
Expand All @@ -72,11 +69,11 @@ struct logdev_key {
operator bool() const { return is_valid(); }
bool is_valid() const { return !is_lowest() && !is_highest(); }

bool is_lowest() const { return (idx == std::numeric_limits< logid_t >::min()); }
bool is_lowest() const { return (idx == -1); }
bool is_highest() const { return (idx == std::numeric_limits< logid_t >::max()); }

void set_lowest() {
idx = std::numeric_limits< logid_t >::min();
idx = -1;
dev_offset = std::numeric_limits< uint64_t >::min();
}

Expand Down Expand Up @@ -109,9 +106,10 @@ struct log_dump_req {

struct logstore_record {
logdev_key m_dev_key;
logdev_key m_trunc_key;

logstore_record() = default;
logstore_record(const logdev_key& key) : m_dev_key{key} {}
logstore_record(const logdev_key& key, const logdev_key& trunc_key) : m_dev_key{key}, m_trunc_key{trunc_key} {}
};

class HomeLogStore;
Expand Down Expand Up @@ -158,25 +156,6 @@ struct logstore_req {
logstore_req() = default;
};

struct seq_ld_key_pair {
logstore_seq_num_t seq_num{-1};
logdev_key ld_key;
};

struct truncation_info {
// Safe log dev location upto which it is truncatable
logdev_key ld_key{std::numeric_limits< logid_t >::min(), 0};

// LSN of this log store upto which it is truncated
std::atomic< logstore_seq_num_t > seq_num{-1};

// Is there any entry which is already store truncated but waiting for device truncation
bool pending_dev_truncation{false};

// Any truncation entries/barriers which are not part of this truncation
bool active_writes_not_part_of_truncation{false};
};

#pragma pack(1)
struct logstore_superblk {
logstore_superblk(const logstore_seq_num_t seq_num = 0) : m_first_seq_num{seq_num} {}
Expand All @@ -194,4 +173,4 @@ struct logstore_superblk {
logstore_seq_num_t m_first_seq_num{0};
};
#pragma pack()
} // namespace homestore
} // namespace homestore
17 changes: 2 additions & 15 deletions src/include/homestore/logstore_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,8 @@ class LogStoreService {

/**
* @brief Schedule a truncate all the log stores physically on the device.
*
* @param cb [OPTIONAL] Callback once truncation is completed, if provided (Default no callback)
* @param wait_till_done [OPTIONAL] Wait for the truncation to complete before returning from this method.
* Default to false
* @param dry_run: If the truncate is a real one or just dry run to simulate the truncation
*/
void device_truncate(const device_truncate_cb_t& cb = nullptr, bool wait_till_done = false, bool dry_run = false);
void device_truncate();

folly::Future< std::error_code > create_vdev(uint64_t size, HSDevType devType, uint32_t chunk_size);
std::shared_ptr< VirtualDev > open_vdev(const vdev_info& vinfo, bool load_existing);
Expand All @@ -178,13 +173,6 @@ class LogStoreService {
uint32_t total_size() const;
iomgr::io_fiber_t flush_thread() { return m_flush_fiber; }

/**
* This is used when the actual LogDev truncate is triggered;
*
* @return The IO fiber associated with the truncate thread.
*/
iomgr::io_fiber_t truncate_thread() { return m_truncate_fiber; }

void delete_unopened_logdevs();

private:
Expand All @@ -194,14 +182,13 @@ class LogStoreService {
void logdev_super_blk_found(const sisl::byte_view& buf, void* meta_cookie);
void rollback_super_blk_found(const sisl::byte_view& buf, void* meta_cookie);
void start_threads();
void flush_if_needed();
void flush();

private:
std::unordered_map< logdev_id_t, std::shared_ptr< LogDev > > m_id_logdev_map;
folly::SharedMutexWritePriority m_logdev_map_mtx;

std::shared_ptr< JournalVirtualDev > m_logdev_vdev;
iomgr::io_fiber_t m_truncate_fiber;
iomgr::io_fiber_t m_flush_fiber;
LogStoreServiceMetrics m_metrics;
std::unordered_set< logdev_id_t > m_unopened_logdev;
Expand Down
Loading

0 comments on commit 4535916

Please sign in to comment.