Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add new dsn for destroy group #535

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.4.54"
version = "6.4.55"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
6 changes: 4 additions & 2 deletions src/lib/replication/log_store/home_raft_log_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -262,10 +262,12 @@ raft_buf_ptr_t HomeRaftLogStore::pack(ulong index, int32_t cnt) {
m_log_store->foreach (
to_store_lsn(index),
[this, &out_buf, &remain_cnt]([[maybe_unused]] store_lsn_t cur, const log_buffer& entry) mutable -> bool {
size_t const total_entry_size = entry.size() + sizeof(uint32_t);
if (remain_cnt-- > 0) {
size_t avail_size = out_buf->size() - out_buf->pos();
if (avail_size < entry.size() + sizeof(uint32_t)) {
avail_size += std::max(out_buf->size() * 2, (size_t)entry.size() + sizeof(uint32_t));
// available size of packing buffer should be able to hold entry.size() and the length of this entry
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yamingk I address your comments for the last PR here, PATL

if (avail_size < total_entry_size) {
avail_size += std::max(out_buf->size() * 2, total_entry_size);
out_buf = nuraft::buffer::expand(*out_buf, avail_size);
}
REPL_STORE_LOG(TRACE, "packing lsn={} of size={}, avail_size in buffer={}", to_repl_lsn(cur),
Expand Down
24 changes: 15 additions & 9 deletions src/lib/replication/repl_dev/raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,19 @@ folly::SemiFuture< ReplServiceError > RaftReplDev::destroy_group() {

// Propose to the group to destroy
auto rreq = repl_req_ptr_t(new repl_req_ctx{});
rreq->init(repl_key{}, journal_type_t::HS_CTRL_DESTROY, true, sisl::blob{}, sisl::blob{}, 0);

// if we have a rreq {originator=1, term=1, dsn=0, lsn=7} in follower and a baseline resync is triggerd before the
// rreq is committed in the follower, then the on_commit of the rreq will not be called and as a result this rreq
// will become a garbage rreq in this follower. now if we trigger a destroy_group, a new rreq {originator=1, term=1,
// dsn=0} will created in the follower since the default dsn of a repl_key is 0.after the log of this rreq is
// appended to log store and get a new lsn, if we link the new lsn to the old rreq (rreq is identified by
// {originator, term, dsn}) which has alread have a lsn, then a assert will be throw out. pls refer to
// repl_req_ctx::set_lsn

// here, we set the dsn to a new one , which is definitely unique in the follower, so that the new rreq will not
// have a conflict with the old rreq.
rreq->init(repl_key{.server_id = server_id(), .term = raft_server()->get_term(), .dsn = m_next_dsn.fetch_add(1)},
journal_type_t::HS_CTRL_DESTROY, true, sisl::blob{}, sisl::blob{}, 0);

auto err = m_state_machine->propose_to_raft(std::move(rreq));
if (err != ReplServiceError::OK) {
Expand Down Expand Up @@ -526,7 +538,7 @@ void RaftReplDev::check_and_fetch_remote_data(std::vector< repl_req_ptr_t > rreq
void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs) {
if (rreqs.size() == 0) { return; }

std::vector<::flatbuffers::Offset< RequestEntry > > entries;
std::vector< ::flatbuffers::Offset< RequestEntry > > entries;
entries.reserve(rreqs.size());

shared< flatbuffers::FlatBufferBuilder > builder = std::make_shared< flatbuffers::FlatBufferBuilder >();
Expand Down Expand Up @@ -1050,13 +1062,7 @@ std::pair< bool, nuraft::cb_func::ReturnCode > RaftReplDev::handle_raft_event(nu
if (entry->get_val_type() != nuraft::log_val_type::app_log) { continue; }
if (entry->get_buf_ptr()->size() == 0) { continue; }
auto req = m_state_machine->localize_journal_entry_prepare(*entry);
// TODO :: we need to indentify whether this log entry should be appended to log store.
// 1 for lsn, if the req#lsn is not -1, it means this log has been localized and apeneded before, we
// should skip it.
// 2 for dsn, if the req#dsn is less than the next_dsn, it means this log has been
// committed, we should skip it.
// here, we only check the first condition for now. revisit here if we need to check the second
if (req == nullptr || req->lsn() != -1) {
if (req == nullptr) {
sisl::VectorPool< repl_req_ptr_t >::free(reqs);
return {true, nuraft::cb_func::ReturnCode::ReturnNull};
}
Expand Down
5 changes: 4 additions & 1 deletion src/lib/replication/repl_dev/raft_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,10 @@ void RaftStateMachine::become_ready() { m_rd.become_ready(); }

void RaftStateMachine::unlink_lsn_to_req(int64_t lsn) {
auto const it = m_lsn_req_map.find(lsn);
if (it != m_lsn_req_map.cend()) { m_lsn_req_map.erase(lsn); }
if (it != m_lsn_req_map.cend()) {
RD_LOG(DEBUG, "Raft channel: erase lsn {}, rreq {}", lsn, it->second->to_string());
m_lsn_req_map.erase(lsn);
}
}

void RaftStateMachine::link_lsn_to_req(repl_req_ptr_t rreq, int64_t lsn) {
Expand Down
2 changes: 1 addition & 1 deletion src/tests/test_raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ class RaftReplDevTest : public testing::Test {
// destroyed for ever. we need handle this in raft_repl_dev. revisit here after making changes at
// raft_repl_dev side to hanle this case. this is a workaround to avoid the infinite loop for now.
if (i++ > 10 && !force_leave) {
LOGWARN("Waiting for repl dev to get destroyed and it is leader, so do a force leave");
LOGWARN("has already waited for repl dev to get destroyed for 10 times, so do a force leave");
repl_dev->force_leave();
force_leave = true;
}
Expand Down
Loading