Skip to content

Commit

Permalink
Clear PG for moved out member
Browse files Browse the repository at this point in the history
When a member is moved out of a PG, clean up the PG and reclaim resources:

1. Reset all chunk allocators and return chunks for reassignment.
2. Remove the index table associated with this PG.
3. Remove this PG from the PG map.
4. Delete the PG and its related shard metadata.

Additionally, enhance restart test scenarios to include PG, shard, and block management.
  • Loading branch information
Hooper9973 committed Dec 12, 2024
1 parent c8cbe36 commit 1b2870d
Show file tree
Hide file tree
Showing 14 changed files with 230 additions and 23 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomeObjectConan(ConanFile):
name = "homeobject"
version = "2.1.17"
version = "2.1.18"

homepage = "https://github.com/eBay/HomeObject"
description = "Blob Store built on HomeReplication"
Expand Down
32 changes: 32 additions & 0 deletions src/lib/homestore_backend/heap_chunk_selector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,38 @@ bool HeapChunkSelector::release_chunk(const pg_id_t pg_id, const chunk_num_t v_c
return true;
}

bool HeapChunkSelector::release_all_chunks(const pg_id_t pg_id) {
std::unique_lock lock_guard(m_chunk_selector_mtx);
auto pg_it = m_per_pg_chunks.find(pg_id);
if (pg_it == m_per_pg_chunks.end()) {
LOGWARNMOD(homeobject, "No pg found for pg_id {}", pg_id);
return false;
}

auto pg_chunk_collection = pg_it->second;
auto pdev_id = pg_chunk_collection->m_pg_chunks[0]->get_pdev_id();
auto pdev_it = m_per_dev_heap.find(pdev_id);
RELEASE_ASSERT(pdev_it != m_per_dev_heap.end(), "pdev {} should in per dev heap", pdev_id);
auto pdev_heap = pdev_it->second;

{
std::scoped_lock lock(pdev_heap->mtx, pg_chunk_collection->mtx);
for (auto& chunk : pg_chunk_collection->m_pg_chunks) {
chunk->reset();
if (chunk->m_state == ChunkState::INUSE) {
chunk->m_state = ChunkState::AVAILABLE;
} // with shard which should be first
chunk->m_pg_id = std::nullopt;
chunk->m_v_chunk_id = std::nullopt;

pdev_heap->m_heap.emplace(chunk);
pdev_heap->available_blk_count += chunk->available_blks();
}
}
m_per_pg_chunks.erase(pg_it);
return true;
}

uint32_t HeapChunkSelector::get_chunk_size() const {
const auto chunk = m_chunks.begin()->second;
return chunk->size();
Expand Down
12 changes: 12 additions & 0 deletions src/lib/homestore_backend/heap_chunk_selector.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,18 @@ class HeapChunkSelector : public homestore::ChunkSelector {
// It is used in two scenarios: 1. seal shard 2. create shard rollback
bool release_chunk(const pg_id_t pg_id, const chunk_num_t v_chunk_id);

/**
* Releases all chunks associated with the specified pg_id.
*
* This function is used to release all chunks that are currently associated with a particular
* pg identified by the given pg_id. It is typically used in scenarios where
* all chunks associated with a pg need to be freed, such as pg move out.
*
* @param pg_id The ID of the protection group whose chunks are to be released.
* @return A boolean value indicating whether the operation was successful.
*/
bool release_all_chunks(const pg_id_t pg_id);

/**
* select chunks for pg, chunks need to be in same pdev.
*
Expand Down
28 changes: 28 additions & 0 deletions src/lib/homestore_backend/hs_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,20 @@ class HSHomeObject : public HomeObjectImpl {
void on_pg_replace_member(homestore::group_id_t group_id, const homestore::replica_member_info& member_out,
const homestore::replica_member_info& member_in);

/**
* @brief Cleans up and recycles resources for the PG identified by the given pg_id on the current node.
*
* This function is called when the replication device leaves or when a specific PG is destroyed on the current
* node. Note that this function does not perform Raft synchronization with other nodes.
*
* Possible scenarios for calling this function include:
* - A member-out node cleaning up resources for a specified PG.
* - During baseline rsync to clean up PG resources on the current node.
*
* @param pg_id The ID of the PG to be destroyed.
*/
void pg_destroy(pg_id_t pg_id);

/**
* @brief Callback function invoked when a message is committed on a shard.
*
Expand Down Expand Up @@ -488,6 +502,7 @@ class HSHomeObject : public HomeObjectImpl {
size_t hash_len) const;

std::shared_ptr< BlobIndexTable > recover_index_table(homestore::superblk< homestore::index_table_sb >&& sb);
std::optional< pg_id_t > get_pg_id_with_group_id(homestore::group_id_t group_id) const;

private:
std::shared_ptr< BlobIndexTable > create_index_table();
Expand All @@ -512,6 +527,19 @@ class HSHomeObject : public HomeObjectImpl {
sisl::io_blob_safe& get_pad_buf(uint32_t pad_len);

// void trigger_timed_events();

/**
* @brief Cleans up and recycles resources for shards in the PG located using a PG ID.
*
* @param pg_id The ID of the PG whose shards are to be destroyed.
*/
void on_shards_destroy(pg_id_t pg_id);
/**
* @brief Cleans up and recycles resources for the PG located using a pg_id.
*
* @param pg_id The ID of the PG to be cleaned.
*/
void on_pg_resources_cleanup(pg_id_t pg_id);
};

class BlobIndexServiceCallbacks : public homestore::IndexServiceCallbacks {
Expand Down
43 changes: 43 additions & 0 deletions src/lib/homestore_backend/hs_pg_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,49 @@ void HSHomeObject::on_pg_replace_member(homestore::group_id_t group_id, const re
boost::uuids::to_string(member_in.id));
}

std::optional< pg_id_t > HSHomeObject::get_pg_id_with_group_id(homestore::group_id_t group_id) const {
auto lg = std::shared_lock(_pg_lock);
auto iter = std::find_if(_pg_map.begin(), _pg_map.end(), [group_id](const auto& entry) {
return pg_repl_dev(*entry.second).group_id() == group_id;
});
if (iter != _pg_map.end()) {
return iter->first;
} else {
return std::nullopt;
}
}

void HSHomeObject::pg_destroy(pg_id_t pg_id) {
on_shards_destroy(pg_id);
on_pg_resources_cleanup(pg_id);
LOGI("pg {} is destroyed", pg_id);
}

void HSHomeObject::on_pg_resources_cleanup(pg_id_t pg_id) {
auto lg = std::scoped_lock(_pg_lock);
auto iter = _pg_map.find(pg_id);
if (iter == _pg_map.end()) {
LOGW("on pg resource release with unknown pg_id {}", pg_id);
return;
}

// release all pg chunks
bool res = chunk_selector_->release_all_chunks(pg_id);
RELEASE_ASSERT(res, "Failed to release all chunks in pg {}", pg_id);
auto& pg = iter->second;
auto hs_pg = s_cast< HS_PG* >(pg.get());

// destroy index table
auto uuid_str = boost::uuids::to_string(hs_pg->index_table_->uuid());
index_table_pg_map_.erase(uuid_str);
hs()->index_service().remove_index_table(hs_pg->index_table_);
hs_pg->index_table_->destroy();
// destroy pg super blk
hs_pg->pg_sb_.destroy();
// erase pg in pg map
_pg_map.erase(iter);
}

void HSHomeObject::add_pg_to_map(unique< HS_PG > hs_pg) {
RELEASE_ASSERT(hs_pg->pg_info_.replica_set_uuid == hs_pg->repl_dev_->group_id(),
"PGInfo replica set uuid mismatch with ReplDev instance for {}",
Expand Down
25 changes: 25 additions & 0 deletions src/lib/homestore_backend/hs_shard_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ ShardError toShardError(ReplServiceError const& e) {
}
}


uint64_t ShardManager::max_shard_size() { return Gi; }

uint64_t ShardManager::max_shard_num_in_pg() { return ((uint64_t)0x01) << shard_width; }
Expand Down Expand Up @@ -526,6 +527,30 @@ bool HSHomeObject::release_chunk_based_on_create_shard_message(sisl::blob const&
}
}

void HSHomeObject::on_shards_destroy(pg_id_t pg_id) {
auto lg = std::scoped_lock(_pg_lock, _shard_lock);
auto iter = _pg_map.find(pg_id);
if (iter == _pg_map.end()) {
LOGW("on shards destroy with unknown pg_id {}", pg_id);
return;
}

auto& pg = iter->second;
for (auto& shard : pg->shards_) {
// release open shard v_chunk
auto hs_shard = s_cast< HS_Shard* >(shard.get());
if (shard->info.state == ShardInfo::State::OPEN) {
bool res = chunk_selector_->release_chunk(shard->info.placement_group, hs_shard->sb_->v_chunk_id);
RELEASE_ASSERT(res, "Failed to release chunk with pg_id {} v_chunk_id {}", shard->info.placement_group,
hs_shard->sb_->v_chunk_id)
}
// destroy shard super blk
hs_shard->sb_.destroy();
// erase shard in shard map
_shard_map.erase(shard->info.id);
}
}

HSHomeObject::HS_Shard::HS_Shard(ShardInfo shard_info, homestore::chunk_num_t p_chunk_id,
homestore::chunk_num_t v_chunk_id) :
Shard(std::move(shard_info)), sb_(_shard_meta_name) {
Expand Down
10 changes: 8 additions & 2 deletions src/lib/homestore_backend/replication_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,14 @@ void ReplicationStateMachine::on_replace_member(const homestore::replica_member_
}

void ReplicationStateMachine::on_destroy(const homestore::group_id_t& group_id) {
// TODO:: add the logic to handle destroy
LOGI("replica destroyed");
auto PG_ID = home_object_->get_pg_id_with_group_id(group_id);
if (!PG_ID.has_value()) {
LOGW("do not have pg mapped by group_id {}", boost::uuids::to_string(group_id));
return;
}
home_object_->pg_destroy(PG_ID.value());
LOGI("replica destroyed, cleared PG {} resources with group_id {}", PG_ID.value(),
boost::uuids::to_string(group_id));
}

homestore::AsyncReplResult<>
Expand Down
4 changes: 2 additions & 2 deletions src/lib/homestore_backend/tests/homeobj_fixture.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,11 @@ class HomeObjectFixture : public ::testing::Test {
run_on_pg_leader(pg_id, [&]() {
auto v_chunkID = _obj_inst->get_shard_v_chunk_id(shard_id);
RELEASE_ASSERT(v_chunkID.has_value(), "failed to get shard v_chunk_id");
g_helper->set_v_chunk_id(v_chunkID.value());
g_helper->set_auxiliary_uint64_id(v_chunkID.value());
});

// get v_chunk_id from IPC and compare with local
auto leader_v_chunk_id = g_helper->get_v_chunk_id();
auto leader_v_chunk_id = g_helper->get_auxiliary_uint64_id();
auto local_v_chunkID = _obj_inst->get_shard_v_chunk_id(shard_id);
RELEASE_ASSERT(local_v_chunkID.has_value(), "failed to get shard v_chunk_id");
RELEASE_ASSERT(leader_v_chunk_id == local_v_chunkID, "v_chunk_id supposed to be identical");
Expand Down
3 changes: 3 additions & 0 deletions src/lib/homestore_backend/tests/hs_blob_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ TEST_F(HomeObjectFixture, BasicEquivalence) {
}

TEST_F(HomeObjectFixture, BasicPutGetDelBlobWRestart) {
// test recovery with pristine state firstly
restart();

auto num_pgs = SISL_OPTIONS["num_pgs"].as< uint64_t >();
auto num_shards_per_pg = SISL_OPTIONS["num_shards"].as< uint64_t >() / num_pgs;

Expand Down
9 changes: 6 additions & 3 deletions src/lib/homestore_backend/tests/hs_pg_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,19 +124,22 @@ TEST_F(HomeObjectFixture, PGSizeLessThanChunkTest) {
}

TEST_F(HomeObjectFixture, PGRecoveryTest) {
HSHomeObject* ho = dynamic_cast< HSHomeObject* >(_obj_inst.get());
auto id = ho->our_uuid();
// test recovery with pristine state firstly
restart();
EXPECT_EQ(id, _obj_inst->our_uuid());

// create 10 pg
for (pg_id_t i = 1; i < 11; i++) {
pg_id_t pg_id{i};
create_pg(pg_id);
}

// get pg map
HSHomeObject* ho = dynamic_cast< HSHomeObject* >(_obj_inst.get());
std::map< pg_id_t, std::unique_ptr< PG > > pg_map;
pg_map.swap(ho->_pg_map);

// get uuid
auto id = ho->our_uuid();

// restart
restart();
Expand Down
25 changes: 11 additions & 14 deletions src/lib/homestore_backend/tests/hs_repl_test_helper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ namespace bip = boost::interprocess;
using namespace homeobject;

#define INVALID_UINT64_ID UINT64_MAX
#define INVALID_CHUNK_NUM UINT16_MAX

namespace test_common {

Expand All @@ -57,7 +56,7 @@ class HSReplTestHelper {
sync_point_num_ = sync_point;
homeobject_replica_count_ = 0;
uint64_id_ = INVALID_UINT64_ID;
v_chunk_id_ = INVALID_CHUNK_NUM;
auxiliary_uint64_id_ = UINT64_MAX;
cv_.notify_all();
} else {
cv_.wait(lg, [this, sync_point]() { return sync_point_num_ == sync_point; });
Expand All @@ -74,26 +73,23 @@ class HSReplTestHelper {
return uint64_id_;
}

void set_v_chunk_id(homestore::chunk_num_t input_v_chunk_id) {
void set_auxiliary_uint64_id(uint64_t input_auxiliary_uint64_id) {
std::unique_lock< bip::interprocess_mutex > lg(mtx_);
v_chunk_id_ = input_v_chunk_id;
auxiliary_uint64_id_ = input_auxiliary_uint64_id;
}

homestore::chunk_num_t get_v_chunk_id() {
uint64_t get_auxiliary_uint64_id() {
std::unique_lock< bip::interprocess_mutex > lg(mtx_);
return v_chunk_id_;
return auxiliary_uint64_id_;
}

private:
bip::interprocess_mutex mtx_;
bip::interprocess_condition cv_;
uint8_t homeobject_replica_count_{0};

// the following variables are used to share shard_id and blob_id among different replicas
// the following variables are used to share shard_id, blob_id and others among different replicas
uint64_t uint64_id_{0};

// used to verify identical layout
homestore::chunk_num_t v_chunk_id_{0};
uint64_t auxiliary_uint64_id_{0};

// the nth synchronization point, that is how many times different replicas have synced
uint64_t sync_point_num_{UINT64_MAX};
Expand Down Expand Up @@ -271,9 +267,10 @@ class HSReplTestHelper {
void sync() { ipc_data_->sync(sync_point_num++, total_replicas_nums_); }
void set_uint64_id(uint64_t uint64_id) { ipc_data_->set_uint64_id(uint64_id); }
uint64_t get_uint64_id() { return ipc_data_->get_uint64_id(); }
void set_v_chunk_id(homestore::chunk_num_t v_chunk_id) { ipc_data_->set_v_chunk_id(v_chunk_id); }
homestore::chunk_num_t get_v_chunk_id() { return ipc_data_->get_v_chunk_id(); }

void set_auxiliary_uint64_id(uint64_t input_auxiliary_uint64_id) {
ipc_data_->set_auxiliary_uint64_id(input_auxiliary_uint64_id);
}
uint64_t get_auxiliary_uint64_id() { return ipc_data_->get_auxiliary_uint64_id(); }
void check_and_kill(int port) {
std::string command = "lsof -t -i:" + std::to_string(port);
if (::system(command.c_str())) {
Expand Down
3 changes: 3 additions & 0 deletions src/lib/homestore_backend/tests/hs_shard_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,9 @@ TEST_F(HomeObjectFixture, ShardManagerRecovery) {
}

TEST_F(HomeObjectFixture, SealedShardRecovery) {
// test recovery with pristine state firstly
restart();

pg_id_t pg_id{1};
create_pg(pg_id);

Expand Down
14 changes: 13 additions & 1 deletion src/lib/homestore_backend/tests/test_heap_chunk_selector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ uint16_t VChunk::get_chunk_id() const { return m_internal_chunk->get_chunk_id();
blk_num_t VChunk::get_total_blks() const { return m_internal_chunk->get_total_blks(); }

uint64_t VChunk::size() const { return m_internal_chunk->size(); }

void VChunk::reset() {}
cshared< Chunk > VChunk::get_internal_chunk() const { return m_internal_chunk; }

} // namespace homestore
Expand Down Expand Up @@ -276,6 +276,18 @@ TEST_F(HeapChunkSelectorTest, test_select_specific_chunk_and_release_chunk) {
}
}

TEST_F(HeapChunkSelectorTest, test_release_all_chunks) {
for (uint16_t pg_id = 1; pg_id < 4; ++pg_id) {
ASSERT_TRUE(HCS.release_all_chunks(pg_id));
ASSERT_EQ(HCS.m_per_pg_chunks.find(pg_id), HCS.m_per_pg_chunks.end());
ASSERT_EQ(HCS.m_per_dev_heap[pg_id]->available_blk_count, 1 + 2 + 3);
ASSERT_EQ(HCS.m_per_dev_heap[pg_id]->available_blk_count, HCS.m_per_dev_heap[pg_id]->m_total_blks);
}
for (const auto& [_, chunk] : HCS.m_chunks) {
ASSERT_EQ(chunk->m_state, ChunkState::AVAILABLE);
}
}

TEST_F(HeapChunkSelectorTest, test_recovery) {
HeapChunkSelector HCS_recovery;
HCS_recovery.add_chunk(std::make_shared< Chunk >(1, 1, 1, 9));
Expand Down
Loading

0 comments on commit 1b2870d

Please sign in to comment.