Skip to content

Commit

Permalink
Merge branch 'master' into yk_repl_fetch_remote
Browse files Browse the repository at this point in the history
  • Loading branch information
yamingk authored Jan 30, 2024
2 parents 161cbe4 + 198657e commit b456c15
Show file tree
Hide file tree
Showing 27 changed files with 554 additions and 235 deletions.
6 changes: 6 additions & 0 deletions src/include/homestore/checkpoint/cp_mgr.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,13 @@ class CPManager {
CPManager();
virtual ~CPManager();

/// @brief Start the CPManager, which creates a first cp session.
/// @param first_time_boot
void start(bool first_time_boot);

/// @brief Start the cp timer so that periodic cps are started
void start_timer();

/// @brief Shutdown the checkpoint manager services. It will not trigger a flush, but cancels any existing
/// checkpoint session abruptly. If caller needs clean shutdown, then they explicitly needs to trigger cp flush
/// before calling shutdown.
Expand Down
7 changes: 3 additions & 4 deletions src/include/homestore/logstore_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,7 @@ class LogStoreService {
*
* @return std::shared_ptr< HomeLogStore >
*/
std::shared_ptr< HomeLogStore > create_new_log_store(const logstore_family_id_t family_id,
const bool append_mode = false);
shared< HomeLogStore > create_new_log_store(logstore_family_id_t family_id, bool append_mode = false);

/**
* @brief Open an existing log store and does a recovery. It then creates an instance of this logstore and
Expand All @@ -102,8 +101,8 @@ class LogStoreService {
* @param store_id: Store ID of the log store to open
* @return std::shared_ptr< HomeLogStore >
*/
void open_log_store(const logstore_family_id_t family_id, const logstore_id_t store_id, const bool append_mode,
const log_store_opened_cb_t& on_open_cb);
folly::Future< shared< HomeLogStore > > open_log_store(logstore_family_id_t family_id, logstore_id_t store_id,
bool append_mode);

/**
* @brief Close the log store instance and free-up the resources
Expand Down
35 changes: 35 additions & 0 deletions src/include/homestore/replication/repl_decls.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
#include <string>

#include <folly/small_vector.h>
#include <folly/futures/Future.h>

#include <sisl/logging/logging.h>
#include <homestore/homestore_decl.hpp>
#include <homestore/blk.h>
Expand All @@ -13,6 +15,39 @@ SISL_LOGGING_DECL(replication)
#define REPL_LOG_MODS grpc_server, HOMESTORE_LOG_MODS, nuraft_mesg, nuraft, replication

namespace homestore {
// clang-format off
VENUM(ReplServiceError, int32_t,
OK = 0, // Everything OK
CANCELLED = -1, // Request was cancelled
TIMEOUT = -2,
NOT_LEADER = -3,
BAD_REQUEST = -4,
SERVER_ALREADY_EXISTS = -5,
CONFIG_CHANGING = -6,
SERVER_IS_JOINING = -7,
SERVER_NOT_FOUND = -8,
CANNOT_REMOVE_LEADER = -9,
SERVER_IS_LEAVING = -10,
TERM_MISMATCH = -11,
RESULT_NOT_EXIST_YET = -10000,
NOT_IMPLEMENTED = -10001,
NO_SPACE_LEFT = -20000,
DRIVE_WRITE_ERROR = -20001,
FAILED = -32768);
// clang-format on

template < typename V, typename E >
using Result = folly::Expected< V, E >;

template < class V >
using ReplResult = Result< V, ReplServiceError >;

template < class V, class E >
using AsyncResult = folly::SemiFuture< Result< V, E > >;

template < class V = folly::Unit >
using AsyncReplResult = AsyncResult< V, ReplServiceError >;

using blkid_list_t = folly::small_vector< BlkId, 4 >;

// Fully qualified domain pba, unique pba id across replica set
Expand Down
35 changes: 30 additions & 5 deletions src/include/homestore/replication/repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ VENUM(repl_req_state_t, uint32_t,
DATA_RECEIVED = 1 << 1, // Data has been received and being written to the storage
DATA_WRITTEN = 1 << 2, // Data has been written to the storage
LOG_RECEIVED = 1 << 3, // Log is received and waiting for data
LOG_FLUSHED = 1 << 4 // Log has been flushed
LOG_FLUSHED = 1 << 4, // Log has been flushed
ERRORED = 1 << 5 // Error has happened and cleaned up
)

struct repl_key {
Expand Down Expand Up @@ -149,12 +150,25 @@ class ReplDevListener {
/// NOTE: Listener should do the free any resources created as part of pre-commit.
///
/// @param lsn - The log sequence number getting rolled back
/// @param header - Header originally passed with repl_dev::write() api
/// @param key - Key originally passed with repl_dev::write() api
/// @param ctx - Context passed as part of the replica_set::write() api
/// @param header - Header originally passed with ReplDev::async_alloc_write() api
/// @param key - Key originally passed with ReplDev::async_alloc_write() api
/// @param ctx - Context passed as part of the ReplDev::async_alloc_write() api
virtual void on_rollback(int64_t lsn, const sisl::blob& header, const sisl::blob& key,
cintrusive< repl_req_ctx >& ctx) = 0;

/// @brief Called when the async_alloc_write call failed to initiate replication
///
/// Called only on the node which called async_alloc_write
///
///
/// NOTE: Listener should do the free any resources created as part of pre-commit.
///
/// @param header - Header originally passed with ReplDev::async_alloc_write() api
/// @param key - Key originally passed with ReplDev::async_alloc_write() api
/// @param ctx - Context passed as part of the ReplDev::async_alloc_write() api
virtual void on_error(ReplServiceError error, const sisl::blob& header, const sisl::blob& key,
cintrusive< repl_req_ctx >& ctx) = 0;

/// @brief Called when replication module is trying to allocate a block to write the value
///
/// This function can be called both on leader and follower when it is trying to allocate a block to write the
Expand All @@ -176,7 +190,7 @@ class ReplDevListener {
class ReplDev {
public:
ReplDev() = default;
virtual ~ReplDev() = default;
virtual ~ReplDev() { detach_listener(); }

/// @brief Replicate the data to the replica set. This method goes through the
/// following steps:
Expand Down Expand Up @@ -217,6 +231,10 @@ class ReplDev {
/// @param blkids - blkids to be freed.
virtual void async_free_blks(int64_t lsn, MultiBlkId const& blkid) = 0;

/// @brief Try to switch the current replica where this method called to become a leader.
/// @return True if it is successful, false otherwise.
virtual AsyncReplResult<> become_leader() = 0;

/// @brief Checks if this replica is the leader in this ReplDev
/// @return true or false
virtual bool is_leader() const = 0;
Expand All @@ -231,6 +249,13 @@ class ReplDev {

virtual void attach_listener(shared< ReplDevListener > listener) { m_listener = std::move(listener); }

virtual void detach_listener() {
if (m_listener) {
m_listener->set_repl_dev(nullptr);
m_listener.reset();
}
}

protected:
shared< ReplDevListener > m_listener;
};
Expand Down
32 changes: 0 additions & 32 deletions src/include/homestore/replication_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,48 +4,16 @@
#include <string>
#include <variant>

#include <folly/futures/Future.h>
#include <sisl/utility/enum.hpp>
#include <homestore/replication/repl_decls.h>
#include <homestore/meta_service.hpp>

namespace homestore {

// clang-format off
VENUM(ReplServiceError, int32_t,
OK = 0, // Everything OK
CANCELLED = -1, // Request was cancelled
TIMEOUT = -2,
NOT_LEADER = -3,
BAD_REQUEST = -4,
SERVER_ALREADY_EXISTS = -5,
CONFIG_CHANGING = -6,
SERVER_IS_JOINING = -7,
SERVER_NOT_FOUND = -8,
CANNOT_REMOVE_LEADER = -9,
SERVER_IS_LEAVING = -10,
TERM_MISMATCH = -11,
RESULT_NOT_EXIST_YET = -10000,
NOT_IMPLEMENTED = -10001,
FAILED = -32768);
// clang-format on

class ReplDev;
class ReplDevListener;
struct hs_stats;

template < typename V, typename E >
using Result = folly::Expected< V, E >;

template < class V >
using ReplResult = Result< V, ReplServiceError >;

template < class V, class E >
using AsyncResult = folly::SemiFuture< Result< V, E > >;

template < class V = folly::Unit >
using AsyncReplResult = AsyncResult< V, ReplServiceError >;

VENUM(repl_impl_type, uint8_t,
server_side, // Completely homestore controlled replication
client_assisted, // Client assisting in replication
Expand Down
86 changes: 43 additions & 43 deletions src/include/homestore/superblk_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,42 +31,42 @@ class superblk {
return ++s_count;
}

superblk(const std::string& meta_name = "") { set_name(meta_name); }
superblk(const std::string& sub_name = "") { set_name(sub_name); }

superblk(const superblk&) = delete;
superblk& operator=(const superblk&) = delete;

superblk(superblk&& rhs) noexcept
: m_meta_mgr_cookie(rhs.m_meta_mgr_cookie)
, m_raw_buf(std::move(rhs.m_raw_buf))
, m_sb(rhs.m_sb)
, m_metablk_name(std::move(rhs.m_metablk_name)) {
rhs.m_meta_mgr_cookie = nullptr;
rhs.m_sb = nullptr;
superblk(superblk&& rhs) noexcept :
m_meta_blk(rhs.m_meta_blk),
m_raw_buf(std::move(rhs.m_raw_buf)),
m_sb(rhs.m_sb),
m_meta_sub_name(std::move(rhs.m_meta_sub_name)) {
rhs.m_meta_blk = nullptr;
rhs.m_sb = nullptr;
}

superblk& operator=(superblk&& rhs) noexcept {
if (this != &rhs) {
m_meta_mgr_cookie = rhs.m_meta_mgr_cookie;
m_meta_blk = rhs.m_meta_blk;
m_raw_buf = std::move(rhs.m_raw_buf);
m_sb = rhs.m_sb;
m_metablk_name = std::move(rhs.m_metablk_name);
rhs.m_meta_mgr_cookie = nullptr;
m_meta_sub_name = std::move(rhs.m_meta_sub_name);
rhs.m_meta_blk = nullptr;
rhs.m_sb = nullptr;
}
return *this;
}
return *this;
}

void set_name(const std::string& meta_name) {
if (meta_name.empty()) {
m_metablk_name = "meta_blk_" + std::to_string(next_count());
void set_name(const std::string& sub_name) {
if (sub_name.empty()) {
m_meta_sub_name = "meta_blk_" + std::to_string(next_count());
} else {
m_metablk_name = meta_name;
m_meta_sub_name = sub_name;
}
}

T* load(const sisl::byte_view& buf, void* meta_cookie) {
m_meta_mgr_cookie = voidptr_cast(meta_cookie);
T* load(const sisl::byte_view& buf, void* meta_blk) {
m_meta_blk = voidptr_cast(meta_blk);
m_raw_buf = meta_service().is_aligned_buf_needed(buf.size()) ? buf.extract(meta_service().align_size())
: buf.extract(0);
m_sb = r_cast< T* >(m_raw_buf->bytes());
Expand All @@ -85,9 +85,9 @@ class superblk {
}

void destroy() {
if (m_meta_mgr_cookie) {
meta_service().remove_sub_sb(m_meta_mgr_cookie);
m_meta_mgr_cookie = nullptr;
if (m_meta_blk) {
meta_service().remove_sub_sb(m_meta_blk);
m_meta_blk = nullptr;
}
m_raw_buf.reset();
m_sb = nullptr;
Expand All @@ -97,10 +97,10 @@ class superblk {
sisl::byte_array raw_buf() { return m_raw_buf; }

void write() {
if (m_meta_mgr_cookie) {
meta_service().update_sub_sb(m_raw_buf->cbytes(), m_raw_buf->size(), m_meta_mgr_cookie);
if (m_meta_blk) {
meta_service().update_sub_sb(m_raw_buf->cbytes(), m_raw_buf->size(), m_meta_blk);
} else {
meta_service().add_sub_sb(m_metablk_name, m_raw_buf->cbytes(), m_raw_buf->size(), m_meta_mgr_cookie);
meta_service().add_sub_sb(m_meta_sub_name, m_raw_buf->cbytes(), m_raw_buf->size(), m_meta_blk);
}
}

Expand All @@ -111,42 +111,42 @@ class superblk {
T& operator*() { return *m_sb; }

private:
void* m_meta_mgr_cookie{nullptr};
void* m_meta_blk{nullptr};
sisl::byte_array m_raw_buf;
T* m_sb{nullptr};
std::string m_metablk_name;
std::string m_meta_sub_name;
};

class json_superblk {
private:
void* m_meta_mgr_cookie{nullptr};
void* m_meta_blk{nullptr};
nlohmann::json m_json_sb;
std::string m_metablk_name;
std::string m_meta_sub_name;

public:
static uint64_t next_count() {
static std::atomic< uint64_t > s_count{0};
return ++s_count;
}

json_superblk(const std::string& meta_name = "") { set_name(meta_name); }
json_superblk(const std::string& sub_name = "") { set_name(sub_name); }

void set_name(const std::string& meta_name) {
if (meta_name.empty()) {
m_metablk_name = "meta_blk_" + std::to_string(next_count());
void set_name(const std::string& sub_name) {
if (sub_name.empty()) {
m_meta_sub_name = "meta_blk_" + std::to_string(next_count());
} else {
m_metablk_name = meta_name;
m_meta_sub_name = sub_name;
}
}

nlohmann::json& load(const sisl::byte_view& buf, void* meta_cookie) {
m_meta_mgr_cookie = voidptr_cast(meta_cookie);
nlohmann::json& load(const sisl::byte_view& buf, void* meta_blk) {
m_meta_blk = voidptr_cast(meta_blk);
std::string_view const b{c_charptr_cast(buf.bytes()), buf.size()};

try {
m_json_sb = nlohmann::json::from_msgpack(b);
} catch (nlohmann::json::exception const& e) {
DEBUG_ASSERT(false, "Failed to load superblk for meta_blk={}", m_metablk_name);
DEBUG_ASSERT(false, "Failed to load superblk for meta_blk={}", m_meta_sub_name);
return m_json_sb;
}
return m_json_sb;
Expand All @@ -155,9 +155,9 @@ class json_superblk {
nlohmann::json& create() { return m_json_sb; }

void destroy() {
if (m_meta_mgr_cookie) {
meta_service().remove_sub_sb(m_meta_mgr_cookie);
m_meta_mgr_cookie = nullptr;
if (m_meta_blk) {
meta_service().remove_sub_sb(m_meta_blk);
m_meta_blk = nullptr;
}
m_json_sb = nlohmann::json{};
}
Expand All @@ -166,10 +166,10 @@ class json_superblk {

void write() {
auto do_write = [this](sisl::blob const& b) {
if (m_meta_mgr_cookie) {
meta_service().update_sub_sb(b.cbytes(), b.size(), m_meta_mgr_cookie);
if (m_meta_blk) {
meta_service().update_sub_sb(b.cbytes(), b.size(), m_meta_blk);
} else {
meta_service().add_sub_sb(m_metablk_name, b.cbytes(), b.size(), m_meta_mgr_cookie);
meta_service().add_sub_sb(m_meta_sub_name, b.cbytes(), b.size(), m_meta_blk);
}
};

Expand Down
2 changes: 2 additions & 0 deletions src/lib/checkpoint/cp_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ void CPManager::start(bool first_time_boot) {
create_first_cp();
m_sb.write();
}
}

void CPManager::start_timer() {
LOGINFO("cp timer is set to {} usec", HS_DYNAMIC_CONFIG(generic.cp_timer_us));
m_cp_timer_hdl = iomanager.schedule_global_timer(
HS_DYNAMIC_CONFIG(generic.cp_timer_us) * 1000, true, nullptr /*cookie*/, iomgr::reactor_regex::all_worker,
Expand Down
Loading

0 comments on commit b456c15

Please sign in to comment.