From 77e49089ce782d888fae2889e9aecea38e3cde6d Mon Sep 17 00:00:00 2001 From: Hooper Date: Tue, 29 Oct 2024 03:21:42 -0700 Subject: [PATCH] implement fix pg size --- conanfile.py | 2 +- .../homestore_backend/heap_chunk_selector.cpp | 239 ++++++++++++++++-- .../homestore_backend/heap_chunk_selector.h | 25 +- src/lib/homestore_backend/hs_homeobject.cpp | 2 +- src/lib/homestore_backend/hs_homeobject.hpp | 21 +- src/lib/homestore_backend/hs_pg_manager.cpp | 58 ++++- .../homestore_backend/hs_shard_manager.cpp | 12 +- .../replication_state_machine.cpp | 15 +- .../tests/homeobj_fixture.hpp | 11 +- 9 files changed, 325 insertions(+), 60 deletions(-) diff --git a/conanfile.py b/conanfile.py index 46a7e08d..d3da8e56 100644 --- a/conanfile.py +++ b/conanfile.py @@ -49,7 +49,7 @@ def build_requirements(self): def requirements(self): self.requires("sisl/[^12.2]@oss/master", transitive_headers=True) - self.requires("homestore/[^6.4]@oss/master") + self.requires("homestore/[^6.5]@oss/master") self.requires("iomgr/[^11.3]@oss/master") self.requires("lz4/1.9.4", override=True) self.requires("openssl/3.3.1", override=True) diff --git a/src/lib/homestore_backend/heap_chunk_selector.cpp b/src/lib/homestore_backend/heap_chunk_selector.cpp index 897ab2ad..57f92485 100644 --- a/src/lib/homestore_backend/heap_chunk_selector.cpp +++ b/src/lib/homestore_backend/heap_chunk_selector.cpp @@ -59,31 +59,19 @@ csharedChunk HeapChunkSelector::select_chunk(homestore::blk_count_t count, const return nullptr; } - // shardid -> chunkid map is maintained by ShardManager - // pg_id->pdev_id map is maintained by PgManager - // chunselector will not take care of the two maps for now. - uint32_t pdevID = 0; - auto& pdevIdHint = hint.pdev_id_hint; - if (!pdevIdHint.has_value()) { - // this is the first shard of this pg, select a pdev with the most available blocks for it - auto&& it = - std::max_element(m_per_dev_heap.begin(), m_per_dev_heap.end(), - [](const std::pair< const uint32_t, std::shared_ptr< PerDevHeap > >& lhs, - const std::pair< const uint32_t, std::shared_ptr< PerDevHeap > >& rhs) { - return lhs.second->available_blk_count.load() < rhs.second->available_blk_count.load(); - }); - if (it == m_per_dev_heap.end()) { - LOGWARNMOD(homeobject, "No pdev found for new pg"); - return nullptr; - } - pdevID = it->first; + // Temporary bypass using pdev_id_hint to represent pg_id_hint + pg_id_t pg_id = 0; + auto& pg_id_hint = hint.pdev_id_hint; + if (!pg_id_hint.has_value()) { + LOGWARNMOD(homeobject, "should not allocated a chunk without exiting pg_id {} in hint!", chunkIdHint.value()); + return nullptr; } else { - pdevID = pdevIdHint.value(); + pg_id = pg_id_hint.value(); } - auto it = m_per_dev_heap.find(pdevID); - if (it == m_per_dev_heap.end()) { - LOGWARNMOD(homeobject, "No pdev found for pdev {}", pdevID); + auto it = m_per_pg_heap.find(pg_id); + if (it == m_per_pg_heap.end()) { + LOGWARNMOD(homeobject, "No pg found for pg_id {}", pg_id); return nullptr; } @@ -99,7 +87,7 @@ csharedChunk HeapChunkSelector::select_chunk(homestore::blk_count_t count, const avalableBlkCounter.fetch_sub(vchunk.available_blks()); remove_chunk_from_defrag_heap(vchunk.get_chunk_id()); } else { - LOGWARNMOD(homeobject, "No pdev found for pdev {}", pdevID); + LOGWARNMOD(homeobject, "No pg found for pg_id {}", pg_id); } return vchunk.get_internal_chunk(); @@ -148,6 +136,48 @@ csharedChunk HeapChunkSelector::select_specific_chunk(const chunk_num_t chunkID) return vchunk.get_internal_chunk(); } +csharedChunk HeapChunkSelector::select_specific_chunk(const pg_id_t pg_id, const chunk_num_t chunkID) { + if (m_chunks.find(chunkID) == m_chunks.end()) { + // sanity check + LOGWARNMOD(homeobject, "No chunk found for ChunkID {}", chunkID); + return nullptr; + } + auto pg_it = m_per_pg_heap.find(pg_id); + if (pg_it == m_per_pg_heap.end()) { + LOGWARNMOD(homeobject, "No pg found for pg_id {}", pg_id); + return nullptr; + } + + + VChunk vchunk(nullptr); + auto& heap = pg_it->second->m_heap; + if (auto lock_guard = std::lock_guard< std::mutex >(pg_it->second->mtx); !heap.empty()) { + std::vector< VChunk > chunks; + chunks.reserve(heap.size()); + while (!heap.empty()) { + auto c = heap.top(); + heap.pop(); + if (c.get_chunk_id() == chunkID) { + vchunk = c; + break; + } + chunks.push_back(std::move(c)); + } + + for (auto& c : chunks) { + heap.emplace(c); + } + } + + if (vchunk.get_internal_chunk()) { + auto& avalableBlkCounter = pg_it->second->available_blk_count; + avalableBlkCounter.fetch_sub(vchunk.available_blks()); + remove_chunk_from_defrag_heap(vchunk.get_chunk_id()); + } + + return vchunk.get_internal_chunk(); +} + // most_defrag_chunk will only be called when GC is triggered, and will return the chunk with the most // defrag blocks csharedChunk HeapChunkSelector::most_defrag_chunk() { @@ -196,6 +226,30 @@ void HeapChunkSelector::release_chunk(const chunk_num_t chunkID) { } } +void HeapChunkSelector::release_chunk(const pg_id_t pg_id, const chunk_num_t chunkID) { + if (m_chunks.find(chunkID) == m_chunks.end()) { + // sanity check + LOGWARNMOD(homeobject, "No chunk found for ChunkID {}", chunkID); + return; + } + auto pg_it = m_per_pg_heap.find(pg_id); + if (pg_it == m_per_pg_heap.end()) { + LOGWARNMOD(homeobject, "No pg found for pg_id {}", pg_id); + return; + } + + const auto& chunk = m_chunks[chunkID]; + VChunk vchunk(chunk); + { + std::lock_guard< std::mutex > l(pg_it->second->mtx); + auto& pg_heap = pg_it->second->m_heap; + pg_heap.emplace(chunk); + } + auto& avalableBlkCounter = pg_it->second->available_blk_count; + avalableBlkCounter.fetch_add(vchunk.available_blks()); + +} + void HeapChunkSelector::build_per_dev_chunk_heap(const std::unordered_set< chunk_num_t >& excludingChunks) { for (const auto& p : m_chunks) { bool add_to_heap = true; @@ -204,6 +258,145 @@ void HeapChunkSelector::build_per_dev_chunk_heap(const std::unordered_set< chunk }; } +uint32_t HeapChunkSelector::get_chunk_size() const { + const auto& chunk = m_chunks.begin()->second; + auto vchunk = VChunk(chunk); + return vchunk.size(); +} + +void HeapChunkSelector::select_chunks_for_pg(pg_id_t pg_id, chunk_num_t num_chunk) { + LOGINFOMOD(homeobject, "[Hooper] start created pg {}, num_chunk {} ", pg_id, num_chunk); + std::unique_lock lock_guard(m_chunk_selector_mtx); + if (m_per_pg_heap.find(pg_id) != m_per_pg_heap.end()) { + LOGWARNMOD(homeobject, "Pg {} had been created", pg_id); + return; + } + + //Select a pdev with the most available blocks for it referenced from HeapChunkSelector::select_chunk" + auto &&most_avail_dev_it = + std::max_element(m_per_dev_heap.begin(), m_per_dev_heap.end(), + [](const std::pair< const uint32_t, std::shared_ptr< PerDevHeap > >& lhs, + const std::pair< const uint32_t, std::shared_ptr< PerDevHeap > >& rhs) { + return lhs.second->available_blk_count.load() < rhs.second->available_blk_count.load(); + }); + auto& pdev_heap = most_avail_dev_it->second; + if (num_chunk > pdev_heap->size()) { + LOGWARNMOD(homeobject, "Pdev has no enough space to create pg {} with num_chunk {}", pg_id, num_chunk); + return; + } + auto vchunk = VChunk(nullptr); + auto it = m_per_pg_heap.emplace(pg_id, std::make_shared< PerDevHeap >()).first; + auto v2r_vector = m_v2r_chunk_map.emplace(pg_id, std::make_shared< std::vector < chunk_num_t > >()).first->second; + auto r2v_map = m_r2v_chunk_map.emplace(pg_id, std::make_shared< ChunkIdMap >()).first->second; + + auto& pg_heap = it->second; + std::scoped_lock lock(pdev_heap->mtx, pg_heap->mtx); + v2r_vector->reserve(num_chunk); + for (chunk_num_t i = 0; i < num_chunk; ++i) { + vchunk = pdev_heap->m_heap.top(); + pdev_heap->m_heap.pop(); + pdev_heap->available_blk_count.fetch_sub(vchunk.available_blks()); + + pg_heap->m_heap.emplace(vchunk); + pg_heap->m_total_blks += vchunk.get_total_blks(); + pg_heap->available_blk_count.fetch_add(vchunk.available_blks()); + // v_chunk_id start from 0. + chunk_num_t v_chunk_id = i; + chunk_num_t r_chunk_id = vchunk.get_chunk_id(); + v2r_vector->emplace_back(r_chunk_id); + r2v_map->emplace(r_chunk_id, v_chunk_id); + } + LOGINFOMOD(homeobject, "[Hooper] Successfully created pg {}, num_chunk {} ", pg_id, num_chunk); + + return; +} + +void HeapChunkSelector::persist_chunk_ids_into_pg_sb(pg_id_t pg_id, chunk_num_t num_chunk, homestore::chunk_num_t* pb_sb_chunk_ids) const { + std::shared_lock lock_guard(m_chunk_selector_mtx); + auto it = m_v2r_chunk_map.find(pg_id); + if (it == m_v2r_chunk_map.end()) { + LOGWARNMOD(homeobject, "Pg {} had never been created", pg_id); + return; + } + auto chunk_ids = it->second; + for (chunk_num_t i = 0; i < num_chunk; ++i) { + pb_sb_chunk_ids[i] = chunk_ids->at(i); + } +} + +void HeapChunkSelector::recover_pg_chunk_map_from_pg_sb(pg_id_t pg_id, chunk_num_t num_chunk, homestore::chunk_num_t* chunk_ids) { + std::unique_lock lock_guard(m_chunk_selector_mtx); + if (m_v2r_chunk_map.find(pg_id) != m_v2r_chunk_map.end()) { + LOGWARNMOD(homeobject, "Pg {} had been recovered", pg_id); + return; + } + + auto v2r_vector = m_v2r_chunk_map.emplace(pg_id, std::make_shared< std::vector < chunk_num_t > >()).first->second; + auto r2v_map = m_r2v_chunk_map.emplace(pg_id, std::make_shared< ChunkIdMap >()).first->second; + + v2r_vector->reserve(num_chunk); + for (chunk_num_t i = 0; i < num_chunk; ++i) { + // v_chunk_id start from 0. + chunk_num_t v_chunk_id = i; + chunk_num_t r_chunk_id = chunk_ids[i]; + v2r_vector->emplace_back(r_chunk_id); + r2v_map->emplace(r_chunk_id, v_chunk_id); + } +} + +void HeapChunkSelector::recover_per_dev_chunk_heap() { + std::unique_lock lock_guard(m_chunk_selector_mtx); + for (const auto& [chunk_id, _] : m_chunks) { + bool add_to_heap = true; + for (const auto& [_, chunk_map] : m_r2v_chunk_map) { + if (chunk_map->find(chunk_id) != chunk_map->end()) { + add_to_heap = false; + break; + } + } + add_chunk_internal(chunk_id, add_to_heap); + + } + LOGINFOMOD(homeobject, "[Hooper] Successfully recover pdev_heap"); +} + +void HeapChunkSelector::recover_pg_chunk_heap(pg_id_t pg_id, const std::unordered_set< chunk_num_t >& excludingChunks) +{ + std::unique_lock lock_guard(m_chunk_selector_mtx); + if (m_per_pg_heap.find(pg_id) != m_per_pg_heap.end()) { + LOGWARNMOD(homeobject, "Pg_heap {} had been recovered", pg_id); + return; + } + auto it = m_v2r_chunk_map.find(pg_id); + if (it == m_v2r_chunk_map.end()) { + LOGWARNMOD(homeobject, "Pg_chunk_map {} had never been recovered", pg_id); + return; + } + const auto& chunk_ids = it->second; + auto& pg_heap = m_per_pg_heap.emplace(pg_id, std::make_shared< PerDevHeap >()).first->second; + for (const auto& chunk_id : *chunk_ids) { + if (excludingChunks.find(chunk_id) == excludingChunks.end()) { + const auto& chunk = m_chunks[chunk_id]; + auto vchunk = VChunk(chunk); + pg_heap->m_heap.emplace(vchunk); + pg_heap->m_total_blks += vchunk.get_total_blks(); + pg_heap->available_blk_count.fetch_add(vchunk.available_blks()); + } + } + LOGINFOMOD(homeobject, "[Hooper] Successfully recover pg_heap {} ", pg_id); +} + +std::shared_ptr< const std::vector > HeapChunkSelector::get_v2r_chunk_id_vector(pg_id_t pg_id) const { + std::shared_lock lock_guard(m_chunk_selector_mtx); + auto it = m_v2r_chunk_map.find(pg_id); + if (it != m_v2r_chunk_map.end()) { + return it->second; + } else { + LOGWARNMOD(homeobject, "get_v2r_chunk_id_vector had never been created, pg {}", pg_id); + return nullptr; + } +} + homestore::blk_alloc_hints HeapChunkSelector::chunk_to_hints(chunk_num_t chunk_id) const { auto iter = m_chunks.find(chunk_id); if (iter == m_chunks.end()) { diff --git a/src/lib/homestore_backend/heap_chunk_selector.h b/src/lib/homestore_backend/heap_chunk_selector.h index 1ccf5d15..5644efa2 100644 --- a/src/lib/homestore_backend/heap_chunk_selector.h +++ b/src/lib/homestore_backend/heap_chunk_selector.h @@ -16,6 +16,8 @@ namespace homeobject { using csharedChunk = homestore::cshared< homestore::Chunk >; +using ChunkIdMap = std::unordered_map < homestore::chunk_num_t, homestore::chunk_num_t >; +using chunk_num_t = homestore::chunk_num_t; class HeapChunkSelector : public homestore::ChunkSelector { public: @@ -35,7 +37,6 @@ class HeapChunkSelector : public homestore::ChunkSelector { using VChunkHeap = std::priority_queue< VChunk, std::vector< VChunk >, VChunkComparator >; using VChunkDefragHeap = std::priority_queue< VChunk, std::vector< VChunk >, VChunkDefragComparator >; - using chunk_num_t = homestore::chunk_num_t; struct PerDevHeap { std::mutex mtx; @@ -52,6 +53,7 @@ class HeapChunkSelector : public homestore::ChunkSelector { // this function will be used by GC flow or recovery flow to mark one specific chunk to be busy, caller should be // responsible to use release_chunk() interface to release it when no longer to use the chunk anymore. csharedChunk select_specific_chunk(const chunk_num_t); + csharedChunk select_specific_chunk(const pg_id_t pg_id, const chunk_num_t); // this function will be used by GC flow to select a chunk for GC csharedChunk most_defrag_chunk(); @@ -59,10 +61,25 @@ class HeapChunkSelector : public homestore::ChunkSelector { // this function is used to return a chunk back to ChunkSelector when sealing a shard, and will only be used by // Homeobject. void release_chunk(const chunk_num_t); + void release_chunk(const pg_id_t pg_id, const chunk_num_t); // this should be called after ShardManager is initialized and get all the open shards void build_per_dev_chunk_heap(const std::unordered_set< chunk_num_t >& excludingChunks); + void recover_per_dev_chunk_heap(); + void recover_pg_chunk_heap(pg_id_t pg_id, const std::unordered_set< chunk_num_t >& excludingChunks); + /** + * select chunks for pg, chunks need to be in same pdev. + * + * @param pg_id The ID of the pg. + * @param num_chunk The num of chunk which be decided by fix pg size. + */ + void select_chunks_for_pg(pg_id_t pg_id, chunk_num_t num_chunk); + + void persist_chunk_ids_into_pg_sb(pg_id_t pg_id, chunk_num_t num_chunk, homestore::chunk_num_t* chunk_ids) const; + void recover_pg_chunk_map_from_pg_sb(pg_id_t pg_id, chunk_num_t num_chunk, homestore::chunk_num_t* chunk_ids); + + std::shared_ptr< const std::vector > get_v2r_chunk_id_vector(pg_id_t pg_id) const; /** * Retrieves the block allocation hints for a given chunk. * @@ -112,12 +129,18 @@ class HeapChunkSelector : public homestore::ChunkSelector { */ uint32_t total_chunks() const; + uint32_t get_chunk_size() const; + private: std::unordered_map< uint32_t, std::shared_ptr< PerDevHeap > > m_per_dev_heap; + std::unordered_map< pg_id_t, std::shared_ptr< PerDevHeap > > m_per_pg_heap; + std::unordered_map< pg_id_t, std::shared_ptr< std::vector > > m_v2r_chunk_map; + std::unordered_map< pg_id_t, std::shared_ptr< ChunkIdMap > > m_r2v_chunk_map; // hold all the chunks , selected or not std::unordered_map< chunk_num_t, csharedChunk > m_chunks; + mutable std::shared_mutex m_chunk_selector_mtx; void add_chunk_internal(const chunk_num_t, bool add_to_heap = true); VChunkDefragHeap m_defrag_heap; diff --git a/src/lib/homestore_backend/hs_homeobject.cpp b/src/lib/homestore_backend/hs_homeobject.cpp index 3bdfde90..fa87879f 100644 --- a/src/lib/homestore_backend/hs_homeobject.cpp +++ b/src/lib/homestore_backend/hs_homeobject.cpp @@ -225,7 +225,7 @@ void HSHomeObject::on_replica_restart() { [this](homestore::meta_blk* mblk, sisl::byte_view buf, size_t size) { on_pg_meta_blk_found(std::move(buf), voidptr_cast(mblk)); }, - nullptr, true); + [this](bool success) { on_pg_meta_blk_recover_completed(success); }, true); HomeStore::instance()->meta_service().read_sub_sb(_pg_meta_name); // recover shard diff --git a/src/lib/homestore_backend/hs_homeobject.hpp b/src/lib/homestore_backend/hs_homeobject.hpp index 357f0317..2a4c4c76 100644 --- a/src/lib/homestore_backend/hs_homeobject.hpp +++ b/src/lib/homestore_backend/hs_homeobject.hpp @@ -78,15 +78,17 @@ class HSHomeObject : public HomeObjectImpl { struct pg_info_superblk { pg_id_t id; uint32_t num_members; + homestore::chunk_num_t num_chunks; peer_id_t replica_set_uuid; + u_int64_t pg_size; homestore::uuid_t index_table_uuid; blob_id_t blob_sequence_num; uint64_t active_blob_count; // Total number of active blobs uint64_t tombstone_blob_count; // Total number of tombstones uint64_t total_occupied_blk_count; // Total number of occupied blocks - pg_members members[1]; // ISO C++ forbids zero-size array + char data[1]; // ISO C++ forbids zero-size array - uint32_t size() const { return sizeof(pg_info_superblk) + ((num_members - 1) * sizeof(pg_members)); } + uint32_t size() const { return sizeof(pg_info_superblk) - sizeof(char) + num_members * sizeof(pg_members) + num_chunks * sizeof(homestore::chunk_num_t); } static std::string name() { return _pg_meta_name; } pg_info_superblk() = default; @@ -95,14 +97,24 @@ class HSHomeObject : public HomeObjectImpl { pg_info_superblk& operator=(pg_info_superblk const& rhs) { id = rhs.id; num_members = rhs.num_members; + num_chunks = rhs.num_chunks; + pg_size = rhs.pg_size; replica_set_uuid = rhs.replica_set_uuid; index_table_uuid = rhs.index_table_uuid; blob_sequence_num = rhs.blob_sequence_num; - memcpy(members, rhs.members, sizeof(pg_members) * num_members); + + memcpy(get_pg_members(), rhs.get_pg_members(), sizeof(pg_members) * num_members); + memcpy(get_chunk_ids(), rhs.get_chunk_ids(), sizeof(homestore::chunk_num_t) * num_chunks); return *this; } void copy(pg_info_superblk const& rhs) { *this = rhs; } + + pg_members* get_pg_members() { return reinterpret_cast(data); } + const pg_members* get_pg_members() const { return reinterpret_cast(data); } + + homestore::chunk_num_t* get_chunk_ids() { return reinterpret_cast(data + num_members * sizeof(pg_members)); } + const homestore::chunk_num_t* get_chunk_ids() const { return reinterpret_cast(data + num_members * sizeof(pg_members)); } }; struct DataHeader { @@ -195,7 +207,7 @@ class HSHomeObject : public HomeObjectImpl { std::shared_ptr< BlobIndexTable > index_table_; PGMetrics metrics_; - HS_PG(PGInfo info, shared< homestore::ReplDev > rdev, shared< BlobIndexTable > index_table); + HS_PG(PGInfo info, shared< homestore::ReplDev > rdev, shared< BlobIndexTable > index_table, cshared< HeapChunkSelector > chunk_selector); HS_PG(homestore::superblk< pg_info_superblk >&& sb, shared< homestore::ReplDev > rdev); ~HS_PG() override = default; @@ -335,6 +347,7 @@ class HSHomeObject : public HomeObjectImpl { // recover part void register_homestore_metablk_callback(); void on_pg_meta_blk_found(sisl::byte_view const& buf, void* meta_cookie); + void on_pg_meta_blk_recover_completed(bool success); void on_shard_meta_blk_found(homestore::meta_blk* mblk, sisl::byte_view buf); void on_shard_meta_blk_recover_completed(bool success); diff --git a/src/lib/homestore_backend/hs_pg_manager.cpp b/src/lib/homestore_backend/hs_pg_manager.cpp index 5a90bb93..049bc583 100644 --- a/src/lib/homestore_backend/hs_pg_manager.cpp +++ b/src/lib/homestore_backend/hs_pg_manager.cpp @@ -60,6 +60,15 @@ PGManager::NullAsyncResult HSHomeObject::_create_pg(PGInfo&& pg_info, std::set< auto pg_id = pg_info.id; if (auto lg = std::shared_lock(_pg_lock); _pg_map.end() != _pg_map.find(pg_id)) return folly::Unit(); + const auto most_avail_num_chunks = chunk_selector()->most_avail_num_chunks(); + const auto chunk_size = chunk_selector()->get_chunk_size(); + const auto needed_num_chunks = sisl::round_up(pg_info.size, chunk_size) / chunk_size; + if (needed_num_chunks > most_avail_num_chunks) { + LOGW("No enough space to create pg, pg_id {}, needed_num_chunks {}, most_avail_num_chunks {}", pg_id, + needed_num_chunks, most_avail_num_chunks); + return folly::makeUnexpected(PGError::NO_SPACE_LEFT); + } + LOGINFOMOD(homeobject, "[Hooper] have enough chunk created pg {}, num_chunk {}, most_avail {}, pg_info_size {} ", pg_id, needed_num_chunks, most_avail_num_chunks, pg_info.size); pg_info.replica_set_uuid = boost::uuids::random_generator()(); return hs_repl_service() .create_repl_dev(pg_info.replica_set_uuid, peers) @@ -127,12 +136,19 @@ void HSHomeObject::on_create_pg_message_commit(int64_t lsn, sisl::blob const& he return; } + // select chunks for pg + const auto chunk_size = chunk_selector()->get_chunk_size(); + const chunk_num_t num_chunk = sisl::round_up(pg_info.size, chunk_size) / chunk_size; + LOGINFOMOD(homeobject, "[Hooper] on create commit create pg {}, num_chunk {}, pg_info_size {}", pg_id, num_chunk, pg_info.size); + + chunk_selector()->select_chunks_for_pg(pg_id, num_chunk); + // create index table and pg // TODO create index table during create shard. auto index_table = create_index_table(); auto uuid_str = boost::uuids::to_string(index_table->uuid()); - auto hs_pg = std::make_unique< HS_PG >(std::move(pg_info), std::move(repl_dev), index_table); + auto hs_pg = std::make_unique< HS_PG >(std::move(pg_info), std::move(repl_dev), index_table, chunk_selector()); std::scoped_lock lock_guard(index_lock_); RELEASE_ASSERT(index_table_pg_map_.count(uuid_str) == 0, "duplicate index table found"); index_table_pg_map_[uuid_str] = PgIndexTable{pg_id, index_table}; @@ -193,11 +209,11 @@ void HSHomeObject::on_pg_replace_member(homestore::group_id_t group_id, const re pg->pg_info_.members.emplace(PGMember(member_in.id, member_in.name, member_in.priority)); uint32_t i{0}; + pg_members* sb_members = hs_pg->pg_sb_->get_pg_members(); for (auto const& m : pg->pg_info_.members) { - hs_pg->pg_sb_->members[i].id = m.id; - std::strncpy(hs_pg->pg_sb_->members[i].name, m.name.c_str(), - std::min(m.name.size(), pg_members::max_name_len)); - hs_pg->pg_sb_->members[i].priority = m.priority; + sb_members[i].id = m.id; + std::strncpy(sb_members[i].name, m.name.c_str(), std::min(m.name.size(), pg_members::max_name_len)); + sb_members[i].priority = m.priority; ++i; } @@ -226,6 +242,7 @@ void HSHomeObject::add_pg_to_map(unique< HS_PG > hs_pg) { std::string HSHomeObject::serialize_pg_info(const PGInfo& pginfo) { nlohmann::json j; j["pg_info"]["pg_id_t"] = pginfo.id; + j["pg_info"]["pg_size"] = pginfo.size; j["pg_info"]["repl_uuid"] = boost::uuids::to_string(pginfo.replica_set_uuid); nlohmann::json members_j{}; @@ -244,6 +261,7 @@ PGInfo HSHomeObject::deserialize_pg_info(const unsigned char* json_str, size_t s auto pg_json = nlohmann::json::parse(json_str, json_str + size); PGInfo pg_info(pg_json["pg_info"]["pg_id_t"].get< pg_id_t >()); + pg_info.size = pg_json["pg_info"]["pg_size"].get< u_int64_t >(); pg_info.replica_set_uuid = boost::uuids::string_generator()(pg_json["pg_info"]["repl_uuid"].get< std::string >()); for (auto const& m : pg_json["pg_info"]["members"]) { @@ -267,6 +285,7 @@ void HSHomeObject::on_pg_meta_blk_found(sisl::byte_view const& buf, void* meta_c return; } auto pg_id = pg_sb->id; + chunk_selector_->recover_pg_chunk_map_from_pg_sb(pg_id, pg_sb->num_chunks, pg_sb->get_chunk_ids()); auto uuid_str = boost::uuids::to_string(pg_sb->index_table_uuid); auto hs_pg = std::make_unique< HS_PG >(std::move(pg_sb), std::move(v.value())); // During PG recovery check if index is already recoverd else @@ -280,38 +299,53 @@ void HSHomeObject::on_pg_meta_blk_found(sisl::byte_view const& buf, void* meta_c add_pg_to_map(std::move(hs_pg)); } +void HSHomeObject::on_pg_meta_blk_recover_completed(bool success) { + chunk_selector_->recover_per_dev_chunk_heap(); +} + PGInfo HSHomeObject::HS_PG::pg_info_from_sb(homestore::superblk< pg_info_superblk > const& sb) { PGInfo pginfo{sb->id}; + const pg_members* sb_members = sb->get_pg_members(); for (uint32_t i{0}; i < sb->num_members; ++i) { - pginfo.members.emplace(sb->members[i].id, std::string(sb->members[i].name), sb->members[i].priority); + pginfo.members.emplace(sb_members[i].id, std::string(sb_members[i].name), sb_members[i].priority); } + pginfo.size = sb->pg_size; pginfo.replica_set_uuid = sb->replica_set_uuid; + LOGI("[Hooper] do pg_info_from_sb num_chunks {}", pginfo.size); return pginfo; } -HSHomeObject::HS_PG::HS_PG(PGInfo info, shared< homestore::ReplDev > rdev, shared< BlobIndexTable > index_table) : +HSHomeObject::HS_PG::HS_PG(PGInfo info, shared< homestore::ReplDev > rdev, shared< BlobIndexTable > index_table, cshared< HeapChunkSelector > chunk_selector) : PG{std::move(info)}, pg_sb_{_pg_meta_name}, repl_dev_{std::move(rdev)}, index_table_{std::move(index_table)}, metrics_{*this} { - pg_sb_.create(sizeof(pg_info_superblk) + ((pg_info_.members.size() - 1) * sizeof(pg_members))); + auto chunk_ids = chunk_selector->get_v2r_chunk_id_vector(pg_info_.id); + chunk_num_t num_chunks = chunk_ids->size(); + pg_sb_.create(sizeof(pg_info_superblk) - sizeof(char) + pg_info_.members.size() * sizeof(pg_members)+ num_chunks * sizeof(homestore::chunk_num_t)); pg_sb_->id = pg_info_.id; pg_sb_->num_members = pg_info_.members.size(); + pg_sb_->num_chunks = num_chunks; + pg_sb_->pg_size = pg_info_.size; pg_sb_->replica_set_uuid = repl_dev_->group_id(); pg_sb_->index_table_uuid = index_table_->uuid(); pg_sb_->active_blob_count = 0; pg_sb_->tombstone_blob_count = 0; pg_sb_->total_occupied_blk_count = 0; - uint32_t i{0}; + pg_members* pg_sb_members = pg_sb_->get_pg_members(); for (auto const& m : pg_info_.members) { - pg_sb_->members[i].id = m.id; - std::strncpy(pg_sb_->members[i].name, m.name.c_str(), std::min(m.name.size(), pg_members::max_name_len)); - pg_sb_->members[i].priority = m.priority; + pg_sb_members[i].id = m.id; + std::strncpy(pg_sb_members[i].name, m.name.c_str(), std::min(m.name.size(), pg_members::max_name_len)); + pg_sb_members[i].priority = m.priority; ++i; } + + chunk_selector->persist_chunk_ids_into_pg_sb(pg_sb_->id, num_chunks, pg_sb_->get_chunk_ids()); pg_sb_.write(); + LOGI("[Hooper] do persist pg_sb {}", pg_info_.id); + } HSHomeObject::HS_PG::HS_PG(homestore::superblk< HSHomeObject::pg_info_superblk >&& sb, diff --git a/src/lib/homestore_backend/hs_shard_manager.cpp b/src/lib/homestore_backend/hs_shard_manager.cpp index 8378087b..86f86af4 100644 --- a/src/lib/homestore_backend/hs_shard_manager.cpp +++ b/src/lib/homestore_backend/hs_shard_manager.cpp @@ -324,12 +324,12 @@ void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& h, hom std::scoped_lock lock_guard(_shard_lock); shard_exist = (_shard_map.find(shard_info.id) != _shard_map.end()); } - if (!shard_exist) { add_new_shard_to_map(std::make_unique< HS_Shard >(shard_info, blkids.chunk_num())); // select_specific_chunk() will do something only when we are relaying journal after restart, during the // runtime flow chunk is already been be mark busy when we write the shard info to the repldev. - chunk_selector_->select_specific_chunk(blkids.chunk_num()); + auto pg_id = shard_info.placement_group; + chunk_selector_->select_specific_chunk(pg_id, blkids.chunk_num()); } if (ctx) { ctx->promise_.setValue(ShardManager::Result< ShardInfo >(shard_info)); } @@ -362,9 +362,10 @@ void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& h, hom } if (state == ShardInfo::State::SEALED) { + auto pg_id = shard_info.placement_group; auto chunk_id = get_shard_chunk(shard_info.id); RELEASE_ASSERT(chunk_id.has_value(), "Chunk id not found"); - chunk_selector()->release_chunk(chunk_id.value()); + chunk_selector()->release_chunk(pg_id, chunk_id.value()); update_shard_in_map(shard_info); } else LOGW("try to commit SEAL_SHARD_MSG but shard state is not sealed, shard_id: {}", shard_info.id); @@ -387,13 +388,16 @@ void HSHomeObject::on_shard_meta_blk_recover_completed(bool success) { std::unordered_set< homestore::chunk_num_t > excluding_chunks; std::scoped_lock lock_guard(_pg_lock); for (auto& pair : _pg_map) { + excluding_chunks.clear(); + excluding_chunks.reserve(pair.second->shards_.size()); for (auto& shard : pair.second->shards_) { if (shard->info.state == ShardInfo::State::OPEN) { excluding_chunks.emplace(d_cast< HS_Shard* >(shard.get())->sb_->chunk_id); } } + chunk_selector_->recover_pg_chunk_heap(pair.first, excluding_chunks); } - chunk_selector_->build_per_dev_chunk_heap(excluding_chunks); + LOGI("[Hooper] successfully on_shard_meta_blk_recover_completed"); } void HSHomeObject::add_new_shard_to_map(ShardPtr&& shard) { diff --git a/src/lib/homestore_backend/replication_state_machine.cpp b/src/lib/homestore_backend/replication_state_machine.cpp index 7f9164f5..83d5f24a 100644 --- a/src/lib/homestore_backend/replication_state_machine.cpp +++ b/src/lib/homestore_backend/replication_state_machine.cpp @@ -121,17 +121,10 @@ ReplicationStateMachine::get_blk_alloc_hints(sisl::blob const& header, uint32_t const ReplicationMessageHeader* msg_header = r_cast< const ReplicationMessageHeader* >(header.cbytes()); switch (msg_header->msg_type) { case ReplicationMessageType::CREATE_SHARD_MSG: { - auto const [pg_found, shards_found, chunk_id] = home_object_->get_any_chunk_id(msg_header->pg_id); - if (!pg_found) { - LOGW("Requesting a chunk for an unknown pg={}, letting the caller retry after sometime", msg_header->pg_id); - return folly::makeUnexpected(homestore::ReplServiceError::RESULT_NOT_EXIST_YET); - } else if (!shards_found) { - // pg is empty without any shards, we leave the decision the HeapChunkSelector to select a pdev - // with most available space and then select one chunk based on that pdev - } else { - return home_object_->chunk_selector()->chunk_to_hints(chunk_id); - } - break; + // Since chunks are selected when a pg is created, the chunkselector selects one of the chunks owned by the pg + homestore::blk_alloc_hints hints; + hints.pdev_id_hint = msg_header->pg_id; // Temporary bypass using pdev_id_hint to represent pg_id_hint + return hints; } case ReplicationMessageType::SEAL_SHARD_MSG: { diff --git a/src/lib/homestore_backend/tests/homeobj_fixture.hpp b/src/lib/homestore_backend/tests/homeobj_fixture.hpp index a3e2e61f..86d15824 100644 --- a/src/lib/homestore_backend/tests/homeobj_fixture.hpp +++ b/src/lib/homestore_backend/tests/homeobj_fixture.hpp @@ -171,6 +171,8 @@ class HomeObjectFixture : public ::testing::Test { EXPECT_EQ(lhs->id, rhs->id); EXPECT_EQ(lhs->num_members, rhs->num_members); + EXPECT_EQ(lhs->num_chunks, rhs->num_chunks); + EXPECT_EQ(lhs->pg_size, rhs->pg_size); EXPECT_EQ(lhs->replica_set_uuid, rhs->replica_set_uuid); EXPECT_EQ(lhs->index_table_uuid, rhs->index_table_uuid); EXPECT_EQ(lhs->blob_sequence_num, rhs->blob_sequence_num); @@ -179,9 +181,12 @@ class HomeObjectFixture : public ::testing::Test { EXPECT_EQ(lhs->total_occupied_blk_count, rhs->total_occupied_blk_count); EXPECT_EQ(lhs->tombstone_blob_count, rhs->tombstone_blob_count); for (uint32_t i = 0; i < lhs->num_members; i++) { - EXPECT_EQ(lhs->members[i].id, rhs->members[i].id); - EXPECT_EQ(lhs->members[i].priority, rhs->members[i].priority); - EXPECT_EQ(0, std::strcmp(lhs->members[i].name, rhs->members[i].name)); + EXPECT_EQ(lhs->get_pg_members()[i].id, rhs->get_pg_members()[i].id); + EXPECT_EQ(lhs->get_pg_members()[i].priority, rhs->get_pg_members()[i].priority); + EXPECT_EQ(0, std::strcmp(lhs->get_pg_members()[i].name, rhs->get_pg_members()[i].name)); + } + for (chunk_num_t i = 0; i < lhs->num_chunks; ++i) { + EXPECT_EQ(lhs->get_chunk_ids()[i], rhs->get_chunk_ids()[i]); } }