Skip to content

Commit

Permalink
Add baseline resync to support read and write snapshots. (#387)
Browse files Browse the repository at this point in the history
Add nuraft based snapshot sync across leader and followers.
Implement the read and write snapshot callback. Add a generic snapshot
structure to support different implemenations. Listener
implements necessary function to load/save snapshot state.
Listener also implements on how to read and write actual
snapshot data. Added test to do basic baseline test.
  • Loading branch information
sanebay authored Aug 9, 2024
1 parent 837dba1 commit e371106
Show file tree
Hide file tree
Showing 12 changed files with 416 additions and 36 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.4.38"
version = "6.4.39"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
83 changes: 78 additions & 5 deletions src/include/homestore/replication/repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,60 @@ struct repl_key {
std::string to_string() const { return fmt::format("server={}, term={}, dsn={}", server_id, term, dsn); }
};

struct repl_snapshot {
uint64_t last_log_idx_{0};
uint64_t last_log_term_{0};
using repl_snapshot = nuraft::snapshot;
using repl_snapshot_ptr = nuraft::ptr< nuraft::snapshot >;

// Consumers of the ReplDevListener dont have to know what underlying
// snapshot implementation is used. Consumers can export and save the state
// of the snapshot using serialize and load the state using deserialize.
class snapshot_context {
public:
snapshot_context(int64_t lsn) : lsn_(lsn) {}
virtual ~snapshot_context() = default;
virtual void deserialize(const sisl::io_blob_safe& snp_ctx) = 0;
virtual sisl::io_blob_safe serialize() = 0;
int64_t get_lsn() { return lsn_; }

protected:
int64_t lsn_;
};

class nuraft_snapshot_context : public snapshot_context {
public:
nuraft_snapshot_context(nuraft::snapshot& snp) : snapshot_context(snp.get_last_log_idx()) {
auto snp_buf = snp.serialize();
snapshot_ = nuraft::snapshot::deserialize(*snp_buf);
}

void deserialize(const sisl::io_blob_safe& snp_ctx) override {
// Load the context from the io blob to nuraft buffer.
auto snp_buf = nuraft::buffer::alloc(snp_ctx.size());
nuraft::buffer_serializer bs(snp_buf);
bs.put_raw(snp_ctx.cbytes(), snp_ctx.size());
snapshot_ = nuraft::snapshot::deserialize(bs);
lsn_ = snapshot_->get_last_log_idx();
}

sisl::io_blob_safe serialize() override {
// Dump the context from nuraft buffer to the io blob.
auto snp_buf = snapshot_->serialize();
sisl::io_blob_safe blob{s_cast< size_t >(snp_buf->size())};
std::memcpy(blob.bytes(), snp_buf->data_begin(), snp_buf->size());
return blob;
}

nuraft::ptr< nuraft::snapshot > nuraft_snapshot() { return snapshot_; }

private:
nuraft::ptr< nuraft::snapshot > snapshot_;
};

struct snapshot_data {
void* user_ctx{nullptr};
int64_t offset{0};
sisl::io_blob_safe blob;
bool is_first_obj{false};
bool is_last_obj{false};
};

struct repl_journal_entry;
Expand Down Expand Up @@ -285,8 +336,30 @@ class ReplDevListener {
/// after restart in case crash happened during the destroy.
virtual void on_destroy() = 0;

/// @brief Called when the snapshot is being created by nuraft;
virtual AsyncReplResult<> create_snapshot(repl_snapshot& s) = 0;
/// @brief Called when the snapshot is being created by nuraft
virtual AsyncReplResult<> create_snapshot(shared< snapshot_context > context) = 0;

/// @brief Called when nuraft does the baseline resync and in the end apply snapshot.
virtual bool apply_snapshot(shared< snapshot_context > context) = 0;

/// @brief Get the last snapshot saved.
virtual shared< snapshot_context > last_snapshot() = 0;

/// @brief Called on the leader side when the follower wants to do baseline resync and leader
/// uses offset given by the follower to the know the current state of the follower.
/// Leader sends the snapshot data to the follower in batch. This callback is called multiple
/// times on the leader till all the data is transferred to the follower. is_last_obj in
/// snapshot_data will be true once all the data has been trasnferred. After this the raft on
/// the follower side can do the incremental resync.
virtual int read_snapshot_data(shared< snapshot_context > context, shared< snapshot_data > snp_data) = 0;

/// @brief Called on the follower when the leader sends the data during the baseline resyc.
/// is_last_obj in in snapshot_data will be true once all the data has been transfered.
/// After this the raft on the follower side can do the incremental resync.
virtual void write_snapshot_data(shared< snapshot_context > context, shared< snapshot_data > snp_data) = 0;

/// @brief Free up user-defined context inside the snapshot_data that is allocated during read_snapshot_data.
virtual void free_user_snp_ctx(void*& user_snp_ctx) = 0;

private:
std::weak_ptr< ReplDev > m_repl_dev;
Expand Down
5 changes: 5 additions & 0 deletions src/lib/logstore/log_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,11 @@ bool LogDev::can_flush_in_this_thread() {
// our threshold. If so, it first flushes whats accumulated so far and then add the pending flush size counter with
// the new record size
bool LogDev::flush_if_needed(int64_t threshold_size) {
{
std::unique_lock< std::mutex > lk{m_block_flush_q_mutex};
if (m_stopped) { return false; }
}

// If after adding the record size, if we have enough to flush or if its been too much time before we actually
// flushed, attempt to flush by setting the atomic bool variable.
if (threshold_size < 0) { threshold_size = LogDev::flush_data_threshold_size(); }
Expand Down
24 changes: 19 additions & 5 deletions src/lib/replication/log_store/home_raft_log_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <sisl/fds/utils.hpp>
#include "common/homestore_assert.hpp"
#include <homestore/homestore.hpp>
#include <iomgr/iomgr_flip.hpp>

using namespace homestore;

Expand Down Expand Up @@ -165,16 +166,21 @@ void HomeRaftLogStore::write_at(ulong index, nuraft::ptr< nuraft::log_entry >& e
void HomeRaftLogStore::end_of_append_batch(ulong start, ulong cnt) {
store_lsn_t end_lsn = to_store_lsn(start + cnt - 1);
m_log_store->flush_sync(end_lsn);
REPL_STORE_LOG(DEBUG, "end_of_append_batch flushed upto start={} cnt={} lsn={}", start, cnt, start + cnt - 1);
m_last_durable_lsn = end_lsn;
}

nuraft::ptr< std::vector< nuraft::ptr< nuraft::log_entry > > > HomeRaftLogStore::log_entries(ulong start, ulong end) {
auto out_vec = std::make_shared< std::vector< nuraft::ptr< nuraft::log_entry > > >();
m_log_store->foreach (to_store_lsn(start), [end, &out_vec](store_lsn_t cur, const log_buffer& entry) -> bool {
bool ret = (cur < to_store_lsn(end) - 1);
if (cur < to_store_lsn(end)) { out_vec->emplace_back(to_nuraft_log_entry(entry)); }
if (cur < to_store_lsn(end)) {
// REPL_STORE_LOG(TRACE, "log_entries lsn={}", cur + 1);
out_vec->emplace_back(to_nuraft_log_entry(entry));
}
return ret;
});
REPL_STORE_LOG(TRACE, "Num log entries start={} end={} num_entries={}", start, end, out_vec->size());
return out_vec;
}

Expand All @@ -184,7 +190,7 @@ nuraft::ptr< nuraft::log_entry > HomeRaftLogStore::entry_at(ulong index) {
auto log_bytes = m_log_store->read_sync(to_store_lsn(index));
nle = to_nuraft_log_entry(log_bytes);
} catch (const std::exception& e) {
REPL_STORE_LOG(ERROR, "entry_at({}) index out_of_range", index);
REPL_STORE_LOG(ERROR, "entry_at({}) index out_of_range start {} end {}", index, start_index(), last_index());
throw e;
}
return nle;
Expand All @@ -196,7 +202,7 @@ ulong HomeRaftLogStore::term_at(ulong index) {
auto log_bytes = m_log_store->read_sync(to_store_lsn(index));
term = extract_term(log_bytes);
} catch (const std::exception& e) {
REPL_STORE_LOG(ERROR, "term_at({}) index out_of_range", index);
REPL_STORE_LOG(ERROR, "term_at({}) index out_of_range start {} end {}", index, start_index(), last_index());
throw e;
}
return term;
Expand Down Expand Up @@ -271,7 +277,7 @@ bool HomeRaftLogStore::compact(ulong compact_lsn) {
if (cur_max_lsn < to_store_lsn(compact_lsn)) {
// release this assert if for some use case, we should tolorant this case;
// for now, don't expect this case to happen.
RELEASE_ASSERT(false, "compact_lsn={} is beyond the current max_lsn={}", compact_lsn, cur_max_lsn);
// RELEASE_ASSERT(false, "compact_lsn={} is beyond the current max_lsn={}", compact_lsn, cur_max_lsn);

// We need to fill the remaining entries with dummy data.
for (auto lsn{cur_max_lsn + 1}; lsn <= to_store_lsn(compact_lsn); ++lsn) {
Expand All @@ -284,7 +290,13 @@ bool HomeRaftLogStore::compact(ulong compact_lsn) {
// we rely on resrouce mgr timer to trigger truncate for all log stores in system;
// this will be friendly for multiple logstore on same logdev;

// m_log_store->truncate(to_store_lsn(compact_lsn));
#ifdef _PRERELEASE
if (iomgr_flip::instance()->test_flip("force_home_raft_log_truncate")) {
REPL_STORE_LOG(TRACE, "Flip force_home_raft_log_truncate is enabled, force truncation, compact_lsn={}",
compact_lsn);
m_log_store->truncate(to_store_lsn(compact_lsn));
}
#endif

return true;
}
Expand All @@ -301,4 +313,6 @@ ulong HomeRaftLogStore::last_durable_index() {

void HomeRaftLogStore::wait_for_log_store_ready() { m_log_store_future.wait(); }

void HomeRaftLogStore::set_last_durable_lsn(repl_lsn_t lsn) { m_last_durable_lsn = to_store_lsn(lsn); }

} // namespace homestore
1 change: 1 addition & 0 deletions src/lib/replication/log_store/home_raft_log_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ class HomeRaftLogStore : public nuraft::log_store {
void truncate(uint32_t num_reserved_cnt, repl_lsn_t compact_lsn);

void wait_for_log_store_ready();
void set_last_durable_lsn(repl_lsn_t lsn);

private:
logstore_id_t m_logstore_id;
Expand Down
6 changes: 3 additions & 3 deletions src/lib/replication/log_store/repl_log_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace homestore {

uint64_t ReplLogStore::append(nuraft::ptr< nuraft::log_entry >& entry) {
// We don't want to transform anything that is not an app log
if (entry->get_val_type() != nuraft::log_val_type::app_log) {
if (entry->get_val_type() != nuraft::log_val_type::app_log || entry->get_buf_ptr()->size() == 0) {
ulong lsn = HomeRaftLogStore::append(entry);
RD_LOGD("append entry term={}, log_val_type={} lsn={} size={}", entry->get_term(),
static_cast< uint32_t >(entry->get_val_type()), lsn, entry->get_buf().size());
Expand Down Expand Up @@ -57,8 +57,8 @@ void ReplLogStore::end_of_append_batch(ulong start_lsn, ulong count) {
}
}

RD_LOGT("Raft Channel: end_of_append_batch start_lsn={} count={} num_data_to_be_written={}", start_lsn, count,
reqs->size());
RD_LOGT("Raft Channel: end_of_append_batch start_lsn={} count={} num_data_to_be_written={} {}", start_lsn, count,
reqs->size(), proposer_reqs->size());

// All requests are from proposer for data write, so as mentioned above we can skip the flush for now
if (!reqs->empty()) {
Expand Down
9 changes: 5 additions & 4 deletions src/lib/replication/repl_dev/raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,19 +127,19 @@ folly::SemiFuture< ReplServiceError > RaftReplDev::destroy_group() {
LOGERROR("RaftReplDev::destroy_group failed {}", err);
}

LOGINFO("Raft repl dev destroy_group={}", boost::uuids::to_string(m_group_id));
return m_destroy_promise.getSemiFuture();
}

void RaftReplDev::use_config(json_superblk raft_config_sb) { m_raft_config_sb = std::move(raft_config_sb); }

void RaftReplDev::on_create_snapshot(nuraft::snapshot& s, nuraft::async_result< bool >::handler_type& when_done) {
RD_LOG(DEBUG, "create_snapshot last_idx={}/term={}", s.get_last_log_idx(), s.get_last_log_term());
repl_snapshot snapshot{.last_log_idx_ = s.get_last_log_idx(), .last_log_term_ = s.get_last_log_term()};
auto result = m_listener->create_snapshot(snapshot).get();
auto snp_ctx = std::make_shared< nuraft_snapshot_context >(s);
auto result = m_listener->create_snapshot(snp_ctx).get();
auto null_except = std::shared_ptr< std::exception >();
HS_REL_ASSERT(result.hasError() == false, "Not expecting creating snapshot to return false. ");
m_last_snapshot = nuraft::cs_new< nuraft::snapshot >(s.get_last_log_idx(), s.get_last_log_term(),
s.get_last_config(), s.size(), s.get_type());

auto ret_val{true};
if (when_done) { when_done(ret_val, null_except); }
}
Expand Down Expand Up @@ -1032,6 +1032,7 @@ std::pair< bool, nuraft::cb_func::ReturnCode > RaftReplDev::handle_raft_event(nu
auto reqs = sisl::VectorPool< repl_req_ptr_t >::alloc();
for (auto& entry : entries) {
if (entry->get_val_type() != nuraft::log_val_type::app_log) { continue; }
if (entry->get_buf_ptr()->size() == 0) { continue; }
auto req = m_state_machine->localize_journal_entry_prepare(*entry);
if (req == nullptr) {
sisl::VectorPool< repl_req_ptr_t >::free(reqs);
Expand Down
5 changes: 1 addition & 4 deletions src/lib/replication/repl_dev/raft_repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,6 @@ class RaftReplDev : public ReplDev,
folly::Promise< ReplServiceError > m_destroy_promise;
RaftReplDevMetrics m_metrics;

nuraft::ptr< nuraft::snapshot > m_last_snapshot{nullptr};

static std::atomic< uint64_t > s_next_group_ordinal;
bool m_log_store_replay_done{false};

Expand Down Expand Up @@ -170,6 +168,7 @@ class RaftReplDev : public ReplDev,
std::string my_replica_id_str() const { return boost::uuids::to_string(m_my_repl_id); }
uint32_t get_blk_size() const override;
repl_lsn_t get_last_commit_lsn() const { return m_commit_upto_lsn.load(); }
void set_last_commit_lsn(repl_lsn_t lsn) { m_commit_upto_lsn.store(lsn); }
bool is_destroy_pending() const;
bool is_destroyed() const;
Clock::time_point destroyed_time() const { return m_destroyed_time; }
Expand Down Expand Up @@ -217,8 +216,6 @@ class RaftReplDev : public ReplDev,
m_data_journal->truncate(num_reserved_entries, m_compact_lsn.load());
}

nuraft::ptr< nuraft::snapshot > get_last_snapshot() { return m_last_snapshot; }

void wait_for_logstore_ready() { m_data_journal->wait_for_log_store_ready(); }

void gc_repl_reqs();
Expand Down
63 changes: 61 additions & 2 deletions src/lib/replication/repl_dev/raft_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,10 @@ void RaftStateMachine::iterate_repl_reqs(std::function< void(int64_t, repl_req_p
}
}

uint64_t RaftStateMachine::last_commit_index() { return uint64_cast(m_rd.get_last_commit_lsn()); }
uint64_t RaftStateMachine::last_commit_index() {
RD_LOG(DEBUG, "Raft channel: last_commit_index {}", uint64_cast(m_rd.get_last_commit_lsn()));
return uint64_cast(m_rd.get_last_commit_lsn());
}

void RaftStateMachine::become_ready() { m_rd.become_ready(); }

Expand Down Expand Up @@ -240,7 +243,63 @@ void RaftStateMachine::create_snapshot(nuraft::snapshot& s, nuraft::async_result
m_rd.on_create_snapshot(s, when_done);
}

int RaftStateMachine::read_logical_snp_obj(nuraft::snapshot& s, void*& user_ctx, ulong obj_id, raft_buf_ptr_t& data_out,
bool& is_last_obj) {
auto snp_ctx = std::make_shared< nuraft_snapshot_context >(s);
auto snp_data = std::make_shared< snapshot_data >();
snp_data->user_ctx = user_ctx;
snp_data->offset = obj_id;
snp_data->is_last_obj = is_last_obj;

// Listener will read the snapshot data and we pass through the same.
int ret = m_rd.m_listener->read_snapshot_data(snp_ctx, snp_data);
if (ret < 0) return ret;

// Update user_ctx and whether is_last_obj
user_ctx = snp_data->user_ctx;
is_last_obj = snp_data->is_last_obj;

// We are doing a copy here.
data_out = nuraft::buffer::alloc(snp_data->blob.size());
nuraft::buffer_serializer bs(data_out);
bs.put_raw(snp_data->blob.cbytes(), snp_data->blob.size());
return ret;
}

void RaftStateMachine::save_logical_snp_obj(nuraft::snapshot& s, ulong& obj_id, nuraft::buffer& data, bool is_first_obj,
bool is_last_obj) {
auto snp_ctx = std::make_shared< nuraft_snapshot_context >(s);
auto snp_data = std::make_shared< snapshot_data >();
snp_data->offset = obj_id;
snp_data->is_first_obj = is_first_obj;
snp_data->is_last_obj = is_last_obj;

// We are doing a copy here.
sisl::io_blob_safe blob{s_cast< size_t >(data.size())};
std::memcpy(blob.bytes(), data.data_begin(), data.size());
snp_data->blob = std::move(blob);

m_rd.m_listener->write_snapshot_data(snp_ctx, snp_data);

// Update the object offset.
obj_id = snp_data->offset;
}

bool RaftStateMachine::apply_snapshot(nuraft::snapshot& s) {
m_rd.set_last_commit_lsn(s.get_last_log_idx());
m_rd.m_data_journal->set_last_durable_lsn(s.get_last_log_idx());
auto snp_ctx = std::make_shared< nuraft_snapshot_context >(s);
return m_rd.m_listener->apply_snapshot(snp_ctx);
}

nuraft::ptr< nuraft::snapshot > RaftStateMachine::last_snapshot() {
auto s = std::dynamic_pointer_cast< nuraft_snapshot_context >(m_rd.m_listener->last_snapshot());
if (s == nullptr) return nullptr;
return s->nuraft_snapshot();
}

void RaftStateMachine::free_user_snp_ctx(void*& user_snp_ctx) { m_rd.m_listener->free_user_snp_ctx(user_snp_ctx); }

std::string RaftStateMachine::rdev_name() const { return m_rd.rdev_name(); }

nuraft::ptr< nuraft::snapshot > RaftStateMachine::last_snapshot() { return m_rd.get_last_snapshot(); }
} // namespace homestore
8 changes: 6 additions & 2 deletions src/lib/replication/repl_dev/raft_state_machine.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,14 @@ class RaftStateMachine : public nuraft::state_machine {
void rollback(uint64_t lsn, nuraft::buffer&) override { LOGCRITICAL("Unimplemented rollback on: [{}]", lsn); }
void become_ready();

bool apply_snapshot(nuraft::snapshot&) override { return false; }

void create_snapshot(nuraft::snapshot& s, nuraft::async_result< bool >::handler_type& when_done) override;
int read_logical_snp_obj(nuraft::snapshot& s, void*& user_ctx, ulong obj_id, raft_buf_ptr_t& data_out,
bool& is_last_obj) override;
void save_logical_snp_obj(nuraft::snapshot& s, ulong& obj_id, nuraft::buffer& data, bool is_first_obj,
bool is_last_obj) override;
bool apply_snapshot(nuraft::snapshot& s) override;
nuraft::ptr< nuraft::snapshot > last_snapshot() override;
void free_user_snp_ctx(void*& user_snp_ctx) override;

////////// APIs outside of nuraft::state_machine requirements ////////////////////
ReplServiceError propose_to_raft(repl_req_ptr_t rreq);
Expand Down
Loading

0 comments on commit e371106

Please sign in to comment.