Skip to content

Commit

Permalink
Clear PG for moved out member
Browse files Browse the repository at this point in the history
When a member is moved out of a PG, clean up the PG and reclaim resources:

1. Reset all chunk allocators and return chunks for reassignment.
2. Remove the index table associated with this PG.
3. Remove this PG from the PG map.
4. Delete the PG and its related shard metadata.

Additionally, enhance restart test scenarios to include PG, shard, and block management.
  • Loading branch information
Hooper9973 committed Dec 12, 2024
1 parent c8cbe36 commit 70face9
Show file tree
Hide file tree
Showing 14 changed files with 197 additions and 23 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomeObjectConan(ConanFile):
name = "homeobject"
version = "2.1.17"
version = "2.1.18"

homepage = "https://github.com/eBay/HomeObject"
description = "Blob Store built on HomeReplication"
Expand Down
32 changes: 32 additions & 0 deletions src/lib/homestore_backend/heap_chunk_selector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,38 @@ bool HeapChunkSelector::release_chunk(const pg_id_t pg_id, const chunk_num_t v_c
return true;
}

bool HeapChunkSelector::release_all_chunks(const pg_id_t pg_id) {
std::unique_lock lock_guard(m_chunk_selector_mtx);
auto pg_it = m_per_pg_chunks.find(pg_id);
if (pg_it == m_per_pg_chunks.end()) {
LOGWARNMOD(homeobject, "No pg found for pg_id {}", pg_id);
return false;
}

auto pg_chunk_collection = pg_it->second;
auto pdev_id = pg_chunk_collection->m_pg_chunks[0]->get_pdev_id();
auto pdev_it = m_per_dev_heap.find(pdev_id);
RELEASE_ASSERT(pdev_it != m_per_dev_heap.end(), "pdev {} should in per dev heap", pdev_id);
auto pdev_heap = pdev_it->second;

{
std::scoped_lock lock(pdev_heap->mtx, pg_chunk_collection->mtx);
for (auto& chunk : pg_chunk_collection->m_pg_chunks) {
chunk->reset();
if (chunk->m_state == ChunkState::INUSE) {
chunk->m_state = ChunkState::AVAILABLE;
} // with shard which should be first
chunk->m_pg_id = std::nullopt;
chunk->m_v_chunk_id = std::nullopt;

pdev_heap->m_heap.emplace(chunk);
pdev_heap->available_blk_count += chunk->available_blks();
}
}
m_per_pg_chunks.erase(pg_it);
return true;
}

uint32_t HeapChunkSelector::get_chunk_size() const {
const auto chunk = m_chunks.begin()->second;
return chunk->size();
Expand Down
12 changes: 12 additions & 0 deletions src/lib/homestore_backend/heap_chunk_selector.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,18 @@ class HeapChunkSelector : public homestore::ChunkSelector {
// It is used in two scenarios: 1. seal shard 2. create shard rollback
bool release_chunk(const pg_id_t pg_id, const chunk_num_t v_chunk_id);

/**
* Releases all chunks associated with the specified pg_id.
*
* This function is used to release all chunks that are currently associated with a particular
* pg identified by the given pg_id. It is typically used in scenarios where
* all chunks associated with a pg need to be freed, such as pg move out.
*
* @param pg_id The ID of the protection group whose chunks are to be released.
* @return A boolean value indicating whether the operation was successful.
*/
bool release_all_chunks(const pg_id_t pg_id);

/**
* select chunks for pg, chunks need to be in same pdev.
*
Expand Down
14 changes: 14 additions & 0 deletions src/lib/homestore_backend/hs_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,20 @@ class HSHomeObject : public HomeObjectImpl {
void on_pg_replace_member(homestore::group_id_t group_id, const homestore::replica_member_info& member_out,
const homestore::replica_member_info& member_in);

/**
* @brief Cleans up and recycles resources for the PG located using a group ID.
*
* @param group_id The unique identifier used to locate the placement group to be destroyed.
*/
void on_pg_destroy(homestore::group_id_t group_id);

/**
* @brief Cleans up and recycles resources for shards in the PG located using a group ID.
*
* @param group_id Identifier of the PG whose shards are to be destroyed.
*/
void on_shards_destroy(homestore::group_id_t group_id);

/**
* @brief Callback function invoked when a message is committed on a shard.
*
Expand Down
26 changes: 26 additions & 0 deletions src/lib/homestore_backend/hs_pg_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,32 @@ void HSHomeObject::on_pg_replace_member(homestore::group_id_t group_id, const re
boost::uuids::to_string(member_in.id));
}

void HSHomeObject::on_pg_destroy(homestore::group_id_t group_id) {
auto lg = std::scoped_lock(_pg_lock);
auto iter = std::find_if(_pg_map.begin(), _pg_map.end(), [group_id](const auto& entry) {
return pg_repl_dev(*entry.second).group_id() == group_id;
});

if (iter != _pg_map.end()) {
// release all pg chunks
auto pg_id = iter->first;
bool res = chunk_selector_->release_all_chunks(pg_id);
RELEASE_ASSERT(res, "Failed to release all chunks in pg {}", pg_id);
auto& pg = iter->second;
auto hs_pg = s_cast< HS_PG* >(pg.get());

// destroy index table
auto uuid_str = boost::uuids::to_string(hs_pg->index_table_->uuid());
index_table_pg_map_.erase(uuid_str);
hs()->index_service().remove_index_table(hs_pg->index_table_);
hs_pg->index_table_->destroy();
// destroy pg super blk
hs_pg->pg_sb_.destroy();
// erase pg in pg map
_pg_map.erase(iter);
}
}

void HSHomeObject::add_pg_to_map(unique< HS_PG > hs_pg) {
RELEASE_ASSERT(hs_pg->pg_info_.replica_set_uuid == hs_pg->repl_dev_->group_id(),
"PGInfo replica set uuid mismatch with ReplDev instance for {}",
Expand Down
28 changes: 28 additions & 0 deletions src/lib/homestore_backend/hs_shard_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ ShardError toShardError(ReplServiceError const& e) {
}
}

[[maybe_unused]] static homestore::ReplDev& pg_repl_dev(PG const& pg) {
return *(static_cast< HSHomeObject::HS_PG const& >(pg).repl_dev_);
}

uint64_t ShardManager::max_shard_size() { return Gi; }

uint64_t ShardManager::max_shard_num_in_pg() { return ((uint64_t)0x01) << shard_width; }
Expand Down Expand Up @@ -526,6 +530,30 @@ bool HSHomeObject::release_chunk_based_on_create_shard_message(sisl::blob const&
}
}

void HSHomeObject::on_shards_destroy(homestore::group_id_t group_id) {
auto lg = std::scoped_lock(_pg_lock, _shard_lock);
auto iter = std::find_if(_pg_map.begin(), _pg_map.end(), [group_id](const auto& entry) {
return pg_repl_dev(*entry.second).group_id() == group_id;
});

if (iter != _pg_map.end()) {
auto& pg = iter->second;
for (auto& shard : pg->shards_) {
// release open shard v_chunk
auto hs_shard = s_cast< HS_Shard* >(shard.get());
if (shard->info.state == ShardInfo::State::OPEN) {
bool res = chunk_selector_->release_chunk(shard->info.placement_group, hs_shard->sb_->v_chunk_id);
RELEASE_ASSERT(res, "Failed to release chunk with pg_id {} v_chunk_id {}", shard->info.placement_group,
hs_shard->sb_->v_chunk_id)
}
// destroy shard super blk
hs_shard->sb_.destroy();
// erase shard in shard map
_shard_map.erase(shard->info.id);
}
}
}

HSHomeObject::HS_Shard::HS_Shard(ShardInfo shard_info, homestore::chunk_num_t p_chunk_id,
homestore::chunk_num_t v_chunk_id) :
Shard(std::move(shard_info)), sb_(_shard_meta_name) {
Expand Down
5 changes: 3 additions & 2 deletions src/lib/homestore_backend/replication_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,9 @@ void ReplicationStateMachine::on_replace_member(const homestore::replica_member_
}

void ReplicationStateMachine::on_destroy(const homestore::group_id_t& group_id) {
// TODO:: add the logic to handle destroy
LOGI("replica destroyed");
home_object_->on_shards_destroy(group_id);
home_object_->on_pg_destroy(group_id);
LOGI("replica destroyed, cleared PG resources with group_id {}", boost::uuids::to_string(group_id));
}

homestore::AsyncReplResult<>
Expand Down
4 changes: 2 additions & 2 deletions src/lib/homestore_backend/tests/homeobj_fixture.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,11 @@ class HomeObjectFixture : public ::testing::Test {
run_on_pg_leader(pg_id, [&]() {
auto v_chunkID = _obj_inst->get_shard_v_chunk_id(shard_id);
RELEASE_ASSERT(v_chunkID.has_value(), "failed to get shard v_chunk_id");
g_helper->set_v_chunk_id(v_chunkID.value());
g_helper->set_auxiliary_uint64_id(v_chunkID.value());
});

// get v_chunk_id from IPC and compare with local
auto leader_v_chunk_id = g_helper->get_v_chunk_id();
auto leader_v_chunk_id = g_helper->get_auxiliary_uint64_id();
auto local_v_chunkID = _obj_inst->get_shard_v_chunk_id(shard_id);
RELEASE_ASSERT(local_v_chunkID.has_value(), "failed to get shard v_chunk_id");
RELEASE_ASSERT(leader_v_chunk_id == local_v_chunkID, "v_chunk_id supposed to be identical");
Expand Down
3 changes: 3 additions & 0 deletions src/lib/homestore_backend/tests/hs_blob_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ TEST_F(HomeObjectFixture, BasicEquivalence) {
}

TEST_F(HomeObjectFixture, BasicPutGetDelBlobWRestart) {
// test recovery with pristine state firstly
restart();

auto num_pgs = SISL_OPTIONS["num_pgs"].as< uint64_t >();
auto num_shards_per_pg = SISL_OPTIONS["num_shards"].as< uint64_t >() / num_pgs;

Expand Down
9 changes: 6 additions & 3 deletions src/lib/homestore_backend/tests/hs_pg_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,19 +124,22 @@ TEST_F(HomeObjectFixture, PGSizeLessThanChunkTest) {
}

TEST_F(HomeObjectFixture, PGRecoveryTest) {
HSHomeObject* ho = dynamic_cast< HSHomeObject* >(_obj_inst.get());
auto id = ho->our_uuid();
// test recovery with pristine state firstly
restart();
EXPECT_EQ(id, _obj_inst->our_uuid());

// create 10 pg
for (pg_id_t i = 1; i < 11; i++) {
pg_id_t pg_id{i};
create_pg(pg_id);
}

// get pg map
HSHomeObject* ho = dynamic_cast< HSHomeObject* >(_obj_inst.get());
std::map< pg_id_t, std::unique_ptr< PG > > pg_map;
pg_map.swap(ho->_pg_map);

// get uuid
auto id = ho->our_uuid();

// restart
restart();
Expand Down
25 changes: 11 additions & 14 deletions src/lib/homestore_backend/tests/hs_repl_test_helper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ namespace bip = boost::interprocess;
using namespace homeobject;

#define INVALID_UINT64_ID UINT64_MAX
#define INVALID_CHUNK_NUM UINT16_MAX

namespace test_common {

Expand All @@ -57,7 +56,7 @@ class HSReplTestHelper {
sync_point_num_ = sync_point;
homeobject_replica_count_ = 0;
uint64_id_ = INVALID_UINT64_ID;
v_chunk_id_ = INVALID_CHUNK_NUM;
auxiliary_uint64_id_ = UINT64_MAX;
cv_.notify_all();
} else {
cv_.wait(lg, [this, sync_point]() { return sync_point_num_ == sync_point; });
Expand All @@ -74,26 +73,23 @@ class HSReplTestHelper {
return uint64_id_;
}

void set_v_chunk_id(homestore::chunk_num_t input_v_chunk_id) {
void set_auxiliary_uint64_id(uint64_t input_auxiliary_uint64_id) {
std::unique_lock< bip::interprocess_mutex > lg(mtx_);
v_chunk_id_ = input_v_chunk_id;
auxiliary_uint64_id_ = input_auxiliary_uint64_id;
}

homestore::chunk_num_t get_v_chunk_id() {
uint64_t get_auxiliary_uint64_id() {
std::unique_lock< bip::interprocess_mutex > lg(mtx_);
return v_chunk_id_;
return auxiliary_uint64_id_;
}

private:
bip::interprocess_mutex mtx_;
bip::interprocess_condition cv_;
uint8_t homeobject_replica_count_{0};

// the following variables are used to share shard_id and blob_id among different replicas
// the following variables are used to share shard_id, blob_id and others among different replicas
uint64_t uint64_id_{0};

// used to verify identical layout
homestore::chunk_num_t v_chunk_id_{0};
uint64_t auxiliary_uint64_id_{0};

// the nth synchronization point, that is how many times different replicas have synced
uint64_t sync_point_num_{UINT64_MAX};
Expand Down Expand Up @@ -271,9 +267,10 @@ class HSReplTestHelper {
void sync() { ipc_data_->sync(sync_point_num++, total_replicas_nums_); }
void set_uint64_id(uint64_t uint64_id) { ipc_data_->set_uint64_id(uint64_id); }
uint64_t get_uint64_id() { return ipc_data_->get_uint64_id(); }
void set_v_chunk_id(homestore::chunk_num_t v_chunk_id) { ipc_data_->set_v_chunk_id(v_chunk_id); }
homestore::chunk_num_t get_v_chunk_id() { return ipc_data_->get_v_chunk_id(); }

void set_auxiliary_uint64_id(uint64_t input_auxiliary_uint64_id) {
ipc_data_->set_auxiliary_uint64_id(input_auxiliary_uint64_id);
}
uint64_t get_auxiliary_uint64_id() { return ipc_data_->get_auxiliary_uint64_id(); }
void check_and_kill(int port) {
std::string command = "lsof -t -i:" + std::to_string(port);
if (::system(command.c_str())) {
Expand Down
3 changes: 3 additions & 0 deletions src/lib/homestore_backend/tests/hs_shard_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,9 @@ TEST_F(HomeObjectFixture, ShardManagerRecovery) {
}

TEST_F(HomeObjectFixture, SealedShardRecovery) {
// test recovery with pristine state firstly
restart();

pg_id_t pg_id{1};
create_pg(pg_id);

Expand Down
14 changes: 13 additions & 1 deletion src/lib/homestore_backend/tests/test_heap_chunk_selector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ uint16_t VChunk::get_chunk_id() const { return m_internal_chunk->get_chunk_id();
blk_num_t VChunk::get_total_blks() const { return m_internal_chunk->get_total_blks(); }

uint64_t VChunk::size() const { return m_internal_chunk->size(); }

void VChunk::reset() {}
cshared< Chunk > VChunk::get_internal_chunk() const { return m_internal_chunk; }

} // namespace homestore
Expand Down Expand Up @@ -276,6 +276,18 @@ TEST_F(HeapChunkSelectorTest, test_select_specific_chunk_and_release_chunk) {
}
}

TEST_F(HeapChunkSelectorTest, test_release_all_chunks) {
for (uint16_t pg_id = 1; pg_id < 4; ++pg_id) {
ASSERT_TRUE(HCS.release_all_chunks(pg_id));
ASSERT_EQ(HCS.m_per_pg_chunks.find(pg_id), HCS.m_per_pg_chunks.end());
ASSERT_EQ(HCS.m_per_dev_heap[pg_id]->available_blk_count, 1 + 2 + 3);
ASSERT_EQ(HCS.m_per_dev_heap[pg_id]->available_blk_count, HCS.m_per_dev_heap[pg_id]->m_total_blks);
}
for (const auto& [_, chunk] : HCS.m_chunks) {
ASSERT_EQ(chunk->m_state, ChunkState::AVAILABLE);
}
}

TEST_F(HeapChunkSelectorTest, test_recovery) {
HeapChunkSelector HCS_recovery;
HCS_recovery.add_chunk(std::make_shared< Chunk >(1, 1, 1, 9));
Expand Down
43 changes: 43 additions & 0 deletions src/lib/homestore_backend/tests/test_homestore_backend_dynamic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,15 @@ TEST_F(HomeObjectFixture, ReplaceMember) {
auto out_member_id = g_helper->replica_id(num_replicas - 1);
auto in_member_id = g_helper->replica_id(num_replicas); /*spare replica*/

// get out_member's index_table_uuid with pg_id
string index_table_uuid_str;
if (out_member_id == g_helper->my_replica_id()) {
auto iter = _obj_inst->_pg_map.find(pg_id);
RELEASE_ASSERT(iter != _obj_inst->_pg_map.end(), "PG not found");
auto hs_pg = static_cast< homeobject::HSHomeObject::HS_PG* >(iter->second.get());
index_table_uuid_str = boost::uuids::to_string(hs_pg->pg_sb_->index_table_uuid);
}

run_on_pg_leader(pg_id, [&]() {
auto r = _obj_inst->pg_manager()
->replace_member(pg_id, out_member_id, PGMember{in_member_id, "new_member", 0})
Expand All @@ -90,6 +99,40 @@ TEST_F(HomeObjectFixture, ReplaceMember) {
verify_get_blob(pg_shard_id_vec, num_blobs_per_shard);
verify_obj_count(1, num_blobs_per_shard, num_shards_per_pg, false);
});

// step 5: Verify no pg related data in out_member
if (out_member_id == g_helper->my_replica_id()) {
while (am_i_in_pg(pg_id)) {
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
LOGINFO("old member is waiting to leave pg {}", pg_id);
}

// check no pg related data in this member
ASSERT_FALSE(pg_exist(pg_id));
for (const auto& shard_id : pg_shard_id_vec[pg_id]) {
ASSERT_FALSE(shard_exist(shard_id));
}
ASSERT_EQ(_obj_inst->index_table_pg_map_.find(index_table_uuid_str), _obj_inst->index_table_pg_map_.end());
LOGINFO("check no pg related data in out member successfully");
}

// Step 6: restart, verify the blobs again on all members, including the new spare replica, and out_member
restart();
run_if_in_pg(pg_id, [&]() {
verify_get_blob(pg_shard_id_vec, num_blobs_per_shard);
verify_obj_count(1, num_blobs_per_shard, num_shards_per_pg, false);
LOGINFO("After restart, check pg related data in pg members successfully");
});

if (out_member_id == g_helper->my_replica_id()) {
// Verify no pg related data in out_member
ASSERT_FALSE(pg_exist(pg_id));
for (const auto& shard_id : pg_shard_id_vec[pg_id]) {
ASSERT_FALSE(shard_exist(shard_id));
}
ASSERT_EQ(_obj_inst->index_table_pg_map_.find(index_table_uuid_str), _obj_inst->index_table_pg_map_.end());
LOGINFO("After restart, check no pg related data in out member successfully");
}
}

SISL_OPTION_GROUP(
Expand Down

0 comments on commit 70face9

Please sign in to comment.