Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PG cleanup for moved out member #242

Merged
merged 1 commit into from
Dec 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomeObjectConan(ConanFile):
name = "homeobject"
version = "2.1.17"
version = "2.1.18"

homepage = "https://github.com/eBay/HomeObject"
description = "Blob Store built on HomeReplication"
Expand Down
48 changes: 48 additions & 0 deletions src/lib/homestore_backend/heap_chunk_selector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,54 @@ bool HeapChunkSelector::release_chunk(const pg_id_t pg_id, const chunk_num_t v_c
return true;
}

bool HeapChunkSelector::reset_pg_chunks(pg_id_t pg_id) {
std::shared_lock lock_guard(m_chunk_selector_mtx);
auto pg_it = m_per_pg_chunks.find(pg_id);
if (pg_it == m_per_pg_chunks.end()) {
LOGWARNMOD(homeobject, "No pg found for pg_id {}", pg_id);
return false;
}
{
auto pg_chunk_collection = pg_it->second;
std::scoped_lock lock(pg_chunk_collection->mtx);
for (auto& chunk : pg_chunk_collection->m_pg_chunks) {
chunk->reset();
}
}
return true;
}

bool HeapChunkSelector::return_pg_chunks_to_dev_heap(const pg_id_t pg_id) {
std::unique_lock lock_guard(m_chunk_selector_mtx);
auto pg_it = m_per_pg_chunks.find(pg_id);
if (pg_it == m_per_pg_chunks.end()) {
LOGWARNMOD(homeobject, "No pg found for pg_id {}", pg_id);
return false;
}

auto pg_chunk_collection = pg_it->second;
auto pdev_id = pg_chunk_collection->m_pg_chunks[0]->get_pdev_id();
auto pdev_it = m_per_dev_heap.find(pdev_id);
RELEASE_ASSERT(pdev_it != m_per_dev_heap.end(), "pdev {} should in per dev heap", pdev_id);
auto pdev_heap = pdev_it->second;

{
std::scoped_lock lock(pdev_heap->mtx, pg_chunk_collection->mtx);
for (auto& chunk : pg_chunk_collection->m_pg_chunks) {
if (chunk->m_state == ChunkState::INUSE) {
chunk->m_state = ChunkState::AVAILABLE;
} // with shard which should be first
chunk->m_pg_id = std::nullopt;
chunk->m_v_chunk_id = std::nullopt;

pdev_heap->m_heap.emplace(chunk);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we fine to open this chunk for re-use by other PG at this point??

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe fine, another PG can pick these chunks but commit has to wait as commit is sequential

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a problem, at this point, maybe another create pg request is coming in and will reuse some chunks which belong to the destroyed pg. If a crash happened unfortunately, then recover HomeObject, there will be a situation that two pgs use some same chunks.

fixed it by resetting the chunks before destroying the pg super block, and only returning the chunks to the heap once the pg super block has been destroyed.

pdev_heap->available_blk_count += chunk->available_blks();
}
}
m_per_pg_chunks.erase(pg_it);
return true;
}

uint32_t HeapChunkSelector::get_chunk_size() const {
const auto chunk = m_chunks.begin()->second;
return chunk->size();
Expand Down
14 changes: 14 additions & 0 deletions src/lib/homestore_backend/heap_chunk_selector.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,20 @@ class HeapChunkSelector : public homestore::ChunkSelector {
// It is used in two scenarios: 1. seal shard 2. create shard rollback
bool release_chunk(const pg_id_t pg_id, const chunk_num_t v_chunk_id);

bool reset_pg_chunks(pg_id_t pg_id);

/**
* Releases all chunks associated with the specified pg_id.
*
* This function is used to return all chunks that are currently associated with a particular
* pg identified by the given pg_id. It is typically used in scenarios where
* all chunks associated with a pg need to be freed, such as pg move out.
*
* @param pg_id The ID of the protection group whose chunks are to be released.
* @return A boolean value indicating whether the operation was successful.
*/
bool return_pg_chunks_to_dev_heap(pg_id_t pg_id);

/**
* select chunks for pg, chunks need to be in same pdev.
*
Expand Down
48 changes: 48 additions & 0 deletions src/lib/homestore_backend/hs_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ static constexpr uint64_t io_align{512};
PGError toPgError(homestore::ReplServiceError const&);
BlobError toBlobError(homestore::ReplServiceError const&);
ShardError toShardError(homestore::ReplServiceError const&);
ENUM(PGState, uint8_t, ALIVE = 0, DESTROYED);

class HSHomeObject : public HomeObjectImpl {
private:
Expand Down Expand Up @@ -80,6 +81,7 @@ class HSHomeObject : public HomeObjectImpl {

struct pg_info_superblk {
pg_id_t id;
PGState state;
uint32_t num_members;
uint32_t num_chunks;
peer_id_t replica_set_uuid;
Expand Down Expand Up @@ -109,6 +111,7 @@ class HSHomeObject : public HomeObjectImpl {

pg_info_superblk& operator=(pg_info_superblk const& rhs) {
id = rhs.id;
state = rhs.state;
num_members = rhs.num_members;
num_chunks = rhs.num_chunks;
pg_size = rhs.pg_size;
Expand Down Expand Up @@ -411,6 +414,20 @@ class HSHomeObject : public HomeObjectImpl {
void on_pg_replace_member(homestore::group_id_t group_id, const homestore::replica_member_info& member_out,
const homestore::replica_member_info& member_in);

/**
* @brief Cleans up and recycles resources for the PG identified by the given pg_id on the current node.
*
* This function is called when the replication device leaves or when a specific PG is destroyed on the current
* node. Note that this function does not perform Raft synchronization with other nodes.
*
* Possible scenarios for calling this function include:
* - A member-out node cleaning up resources for a specified PG.
* - During baseline rsync to clean up PG resources on the current node.
*
* @param pg_id The ID of the PG to be destroyed.
*/
void pg_destroy(pg_id_t pg_id);

/**
* @brief Callback function invoked when a message is committed on a shard.
*
Expand Down Expand Up @@ -488,6 +505,7 @@ class HSHomeObject : public HomeObjectImpl {
size_t hash_len) const;

std::shared_ptr< BlobIndexTable > recover_index_table(homestore::superblk< homestore::index_table_sb >&& sb);
std::optional< pg_id_t > get_pg_id_with_group_id(homestore::group_id_t group_id) const;

private:
std::shared_ptr< BlobIndexTable > create_index_table();
Expand All @@ -512,6 +530,36 @@ class HSHomeObject : public HomeObjectImpl {
sisl::io_blob_safe& get_pad_buf(uint32_t pad_len);

// void trigger_timed_events();

/**
* @brief Marks the PG as destroyed.
*
* Updates the internal state to indicate that the specified PG is destroyed and ensures its state is persisted.
*
* @param pg_id The ID of the PG to be marked as destroyed.
*/
void mark_pg_destroyed(pg_id_t pg_id);

/**
* @brief Cleans up and recycles resources for shards in the PG located using a PG ID.
*
* @param pg_id The ID of the PG whose shards are to be destroyed.
*/
void destroy_shards(pg_id_t pg_id);

/**
* @brief Resets the chunks for the given PG ID and triggers a checkpoint flush.
*
* @param pg_id The ID of the PG whose chunks are to be reset.
*/
void reset_pg_chunks(pg_id_t pg_id);

/**
* @brief Cleans up and recycles resources for the PG located using a pg_id.
*
* @param pg_id The ID of the PG to be cleaned.
*/
void cleanup_pg_resources(pg_id_t pg_id);
};

class BlobIndexServiceCallbacks : public homestore::IndexServiceCallbacks {
Expand Down
85 changes: 82 additions & 3 deletions src/lib/homestore_backend/hs_pg_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,79 @@ void HSHomeObject::on_pg_replace_member(homestore::group_id_t group_id, const re
boost::uuids::to_string(member_in.id));
}

std::optional< pg_id_t > HSHomeObject::get_pg_id_with_group_id(homestore::group_id_t group_id) const {
auto lg = std::shared_lock(_pg_lock);
auto iter = std::find_if(_pg_map.begin(), _pg_map.end(), [group_id](const auto& entry) {
return pg_repl_dev(*entry.second).group_id() == group_id;
});
if (iter != _pg_map.end()) {
return iter->first;
} else {
return std::nullopt;
}
}

void HSHomeObject::pg_destroy(pg_id_t pg_id) {
mark_pg_destroyed(pg_id);
destroy_shards(pg_id);
reset_pg_chunks(pg_id);
cleanup_pg_resources(pg_id);
LOGI("pg {} is destroyed", pg_id);
JacksonYao287 marked this conversation as resolved.
Show resolved Hide resolved
}
void HSHomeObject::mark_pg_destroyed(pg_id_t pg_id) {
auto lg = std::scoped_lock(_pg_lock);
auto iter = _pg_map.find(pg_id);
if (iter == _pg_map.end()) {
LOGW("on pg destroy with unknown pg_id {}", pg_id);
return;
}
auto& pg = iter->second;
auto hs_pg = s_cast< HS_PG* >(pg.get());
hs_pg->pg_sb_->state = PGState::DESTROYED;
hs_pg->pg_sb_.write();
}

void HSHomeObject::reset_pg_chunks(pg_id_t pg_id) {
bool res = chunk_selector_->reset_pg_chunks(pg_id);
RELEASE_ASSERT(res, "Failed to reset all chunks in pg {}", pg_id);
auto fut = homestore::hs()->cp_mgr().trigger_cp_flush(true /* force */);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why triggering a cp flush only after reset chunks, should we trigger flush when all the cleanup is done?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

other cleanups are metaService only, which do not rely on cp. I think the intent is to do the cp as early as possible but I am fine to move the cp to anywhere before delete the pg superblk

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because only reset chunks rely on cp, other operation are metaService destroy only.
The reason why we need do cp before deleting pg superblk is if cp is after delete pg superblk, and a crash happened before do cp, when we do recovery we can't find the pg superblk anymore, and can not have the chance to reset chunks again.

Above all, I thought do cp after reset chunks can be good.

auto on_complete = [&](auto success) {
RELEASE_ASSERT(success, "Failed to trigger CP flush");
LOGI("CP Flush completed");
};
on_complete(std::move(fut).get());
}

void HSHomeObject::cleanup_pg_resources(pg_id_t pg_id) {
auto lg = std::scoped_lock(_pg_lock);
auto iter = _pg_map.find(pg_id);
if (iter == _pg_map.end()) {
LOGW("on pg resource release with unknown pg_id {}", pg_id);
return;
}

// destroy index table
auto& pg = iter->second;
auto hs_pg = s_cast< HS_PG* >(pg.get());
if (nullptr != hs_pg->index_table_) {
auto uuid_str = boost::uuids::to_string(hs_pg->index_table_->uuid());
index_table_pg_map_.erase(uuid_str);
hs()->index_service().remove_index_table(hs_pg->index_table_);
hs_pg->index_table_->destroy();
}

// destroy pg super blk
hs_pg->pg_sb_.destroy();
xiaoxichen marked this conversation as resolved.
Show resolved Hide resolved

// return pg chunks to dev heap
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you also explains if a crash happened here,what would be the process of recovering this chunks to heap?

Is that because we dont have pg superblock so these chunks are available by default during recovery?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are right.If a crash happened here, then we do recovery, as we don't have the pg superblk, so these chunks will belongs to dev heap by default.

// which must be done after destroying pg super blk to avoid multiple pg use same chunks
bool res = chunk_selector_->return_pg_chunks_to_dev_heap(pg_id);
RELEASE_ASSERT(res, "Failed to return pg {} chunks to dev_heap", pg_id);

// erase pg in pg map
_pg_map.erase(iter);
}

void HSHomeObject::add_pg_to_map(unique< HS_PG > hs_pg) {
RELEASE_ASSERT(hs_pg->pg_info_.replica_set_uuid == hs_pg->repl_dev_->group_id(),
"PGInfo replica set uuid mismatch with ReplDev instance for {}",
Expand Down Expand Up @@ -317,9 +390,14 @@ void HSHomeObject::on_pg_meta_blk_found(sisl::byte_view const& buf, void* meta_c
// add entry in map, so that index recovery can update the PG.
std::scoped_lock lg(index_lock_);
auto it = index_table_pg_map_.find(uuid_str);
RELEASE_ASSERT(it != index_table_pg_map_.end(), "IndexTable should be recovered before PG");
hs_pg->index_table_ = it->second.index_table;
it->second.pg_id = pg_id;
if (it != index_table_pg_map_.end()) {
hs_pg->index_table_ = it->second.index_table;
it->second.pg_id = pg_id;
} else {
RELEASE_ASSERT(hs_pg->pg_sb_->state == PGState::DESTROYED, "IndexTable should be recovered before PG");
hs_pg->index_table_ = nullptr;
LOGI("Index table not found for destroyed pg_id={}, index_table_uuid={}", pg_id, uuid_str);
}

add_pg_to_map(std::move(hs_pg));
}
Expand Down Expand Up @@ -349,6 +427,7 @@ HSHomeObject::HS_PG::HS_PG(PGInfo info, shared< homestore::ReplDev > rdev, share
pg_sb_.create(sizeof(pg_info_superblk) - sizeof(char) + pg_info_.members.size() * sizeof(pg_members) +
num_chunks * sizeof(homestore::chunk_num_t));
pg_sb_->id = pg_info_.id;
pg_sb_->state = PGState::ALIVE;
pg_sb_->num_members = pg_info_.members.size();
pg_sb_->num_chunks = num_chunks;
pg_sb_->pg_size = pg_info_.size;
Expand Down
20 changes: 20 additions & 0 deletions src/lib/homestore_backend/hs_shard_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ ShardError toShardError(ReplServiceError const& e) {
}
}


uint64_t ShardManager::max_shard_size() { return Gi; }

uint64_t ShardManager::max_shard_num_in_pg() { return ((uint64_t)0x01) << shard_width; }
Expand Down Expand Up @@ -526,6 +527,25 @@ bool HSHomeObject::release_chunk_based_on_create_shard_message(sisl::blob const&
}
}

void HSHomeObject::destroy_shards(pg_id_t pg_id) {
auto lg = std::scoped_lock(_pg_lock, _shard_lock);
auto iter = _pg_map.find(pg_id);
if (iter == _pg_map.end()) {
LOGW("on shards destroy with unknown pg_id {}", pg_id);
return;
}

auto& pg = iter->second;
for (auto& shard : pg->shards_) {
// release open shard v_chunk
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: remove this comment, we do not care about v_chunk here

auto hs_shard = s_cast< HS_Shard* >(shard.get());
// destroy shard super blk
hs_shard->sb_.destroy();
xiaoxichen marked this conversation as resolved.
Show resolved Hide resolved
// erase shard in shard map
_shard_map.erase(shard->info.id);
}
}

HSHomeObject::HS_Shard::HS_Shard(ShardInfo shard_info, homestore::chunk_num_t p_chunk_id,
homestore::chunk_num_t v_chunk_id) :
Shard(std::move(shard_info)), sb_(_shard_meta_name) {
Expand Down
10 changes: 8 additions & 2 deletions src/lib/homestore_backend/replication_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,14 @@ void ReplicationStateMachine::on_replace_member(const homestore::replica_member_
}

void ReplicationStateMachine::on_destroy(const homestore::group_id_t& group_id) {
// TODO:: add the logic to handle destroy
LOGI("replica destroyed");
auto PG_ID = home_object_->get_pg_id_with_group_id(group_id);
if (!PG_ID.has_value()) {
LOGW("do not have pg mapped by group_id {}", boost::uuids::to_string(group_id));
return;
}
home_object_->pg_destroy(PG_ID.value());
LOGI("replica destroyed, cleared PG {} resources with group_id {}", PG_ID.value(),
boost::uuids::to_string(group_id));
}

homestore::AsyncReplResult<>
Expand Down
20 changes: 18 additions & 2 deletions src/lib/homestore_backend/tests/homeobj_fixture.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,11 @@ class HomeObjectFixture : public ::testing::Test {
run_on_pg_leader(pg_id, [&]() {
auto v_chunkID = _obj_inst->get_shard_v_chunk_id(shard_id);
RELEASE_ASSERT(v_chunkID.has_value(), "failed to get shard v_chunk_id");
g_helper->set_v_chunk_id(v_chunkID.value());
g_helper->set_auxiliary_uint64_id(v_chunkID.value());
});

// get v_chunk_id from IPC and compare with local
auto leader_v_chunk_id = g_helper->get_v_chunk_id();
auto leader_v_chunk_id = g_helper->get_auxiliary_uint64_id();
auto local_v_chunkID = _obj_inst->get_shard_v_chunk_id(shard_id);
RELEASE_ASSERT(local_v_chunkID.has_value(), "failed to get shard v_chunk_id");
RELEASE_ASSERT(leader_v_chunk_id == local_v_chunkID, "v_chunk_id supposed to be identical");
Expand Down Expand Up @@ -318,6 +318,22 @@ class HomeObjectFixture : public ::testing::Test {
}
}

void verify_pg_destroy(pg_id_t pg_id, const string& index_table_uuid_str,
const std::vector< shard_id_t >& shard_id_vec) {
// check pg
ASSERT_FALSE(pg_exist(pg_id));
ASSERT_EQ(_obj_inst->index_table_pg_map_.find(index_table_uuid_str), _obj_inst->index_table_pg_map_.end());
// check shards
auto e = _obj_inst->shard_manager()->list_shards(pg_id).get();
ASSERT_EQ(e.error(), ShardError::UNKNOWN_PG);
for (const auto& shard_id : shard_id_vec) {
ASSERT_FALSE(shard_exist(shard_id));
}
// check chunk_selector
const auto& chunk_selector = _obj_inst->chunk_selector();
ASSERT_EQ(chunk_selector->m_per_pg_chunks.find(pg_id), chunk_selector->m_per_pg_chunks.end());
}

void verify_hs_pg(HSHomeObject::HS_PG* lhs_pg, HSHomeObject::HS_PG* rhs_pg) {
// verify index table
EXPECT_EQ(lhs_pg->index_table_->uuid(), rhs_pg->index_table_->uuid());
Expand Down
3 changes: 3 additions & 0 deletions src/lib/homestore_backend/tests/hs_blob_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ TEST_F(HomeObjectFixture, BasicEquivalence) {
}

TEST_F(HomeObjectFixture, BasicPutGetDelBlobWRestart) {
// test recovery with pristine state firstly
restart();

auto num_pgs = SISL_OPTIONS["num_pgs"].as< uint64_t >();
auto num_shards_per_pg = SISL_OPTIONS["num_shards"].as< uint64_t >() / num_pgs;

Expand Down
Loading
Loading