diff --git a/conanfile.py b/conanfile.py index 99e129017..bc914e16c 100644 --- a/conanfile.py +++ b/conanfile.py @@ -9,7 +9,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "6.5.18" + version = "6.5.19" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" diff --git a/src/include/homestore/replication/repl_dev.h b/src/include/homestore/replication/repl_dev.h index 20e9a170f..335cda834 100644 --- a/src/include/homestore/replication/repl_dev.h +++ b/src/include/homestore/replication/repl_dev.h @@ -46,6 +46,10 @@ VENUM(journal_type_t, uint16_t, HS_CTRL_REPLACE = 3, // Control message to replace a member ) +// magic num comes from the first 8 bytes of 'echo homestore_resync_data | md5sum' +static constexpr uint64_t HOMESTORE_RESYNC_DATA_MAGIC = 0xa65dbd27c213f327; +static constexpr uint32_t HOMESTORE_RESYNC_DATA_PROTOCOL_VERSION_V1 = 0x01; + struct repl_key { int32_t server_id{0}; // Server Id which this req is originated from uint64_t term; // RAFT term number @@ -112,14 +116,23 @@ class nuraft_snapshot_context : public snapshot_context { nuraft::ptr< nuraft::snapshot > snapshot_; }; -struct snapshot_data { +struct snapshot_obj { void* user_ctx{nullptr}; - int64_t offset{0}; + uint64_t offset{0}; sisl::io_blob_safe blob; bool is_first_obj{false}; bool is_last_obj{false}; }; +//HomeStore has some meta information to be transmitted during the baseline resync, +//Although now only dsn needs to be synced, this structure is defined as a general message, and we can easily add data if needed in the future. +struct snp_repl_dev_data { + uint64_t magic_num{HOMESTORE_RESYNC_DATA_MAGIC}; + uint32_t protocol_version{HOMESTORE_RESYNC_DATA_PROTOCOL_VERSION_V1}; + uint32_t crc{0}; + uint64_t dsn{0}; +}; + struct repl_journal_entry; struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost::thread_safe_counter >, sisl::ObjLifeCounter< repl_req_ctx > { @@ -368,16 +381,16 @@ class ReplDevListener { /// uses offset given by the follower to the know the current state of the follower. /// Leader sends the snapshot data to the follower in batch. This callback is called multiple /// times on the leader till all the data is transferred to the follower. is_last_obj in - /// snapshot_data will be true once all the data has been trasnferred. After this the raft on + /// snapshot_obj will be true once all the data has been trasnferred. After this the raft on /// the follower side can do the incremental resync. - virtual int read_snapshot_data(shared< snapshot_context > context, shared< snapshot_data > snp_data) = 0; + virtual int read_snapshot_obj(shared< snapshot_context > context, shared< snapshot_obj > snp_obj) = 0; /// @brief Called on the follower when the leader sends the data during the baseline resyc. - /// is_last_obj in in snapshot_data will be true once all the data has been transfered. + /// is_last_obj in in snapshot_obj will be true once all the data has been transfered. /// After this the raft on the follower side can do the incremental resync. - virtual void write_snapshot_data(shared< snapshot_context > context, shared< snapshot_data > snp_data) = 0; + virtual void write_snapshot_obj(shared< snapshot_context > context, shared< snapshot_obj > snp_obj) = 0; - /// @brief Free up user-defined context inside the snapshot_data that is allocated during read_snapshot_data. + /// @brief Free up user-defined context inside the snapshot_obj that is allocated during read_snapshot_obj. virtual void free_user_snp_ctx(void*& user_snp_ctx) = 0; private: diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index 2d93c4070..72a39a27a 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -1491,6 +1491,42 @@ void RaftReplDev::on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx handle_commit(rreq, true /* recovery */); } +void RaftReplDev::create_snp_resync_data(raft_buf_ptr_t& data_out) { + snp_repl_dev_data msg; + auto msg_size = sizeof(snp_repl_dev_data); + msg.dsn = m_next_dsn; + auto crc = crc32_ieee(init_crc32, reinterpret_cast< const unsigned char* >(&msg), msg_size); + RD_LOGD("create snapshot resync msg, dsn={}, crc={}", msg.dsn, crc); + msg.crc = crc; + data_out = nuraft::buffer::alloc(msg_size); + std::memcpy(data_out->data_begin(), &msg, msg_size); +} + +bool RaftReplDev::apply_snp_resync_data(nuraft::buffer& data) { + auto msg = r_cast< snp_repl_dev_data* >(data.data_begin()); + if (msg->magic_num != HOMESTORE_RESYNC_DATA_MAGIC || msg->protocol_version != + HOMESTORE_RESYNC_DATA_PROTOCOL_VERSION_V1) { + RD_LOGE("Snapshot resync data validation failed, magic={}, version={}", msg->magic_num, msg->protocol_version); + return false; + } + auto received_crc = msg->crc; + RD_LOGD("received snapshot resync msg, dsn={}, crc={}, received crc={}", msg->dsn, msg->crc, received_crc); + // Clear the crc field before verification, because the crc value computed by leader doesn't contain it. + msg->crc = 0; + auto computed_crc = crc32_ieee(init_crc32, reinterpret_cast< const unsigned char* >(msg), + sizeof(snp_repl_dev_data)); + if (received_crc != computed_crc) { + RD_LOGE("Snapshot resync data crc mismatch, received_crc={}, computed_crc={}", received_crc, computed_crc); + return false; + } + if (msg->dsn > m_next_dsn) { + m_next_dsn = msg->dsn; + RD_LOGD("Update next_dsn from {} to {}", m_next_dsn.load(), msg->dsn); + return true; + } + return true; +} + void RaftReplDev::on_restart() { m_listener->on_restart(); } bool RaftReplDev::is_resync_mode() { diff --git a/src/lib/replication/repl_dev/raft_repl_dev.h b/src/lib/replication/repl_dev/raft_repl_dev.h index 2bf7cc52c..0550858cf 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.h +++ b/src/lib/replication/repl_dev/raft_repl_dev.h @@ -285,6 +285,8 @@ class RaftReplDev : public ReplDev, void commit_blk(repl_req_ptr_t rreq); void replace_member(repl_req_ptr_t rreq); void reset_quorum_size(uint32_t commit_quorum); + void create_snp_resync_data(raft_buf_ptr_t& data_out); + bool apply_snp_resync_data(nuraft::buffer& data); }; } // namespace homestore diff --git a/src/lib/replication/repl_dev/raft_state_machine.cpp b/src/lib/replication/repl_dev/raft_state_machine.cpp index 2047a3b28..b64a32c24 100644 --- a/src/lib/replication/repl_dev/raft_state_machine.cpp +++ b/src/lib/replication/repl_dev/raft_state_machine.cpp @@ -297,14 +297,22 @@ void RaftStateMachine::create_snapshot(nuraft::snapshot& s, nuraft::async_result int RaftStateMachine::read_logical_snp_obj(nuraft::snapshot& s, void*& user_ctx, ulong obj_id, raft_buf_ptr_t& data_out, bool& is_last_obj) { + // For Nuraft baseline resync, we separate the process into two layers: HomeStore layer and Application layer. + // We use the highest bit of the obj_id to indicate the message type: 0 is for HS, 1 is for Application. + if (is_hs_snp_obj(obj_id)) { + // This is the preserved msg for homestore to resync data + m_rd.create_snp_resync_data(data_out); + is_last_obj = false; + return 0; + } auto snp_ctx = std::make_shared< nuraft_snapshot_context >(s); - auto snp_data = std::make_shared< snapshot_data >(); + auto snp_data = std::make_shared< snapshot_obj >(); snp_data->user_ctx = user_ctx; snp_data->offset = obj_id; snp_data->is_last_obj = is_last_obj; // Listener will read the snapshot data and we pass through the same. - int ret = m_rd.m_listener->read_snapshot_data(snp_ctx, snp_data); + int ret = m_rd.m_listener->read_snapshot_obj(snp_ctx, snp_data); if (ret < 0) return ret; // Update user_ctx and whether is_last_obj @@ -320,8 +328,16 @@ int RaftStateMachine::read_logical_snp_obj(nuraft::snapshot& s, void*& user_ctx, void RaftStateMachine::save_logical_snp_obj(nuraft::snapshot& s, ulong& obj_id, nuraft::buffer& data, bool is_first_obj, bool is_last_obj) { + if (is_hs_snp_obj(obj_id)) { + // Homestore preserved msg + if (m_rd.apply_snp_resync_data(data)) { + obj_id = snp_obj_id_type_app; + LOGDEBUG("apply_snp_resync_data success, next obj_id={}", obj_id); + } + return; + } auto snp_ctx = std::make_shared< nuraft_snapshot_context >(s); - auto snp_data = std::make_shared< snapshot_data >(); + auto snp_data = std::make_shared< snapshot_obj >(); snp_data->offset = obj_id; snp_data->is_first_obj = is_first_obj; snp_data->is_last_obj = is_last_obj; @@ -331,7 +347,7 @@ void RaftStateMachine::save_logical_snp_obj(nuraft::snapshot& s, ulong& obj_id, std::memcpy(blob.bytes(), data.data_begin(), data.size()); snp_data->blob = std::move(blob); - m_rd.m_listener->write_snapshot_data(snp_ctx, snp_data); + m_rd.m_listener->write_snapshot_obj(snp_ctx, snp_data); // Update the object offset. obj_id = snp_data->offset; @@ -349,7 +365,10 @@ bool RaftStateMachine::apply_snapshot(nuraft::snapshot& s) { m_rd.set_last_commit_lsn(s.get_last_log_idx()); m_rd.m_data_journal->set_last_durable_lsn(s.get_last_log_idx()); auto snp_ctx = std::make_shared< nuraft_snapshot_context >(s); - return m_rd.m_listener->apply_snapshot(snp_ctx); + auto res = m_rd.m_listener->apply_snapshot(snp_ctx); + //make sure the changes are flushed. + hs()->cp_mgr().trigger_cp_flush(true /* force */).get(); + return res; } nuraft::ptr< nuraft::snapshot > RaftStateMachine::last_snapshot() { diff --git a/src/lib/replication/repl_dev/raft_state_machine.h b/src/lib/replication/repl_dev/raft_state_machine.h index 6bf4faf5a..8f00cec43 100644 --- a/src/lib/replication/repl_dev/raft_state_machine.h +++ b/src/lib/replication/repl_dev/raft_state_machine.h @@ -86,6 +86,10 @@ class StateMachineStore; #define RD_LOGE(...) RD_LOG(ERROR, ##__VA_ARGS__) #define RD_LOGC(...) RD_LOG(CRITICAL, ##__VA_ARGS__) +// For the logic snapshot obj_id, we use the highest bit to indicate the type of the snapshot message. +// 0 is for HS, 1 is for Application. +static constexpr uint64_t snp_obj_id_type_app = 1ULL << 63; + using AsyncNotify = folly::SemiFuture< folly::Unit >; using AsyncNotifier = folly::Promise< folly::Unit >; @@ -135,6 +139,8 @@ class RaftStateMachine : public nuraft::state_machine { std::string rdev_name() const; + static bool is_hs_snp_obj(uint64_t obj_id) { return (obj_id & snp_obj_id_type_app) == 0; } + private: void after_precommit_in_leader(const nuraft::raft_server::req_ext_cb_params& params); }; diff --git a/src/tests/test_common/raft_repl_test_base.hpp b/src/tests/test_common/raft_repl_test_base.hpp index 889ab72bb..7445568b8 100644 --- a/src/tests/test_common/raft_repl_test_base.hpp +++ b/src/tests/test_common/raft_repl_test_base.hpp @@ -182,10 +182,26 @@ class TestReplicatedDB : public homestore::ReplDevListener { return make_async_success<>(); } - int read_snapshot_data(shared< snapshot_context > context, shared< snapshot_data > snp_data) override { + static int64_t get_next_lsn(uint64_t& obj_id) { + return obj_id & ((1ULL << 63) - 1); + } + static void set_resync_msg_type_bit(uint64_t& obj_id) { + obj_id |= 1ULL << 63; + } + + int read_snapshot_obj(shared< snapshot_context > context, shared< snapshot_obj > snp_data) override { auto s = std::dynamic_pointer_cast< nuraft_snapshot_context >(context)->nuraft_snapshot(); + if(RaftStateMachine::is_hs_snp_obj(snp_data->offset)) { + LOGERRORMOD(replication, "invalid snapshot offset={}", snp_data->offset); + return -1; + } + if ((snp_data->offset & snp_obj_id_type_app) == 0) { + LOGERRORMOD(replication, "invalid snapshot offset={}", snp_data->offset); + return -1; + } - if (snp_data->offset == 0) { + int64_t next_lsn = get_next_lsn(snp_data->offset); + if (next_lsn == 0) { snp_data->is_last_obj = false; snp_data->blob = sisl::io_blob_safe(sizeof(ulong)); LOGINFOMOD(replication, @@ -194,38 +210,37 @@ class TestReplicatedDB : public homestore::ReplDevListener { return 0; } - int64_t next_lsn = snp_data->offset; - std::vector< KeyValuePair > kv_snapshot_data; + std::vector< KeyValuePair > kv_snapshot_obj; // we can not use find to get the next element, since if the next lsn is a config lsn , it will not be put into // lsn_index_ and as a result, the find will return the end of the map. so here we use lower_bound to get the // first element to be read and transfered. for (auto iter = lsn_index_.lower_bound(next_lsn); iter != lsn_index_.end(); iter++) { auto& v = iter->second; - kv_snapshot_data.emplace_back(Key{v.id_}, v); + kv_snapshot_obj.emplace_back(Key{v.id_}, v); LOGTRACEMOD(replication, "[Replica={}] Read logical snapshot callback fetching lsn={} size={} pattern={}", g_helper->replica_num(), v.lsn_, v.data_size_, v.data_pattern_); - if (kv_snapshot_data.size() >= 10) { break; } + if (kv_snapshot_obj.size() >= 10) { break; } } - if (kv_snapshot_data.size() == 0) { + if (kv_snapshot_obj.size() == 0) { snp_data->is_last_obj = true; LOGINFOMOD(replication, "Snapshot is_last_obj is true"); return 0; } - int64_t kv_snapshot_data_size = sizeof(KeyValuePair) * kv_snapshot_data.size(); - sisl::io_blob_safe blob{static_cast< uint32_t >(kv_snapshot_data_size)}; - std::memcpy(blob.bytes(), kv_snapshot_data.data(), kv_snapshot_data_size); + int64_t kv_snapshot_obj_size = sizeof(KeyValuePair) * kv_snapshot_obj.size(); + sisl::io_blob_safe blob{static_cast< uint32_t >(kv_snapshot_obj_size)}; + std::memcpy(blob.bytes(), kv_snapshot_obj.data(), kv_snapshot_obj_size); snp_data->blob = std::move(blob); snp_data->is_last_obj = false; LOGINFOMOD(replication, "[Replica={}] Read logical snapshot callback obj_id={} term={} idx={} num_items={}", g_helper->replica_num(), snp_data->offset, s->get_last_log_term(), s->get_last_log_idx(), - kv_snapshot_data.size()); + kv_snapshot_obj.size()); return 0; } - void snapshot_data_write(uint64_t data_size, uint64_t data_pattern, MultiBlkId& out_blkids) { + void snapshot_obj_write(uint64_t data_size, uint64_t data_pattern, MultiBlkId& out_blkids) { auto block_size = SISL_OPTIONS["block_size"].as< uint32_t >(); auto write_sgs = test_common::HSTestHelper::create_sgs(data_size, block_size, data_pattern); auto fut = homestore::data_service().async_alloc_write(write_sgs, blk_alloc_hints{}, out_blkids); @@ -235,21 +250,27 @@ class TestReplicatedDB : public homestore::ReplDevListener { } } - void write_snapshot_data(shared< snapshot_context > context, shared< snapshot_data > snp_data) override { + void write_snapshot_obj(shared< snapshot_context > context, shared< snapshot_obj > snp_data) override { + if (RaftStateMachine::is_hs_snp_obj(snp_data->offset)) { + LOGERRORMOD(replication, "invalid snapshot offset={}", snp_data->offset); + return; + } + int64_t next_lsn = get_next_lsn(snp_data->offset); auto s = std::dynamic_pointer_cast< nuraft_snapshot_context >(context)->nuraft_snapshot(); auto last_committed_idx = std::dynamic_pointer_cast< RaftReplDev >(repl_dev())->raft_server()->get_committed_log_idx(); - if (snp_data->offset == 0) { + if (next_lsn == 0) { snp_data->offset = last_committed_lsn + 1; + set_resync_msg_type_bit(snp_data->offset); LOGINFOMOD(replication, "[Replica={}] Save logical snapshot callback return obj_id={}", g_helper->replica_num(), snp_data->offset); return; } - size_t kv_snapshot_data_size = snp_data->blob.size(); - if (kv_snapshot_data_size == 0) return; + size_t kv_snapshot_obj_size = snp_data->blob.size(); + if (kv_snapshot_obj_size == 0) return; - size_t num_items = kv_snapshot_data_size / sizeof(KeyValuePair); + size_t num_items = kv_snapshot_obj_size / sizeof(KeyValuePair); std::unique_lock lk(db_mtx_); auto ptr = r_cast< const KeyValuePair* >(snp_data->blob.bytes()); for (size_t i = 0; i < num_items; i++) { @@ -261,7 +282,7 @@ class TestReplicatedDB : public homestore::ReplDevListener { // Write to data service and inmem map. MultiBlkId out_blkids; if (value.data_size_ != 0) { - snapshot_data_write(value.data_size_, value.data_pattern_, out_blkids); + snapshot_obj_write(value.data_size_, value.data_pattern_, out_blkids); value.blkid_ = out_blkids; } inmem_db_.insert_or_assign(key, value); @@ -271,6 +292,7 @@ class TestReplicatedDB : public homestore::ReplDevListener { } snp_data->offset = last_committed_lsn + 1; + set_resync_msg_type_bit(snp_data->offset); LOGINFOMOD(replication, "[Replica={}] Save logical snapshot callback obj_id={} term={} idx={} is_last={} num_items={}", g_helper->replica_num(), snp_data->offset, s->get_last_log_term(), s->get_last_log_idx(), diff --git a/src/tests/test_solo_repl_dev.cpp b/src/tests/test_solo_repl_dev.cpp index 1b990d592..eaec0ff1e 100644 --- a/src/tests/test_solo_repl_dev.cpp +++ b/src/tests/test_solo_repl_dev.cpp @@ -111,10 +111,10 @@ class SoloReplDevTest : public testing::Test { AsyncReplResult<> create_snapshot(shared< snapshot_context > context) override { return make_async_success<>(); } - int read_snapshot_data(shared< snapshot_context > context, shared< snapshot_data > snp_data) override { + int read_snapshot_obj(shared< snapshot_context > context, shared< snapshot_obj > snp_data) override { return 0; } - void write_snapshot_data(shared< snapshot_context > context, shared< snapshot_data > snp_data) override {} + void write_snapshot_obj(shared< snapshot_context > context, shared< snapshot_obj > snp_data) override {} bool apply_snapshot(shared< snapshot_context > context) override { return true; } shared< snapshot_context > last_snapshot() override { return nullptr; } void free_user_snp_ctx(void*& user_snp_ctx) override {}