Skip to content

Commit

Permalink
Add baseline resync to support read and write snapshots.
Browse files Browse the repository at this point in the history
Add nuraft based snapshot sync across leader and followers.
Implement the read and write snapshot callback. Add a generic snapshot
structure to support different implemenations. Listener
implements necessary function to load/save snapshot state.
Listener also implements on how to read and write actual
snapshot data. Added test to do basic baseline test.
  • Loading branch information
sanebay committed Aug 8, 2024
1 parent 837dba1 commit 5ce38e6
Show file tree
Hide file tree
Showing 12 changed files with 416 additions and 36 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.4.38"
version = "6.4.39"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
83 changes: 78 additions & 5 deletions src/include/homestore/replication/repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,60 @@ struct repl_key {
std::string to_string() const { return fmt::format("server={}, term={}, dsn={}", server_id, term, dsn); }
};

struct repl_snapshot {
uint64_t last_log_idx_{0};
uint64_t last_log_term_{0};
using repl_snapshot = nuraft::snapshot;
using repl_snapshot_ptr = nuraft::ptr< nuraft::snapshot >;

// Consumers of the ReplDevListener dont have to know what underlying
// snapshot implementation is used. Consumers can export and save the state
// of the snapshot using serialize and load the state using deserialize.
class snapshot_context {
public:
snapshot_context(int64_t lsn) : lsn_(lsn) {}
virtual ~snapshot_context() = default;
virtual void deserialize(const sisl::io_blob_safe& snp_ctx) = 0;
virtual sisl::io_blob_safe serialize() = 0;
int64_t get_lsn() { return lsn_; }

protected:
int64_t lsn_;
};

class nuraft_snapshot_context : public snapshot_context {
public:
nuraft_snapshot_context(nuraft::snapshot& snp) : snapshot_context(snp.get_last_log_idx()) {
auto snp_buf = snp.serialize();
snapshot_ = nuraft::snapshot::deserialize(*snp_buf);
}

void deserialize(const sisl::io_blob_safe& snp_ctx) override {
// Load the context from the io blob to nuraft buffer.
auto snp_buf = nuraft::buffer::alloc(snp_ctx.size());
nuraft::buffer_serializer bs(snp_buf);
bs.put_raw(snp_ctx.cbytes(), snp_ctx.size());
snapshot_ = nuraft::snapshot::deserialize(bs);
lsn_ = snapshot_->get_last_log_idx();
}

sisl::io_blob_safe serialize() override {
// Dump the context from nuraft buffer to the io blob.
auto snp_buf = snapshot_->serialize();
sisl::io_blob_safe blob{s_cast< size_t >(snp_buf->size())};
std::memcpy(blob.bytes(), snp_buf->data_begin(), snp_buf->size());
return blob;
}

nuraft::ptr< nuraft::snapshot > nuraft_snapshot() { return snapshot_; }

private:
nuraft::ptr< nuraft::snapshot > snapshot_;
};

struct snapshot_data {
void* user_ctx{nullptr};
int64_t offset{0};
sisl::io_blob_safe blob;
bool is_first_obj{false};
bool is_last_obj{false};
};

struct repl_journal_entry;
Expand Down Expand Up @@ -285,8 +336,30 @@ class ReplDevListener {
/// after restart in case crash happened during the destroy.
virtual void on_destroy() = 0;

/// @brief Called when the snapshot is being created by nuraft;
virtual AsyncReplResult<> create_snapshot(repl_snapshot& s) = 0;
/// @brief Called when the snapshot is being created by nuraft
virtual AsyncReplResult<> create_snapshot(shared< snapshot_context > context) = 0;

/// @brief Called when nuraft does the baseline resync and in the end apply snapshot.
virtual bool apply_snapshot(shared< snapshot_context > context) = 0;

/// @brief Get the last snapshot saved.
virtual shared< snapshot_context > last_snapshot() = 0;

/// @brief Called on the leader side when the follower wants to do baseline resync and leader
/// uses offset given by the follower to the know the current state of the follower.
/// Leader sends the snapshot data to the follower in batch. This callback is called multiple
/// times on the leader till all the data is transferred to the follower. is_last_obj in
/// snapshot_data will be true once all the data has been trasnferred. After this the raft on
/// the follower side can do the incremental resync.
virtual int read_snapshot_data(shared< snapshot_context > context, shared< snapshot_data > snp_data) = 0;

/// @brief Called on the follower when the leader sends the data during the baseline resyc.
/// is_last_obj in in snapshot_data will be true once all the data has been transfered.
/// After this the raft on the follower side can do the incremental resync.
virtual void write_snapshot_data(shared< snapshot_context > context, shared< snapshot_data > snp_data) = 0;

/// @brief Free up user-defined context inside the snapshot_data that is allocated during read_snapshot_data.
virtual void free_user_snp_ctx(void*& user_snp_ctx) = 0;

private:
std::weak_ptr< ReplDev > m_repl_dev;
Expand Down
5 changes: 5 additions & 0 deletions src/lib/logstore/log_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,11 @@ bool LogDev::can_flush_in_this_thread() {
// our threshold. If so, it first flushes whats accumulated so far and then add the pending flush size counter with
// the new record size
bool LogDev::flush_if_needed(int64_t threshold_size) {
{
std::unique_lock< std::mutex > lk{m_block_flush_q_mutex};
if (m_stopped) { return false; }
}

// If after adding the record size, if we have enough to flush or if its been too much time before we actually
// flushed, attempt to flush by setting the atomic bool variable.
if (threshold_size < 0) { threshold_size = LogDev::flush_data_threshold_size(); }
Expand Down
24 changes: 19 additions & 5 deletions src/lib/replication/log_store/home_raft_log_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <sisl/fds/utils.hpp>
#include "common/homestore_assert.hpp"
#include <homestore/homestore.hpp>
#include <iomgr/iomgr_flip.hpp>

using namespace homestore;

Expand Down Expand Up @@ -165,16 +166,21 @@ void HomeRaftLogStore::write_at(ulong index, nuraft::ptr< nuraft::log_entry >& e
void HomeRaftLogStore::end_of_append_batch(ulong start, ulong cnt) {
store_lsn_t end_lsn = to_store_lsn(start + cnt - 1);
m_log_store->flush_sync(end_lsn);
REPL_STORE_LOG(DEBUG, "end_of_append_batch flushed upto start={} cnt={} lsn={}", start, cnt, start + cnt - 1);
m_last_durable_lsn = end_lsn;
}

nuraft::ptr< std::vector< nuraft::ptr< nuraft::log_entry > > > HomeRaftLogStore::log_entries(ulong start, ulong end) {
auto out_vec = std::make_shared< std::vector< nuraft::ptr< nuraft::log_entry > > >();
m_log_store->foreach (to_store_lsn(start), [end, &out_vec](store_lsn_t cur, const log_buffer& entry) -> bool {
bool ret = (cur < to_store_lsn(end) - 1);
if (cur < to_store_lsn(end)) { out_vec->emplace_back(to_nuraft_log_entry(entry)); }
if (cur < to_store_lsn(end)) {
// REPL_STORE_LOG(TRACE, "log_entries lsn={}", cur + 1);
out_vec->emplace_back(to_nuraft_log_entry(entry));
}
return ret;
});
REPL_STORE_LOG(TRACE, "Num log entries start={} end={} num_entries={}", start, end, out_vec->size());
return out_vec;
}

Expand All @@ -184,7 +190,7 @@ nuraft::ptr< nuraft::log_entry > HomeRaftLogStore::entry_at(ulong index) {
auto log_bytes = m_log_store->read_sync(to_store_lsn(index));
nle = to_nuraft_log_entry(log_bytes);
} catch (const std::exception& e) {
REPL_STORE_LOG(ERROR, "entry_at({}) index out_of_range", index);
REPL_STORE_LOG(ERROR, "entry_at({}) index out_of_range start {} end {}", index, start_index(), last_index());
throw e;
}
return nle;
Expand All @@ -196,7 +202,7 @@ ulong HomeRaftLogStore::term_at(ulong index) {
auto log_bytes = m_log_store->read_sync(to_store_lsn(index));
term = extract_term(log_bytes);
} catch (const std::exception& e) {
REPL_STORE_LOG(ERROR, "term_at({}) index out_of_range", index);
REPL_STORE_LOG(ERROR, "term_at({}) index out_of_range start {} end {}", index, start_index(), last_index());
throw e;
}
return term;
Expand Down Expand Up @@ -271,7 +277,7 @@ bool HomeRaftLogStore::compact(ulong compact_lsn) {
if (cur_max_lsn < to_store_lsn(compact_lsn)) {
// release this assert if for some use case, we should tolorant this case;
// for now, don't expect this case to happen.
RELEASE_ASSERT(false, "compact_lsn={} is beyond the current max_lsn={}", compact_lsn, cur_max_lsn);
// RELEASE_ASSERT(false, "compact_lsn={} is beyond the current max_lsn={}", compact_lsn, cur_max_lsn);

// We need to fill the remaining entries with dummy data.
for (auto lsn{cur_max_lsn + 1}; lsn <= to_store_lsn(compact_lsn); ++lsn) {
Expand All @@ -284,7 +290,13 @@ bool HomeRaftLogStore::compact(ulong compact_lsn) {
// we rely on resrouce mgr timer to trigger truncate for all log stores in system;
// this will be friendly for multiple logstore on same logdev;

// m_log_store->truncate(to_store_lsn(compact_lsn));
#ifdef _PRERELEASE
if (iomgr_flip::instance()->test_flip("force_home_raft_log_truncate")) {
REPL_STORE_LOG(TRACE, "Flip force_home_raft_log_truncate is enabled, force truncation, compact_lsn={}",
compact_lsn);
m_log_store->truncate(to_store_lsn(compact_lsn));
}
#endif

return true;
}
Expand All @@ -301,4 +313,6 @@ ulong HomeRaftLogStore::last_durable_index() {

void HomeRaftLogStore::wait_for_log_store_ready() { m_log_store_future.wait(); }

void HomeRaftLogStore::set_last_durable_lsn(repl_lsn_t lsn) { m_last_durable_lsn = to_store_lsn(lsn); }

} // namespace homestore
1 change: 1 addition & 0 deletions src/lib/replication/log_store/home_raft_log_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ class HomeRaftLogStore : public nuraft::log_store {
void truncate(uint32_t num_reserved_cnt, repl_lsn_t compact_lsn);

void wait_for_log_store_ready();
void set_last_durable_lsn(repl_lsn_t lsn);

private:
logstore_id_t m_logstore_id;
Expand Down
6 changes: 3 additions & 3 deletions src/lib/replication/log_store/repl_log_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace homestore {

uint64_t ReplLogStore::append(nuraft::ptr< nuraft::log_entry >& entry) {
// We don't want to transform anything that is not an app log
if (entry->get_val_type() != nuraft::log_val_type::app_log) {
if (entry->get_val_type() != nuraft::log_val_type::app_log || entry->get_buf_ptr()->size() == 0) {
ulong lsn = HomeRaftLogStore::append(entry);
RD_LOGD("append entry term={}, log_val_type={} lsn={} size={}", entry->get_term(),
static_cast< uint32_t >(entry->get_val_type()), lsn, entry->get_buf().size());
Expand Down Expand Up @@ -57,8 +57,8 @@ void ReplLogStore::end_of_append_batch(ulong start_lsn, ulong count) {
}
}

RD_LOGT("Raft Channel: end_of_append_batch start_lsn={} count={} num_data_to_be_written={}", start_lsn, count,
reqs->size());
RD_LOGT("Raft Channel: end_of_append_batch start_lsn={} count={} num_data_to_be_written={} {}", start_lsn, count,
reqs->size(), proposer_reqs->size());

// All requests are from proposer for data write, so as mentioned above we can skip the flush for now
if (!reqs->empty()) {
Expand Down
9 changes: 5 additions & 4 deletions src/lib/replication/repl_dev/raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,19 +127,19 @@ folly::SemiFuture< ReplServiceError > RaftReplDev::destroy_group() {
LOGERROR("RaftReplDev::destroy_group failed {}", err);
}

LOGINFO("Raft repl dev destroy_group={}", boost::uuids::to_string(m_group_id));
return m_destroy_promise.getSemiFuture();
}

void RaftReplDev::use_config(json_superblk raft_config_sb) { m_raft_config_sb = std::move(raft_config_sb); }

void RaftReplDev::on_create_snapshot(nuraft::snapshot& s, nuraft::async_result< bool >::handler_type& when_done) {
RD_LOG(DEBUG, "create_snapshot last_idx={}/term={}", s.get_last_log_idx(), s.get_last_log_term());
repl_snapshot snapshot{.last_log_idx_ = s.get_last_log_idx(), .last_log_term_ = s.get_last_log_term()};
auto result = m_listener->create_snapshot(snapshot).get();
auto snp_ctx = std::make_shared< nuraft_snapshot_context >(s);
auto result = m_listener->create_snapshot(snp_ctx).get();
auto null_except = std::shared_ptr< std::exception >();
HS_REL_ASSERT(result.hasError() == false, "Not expecting creating snapshot to return false. ");
m_last_snapshot = nuraft::cs_new< nuraft::snapshot >(s.get_last_log_idx(), s.get_last_log_term(),
s.get_last_config(), s.size(), s.get_type());

auto ret_val{true};
if (when_done) { when_done(ret_val, null_except); }
}
Expand Down Expand Up @@ -1032,6 +1032,7 @@ std::pair< bool, nuraft::cb_func::ReturnCode > RaftReplDev::handle_raft_event(nu
auto reqs = sisl::VectorPool< repl_req_ptr_t >::alloc();
for (auto& entry : entries) {
if (entry->get_val_type() != nuraft::log_val_type::app_log) { continue; }
if (entry->get_buf_ptr()->size() == 0) { continue; }
auto req = m_state_machine->localize_journal_entry_prepare(*entry);
if (req == nullptr) {
sisl::VectorPool< repl_req_ptr_t >::free(reqs);
Expand Down
5 changes: 1 addition & 4 deletions src/lib/replication/repl_dev/raft_repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,6 @@ class RaftReplDev : public ReplDev,
folly::Promise< ReplServiceError > m_destroy_promise;
RaftReplDevMetrics m_metrics;

nuraft::ptr< nuraft::snapshot > m_last_snapshot{nullptr};

static std::atomic< uint64_t > s_next_group_ordinal;
bool m_log_store_replay_done{false};

Expand Down Expand Up @@ -170,6 +168,7 @@ class RaftReplDev : public ReplDev,
std::string my_replica_id_str() const { return boost::uuids::to_string(m_my_repl_id); }
uint32_t get_blk_size() const override;
repl_lsn_t get_last_commit_lsn() const { return m_commit_upto_lsn.load(); }
void set_last_commit_lsn(repl_lsn_t lsn) { m_commit_upto_lsn.store(lsn); }
bool is_destroy_pending() const;
bool is_destroyed() const;
Clock::time_point destroyed_time() const { return m_destroyed_time; }
Expand Down Expand Up @@ -217,8 +216,6 @@ class RaftReplDev : public ReplDev,
m_data_journal->truncate(num_reserved_entries, m_compact_lsn.load());
}

nuraft::ptr< nuraft::snapshot > get_last_snapshot() { return m_last_snapshot; }

void wait_for_logstore_ready() { m_data_journal->wait_for_log_store_ready(); }

void gc_repl_reqs();
Expand Down
63 changes: 61 additions & 2 deletions src/lib/replication/repl_dev/raft_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,10 @@ void RaftStateMachine::iterate_repl_reqs(std::function< void(int64_t, repl_req_p
}
}

uint64_t RaftStateMachine::last_commit_index() { return uint64_cast(m_rd.get_last_commit_lsn()); }
uint64_t RaftStateMachine::last_commit_index() {
RD_LOG(DEBUG, "Raft channel: last_commit_index {}", uint64_cast(m_rd.get_last_commit_lsn()));
return uint64_cast(m_rd.get_last_commit_lsn());
}

void RaftStateMachine::become_ready() { m_rd.become_ready(); }

Expand Down Expand Up @@ -240,7 +243,63 @@ void RaftStateMachine::create_snapshot(nuraft::snapshot& s, nuraft::async_result
m_rd.on_create_snapshot(s, when_done);
}

int RaftStateMachine::read_logical_snp_obj(nuraft::snapshot& s, void*& user_ctx, ulong obj_id, raft_buf_ptr_t& data_out,
bool& is_last_obj) {
auto snp_ctx = std::make_shared< nuraft_snapshot_context >(s);
auto snp_data = std::make_shared< snapshot_data >();
snp_data->user_ctx = user_ctx;
snp_data->offset = obj_id;
snp_data->is_last_obj = is_last_obj;

// Listener will read the snapshot data and we pass through the same.
int ret = m_rd.m_listener->read_snapshot_data(snp_ctx, snp_data);
if (ret < 0) return ret;

// Update user_ctx and whether is_last_obj
user_ctx = snp_data->user_ctx;
is_last_obj = snp_data->is_last_obj;

// We are doing a copy here.
data_out = nuraft::buffer::alloc(snp_data->blob.size());
nuraft::buffer_serializer bs(data_out);
bs.put_raw(snp_data->blob.cbytes(), snp_data->blob.size());
return ret;
}

void RaftStateMachine::save_logical_snp_obj(nuraft::snapshot& s, ulong& obj_id, nuraft::buffer& data, bool is_first_obj,
bool is_last_obj) {
auto snp_ctx = std::make_shared< nuraft_snapshot_context >(s);
auto snp_data = std::make_shared< snapshot_data >();
snp_data->offset = obj_id;
snp_data->is_first_obj = is_first_obj;
snp_data->is_last_obj = is_last_obj;

// We are doing a copy here.
sisl::io_blob_safe blob{s_cast< size_t >(data.size())};
std::memcpy(blob.bytes(), data.data_begin(), data.size());
snp_data->blob = std::move(blob);

m_rd.m_listener->write_snapshot_data(snp_ctx, snp_data);

// Update the object offset.
obj_id = snp_data->offset;
}

bool RaftStateMachine::apply_snapshot(nuraft::snapshot& s) {
m_rd.set_last_commit_lsn(s.get_last_log_idx());
m_rd.m_data_journal->set_last_durable_lsn(s.get_last_log_idx());
auto snp_ctx = std::make_shared< nuraft_snapshot_context >(s);
return m_rd.m_listener->apply_snapshot(snp_ctx);
}

nuraft::ptr< nuraft::snapshot > RaftStateMachine::last_snapshot() {
auto s = std::dynamic_pointer_cast< nuraft_snapshot_context >(m_rd.m_listener->last_snapshot());
if (s == nullptr) return nullptr;
return s->nuraft_snapshot();
}

void RaftStateMachine::free_user_snp_ctx(void*& user_snp_ctx) { m_rd.m_listener->free_user_snp_ctx(user_snp_ctx); }

std::string RaftStateMachine::rdev_name() const { return m_rd.rdev_name(); }

nuraft::ptr< nuraft::snapshot > RaftStateMachine::last_snapshot() { return m_rd.get_last_snapshot(); }
} // namespace homestore
8 changes: 6 additions & 2 deletions src/lib/replication/repl_dev/raft_state_machine.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,14 @@ class RaftStateMachine : public nuraft::state_machine {
void rollback(uint64_t lsn, nuraft::buffer&) override { LOGCRITICAL("Unimplemented rollback on: [{}]", lsn); }
void become_ready();

bool apply_snapshot(nuraft::snapshot&) override { return false; }

void create_snapshot(nuraft::snapshot& s, nuraft::async_result< bool >::handler_type& when_done) override;
int read_logical_snp_obj(nuraft::snapshot& s, void*& user_ctx, ulong obj_id, raft_buf_ptr_t& data_out,
bool& is_last_obj) override;
void save_logical_snp_obj(nuraft::snapshot& s, ulong& obj_id, nuraft::buffer& data, bool is_first_obj,
bool is_last_obj) override;
bool apply_snapshot(nuraft::snapshot& s) override;
nuraft::ptr< nuraft::snapshot > last_snapshot() override;
void free_user_snp_ctx(void*& user_snp_ctx) override;

////////// APIs outside of nuraft::state_machine requirements ////////////////////
ReplServiceError propose_to_raft(repl_req_ptr_t rreq);
Expand Down
Loading

0 comments on commit 5ce38e6

Please sign in to comment.