Skip to content

Commit

Permalink
Support Baseline Resync (#596)
Browse files Browse the repository at this point in the history
* Support Baseline resync

For Nuraft baseline resync, we separate the process into two layers: HomeStore layer and Application layer.
We use the first bit of the obj_id to indicate the message type: 0 is for HS, 1 is for Application.

In the HomeStore layer, leader needs to transmit the DSN to the follower, this is intended to handle the following case:

1. Leader sends snapshot at LSN T1 to follower F1.
2. F1 fully receives the snapshot and now at T1.
3. Leader yield its leadership, F1 elected as leader.
In this sequence the incremental resync will not kicked in to update the m_next_dsn, and as result, duplication may occur.
  • Loading branch information
yuwmao authored Dec 4, 2024
1 parent 7811855 commit 89b86ff
Show file tree
Hide file tree
Showing 8 changed files with 131 additions and 33 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.5.18"
version = "6.5.19"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
27 changes: 20 additions & 7 deletions src/include/homestore/replication/repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ VENUM(journal_type_t, uint16_t,
HS_CTRL_REPLACE = 3, // Control message to replace a member
)

// magic num comes from the first 8 bytes of 'echo homestore_resync_data | md5sum'
static constexpr uint64_t HOMESTORE_RESYNC_DATA_MAGIC = 0xa65dbd27c213f327;
static constexpr uint32_t HOMESTORE_RESYNC_DATA_PROTOCOL_VERSION_V1 = 0x01;

struct repl_key {
int32_t server_id{0}; // Server Id which this req is originated from
uint64_t term; // RAFT term number
Expand Down Expand Up @@ -112,14 +116,23 @@ class nuraft_snapshot_context : public snapshot_context {
nuraft::ptr< nuraft::snapshot > snapshot_;
};

struct snapshot_data {
struct snapshot_obj {
void* user_ctx{nullptr};
int64_t offset{0};
uint64_t offset{0};
sisl::io_blob_safe blob;
bool is_first_obj{false};
bool is_last_obj{false};
};

//HomeStore has some meta information to be transmitted during the baseline resync,
//Although now only dsn needs to be synced, this structure is defined as a general message, and we can easily add data if needed in the future.
struct snp_repl_dev_data {
uint64_t magic_num{HOMESTORE_RESYNC_DATA_MAGIC};
uint32_t protocol_version{HOMESTORE_RESYNC_DATA_PROTOCOL_VERSION_V1};
uint32_t crc{0};
uint64_t dsn{0};
};

struct repl_journal_entry;
struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost::thread_safe_counter >,
sisl::ObjLifeCounter< repl_req_ctx > {
Expand Down Expand Up @@ -368,16 +381,16 @@ class ReplDevListener {
/// uses offset given by the follower to the know the current state of the follower.
/// Leader sends the snapshot data to the follower in batch. This callback is called multiple
/// times on the leader till all the data is transferred to the follower. is_last_obj in
/// snapshot_data will be true once all the data has been trasnferred. After this the raft on
/// snapshot_obj will be true once all the data has been trasnferred. After this the raft on
/// the follower side can do the incremental resync.
virtual int read_snapshot_data(shared< snapshot_context > context, shared< snapshot_data > snp_data) = 0;
virtual int read_snapshot_obj(shared< snapshot_context > context, shared< snapshot_obj > snp_obj) = 0;

/// @brief Called on the follower when the leader sends the data during the baseline resyc.
/// is_last_obj in in snapshot_data will be true once all the data has been transfered.
/// is_last_obj in in snapshot_obj will be true once all the data has been transfered.
/// After this the raft on the follower side can do the incremental resync.
virtual void write_snapshot_data(shared< snapshot_context > context, shared< snapshot_data > snp_data) = 0;
virtual void write_snapshot_obj(shared< snapshot_context > context, shared< snapshot_obj > snp_obj) = 0;

/// @brief Free up user-defined context inside the snapshot_data that is allocated during read_snapshot_data.
/// @brief Free up user-defined context inside the snapshot_obj that is allocated during read_snapshot_obj.
virtual void free_user_snp_ctx(void*& user_snp_ctx) = 0;

private:
Expand Down
36 changes: 36 additions & 0 deletions src/lib/replication/repl_dev/raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1491,6 +1491,42 @@ void RaftReplDev::on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx
handle_commit(rreq, true /* recovery */);
}

void RaftReplDev::create_snp_resync_data(raft_buf_ptr_t& data_out) {
snp_repl_dev_data msg;
auto msg_size = sizeof(snp_repl_dev_data);
msg.dsn = m_next_dsn;
auto crc = crc32_ieee(init_crc32, reinterpret_cast< const unsigned char* >(&msg), msg_size);
RD_LOGD("create snapshot resync msg, dsn={}, crc={}", msg.dsn, crc);
msg.crc = crc;
data_out = nuraft::buffer::alloc(msg_size);
std::memcpy(data_out->data_begin(), &msg, msg_size);
}

bool RaftReplDev::apply_snp_resync_data(nuraft::buffer& data) {
auto msg = r_cast< snp_repl_dev_data* >(data.data_begin());
if (msg->magic_num != HOMESTORE_RESYNC_DATA_MAGIC || msg->protocol_version !=
HOMESTORE_RESYNC_DATA_PROTOCOL_VERSION_V1) {
RD_LOGE("Snapshot resync data validation failed, magic={}, version={}", msg->magic_num, msg->protocol_version);
return false;
}
auto received_crc = msg->crc;
RD_LOGD("received snapshot resync msg, dsn={}, crc={}, received crc={}", msg->dsn, msg->crc, received_crc);
// Clear the crc field before verification, because the crc value computed by leader doesn't contain it.
msg->crc = 0;
auto computed_crc = crc32_ieee(init_crc32, reinterpret_cast< const unsigned char* >(msg),
sizeof(snp_repl_dev_data));
if (received_crc != computed_crc) {
RD_LOGE("Snapshot resync data crc mismatch, received_crc={}, computed_crc={}", received_crc, computed_crc);
return false;
}
if (msg->dsn > m_next_dsn) {
m_next_dsn = msg->dsn;
RD_LOGD("Update next_dsn from {} to {}", m_next_dsn.load(), msg->dsn);
return true;
}
return true;
}

void RaftReplDev::on_restart() { m_listener->on_restart(); }

bool RaftReplDev::is_resync_mode() {
Expand Down
2 changes: 2 additions & 0 deletions src/lib/replication/repl_dev/raft_repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,8 @@ class RaftReplDev : public ReplDev,
void commit_blk(repl_req_ptr_t rreq);
void replace_member(repl_req_ptr_t rreq);
void reset_quorum_size(uint32_t commit_quorum);
void create_snp_resync_data(raft_buf_ptr_t& data_out);
bool apply_snp_resync_data(nuraft::buffer& data);
};

} // namespace homestore
29 changes: 24 additions & 5 deletions src/lib/replication/repl_dev/raft_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -297,14 +297,22 @@ void RaftStateMachine::create_snapshot(nuraft::snapshot& s, nuraft::async_result

int RaftStateMachine::read_logical_snp_obj(nuraft::snapshot& s, void*& user_ctx, ulong obj_id, raft_buf_ptr_t& data_out,
bool& is_last_obj) {
// For Nuraft baseline resync, we separate the process into two layers: HomeStore layer and Application layer.
// We use the highest bit of the obj_id to indicate the message type: 0 is for HS, 1 is for Application.
if (is_hs_snp_obj(obj_id)) {
// This is the preserved msg for homestore to resync data
m_rd.create_snp_resync_data(data_out);
is_last_obj = false;
return 0;
}
auto snp_ctx = std::make_shared< nuraft_snapshot_context >(s);
auto snp_data = std::make_shared< snapshot_data >();
auto snp_data = std::make_shared< snapshot_obj >();
snp_data->user_ctx = user_ctx;
snp_data->offset = obj_id;
snp_data->is_last_obj = is_last_obj;

// Listener will read the snapshot data and we pass through the same.
int ret = m_rd.m_listener->read_snapshot_data(snp_ctx, snp_data);
int ret = m_rd.m_listener->read_snapshot_obj(snp_ctx, snp_data);
if (ret < 0) return ret;

// Update user_ctx and whether is_last_obj
Expand All @@ -320,8 +328,16 @@ int RaftStateMachine::read_logical_snp_obj(nuraft::snapshot& s, void*& user_ctx,

void RaftStateMachine::save_logical_snp_obj(nuraft::snapshot& s, ulong& obj_id, nuraft::buffer& data, bool is_first_obj,
bool is_last_obj) {
if (is_hs_snp_obj(obj_id)) {
// Homestore preserved msg
if (m_rd.apply_snp_resync_data(data)) {
obj_id = snp_obj_id_type_app;
LOGDEBUG("apply_snp_resync_data success, next obj_id={}", obj_id);
}
return;
}
auto snp_ctx = std::make_shared< nuraft_snapshot_context >(s);
auto snp_data = std::make_shared< snapshot_data >();
auto snp_data = std::make_shared< snapshot_obj >();
snp_data->offset = obj_id;
snp_data->is_first_obj = is_first_obj;
snp_data->is_last_obj = is_last_obj;
Expand All @@ -331,7 +347,7 @@ void RaftStateMachine::save_logical_snp_obj(nuraft::snapshot& s, ulong& obj_id,
std::memcpy(blob.bytes(), data.data_begin(), data.size());
snp_data->blob = std::move(blob);

m_rd.m_listener->write_snapshot_data(snp_ctx, snp_data);
m_rd.m_listener->write_snapshot_obj(snp_ctx, snp_data);

// Update the object offset.
obj_id = snp_data->offset;
Expand All @@ -349,7 +365,10 @@ bool RaftStateMachine::apply_snapshot(nuraft::snapshot& s) {
m_rd.set_last_commit_lsn(s.get_last_log_idx());
m_rd.m_data_journal->set_last_durable_lsn(s.get_last_log_idx());
auto snp_ctx = std::make_shared< nuraft_snapshot_context >(s);
return m_rd.m_listener->apply_snapshot(snp_ctx);
auto res = m_rd.m_listener->apply_snapshot(snp_ctx);
//make sure the changes are flushed.
hs()->cp_mgr().trigger_cp_flush(true /* force */).get();
return res;
}

nuraft::ptr< nuraft::snapshot > RaftStateMachine::last_snapshot() {
Expand Down
6 changes: 6 additions & 0 deletions src/lib/replication/repl_dev/raft_state_machine.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ class StateMachineStore;
#define RD_LOGE(...) RD_LOG(ERROR, ##__VA_ARGS__)
#define RD_LOGC(...) RD_LOG(CRITICAL, ##__VA_ARGS__)

// For the logic snapshot obj_id, we use the highest bit to indicate the type of the snapshot message.
// 0 is for HS, 1 is for Application.
static constexpr uint64_t snp_obj_id_type_app = 1ULL << 63;

using AsyncNotify = folly::SemiFuture< folly::Unit >;
using AsyncNotifier = folly::Promise< folly::Unit >;

Expand Down Expand Up @@ -135,6 +139,8 @@ class RaftStateMachine : public nuraft::state_machine {

std::string rdev_name() const;

static bool is_hs_snp_obj(uint64_t obj_id) { return (obj_id & snp_obj_id_type_app) == 0; }

private:
void after_precommit_in_leader(const nuraft::raft_server::req_ext_cb_params& params);
};
Expand Down
58 changes: 40 additions & 18 deletions src/tests/test_common/raft_repl_test_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,26 @@ class TestReplicatedDB : public homestore::ReplDevListener {
return make_async_success<>();
}

int read_snapshot_data(shared< snapshot_context > context, shared< snapshot_data > snp_data) override {
static int64_t get_next_lsn(uint64_t& obj_id) {
return obj_id & ((1ULL << 63) - 1);
}
static void set_resync_msg_type_bit(uint64_t& obj_id) {
obj_id |= 1ULL << 63;
}

int read_snapshot_obj(shared< snapshot_context > context, shared< snapshot_obj > snp_data) override {
auto s = std::dynamic_pointer_cast< nuraft_snapshot_context >(context)->nuraft_snapshot();
if(RaftStateMachine::is_hs_snp_obj(snp_data->offset)) {
LOGERRORMOD(replication, "invalid snapshot offset={}", snp_data->offset);
return -1;
}
if ((snp_data->offset & snp_obj_id_type_app) == 0) {
LOGERRORMOD(replication, "invalid snapshot offset={}", snp_data->offset);
return -1;
}

if (snp_data->offset == 0) {
int64_t next_lsn = get_next_lsn(snp_data->offset);
if (next_lsn == 0) {
snp_data->is_last_obj = false;
snp_data->blob = sisl::io_blob_safe(sizeof(ulong));
LOGINFOMOD(replication,
Expand All @@ -194,38 +210,37 @@ class TestReplicatedDB : public homestore::ReplDevListener {
return 0;
}

int64_t next_lsn = snp_data->offset;
std::vector< KeyValuePair > kv_snapshot_data;
std::vector< KeyValuePair > kv_snapshot_obj;
// we can not use find to get the next element, since if the next lsn is a config lsn , it will not be put into
// lsn_index_ and as a result, the find will return the end of the map. so here we use lower_bound to get the
// first element to be read and transfered.
for (auto iter = lsn_index_.lower_bound(next_lsn); iter != lsn_index_.end(); iter++) {
auto& v = iter->second;
kv_snapshot_data.emplace_back(Key{v.id_}, v);
kv_snapshot_obj.emplace_back(Key{v.id_}, v);
LOGTRACEMOD(replication, "[Replica={}] Read logical snapshot callback fetching lsn={} size={} pattern={}",
g_helper->replica_num(), v.lsn_, v.data_size_, v.data_pattern_);
if (kv_snapshot_data.size() >= 10) { break; }
if (kv_snapshot_obj.size() >= 10) { break; }
}

if (kv_snapshot_data.size() == 0) {
if (kv_snapshot_obj.size() == 0) {
snp_data->is_last_obj = true;
LOGINFOMOD(replication, "Snapshot is_last_obj is true");
return 0;
}

int64_t kv_snapshot_data_size = sizeof(KeyValuePair) * kv_snapshot_data.size();
sisl::io_blob_safe blob{static_cast< uint32_t >(kv_snapshot_data_size)};
std::memcpy(blob.bytes(), kv_snapshot_data.data(), kv_snapshot_data_size);
int64_t kv_snapshot_obj_size = sizeof(KeyValuePair) * kv_snapshot_obj.size();
sisl::io_blob_safe blob{static_cast< uint32_t >(kv_snapshot_obj_size)};
std::memcpy(blob.bytes(), kv_snapshot_obj.data(), kv_snapshot_obj_size);
snp_data->blob = std::move(blob);
snp_data->is_last_obj = false;
LOGINFOMOD(replication, "[Replica={}] Read logical snapshot callback obj_id={} term={} idx={} num_items={}",
g_helper->replica_num(), snp_data->offset, s->get_last_log_term(), s->get_last_log_idx(),
kv_snapshot_data.size());
kv_snapshot_obj.size());

return 0;
}

void snapshot_data_write(uint64_t data_size, uint64_t data_pattern, MultiBlkId& out_blkids) {
void snapshot_obj_write(uint64_t data_size, uint64_t data_pattern, MultiBlkId& out_blkids) {
auto block_size = SISL_OPTIONS["block_size"].as< uint32_t >();
auto write_sgs = test_common::HSTestHelper::create_sgs(data_size, block_size, data_pattern);
auto fut = homestore::data_service().async_alloc_write(write_sgs, blk_alloc_hints{}, out_blkids);
Expand All @@ -235,21 +250,27 @@ class TestReplicatedDB : public homestore::ReplDevListener {
}
}

void write_snapshot_data(shared< snapshot_context > context, shared< snapshot_data > snp_data) override {
void write_snapshot_obj(shared< snapshot_context > context, shared< snapshot_obj > snp_data) override {
if (RaftStateMachine::is_hs_snp_obj(snp_data->offset)) {
LOGERRORMOD(replication, "invalid snapshot offset={}", snp_data->offset);
return;
}
int64_t next_lsn = get_next_lsn(snp_data->offset);
auto s = std::dynamic_pointer_cast< nuraft_snapshot_context >(context)->nuraft_snapshot();
auto last_committed_idx =
std::dynamic_pointer_cast< RaftReplDev >(repl_dev())->raft_server()->get_committed_log_idx();
if (snp_data->offset == 0) {
if (next_lsn == 0) {
snp_data->offset = last_committed_lsn + 1;
set_resync_msg_type_bit(snp_data->offset);
LOGINFOMOD(replication, "[Replica={}] Save logical snapshot callback return obj_id={}",
g_helper->replica_num(), snp_data->offset);
return;
}

size_t kv_snapshot_data_size = snp_data->blob.size();
if (kv_snapshot_data_size == 0) return;
size_t kv_snapshot_obj_size = snp_data->blob.size();
if (kv_snapshot_obj_size == 0) return;

size_t num_items = kv_snapshot_data_size / sizeof(KeyValuePair);
size_t num_items = kv_snapshot_obj_size / sizeof(KeyValuePair);
std::unique_lock lk(db_mtx_);
auto ptr = r_cast< const KeyValuePair* >(snp_data->blob.bytes());
for (size_t i = 0; i < num_items; i++) {
Expand All @@ -261,7 +282,7 @@ class TestReplicatedDB : public homestore::ReplDevListener {
// Write to data service and inmem map.
MultiBlkId out_blkids;
if (value.data_size_ != 0) {
snapshot_data_write(value.data_size_, value.data_pattern_, out_blkids);
snapshot_obj_write(value.data_size_, value.data_pattern_, out_blkids);
value.blkid_ = out_blkids;
}
inmem_db_.insert_or_assign(key, value);
Expand All @@ -271,6 +292,7 @@ class TestReplicatedDB : public homestore::ReplDevListener {
}

snp_data->offset = last_committed_lsn + 1;
set_resync_msg_type_bit(snp_data->offset);
LOGINFOMOD(replication,
"[Replica={}] Save logical snapshot callback obj_id={} term={} idx={} is_last={} num_items={}",
g_helper->replica_num(), snp_data->offset, s->get_last_log_term(), s->get_last_log_idx(),
Expand Down
4 changes: 2 additions & 2 deletions src/tests/test_solo_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,10 @@ class SoloReplDevTest : public testing::Test {
AsyncReplResult<> create_snapshot(shared< snapshot_context > context) override {
return make_async_success<>();
}
int read_snapshot_data(shared< snapshot_context > context, shared< snapshot_data > snp_data) override {
int read_snapshot_obj(shared< snapshot_context > context, shared< snapshot_obj > snp_data) override {
return 0;
}
void write_snapshot_data(shared< snapshot_context > context, shared< snapshot_data > snp_data) override {}
void write_snapshot_obj(shared< snapshot_context > context, shared< snapshot_obj > snp_data) override {}
bool apply_snapshot(shared< snapshot_context > context) override { return true; }
shared< snapshot_context > last_snapshot() override { return nullptr; }
void free_user_snp_ctx(void*& user_snp_ctx) override {}
Expand Down

0 comments on commit 89b86ff

Please sign in to comment.