Skip to content

Commit

Permalink
handle RemovedFromCluster event (#594)
Browse files Browse the repository at this point in the history
1 consume nuraft::cb_func::Type::RemovedFromCluster callback
2 add reset function to allocator/vchunk as a preparation for implementing m_listener->on_destroy()
  • Loading branch information
JacksonYao287 authored Nov 25, 2024
1 parent da19fe4 commit 1a87f71
Show file tree
Hide file tree
Showing 16 changed files with 101 additions and 35 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.5.14"
version = "6.5.16"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
9 changes: 7 additions & 2 deletions src/include/homestore/replication/repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost::
MultiBlkId const& local_blkid() const { return m_local_blkid; }
RemoteBlkId const& remote_blkid() const { return m_remote_blkid; }
const char* data() const {
DEBUG_ASSERT(m_data != nullptr, "m_data is nullptr, use before save_pushed/fetched_data or after release_data()");
DEBUG_ASSERT(m_data != nullptr,
"m_data is nullptr, use before save_pushed/fetched_data or after release_data()");
return r_cast< const char* >(m_data);
}
repl_req_state_t state() const { return repl_req_state_t(m_state.load()); }
Expand Down Expand Up @@ -349,7 +350,7 @@ class ReplDevListener {
/// @brief Called when the repl_dev is being destroyed. The consumer is expected to clean up any related resources.
/// However, it is expected that this call be idempotent. It is possible in rare scenarios that this can be called
/// after restart in case crash happened during the destroy.
virtual void on_destroy() = 0;
virtual void on_destroy(const group_id_t& group_id) = 0;

/// @brief Called when replace member is performed.
virtual void on_replace_member(const replica_member_info& member_out, const replica_member_info& member_in) = 0;
Expand Down Expand Up @@ -450,6 +451,10 @@ class ReplDev {
/// @return Block size
virtual uint32_t get_blk_size() const = 0;

/// @brief Gets the last commit lsn of this repldev
/// @return last_commit_lsn
virtual repl_lsn_t get_last_commit_lsn() const = 0;

virtual void attach_listener(shared< ReplDevListener > listener) { m_listener = std::move(listener); }

virtual void detach_listener() {
Expand Down
1 change: 1 addition & 0 deletions src/include/homestore/vchunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class VChunk {
uint16_t get_chunk_id() const;
cshared< Chunk > get_internal_chunk() const;
uint64_t size() const;
void reset();

private:
shared< Chunk > m_internal_chunk;
Expand Down
7 changes: 7 additions & 0 deletions src/lib/blkalloc/append_blk_allocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,13 @@ bool AppendBlkAllocator::is_blk_alloced(const BlkId& in_bid, bool) const {
return in_bid.blk_num() < get_used_blks();
}

void AppendBlkAllocator::reset() {
m_last_append_offset.store(0);
m_freeable_nblks.store(0);
m_commit_offset.store(0);
m_is_dirty.store(true);
}

bool AppendBlkAllocator::is_blk_alloced_on_disk(BlkId const& bid, bool) const {
return bid.blk_num() < m_sb->commit_offset;
}
Expand Down
33 changes: 19 additions & 14 deletions src/lib/blkalloc/append_blk_allocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,21 @@ struct append_blk_sb_t {
};
#pragma pack()

//class AppendBlkAllocMetrics : public sisl::MetricsGroup {
//public:
// explicit AppendBlkAllocMetrics(const char* inst_name) : sisl::MetricsGroup("AppendBlkAlloc", inst_name) {
// REGISTER_COUNTER(num_alloc, "Number of blks alloc attempts");
// REGISTER_COUNTER(num_alloc_failure, "Number of blk alloc failures");
// class AppendBlkAllocMetrics : public sisl::MetricsGroup {
// public:
// explicit AppendBlkAllocMetrics(const char* inst_name) : sisl::MetricsGroup("AppendBlkAlloc", inst_name) {
// REGISTER_COUNTER(num_alloc, "Number of blks alloc attempts");
// REGISTER_COUNTER(num_alloc_failure, "Number of blk alloc failures");
//
// register_me_to_farm();
// }
// register_me_to_farm();
// }
//
// AppendBlkAllocMetrics(const AppendBlkAllocMetrics&) = delete;
// AppendBlkAllocMetrics(AppendBlkAllocMetrics&&) noexcept = delete;
// AppendBlkAllocMetrics& operator=(const AppendBlkAllocMetrics&) = delete;
// AppendBlkAllocMetrics& operator=(AppendBlkAllocMetrics&&) noexcept = delete;
// ~AppendBlkAllocMetrics() { deregister_me_from_farm(); }
//};
// AppendBlkAllocMetrics(const AppendBlkAllocMetrics&) = delete;
// AppendBlkAllocMetrics(AppendBlkAllocMetrics&&) noexcept = delete;
// AppendBlkAllocMetrics& operator=(const AppendBlkAllocMetrics&) = delete;
// AppendBlkAllocMetrics& operator=(AppendBlkAllocMetrics&&) noexcept = delete;
// ~AppendBlkAllocMetrics() { deregister_me_from_farm(); }
// };

//
// The assumption for AppendBlkAllocator:
Expand Down Expand Up @@ -108,6 +108,11 @@ class AppendBlkAllocator : public BlkAllocator {

std::string to_string() const override;

/**
* @brief : reset the allocator to initial state, so all the blks in this chunk are free.
*/
void reset() override;

void cp_flush(CP* cp) override;
void recovery_completed() override {}
nlohmann::json get_status(int log_level) const override;
Expand All @@ -121,7 +126,7 @@ class AppendBlkAllocator : public BlkAllocator {
std::atomic< blk_num_t > m_freeable_nblks{0}; // count of blks fragmentedly freed (both on-disk and in-memory)
std::atomic< blk_num_t > m_commit_offset{0}; // offset in on-disk version
std::atomic< bool > m_is_dirty{false};
//AppendBlkAllocMetrics m_metrics;
// AppendBlkAllocMetrics m_metrics;
superblk< append_blk_sb_t > m_sb; // only cp will be writing to this disk
};

Expand Down
1 change: 1 addition & 0 deletions src/lib/blkalloc/bitmap_blk_allocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class BitmapBlkAllocator : public BlkAllocator {
void cp_flush(CP* cp) override;

void recovery_completed() override {}
void reset() override {}
blk_num_t get_num_portions() const { return (m_num_blks - 1) / m_blks_per_portion + 1; }
blk_num_t get_blks_per_portion() const { return m_blks_per_portion; }

Expand Down
1 change: 1 addition & 0 deletions src/lib/blkalloc/blk_allocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ class BlkAllocator {
virtual bool is_blk_alloced(BlkId const& b, bool use_lock = false) const = 0;
virtual bool is_blk_alloced_on_disk(BlkId const& b, bool use_lock = false) const = 0;
virtual void recovery_completed() = 0;
virtual void reset() = 0;

virtual std::string to_string() const = 0;
virtual void cp_flush(CP* cp) = 0;
Expand Down
1 change: 1 addition & 0 deletions src/lib/blkalloc/fixed_blk_allocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class FixedBlkAllocator : public BitmapBlkAllocator {
blk_num_t available_blks() const override;
blk_num_t get_used_blks() const override;
blk_num_t get_defrag_nblks() const override;
void reset() override{};
bool is_blk_alloced(BlkId const& in_bid, bool use_lock = false) const override;
std::string to_string() const override;

Expand Down
1 change: 1 addition & 0 deletions src/lib/blkalloc/varsize_blk_allocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ class VarsizeBlkAllocator : public BitmapBlkAllocator {
blk_num_t get_used_blks() const override;
bool is_blk_alloced(BlkId const& in_bid, bool use_lock = false) const override;
std::string to_string() const override;
void reset() override{};
nlohmann::json get_metrics_in_json();

private:
Expand Down
2 changes: 2 additions & 0 deletions src/lib/device/vchunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ const uint8_t* VChunk::get_user_private() const { return m_internal_chunk->user_

blk_num_t VChunk::get_total_blks() const { return m_internal_chunk->blk_allocator()->get_total_blks(); }

void VChunk::reset() { m_internal_chunk->blk_allocator_mutable()->reset(); }

blk_num_t VChunk::available_blks() const { return m_internal_chunk->blk_allocator()->available_blks(); }

blk_num_t VChunk::get_defrag_nblks() const { return m_internal_chunk->blk_allocator()->get_defrag_nblks(); }
Expand Down
32 changes: 26 additions & 6 deletions src/lib/replication/repl_dev/raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -914,8 +914,7 @@ void RaftReplDev::handle_rollback(repl_req_ptr_t rreq) {
if (rreq->has_state(repl_req_state_t::BLK_ALLOCATED)) {
auto blkid = rreq->local_blkid();
data_service().async_free_blk(blkid).thenValue([this, blkid](auto&& err) {
HS_LOG_ASSERT(!err, "freeing blkid={} upon error failed, potential to cause blk leak",
blkid.to_string());
HS_LOG_ASSERT(!err, "freeing blkid={} upon error failed, potential to cause blk leak", blkid.to_string());
RD_LOGD("Rollback rreq: Releasing blkid={} freed successfully", blkid.to_string());
});
}
Expand Down Expand Up @@ -1212,7 +1211,7 @@ void RaftReplDev::leave() {

// We let the listener know right away, so that they can cleanup persistent structures soonest. This will
// reduce the time window of leaked resources if any
m_listener->on_destroy();
m_listener->on_destroy(group_id());

// Persist that destroy pending in superblk, so that in case of crash before cleanup of resources, it can be done
// post restart.
Expand All @@ -1227,7 +1226,8 @@ std::pair< bool, nuraft::cb_func::ReturnCode > RaftReplDev::handle_raft_event(nu
nuraft::cb_func::Param* param) {
auto ret = nuraft::cb_func::ReturnCode::Ok;

if (type == nuraft::cb_func::Type::GotAppendEntryReqFromLeader) {
switch (type) {
case nuraft::cb_func::Type::GotAppendEntryReqFromLeader: {
auto raft_req = r_cast< nuraft::req_msg* >(param->ctx);
auto const& entries = raft_req->log_entries();

Expand Down Expand Up @@ -1276,9 +1276,29 @@ std::pair< bool, nuraft::cb_func::ReturnCode > RaftReplDev::handle_raft_event(nu
sisl::VectorPool< repl_req_ptr_t >::free(reqs);
}
return {true, ret};
} else {
return {false, ret};
}

case nuraft::cb_func::Type::RemovedFromCluster: {
// a node will reach here when :
// 1. it is removed from the cluster and the new config(excluding this node) is being committed on this node
// 2. it is removed from the cluster , but the node is down and new config log(excluding this node) is not
// replicated to this removed node. when the node restart, leader will not send any append entry to this node,
// since it is not a member of the raft group. it will become a condidate and send request-vote request to other
// members of this raft group. a member will send RemovedFromCluster to the node if this member finds the node
// is no longer a member of the raft group.

// this will lazily cleanup the group
// TODO:cleanup this repl dev ASAP if necessary.
leave();

return {true, ret};
}

// TODO: Add more type handler if necessary
default:
break;
}
return {false, ret};
}

void RaftReplDev::flush_durable_commit_lsn() {
Expand Down
6 changes: 2 additions & 4 deletions src/lib/replication/repl_dev/raft_repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ class RaftReplDev : public ReplDev,
std::string rdev_name() const { return m_rdev_name; }
std::string my_replica_id_str() const { return boost::uuids::to_string(m_my_repl_id); }
uint32_t get_blk_size() const override;
repl_lsn_t get_last_commit_lsn() const { return m_commit_upto_lsn.load(); }
repl_lsn_t get_last_commit_lsn() const override { return m_commit_upto_lsn.load(); }
void set_last_commit_lsn(repl_lsn_t lsn) { m_commit_upto_lsn.store(lsn); }
bool is_destroy_pending() const;
bool is_destroyed() const;
Expand Down Expand Up @@ -229,9 +229,7 @@ class RaftReplDev : public ReplDev,
*
* @param num_reserved_entries The number of reserved entries of the replication log.
*/
void truncate(uint32_t num_reserved_entries) {
m_data_journal->truncate(num_reserved_entries, m_compact_lsn.load());
}
void truncate(uint32_t num_reserved_entries) { m_data_journal->truncate(num_reserved_entries, m_compact_lsn.load()); }

void wait_for_logstore_ready() { m_data_journal->wait_for_log_store_ready(); }

Expand Down
30 changes: 26 additions & 4 deletions src/lib/replication/repl_dev/raft_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,32 @@ raft_buf_ptr_t RaftStateMachine::commit_ext(nuraft::state_machine::ext_op_params
}

void RaftStateMachine::commit_config(const ulong log_idx, raft_cluster_config_ptr_t& new_conf) {
// when reaching here, the config change log has already been committed, and the new config has been applied to the
// cluster

RD_LOGD("Raft channel: Commit new cluster conf , log_idx = {}", log_idx);
// TODO:add more logic here if necessary

#ifdef _PRERELEASE
auto& servers_in_new_conf = new_conf->get_servers();
std::vector< int32_t > server_ids_in_new_conf;
for (auto& server : servers_in_new_conf)
server_ids_in_new_conf.emplace_back(server->get_id());

auto my_id = m_rd.server_id();

std::ostringstream oss;
auto it = server_ids_in_new_conf.begin();
if (it != server_ids_in_new_conf.end()) {
oss << *it;
++it;
}
for (; it != server_ids_in_new_conf.end(); ++it) {
oss << "," << *it;
}

RD_LOG(INFO, "Raft channel: server ids in new cluster conf : {}, my_id {}, group_id {}", oss.str(), my_id,
m_rd.group_id_str());
#endif
}

void RaftStateMachine::rollback_config(const ulong log_idx, raft_cluster_config_ptr_t& conf) {
Expand Down Expand Up @@ -242,9 +266,7 @@ void RaftStateMachine::unlink_lsn_to_req(int64_t lsn, repl_req_ptr_t rreq) {
// it is possible a LSN mapped to different rreq in history
// due to log overwritten. Verify the rreq before removing
auto deleted = m_lsn_req_map.erase_if_equal(lsn, rreq);
if (deleted) {
RD_LOG(DEBUG, "Raft channel: erase lsn {}, rreq {}", lsn, rreq->to_string());
}
if (deleted) { RD_LOG(DEBUG, "Raft channel: erase lsn {}, rreq {}", lsn, rreq->to_string()); }
}

void RaftStateMachine::link_lsn_to_req(repl_req_ptr_t rreq, int64_t lsn) {
Expand Down
2 changes: 2 additions & 0 deletions src/lib/replication/repl_dev/solo_repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ class SoloReplDev : public ReplDev {

uuid_t group_id() const override { return m_group_id; }

repl_lsn_t get_last_commit_lsn() const override { return 0; }

uint32_t get_blk_size() const override;

void cp_flush(CP* cp);
Expand Down
6 changes: 3 additions & 3 deletions src/tests/test_common/raft_repl_test_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -306,10 +306,10 @@ class TestReplicatedDB : public homestore::ReplDevListener {
boost::uuids::to_string(member_out.id), boost::uuids::to_string(member_in.id));
}

void on_destroy() override {
void on_destroy(const group_id_t& group_id) override {
LOGINFOMOD(replication, "[Replica={}] Group={} is being destroyed", g_helper->replica_num(),
boost::uuids::to_string(repl_dev()->group_id()));
g_helper->unregister_listener(repl_dev()->group_id());
boost::uuids::to_string(group_id));
g_helper->unregister_listener(group_id);
}

void db_write(uint64_t data_size, uint32_t max_size_per_iov) {
Expand Down
2 changes: 1 addition & 1 deletion src/tests/test_solo_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ class SoloReplDevTest : public testing::Test {
LOGINFO("Received error={} on repl_dev", enum_name(error));
}
void on_replace_member(const replica_member_info& member_out, const replica_member_info& member_in) override {}
void on_destroy() override {}
void on_destroy(const group_id_t& group_id) override {}
};

class Application : public ReplApplication {
Expand Down

0 comments on commit 1a87f71

Please sign in to comment.