Skip to content

Commit

Permalink
add new dsn for destroy group (#535)
Browse files Browse the repository at this point in the history
  • Loading branch information
JacksonYao287 authored Sep 2, 2024
1 parent 7cd1eab commit 0fa6a0c
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 14 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.4.54"
version = "6.4.55"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
6 changes: 4 additions & 2 deletions src/lib/replication/log_store/home_raft_log_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -262,10 +262,12 @@ raft_buf_ptr_t HomeRaftLogStore::pack(ulong index, int32_t cnt) {
m_log_store->foreach (
to_store_lsn(index),
[this, &out_buf, &remain_cnt]([[maybe_unused]] store_lsn_t cur, const log_buffer& entry) mutable -> bool {
size_t const total_entry_size = entry.size() + sizeof(uint32_t);
if (remain_cnt-- > 0) {
size_t avail_size = out_buf->size() - out_buf->pos();
if (avail_size < entry.size() + sizeof(uint32_t)) {
avail_size += std::max(out_buf->size() * 2, (size_t)entry.size() + sizeof(uint32_t));
// available size of packing buffer should be able to hold entry.size() and the length of this entry
if (avail_size < total_entry_size) {
avail_size += std::max(out_buf->size() * 2, total_entry_size);
out_buf = nuraft::buffer::expand(*out_buf, avail_size);
}
REPL_STORE_LOG(TRACE, "packing lsn={} of size={}, avail_size in buffer={}", to_repl_lsn(cur),
Expand Down
24 changes: 15 additions & 9 deletions src/lib/replication/repl_dev/raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,19 @@ folly::SemiFuture< ReplServiceError > RaftReplDev::destroy_group() {

// Propose to the group to destroy
auto rreq = repl_req_ptr_t(new repl_req_ctx{});
rreq->init(repl_key{}, journal_type_t::HS_CTRL_DESTROY, true, sisl::blob{}, sisl::blob{}, 0);

// if we have a rreq {originator=1, term=1, dsn=0, lsn=7} in follower and a baseline resync is triggerd before the
// rreq is committed in the follower, then the on_commit of the rreq will not be called and as a result this rreq
// will become a garbage rreq in this follower. now if we trigger a destroy_group, a new rreq {originator=1, term=1,
// dsn=0} will created in the follower since the default dsn of a repl_key is 0.after the log of this rreq is
// appended to log store and get a new lsn, if we link the new lsn to the old rreq (rreq is identified by
// {originator, term, dsn}) which has alread have a lsn, then a assert will be throw out. pls refer to
// repl_req_ctx::set_lsn

// here, we set the dsn to a new one , which is definitely unique in the follower, so that the new rreq will not
// have a conflict with the old rreq.
rreq->init(repl_key{.server_id = server_id(), .term = raft_server()->get_term(), .dsn = m_next_dsn.fetch_add(1)},
journal_type_t::HS_CTRL_DESTROY, true, sisl::blob{}, sisl::blob{}, 0);

auto err = m_state_machine->propose_to_raft(std::move(rreq));
if (err != ReplServiceError::OK) {
Expand Down Expand Up @@ -526,7 +538,7 @@ void RaftReplDev::check_and_fetch_remote_data(std::vector< repl_req_ptr_t > rreq
void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs) {
if (rreqs.size() == 0) { return; }

std::vector<::flatbuffers::Offset< RequestEntry > > entries;
std::vector< ::flatbuffers::Offset< RequestEntry > > entries;
entries.reserve(rreqs.size());

shared< flatbuffers::FlatBufferBuilder > builder = std::make_shared< flatbuffers::FlatBufferBuilder >();
Expand Down Expand Up @@ -1050,13 +1062,7 @@ std::pair< bool, nuraft::cb_func::ReturnCode > RaftReplDev::handle_raft_event(nu
if (entry->get_val_type() != nuraft::log_val_type::app_log) { continue; }
if (entry->get_buf_ptr()->size() == 0) { continue; }
auto req = m_state_machine->localize_journal_entry_prepare(*entry);
// TODO :: we need to indentify whether this log entry should be appended to log store.
// 1 for lsn, if the req#lsn is not -1, it means this log has been localized and apeneded before, we
// should skip it.
// 2 for dsn, if the req#dsn is less than the next_dsn, it means this log has been
// committed, we should skip it.
// here, we only check the first condition for now. revisit here if we need to check the second
if (req == nullptr || req->lsn() != -1) {
if (req == nullptr) {
sisl::VectorPool< repl_req_ptr_t >::free(reqs);
return {true, nuraft::cb_func::ReturnCode::ReturnNull};
}
Expand Down
5 changes: 4 additions & 1 deletion src/lib/replication/repl_dev/raft_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,10 @@ void RaftStateMachine::become_ready() { m_rd.become_ready(); }

void RaftStateMachine::unlink_lsn_to_req(int64_t lsn) {
auto const it = m_lsn_req_map.find(lsn);
if (it != m_lsn_req_map.cend()) { m_lsn_req_map.erase(lsn); }
if (it != m_lsn_req_map.cend()) {
RD_LOG(DEBUG, "Raft channel: erase lsn {}, rreq {}", lsn, it->second->to_string());
m_lsn_req_map.erase(lsn);
}
}

void RaftStateMachine::link_lsn_to_req(repl_req_ptr_t rreq, int64_t lsn) {
Expand Down
2 changes: 1 addition & 1 deletion src/tests/test_raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ class RaftReplDevTest : public testing::Test {
// destroyed for ever. we need handle this in raft_repl_dev. revisit here after making changes at
// raft_repl_dev side to hanle this case. this is a workaround to avoid the infinite loop for now.
if (i++ > 10 && !force_leave) {
LOGWARN("Waiting for repl dev to get destroyed and it is leader, so do a force leave");
LOGWARN("has already waited for repl dev to get destroyed for 10 times, so do a force leave");
repl_dev->force_leave();
force_leave = true;
}
Expand Down

0 comments on commit 0fa6a0c

Please sign in to comment.