Skip to content

Commit

Permalink
Restartability of ReplDev (#285)
Browse files Browse the repository at this point in the history
Following changes are done as part of this commit

1. Because of ordering of services, restart of homestore doesn't recover the ReplDev
does not work. Fixed the ordering and also made logstore and data service start/stop
inside ReplService start/stop.

2. Error handling was not present, as a result tests failure were hard to comprehend.
Added error handling of propose, commit, write failures

3. Aesthetical change to have open_log_store to return future< logstore > instead of
callback

4. Added become_leader() call to ReplDev and fixed test cases to utilize that to ensure
write happens/issued on correct leader.

5. Added testing to restart, write after restart, validate of ReplDev
  • Loading branch information
hkadayam authored Jan 30, 2024
1 parent b89f275 commit 198657e
Show file tree
Hide file tree
Showing 27 changed files with 548 additions and 231 deletions.
6 changes: 6 additions & 0 deletions src/include/homestore/checkpoint/cp_mgr.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,13 @@ class CPManager {
CPManager();
virtual ~CPManager();

/// @brief Start the CPManager, which creates a first cp session.
/// @param first_time_boot
void start(bool first_time_boot);

/// @brief Start the cp timer so that periodic cps are started
void start_timer();

/// @brief Shutdown the checkpoint manager services. It will not trigger a flush, but cancels any existing
/// checkpoint session abruptly. If caller needs clean shutdown, then they explicitly needs to trigger cp flush
/// before calling shutdown.
Expand Down
7 changes: 3 additions & 4 deletions src/include/homestore/logstore_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,7 @@ class LogStoreService {
*
* @return std::shared_ptr< HomeLogStore >
*/
std::shared_ptr< HomeLogStore > create_new_log_store(const logstore_family_id_t family_id,
const bool append_mode = false);
shared< HomeLogStore > create_new_log_store(logstore_family_id_t family_id, bool append_mode = false);

/**
* @brief Open an existing log store and does a recovery. It then creates an instance of this logstore and
Expand All @@ -102,8 +101,8 @@ class LogStoreService {
* @param store_id: Store ID of the log store to open
* @return std::shared_ptr< HomeLogStore >
*/
void open_log_store(const logstore_family_id_t family_id, const logstore_id_t store_id, const bool append_mode,
const log_store_opened_cb_t& on_open_cb);
folly::Future< shared< HomeLogStore > > open_log_store(logstore_family_id_t family_id, logstore_id_t store_id,
bool append_mode);

/**
* @brief Close the log store instance and free-up the resources
Expand Down
35 changes: 35 additions & 0 deletions src/include/homestore/replication/repl_decls.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
#include <string>

#include <folly/small_vector.h>
#include <folly/futures/Future.h>

#include <sisl/logging/logging.h>
#include <homestore/homestore_decl.hpp>
#include <homestore/blk.h>
Expand All @@ -13,6 +15,39 @@ SISL_LOGGING_DECL(replication)
#define REPL_LOG_MODS grpc_server, HOMESTORE_LOG_MODS, nuraft_mesg, nuraft, replication

namespace homestore {
// clang-format off
VENUM(ReplServiceError, int32_t,
OK = 0, // Everything OK
CANCELLED = -1, // Request was cancelled
TIMEOUT = -2,
NOT_LEADER = -3,
BAD_REQUEST = -4,
SERVER_ALREADY_EXISTS = -5,
CONFIG_CHANGING = -6,
SERVER_IS_JOINING = -7,
SERVER_NOT_FOUND = -8,
CANNOT_REMOVE_LEADER = -9,
SERVER_IS_LEAVING = -10,
TERM_MISMATCH = -11,
RESULT_NOT_EXIST_YET = -10000,
NOT_IMPLEMENTED = -10001,
NO_SPACE_LEFT = -20000,
DRIVE_WRITE_ERROR = -20001,
FAILED = -32768);
// clang-format on

template < typename V, typename E >
using Result = folly::Expected< V, E >;

template < class V >
using ReplResult = Result< V, ReplServiceError >;

template < class V, class E >
using AsyncResult = folly::SemiFuture< Result< V, E > >;

template < class V = folly::Unit >
using AsyncReplResult = AsyncResult< V, ReplServiceError >;

using blkid_list_t = folly::small_vector< BlkId, 4 >;

// Fully qualified domain pba, unique pba id across replica set
Expand Down
35 changes: 30 additions & 5 deletions src/include/homestore/replication/repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ VENUM(repl_req_state_t, uint32_t,
DATA_RECEIVED = 1 << 1, // Data has been received and being written to the storage
DATA_WRITTEN = 1 << 2, // Data has been written to the storage
LOG_RECEIVED = 1 << 3, // Log is received and waiting for data
LOG_FLUSHED = 1 << 4 // Log has been flushed
LOG_FLUSHED = 1 << 4, // Log has been flushed
ERRORED = 1 << 5 // Error has happened and cleaned up
)

struct repl_key {
Expand Down Expand Up @@ -149,12 +150,25 @@ class ReplDevListener {
/// NOTE: Listener should do the free any resources created as part of pre-commit.
///
/// @param lsn - The log sequence number getting rolled back
/// @param header - Header originally passed with repl_dev::write() api
/// @param key - Key originally passed with repl_dev::write() api
/// @param ctx - Context passed as part of the replica_set::write() api
/// @param header - Header originally passed with ReplDev::async_alloc_write() api
/// @param key - Key originally passed with ReplDev::async_alloc_write() api
/// @param ctx - Context passed as part of the ReplDev::async_alloc_write() api
virtual void on_rollback(int64_t lsn, const sisl::blob& header, const sisl::blob& key,
cintrusive< repl_req_ctx >& ctx) = 0;

/// @brief Called when the async_alloc_write call failed to initiate replication
///
/// Called only on the node which called async_alloc_write
///
///
/// NOTE: Listener should do the free any resources created as part of pre-commit.
///
/// @param header - Header originally passed with ReplDev::async_alloc_write() api
/// @param key - Key originally passed with ReplDev::async_alloc_write() api
/// @param ctx - Context passed as part of the ReplDev::async_alloc_write() api
virtual void on_error(ReplServiceError error, const sisl::blob& header, const sisl::blob& key,
cintrusive< repl_req_ctx >& ctx) = 0;

/// @brief Called when replication module is trying to allocate a block to write the value
///
/// This function can be called both on leader and follower when it is trying to allocate a block to write the
Expand All @@ -176,7 +190,7 @@ class ReplDevListener {
class ReplDev {
public:
ReplDev() = default;
virtual ~ReplDev() = default;
virtual ~ReplDev() { detach_listener(); }

/// @brief Replicate the data to the replica set. This method goes through the
/// following steps:
Expand Down Expand Up @@ -217,6 +231,10 @@ class ReplDev {
/// @param blkids - blkids to be freed.
virtual void async_free_blks(int64_t lsn, MultiBlkId const& blkid) = 0;

/// @brief Try to switch the current replica where this method called to become a leader.
/// @return True if it is successful, false otherwise.
virtual AsyncReplResult<> become_leader() = 0;

/// @brief Checks if this replica is the leader in this ReplDev
/// @return true or false
virtual bool is_leader() const = 0;
Expand All @@ -231,6 +249,13 @@ class ReplDev {

virtual void attach_listener(shared< ReplDevListener > listener) { m_listener = std::move(listener); }

virtual void detach_listener() {
if (m_listener) {
m_listener->set_repl_dev(nullptr);
m_listener.reset();
}
}

protected:
shared< ReplDevListener > m_listener;
};
Expand Down
32 changes: 0 additions & 32 deletions src/include/homestore/replication_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,48 +4,16 @@
#include <string>
#include <variant>

#include <folly/futures/Future.h>
#include <sisl/utility/enum.hpp>
#include <homestore/replication/repl_decls.h>
#include <homestore/meta_service.hpp>

namespace homestore {

// clang-format off
VENUM(ReplServiceError, int32_t,
OK = 0, // Everything OK
CANCELLED = -1, // Request was cancelled
TIMEOUT = -2,
NOT_LEADER = -3,
BAD_REQUEST = -4,
SERVER_ALREADY_EXISTS = -5,
CONFIG_CHANGING = -6,
SERVER_IS_JOINING = -7,
SERVER_NOT_FOUND = -8,
CANNOT_REMOVE_LEADER = -9,
SERVER_IS_LEAVING = -10,
TERM_MISMATCH = -11,
RESULT_NOT_EXIST_YET = -10000,
NOT_IMPLEMENTED = -10001,
FAILED = -32768);
// clang-format on

class ReplDev;
class ReplDevListener;
struct hs_stats;

template < typename V, typename E >
using Result = folly::Expected< V, E >;

template < class V >
using ReplResult = Result< V, ReplServiceError >;

template < class V, class E >
using AsyncResult = folly::SemiFuture< Result< V, E > >;

template < class V = folly::Unit >
using AsyncReplResult = AsyncResult< V, ReplServiceError >;

VENUM(repl_impl_type, uint8_t,
server_side, // Completely homestore controlled replication
client_assisted, // Client assisting in replication
Expand Down
86 changes: 43 additions & 43 deletions src/include/homestore/superblk_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,42 +31,42 @@ class superblk {
return ++s_count;
}

superblk(const std::string& meta_name = "") { set_name(meta_name); }
superblk(const std::string& sub_name = "") { set_name(sub_name); }

superblk(const superblk&) = delete;
superblk& operator=(const superblk&) = delete;

superblk(superblk&& rhs) noexcept
: m_meta_mgr_cookie(rhs.m_meta_mgr_cookie)
, m_raw_buf(std::move(rhs.m_raw_buf))
, m_sb(rhs.m_sb)
, m_metablk_name(std::move(rhs.m_metablk_name)) {
rhs.m_meta_mgr_cookie = nullptr;
rhs.m_sb = nullptr;
superblk(superblk&& rhs) noexcept :
m_meta_blk(rhs.m_meta_blk),
m_raw_buf(std::move(rhs.m_raw_buf)),
m_sb(rhs.m_sb),
m_meta_sub_name(std::move(rhs.m_meta_sub_name)) {
rhs.m_meta_blk = nullptr;
rhs.m_sb = nullptr;
}

superblk& operator=(superblk&& rhs) noexcept {
if (this != &rhs) {
m_meta_mgr_cookie = rhs.m_meta_mgr_cookie;
m_meta_blk = rhs.m_meta_blk;
m_raw_buf = std::move(rhs.m_raw_buf);
m_sb = rhs.m_sb;
m_metablk_name = std::move(rhs.m_metablk_name);
rhs.m_meta_mgr_cookie = nullptr;
m_meta_sub_name = std::move(rhs.m_meta_sub_name);
rhs.m_meta_blk = nullptr;
rhs.m_sb = nullptr;
}
return *this;
}
return *this;
}

void set_name(const std::string& meta_name) {
if (meta_name.empty()) {
m_metablk_name = "meta_blk_" + std::to_string(next_count());
void set_name(const std::string& sub_name) {
if (sub_name.empty()) {
m_meta_sub_name = "meta_blk_" + std::to_string(next_count());
} else {
m_metablk_name = meta_name;
m_meta_sub_name = sub_name;
}
}

T* load(const sisl::byte_view& buf, void* meta_cookie) {
m_meta_mgr_cookie = voidptr_cast(meta_cookie);
T* load(const sisl::byte_view& buf, void* meta_blk) {
m_meta_blk = voidptr_cast(meta_blk);
m_raw_buf = meta_service().is_aligned_buf_needed(buf.size()) ? buf.extract(meta_service().align_size())
: buf.extract(0);
m_sb = r_cast< T* >(m_raw_buf->bytes());
Expand All @@ -85,9 +85,9 @@ class superblk {
}

void destroy() {
if (m_meta_mgr_cookie) {
meta_service().remove_sub_sb(m_meta_mgr_cookie);
m_meta_mgr_cookie = nullptr;
if (m_meta_blk) {
meta_service().remove_sub_sb(m_meta_blk);
m_meta_blk = nullptr;
}
m_raw_buf.reset();
m_sb = nullptr;
Expand All @@ -97,10 +97,10 @@ class superblk {
sisl::byte_array raw_buf() { return m_raw_buf; }

void write() {
if (m_meta_mgr_cookie) {
meta_service().update_sub_sb(m_raw_buf->cbytes(), m_raw_buf->size(), m_meta_mgr_cookie);
if (m_meta_blk) {
meta_service().update_sub_sb(m_raw_buf->cbytes(), m_raw_buf->size(), m_meta_blk);
} else {
meta_service().add_sub_sb(m_metablk_name, m_raw_buf->cbytes(), m_raw_buf->size(), m_meta_mgr_cookie);
meta_service().add_sub_sb(m_meta_sub_name, m_raw_buf->cbytes(), m_raw_buf->size(), m_meta_blk);
}
}

Expand All @@ -111,42 +111,42 @@ class superblk {
T& operator*() { return *m_sb; }

private:
void* m_meta_mgr_cookie{nullptr};
void* m_meta_blk{nullptr};
sisl::byte_array m_raw_buf;
T* m_sb{nullptr};
std::string m_metablk_name;
std::string m_meta_sub_name;
};

class json_superblk {
private:
void* m_meta_mgr_cookie{nullptr};
void* m_meta_blk{nullptr};
nlohmann::json m_json_sb;
std::string m_metablk_name;
std::string m_meta_sub_name;

public:
static uint64_t next_count() {
static std::atomic< uint64_t > s_count{0};
return ++s_count;
}

json_superblk(const std::string& meta_name = "") { set_name(meta_name); }
json_superblk(const std::string& sub_name = "") { set_name(sub_name); }

void set_name(const std::string& meta_name) {
if (meta_name.empty()) {
m_metablk_name = "meta_blk_" + std::to_string(next_count());
void set_name(const std::string& sub_name) {
if (sub_name.empty()) {
m_meta_sub_name = "meta_blk_" + std::to_string(next_count());
} else {
m_metablk_name = meta_name;
m_meta_sub_name = sub_name;
}
}

nlohmann::json& load(const sisl::byte_view& buf, void* meta_cookie) {
m_meta_mgr_cookie = voidptr_cast(meta_cookie);
nlohmann::json& load(const sisl::byte_view& buf, void* meta_blk) {
m_meta_blk = voidptr_cast(meta_blk);
std::string_view const b{c_charptr_cast(buf.bytes()), buf.size()};

try {
m_json_sb = nlohmann::json::from_msgpack(b);
} catch (nlohmann::json::exception const& e) {
DEBUG_ASSERT(false, "Failed to load superblk for meta_blk={}", m_metablk_name);
DEBUG_ASSERT(false, "Failed to load superblk for meta_blk={}", m_meta_sub_name);
return m_json_sb;
}
return m_json_sb;
Expand All @@ -155,9 +155,9 @@ class json_superblk {
nlohmann::json& create() { return m_json_sb; }

void destroy() {
if (m_meta_mgr_cookie) {
meta_service().remove_sub_sb(m_meta_mgr_cookie);
m_meta_mgr_cookie = nullptr;
if (m_meta_blk) {
meta_service().remove_sub_sb(m_meta_blk);
m_meta_blk = nullptr;
}
m_json_sb = nlohmann::json{};
}
Expand All @@ -166,10 +166,10 @@ class json_superblk {

void write() {
auto do_write = [this](sisl::blob const& b) {
if (m_meta_mgr_cookie) {
meta_service().update_sub_sb(b.cbytes(), b.size(), m_meta_mgr_cookie);
if (m_meta_blk) {
meta_service().update_sub_sb(b.cbytes(), b.size(), m_meta_blk);
} else {
meta_service().add_sub_sb(m_metablk_name, b.cbytes(), b.size(), m_meta_mgr_cookie);
meta_service().add_sub_sb(m_meta_sub_name, b.cbytes(), b.size(), m_meta_blk);
}
};

Expand Down
2 changes: 2 additions & 0 deletions src/lib/checkpoint/cp_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ void CPManager::start(bool first_time_boot) {
create_first_cp();
m_sb.write();
}
}

void CPManager::start_timer() {
LOGINFO("cp timer is set to {} usec", HS_DYNAMIC_CONFIG(generic.cp_timer_us));
m_cp_timer_hdl = iomanager.schedule_global_timer(
HS_DYNAMIC_CONFIG(generic.cp_timer_us) * 1000, true, nullptr /*cookie*/, iomgr::reactor_regex::all_worker,
Expand Down
Loading

0 comments on commit 198657e

Please sign in to comment.