From 514efcd66be688cc2834db9dde88b86d8af11516 Mon Sep 17 00:00:00 2001 From: zilai Date: Thu, 28 Sep 2023 08:52:30 -0700 Subject: [PATCH] refactor to use HS replication context --- src/lib/homeobject_impl.hpp | 11 +- src/lib/homestore_backend/hs_homeobject.cpp | 45 ++-- src/lib/homestore_backend/hs_homeobject.hpp | 60 +++-- src/lib/homestore_backend/hs_pg_manager.cpp | 14 +- .../homestore_backend/hs_shard_manager.cpp | 235 ++++++++++-------- .../homestore_backend/replication_message.hpp | 13 +- .../replication_state_machine.cpp | 27 +- .../replication_state_machine.hpp | 5 +- .../tests/test_shard_manager.cpp | 197 +++++---------- src/lib/memory_backend/mem_shard_manager.cpp | 7 +- src/lib/shard_manager.cpp | 4 +- 11 files changed, 312 insertions(+), 306 deletions(-) diff --git a/src/lib/homeobject_impl.hpp b/src/lib/homeobject_impl.hpp index 7f018d1e..7675e616 100644 --- a/src/lib/homeobject_impl.hpp +++ b/src/lib/homeobject_impl.hpp @@ -38,13 +38,12 @@ inline shard_id_t make_new_shard_id(pg_id_t pg, shard_id_t next_shard) { struct Shard { explicit Shard(ShardInfo info) : info(std::move(info)) {} + virtual ~Shard() = default; ShardInfo info; - uint16_t chunk_id; - void* metablk_cookie{nullptr}; }; -using ShardList = std::list< Shard >; -using ShardIterator = ShardList::iterator; +using ShardPtr = shared< Shard >; +using ShardPtrList = std::list< ShardPtr >; struct PG { explicit PG(PGInfo info) : pg_info_(std::move(info)) {} @@ -56,7 +55,7 @@ struct PG { PGInfo pg_info_; uint64_t shard_sequence_num_{0}; - ShardList shards_; + ShardPtrList shards_; }; class HomeObjectImpl : public HomeObject, @@ -92,7 +91,7 @@ class HomeObjectImpl : public HomeObject, std::map< pg_id_t, shared< PG > > _pg_map; mutable std::shared_mutex _shard_lock; - std::map< shard_id_t, ShardIterator > _shard_map; + std::map< shard_id_t, ShardPtr > _shard_map; /// public: diff --git a/src/lib/homestore_backend/hs_homeobject.cpp b/src/lib/homestore_backend/hs_homeobject.cpp index 3301686a..d40e8db9 100644 --- a/src/lib/homestore_backend/hs_homeobject.cpp +++ b/src/lib/homestore_backend/hs_homeobject.cpp @@ -10,8 +10,6 @@ namespace homeobject { -const std::string HSHomeObject::s_shard_info_sub_type = "shard_info"; - extern std::shared_ptr< HomeObject > init_homeobject(std::weak_ptr< HomeObjectApplication >&& application) { LOGINFOMOD(homeobject, "Initializing HomeObject"); auto instance = std::make_shared< HSHomeObject >(std::move(application)); @@ -41,7 +39,6 @@ void HSHomeObject::init_homestore() { } chunk_selector_ = std::make_shared< HeapChunkSelector >(); - using namespace homestore; bool need_format = HomeStore::instance() ->with_index_service(nullptr) @@ -63,19 +60,17 @@ void HSHomeObject::init_homestore() { {HS_SERVICE::INDEX, hs_format_params{.size_pct = 5.0}}, }); } + LOGINFO("Initialize and start HomeStore is successfully"); } void HSHomeObject::register_homestore_metablk_callback() { // register some callbacks for metadata recovery; using namespace homestore; HomeStore::instance()->meta_service().register_handler( - HSHomeObject::s_shard_info_sub_type, - [this](homestore::meta_blk* mblk, sisl::byte_view buf, size_t size) { - on_shard_meta_blk_found(mblk, buf, size); - }, + "ShardManager", + [this](homestore::meta_blk* mblk, sisl::byte_view buf, size_t size) { on_shard_meta_blk_found(mblk, buf); }, - [this](bool success) { on_shard_meta_blk_recover_completed(success); }, - true); + [this](bool success) { on_shard_meta_blk_recover_completed(success); }, true); HomeStore::instance()->meta_service().register_handler( "PGManager", @@ -91,23 +86,37 @@ HSHomeObject::~HSHomeObject() { iomanager.stop(); } -void HSHomeObject::on_shard_meta_blk_found(homestore::meta_blk* mblk, sisl::byte_view buf, size_t size) { - auto shard = deserialize_shard(r_cast< const char* >(buf.bytes()), size); - shard.metablk_cookie = mblk; - // As shard info in the homestore metablk is always the latest state(OPEN or SEALED), - // we can always create a shard from this shard info and once shard is deleted, the associated metablk will be - // deleted too. - do_commit_new_shard(shard); -} +void HSHomeObject::on_shard_meta_blk_found(homestore::meta_blk* mblk, sisl::byte_view buf) { + homestore::superblk< shard_info_superblk > sb; + sb.load(buf, mblk); + bool pg_is_recovery = false; + { + std::scoped_lock lock_guard(_pg_lock); + pg_is_recovery = _pg_map.find(sb->placement_group) != _pg_map.end(); + } + + if (pg_is_recovery) { + auto hs_shard = std::make_shared< HS_Shard >(sb); + add_new_shard_to_map(hs_shard); + return; + } + + // There is no guarantee that pg info will be recovery before shard recovery + std::scoped_lock lock_guard(recovery_mutex_); + pending_recovery_shards_[sb->placement_group].push_back(std::move(sb)); +} void HSHomeObject::on_shard_meta_blk_recover_completed(bool success) { // Find all shard with opening state and excluede their binding chunks from the HeapChunkSelector; + RELEASE_ASSERT(pending_recovery_shards_.empty(), "some shards is still pending on recovery"); std::unordered_set< homestore::chunk_num_t > excluding_chunks; std::scoped_lock lock_guard(_pg_lock); for (auto& pair : _pg_map) { for (auto& shard : pair.second->shards_) { - if (shard.info.state == ShardInfo::State::OPEN) { excluding_chunks.emplace(shard.chunk_id); } + if (shard->info.state == ShardInfo::State::OPEN) { + excluding_chunks.emplace(dp_cast< HS_Shard >(shard)->sb_->chunk_id); + } } } diff --git a/src/lib/homestore_backend/hs_homeobject.hpp b/src/lib/homestore_backend/hs_homeobject.hpp index 8b0c5684..c80e57a2 100644 --- a/src/lib/homestore_backend/hs_homeobject.hpp +++ b/src/lib/homestore_backend/hs_homeobject.hpp @@ -9,6 +9,7 @@ #include "heap_chunk_selector.h" #include "lib/homeobject_impl.hpp" +#include "replication_message.hpp" namespace homestore { struct meta_blk; @@ -17,8 +18,6 @@ struct meta_blk; namespace homeobject { class HSHomeObject : public HomeObjectImpl { - std::shared_ptr< HeapChunkSelector > chunk_selector_; -private: /// Overridable Helpers ShardManager::Result< ShardInfo > _create_shard(pg_id_t, uint64_t size_bytes) override; ShardManager::Result< ShardInfo > _seal_shard(shard_id_t) override; @@ -46,6 +45,18 @@ class HSHomeObject : public HomeObjectImpl { peer_id_t replica_set_uuid; pg_members members[1]; // ISO C++ forbids zero-size array }; + + struct shard_info_superblk { + shard_id_t id; + pg_id_t placement_group; + ShardInfo::State state; + uint64_t created_time; + uint64_t last_modified_time; + uint64_t available_capacity_bytes; + uint64_t total_capacity_bytes; + uint64_t deleted_capacity_bytes; + homestore::chunk_num_t chunk_id; + }; #pragma pack() struct HS_PG : public PG { @@ -59,23 +70,41 @@ class HSHomeObject : public HomeObjectImpl { static PGInfo pg_info_from_sb(homestore::superblk< pg_info_superblk > const& sb); }; + struct HS_Shard : public Shard { + homestore::superblk< shard_info_superblk > sb_; + HS_Shard(ShardInfo info, homestore::chunk_num_t chunk_id); + HS_Shard(homestore::superblk< shard_info_superblk > const& sb); + virtual ~HS_Shard() = default; + + void update_info(const ShardInfo& info); + void write_sb(); + static ShardInfo shard_info_from_sb(homestore::superblk< shard_info_superblk > const& sb); + }; + +private: + shared< HeapChunkSelector > chunk_selector_; + std::shared_mutex recovery_mutex_; + std::map< pg_id_t, std::list< homestore::superblk< shard_info_superblk > > > pending_recovery_shards_; + private: static homestore::ReplicationService& hs_repl_service() { return homestore::hs()->repl_service(); } void add_pg_to_map(shared< HS_PG > hs_pg); shard_id_t generate_new_shard_id(pg_id_t pg); uint64_t get_sequence_num_from_shard_id(uint64_t shard_id_t); - std::string serialize_shard(const Shard& shard) const; - Shard deserialize_shard(const char* shard_info_str, size_t size) const; - void do_commit_new_shard(const Shard& shard); - void do_commit_seal_shard(const Shard& shard); - void register_homestore_metablk_callback(); - void* get_shard_metablk(shard_id_t id) const; + + static ShardInfo deserialize_shard_info(const char* shard_info_str, size_t size); + static std::string serialize_shard_info(const ShardInfo& info); + void add_new_shard_to_map(ShardPtr shard); + void update_shard_in_map(const ShardInfo& shard_info); + void do_shard_message_commit(int64_t lsn, const ReplicationMessageHeader& header, + homestore::MultiBlkId const& blkids, sisl::blob value, + cintrusive< homestore::repl_req_ctx >& hs_ctx); // recover part - static const std::string s_shard_info_sub_type; + void register_homestore_metablk_callback(); void on_pg_meta_blk_found(sisl::byte_view const& buf, void* meta_cookie); - void on_shard_meta_blk_found(homestore::meta_blk* mblk, sisl::byte_view buf, size_t size); - void on_shard_meta_blk_recover_completed(bool success); + void on_shard_meta_blk_found(homestore::meta_blk* mblk, sisl::byte_view buf); + void on_shard_meta_blk_recover_completed(bool success); public: using HomeObjectImpl::HomeObjectImpl; @@ -83,11 +112,12 @@ class HSHomeObject : public HomeObjectImpl { void init_homestore(); - void on_shard_message_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, - homestore::MultiBlkId const& blkids, - cintrusive< homestore::repl_req_ctx >& hs_ctx); + void on_shard_message_commit(int64_t lsn, sisl::blob const& header, homestore::MultiBlkId const& blkids, + homestore::ReplDev* repl_dev, cintrusive< homestore::repl_req_ctx >& hs_ctx); + + ShardManager::Result< homestore::chunk_num_t > get_shard_chunk(shard_id_t id) const; - ShardManager::Result< homestore::chunk_num_t > get_shard_chunk(shard_id_t id) const; + shared< HeapChunkSelector > chunk_selector() { return chunk_selector_; } }; } // namespace homeobject diff --git a/src/lib/homestore_backend/hs_pg_manager.cpp b/src/lib/homestore_backend/hs_pg_manager.cpp index 63a85a6b..ad7e3da3 100644 --- a/src/lib/homestore_backend/hs_pg_manager.cpp +++ b/src/lib/homestore_backend/hs_pg_manager.cpp @@ -121,6 +121,17 @@ void HSHomeObject::on_pg_meta_blk_found(sisl::byte_view const& buf, void* meta_c return; } add_pg_to_map(std::make_shared< HS_PG >(pg_sb, std::move(v.value()))); + + // check if any shard recovery is pending by this pg; + std::scoped_lock lock_guard(recovery_mutex_); + auto iter = pending_recovery_shards_.find(pg_sb->id); + if (iter != pending_recovery_shards_.end()) { + for (auto& sb : iter->second) { + auto hs_shard = std::make_shared< HS_Shard >(sb); + add_new_shard_to_map(hs_shard); + } + pending_recovery_shards_.erase(iter); + } }); } @@ -129,6 +140,7 @@ PGInfo HSHomeObject::HS_PG::pg_info_from_sb(homestore::superblk< pg_info_superbl for (uint32_t i{0}; i < sb->num_members; ++i) { pginfo.members.emplace(sb->members[i].id, std::string(sb->members[i].name), sb->members[i].priority); } + pginfo.replica_set_uuid = sb->replica_set_uuid; return pginfo; } @@ -154,4 +166,4 @@ HSHomeObject::HS_PG::HS_PG(homestore::superblk< HSHomeObject::pg_info_superblk > shared< homestore::ReplDev > rdev) : PG{pg_info_from_sb(sb)}, pg_sb_{sb}, repl_dev_{std::move(rdev)} {} -} // namespace homeobject \ No newline at end of file +} // namespace homeobject diff --git a/src/lib/homestore_backend/hs_shard_manager.cpp b/src/lib/homestore_backend/hs_shard_manager.cpp index 824c3ce1..3ae40508 100644 --- a/src/lib/homestore_backend/hs_shard_manager.cpp +++ b/src/lib/homestore_backend/hs_shard_manager.cpp @@ -28,24 +28,23 @@ uint64_t HSHomeObject::get_sequence_num_from_shard_id(uint64_t shard_id_t) { return shard_id_t & (max_shard_num_in_pg() - 1); } -std::string HSHomeObject::serialize_shard(const Shard& shard) const { +std::string HSHomeObject::serialize_shard_info(const ShardInfo& info) { nlohmann::json j; - j["shard_info"]["shard_id"] = shard.info.id; - j["shard_info"]["pg_id_t"] = shard.info.placement_group; - j["shard_info"]["state"] = shard.info.state; - j["shard_info"]["created_time"] = shard.info.created_time; - j["shard_info"]["modified_time"] = shard.info.last_modified_time; - j["shard_info"]["total_capacity"] = shard.info.total_capacity_bytes; - j["shard_info"]["available_capacity"] = shard.info.available_capacity_bytes; - j["shard_info"]["deleted_capacity"] = shard.info.deleted_capacity_bytes; - j["ext_info"]["chunk_id"] = shard.chunk_id; + j["shard_info"]["shard_id_t"] = info.id; + j["shard_info"]["pg_id_t"] = info.placement_group; + j["shard_info"]["state"] = info.state; + j["shard_info"]["created_time"] = info.created_time; + j["shard_info"]["modified_time"] = info.last_modified_time; + j["shard_info"]["total_capacity"] = info.total_capacity_bytes; + j["shard_info"]["available_capacity"] = info.available_capacity_bytes; + j["shard_info"]["deleted_capacity"] = info.deleted_capacity_bytes; return j.dump(); } -Shard HSHomeObject::deserialize_shard(const char* json_str, size_t str_size) const { - auto shard_json = nlohmann::json::parse(json_str, json_str + str_size); +ShardInfo HSHomeObject::deserialize_shard_info(const char* json_str, size_t str_size) { ShardInfo shard_info; - shard_info.id = shard_json["shard_info"]["shard_id"].get< shard_id_t >(); + auto shard_json = nlohmann::json::parse(json_str, json_str + str_size); + shard_info.id = shard_json["shard_info"]["shard_id_t"].get< shard_id_t >(); shard_info.placement_group = shard_json["shard_info"]["pg_id_t"].get< pg_id_t >(); shard_info.state = static_cast< ShardInfo::State >(shard_json["shard_info"]["state"].get< int >()); shard_info.created_time = shard_json["shard_info"]["created_time"].get< uint64_t >(); @@ -53,9 +52,7 @@ Shard HSHomeObject::deserialize_shard(const char* json_str, size_t str_size) con shard_info.available_capacity_bytes = shard_json["shard_info"]["available_capacity"].get< uint64_t >(); shard_info.total_capacity_bytes = shard_json["shard_info"]["total_capacity"].get< uint64_t >(); shard_info.deleted_capacity_bytes = shard_json["shard_info"]["deleted_capacity"].get< uint64_t >(); - auto shard = Shard(shard_info); - shard.chunk_id = shard_json["ext_info"]["chunk_id"].get< uint16_t >(); - return shard; + return shard_info; } ShardManager::Result< ShardInfo > HSHomeObject::_create_shard(pg_id_t pg_owner, uint64_t size_bytes) { @@ -77,16 +74,14 @@ ShardManager::Result< ShardInfo > HSHomeObject::_create_shard(pg_id_t pg_owner, auto new_shard_id = generate_new_shard_id(pg_owner); auto create_time = get_current_timestamp(); - auto shard_info = - ShardInfo(new_shard_id, pg_owner, ShardInfo::State::OPEN, create_time, create_time, size_bytes, size_bytes, 0); - - std::string const create_shard_message = serialize_shard(Shard(shard_info)); + std::string const create_shard_message = serialize_shard_info( + ShardInfo(new_shard_id, pg_owner, ShardInfo::State::OPEN, create_time, create_time, size_bytes, size_bytes, 0)); const auto msg_size = sisl::round_up(create_shard_message.size(), repl_dev->get_blk_size()); auto req = repl_result_ctx< ShardManager::Result< ShardInfo > >::make(msg_size, 512 /*alignment*/); auto buf_ptr = req->hdr_buf_.bytes; std::memset(buf_ptr, 0, msg_size); - std::memcpy(buf_ptr, create_shard_message.c_str(), create_shard_message.size()); - // preapre msg header; + std::memcpy(buf_ptr, create_shard_message.c_str(), create_shard_message.size()); + // preapre msg header; req->header_.msg_type = ReplicationMessageType::CREATE_SHARD_MSG; req->header_.pg_id = pg_owner; req->header_.shard_id = new_shard_id; @@ -94,7 +89,7 @@ ShardManager::Result< ShardInfo > HSHomeObject::_create_shard(pg_id_t pg_owner, req->header_.payload_crc = crc32_ieee(init_crc32, buf_ptr, msg_size); req->header_.seal(); sisl::blob header; - header.bytes = r_cast(&req->header_); + header.bytes = r_cast< uint8_t* >(&req->header_); header.size = sizeof(req->header_); sisl::sg_list value; value.size = msg_size; @@ -113,110 +108,146 @@ ShardManager::Result< ShardInfo > HSHomeObject::_seal_shard(shard_id_t id) { return folly::makeUnexpected(ShardError::UNKNOWN_SHARD); } -void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& header, sisl::blob const&, - homestore::MultiBlkId const& blkids, +void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& header, homestore::MultiBlkId const& blkids, + homestore::ReplDev* repl_dev, cintrusive< homestore::repl_req_ctx >& hs_ctx) { - /* - folly::Promise< ShardManager::Result< ShardInfo > >* promise = nullptr; - // user_ctx will be nullptr when: - // 1. on the follower side - // 2. on the leader side but homeobject restarts and replay all commited log entries from the last checkpoint; - if (user_ctx != nullptr) { promise = r_cast< folly::Promise< ShardManager::Result< ShardInfo > >* >(user_ctx); } - - const ReplicationMessageHeader* msg_header = r_cast< const ReplicationMessageHeader* >(header.bytes); - if (msg_header->header_crc != msg_header->calculate_crc()) { - LOGWARN("replication message header is corrupted with crc error, lsn:{}", lsn); - if (promise) { promise->setValue(folly::makeUnexpected(ShardError::INVALID_ARG)); } + + if (hs_ctx != nullptr) { + auto ctx = boost::static_pointer_cast< repl_result_ctx< ShardManager::Result< ShardInfo > > >(hs_ctx); + do_shard_message_commit(lsn, *r_cast< ReplicationMessageHeader* >(header.bytes), blkids, ctx->hdr_buf_, hs_ctx); return; } - // read value from PBA; - auto value_size = homestore::HomeStore::instance()->data_service().get_blk_size() * blkids.blk_count(); - auto value_buf = iomanager.iobuf_alloc(512, value_size); + // hs_ctx will be nullptr when HS is restarting and replay all commited log entries from the last checkpoint; + // and then we need re-read value from PBA; sisl::sg_list value; - value.size = value_size; - value.iovs.emplace_back(iovec{.iov_base = value_buf, .iov_len = value_size}); - - repl_dev.async_read(blkids, value, value_size) - .thenValue([this, header, msg_header, blkids, value_buf, value_size, promise](auto&& err) { - if (err || - crc32_ieee(init_crc32, r_cast< const uint8_t* >(value_buf), value_size) != msg_header->payload_crc) { - // read failure or read successfully but data is corrupted; - if (promise) { promise->setValue(folly::makeUnexpected(ShardError::INVALID_ARG)); } - iomanager.iobuf_free(value_buf); - return; - } - - auto shard = deserialize_shard(r_cast< const char* >(value_buf), value_size); - switch (msg_header->msg_type) { - case ReplicationMessageType::CREATE_SHARD_MSG: { - shard.chunk_id = blkids.chunk_num(); - // serialize the finalized shard msg and persist the serialize result to homestore MetaBlkService; - auto shard_msg = serialize_shard(shard); - homestore::hs()->meta_service().add_sub_sb(HSHomeObject::s_shard_info_sub_type, - r_cast< const uint8_t* >(shard_msg.c_str()), - shard_msg.size(), shard.metablk_cookie); - do_commit_new_shard(shard); - break; - } - - case ReplicationMessageType::SEAL_SHARD_MSG: { - void* metablk_cookie = get_shard_metablk(shard.info.id); - RELEASE_ASSERT(metablk_cookie != nullptr, "seal shard when metablk is nullptr"); - homestore::hs()->meta_service().update_sub_sb(r_cast< const uint8_t* >(value_buf), value_size, - metablk_cookie); - shard.metablk_cookie = metablk_cookie; - do_commit_seal_shard(shard); - break; - } - default: { - break; - } - } - - iomanager.iobuf_free(value_buf); - if (promise) { promise->setValue(ShardManager::Result< ShardInfo >(shard.info)); } - }); - - if (ctx) { ctx->promise_.setValue(ShardManager::Result< ShardInfo >(shard.info)); } - */ + value.size = blkids.blk_count() * repl_dev->get_blk_size(); + auto value_buf = iomanager.iobuf_alloc(512, value.size); + value.iovs.push_back(iovec{.iov_base = value_buf, .iov_len = value.size}); + // header will be released when this function returns, but we still need the header when async_read() finished. + const ReplicationMessageHeader msg_header = *r_cast< const ReplicationMessageHeader* >(header.bytes); + repl_dev->async_read(blkids, value, value.size).thenValue([this, lsn, msg_header, blkids, value](auto&& err) { + if (err) { + LOGWARNMOD(homeobject, "failed to read data from homestore pba, lsn:{}", lsn); + } else { + sisl::blob value_blob(r_cast< uint8_t* >(value.iovs[0].iov_base), value.size); + do_shard_message_commit(lsn, msg_header, blkids, value_blob, nullptr); + } + iomanager.iobuf_free(r_cast< uint8_t* >(value.iovs[0].iov_base)); + }); +} + +void HSHomeObject::do_shard_message_commit(int64_t lsn, const ReplicationMessageHeader& header, + homestore::MultiBlkId const& blkids, sisl::blob value, + cintrusive< homestore::repl_req_ctx >& hs_ctx) { + repl_result_ctx< ShardManager::Result< ShardInfo > >* ctx{nullptr}; + if (hs_ctx != nullptr) { + ctx = boost::static_pointer_cast< repl_result_ctx< ShardManager::Result< ShardInfo > > >(hs_ctx).get(); + } + + if (header.corrupted()) { + LOGWARNMOD(homeobject, "replication message header is corrupted with crc error, lsn:{}", lsn); + if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(ShardError::INVALID_ARG)); } + return; + } + + if (crc32_ieee(init_crc32, value.bytes, value.size) != header.payload_crc) { + // header & value is inconsistent; + LOGWARNMOD(homeobject, "replication message header is inconsistent with value, lsn:{}", lsn); + if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(ShardError::INVALID_ARG)); } + return; + } + + auto shard_info = deserialize_shard_info(r_cast< const char* >(value.bytes), value.size); + switch (header.msg_type) { + case ReplicationMessageType::CREATE_SHARD_MSG: { + auto hs_shard = std::make_shared< HS_Shard >(shard_info, blkids.chunk_num()); + add_new_shard_to_map(hs_shard); + break; + } + + case ReplicationMessageType::SEAL_SHARD_MSG: { + update_shard_in_map(shard_info); + break; + } + default: { + break; + } + } + + if (ctx) { ctx->promise_.setValue(ShardManager::Result< ShardInfo >(shard_info)); } } -void HSHomeObject::do_commit_new_shard(const Shard& shard) { +void HSHomeObject::add_new_shard_to_map(ShardPtr shard) { // TODO: We are taking a global lock for all pgs to create shard. Is it really needed?? // We need to have fine grained per PG lock and take only that. std::scoped_lock lock_guard(_pg_lock, _shard_lock); - auto pg_iter = _pg_map.find(shard.info.placement_group); + auto pg_iter = _pg_map.find(shard->info.placement_group); RELEASE_ASSERT(pg_iter != _pg_map.end(), "Missing PG info"); auto& shards = pg_iter->second->shards_; - auto iter = shards.emplace(shards.end(), shard); - auto [_, happened] = _shard_map.emplace(shard.info.id, iter); + shards.emplace_back(shard); + auto [_, happened] = _shard_map.emplace(shard->info.id, shard); RELEASE_ASSERT(happened, "duplicated shard info"); - // following part give follower members a chance to catch up shard sequence num; - auto sequence_num = get_sequence_num_from_shard_id(shard.info.id); + // following part gives follower members a chance to catch up shard sequence num; + auto sequence_num = get_sequence_num_from_shard_id(shard->info.id); if (sequence_num > pg_iter->second->shard_sequence_num_) { pg_iter->second->shard_sequence_num_ = sequence_num; } } -void HSHomeObject::do_commit_seal_shard(const Shard& shard) { +void HSHomeObject::update_shard_in_map(const ShardInfo& shard_info) { std::scoped_lock lock_guard(_shard_lock); - auto shard_iter = _shard_map.find(shard.info.id); + auto shard_iter = _shard_map.find(shard_info.id); RELEASE_ASSERT(shard_iter != _shard_map.end(), "Missing shard info"); - *(shard_iter->second) = shard; -} - -void* HSHomeObject::get_shard_metablk(shard_id_t id) const { - std::scoped_lock lock_guard(_shard_lock); - auto shard_iter = _shard_map.find(id); - if (shard_iter == _shard_map.end()) { return nullptr; } - return (*shard_iter->second).metablk_cookie; + auto hs_shard = dp_cast< HS_Shard >(shard_iter->second); + hs_shard->update_info(shard_info); } ShardManager::Result< homestore::chunk_num_t > HSHomeObject::get_shard_chunk(shard_id_t id) const { std::scoped_lock lock_guard(_shard_lock); auto shard_iter = _shard_map.find(id); if (shard_iter == _shard_map.end()) { return folly::makeUnexpected(ShardError::UNKNOWN_SHARD); } - return (*shard_iter->second).chunk_id; + auto hs_shard = dp_cast< HS_Shard >(shard_iter->second); + return hs_shard->sb_->chunk_id; +} + +HSHomeObject::HS_Shard::HS_Shard(ShardInfo shard_info, homestore::chunk_num_t chunk_id) : + Shard(std::move(shard_info)), sb_("ShardManager") { + sb_.create(sizeof(shard_info_superblk)); + sb_->chunk_id = chunk_id; + write_sb(); +} + +HSHomeObject::HS_Shard::HS_Shard(homestore::superblk< shard_info_superblk > const& sb) : + Shard(shard_info_from_sb(sb)), sb_(sb) {} + +void HSHomeObject::HS_Shard::update_info(const ShardInfo& shard_info) { + info = shard_info; + write_sb(); +} + +void HSHomeObject::HS_Shard::write_sb() { + sb_->id = info.id; + sb_->placement_group = info.placement_group; + sb_->state = info.state; + sb_->created_time = info.created_time; + sb_->last_modified_time = info.last_modified_time; + sb_->available_capacity_bytes = info.available_capacity_bytes; + sb_->total_capacity_bytes = info.total_capacity_bytes; + sb_->deleted_capacity_bytes = info.deleted_capacity_bytes; + sb_.write(); +} + +ShardInfo HSHomeObject::HS_Shard::shard_info_from_sb(homestore::superblk< shard_info_superblk > const& sb) { + ShardInfo info; + info.id = sb->id; + info.placement_group = sb->placement_group; + info.state = sb->state; + info.created_time = sb->created_time; + info.last_modified_time = sb->last_modified_time; + info.available_capacity_bytes = sb->available_capacity_bytes; + info.total_capacity_bytes = sb->total_capacity_bytes; + info.deleted_capacity_bytes = sb->deleted_capacity_bytes; + return info; } } // namespace homeobject diff --git a/src/lib/homestore_backend/replication_message.hpp b/src/lib/homestore_backend/replication_message.hpp index 41e2b15f..76b38011 100644 --- a/src/lib/homestore_backend/replication_message.hpp +++ b/src/lib/homestore_backend/replication_message.hpp @@ -7,7 +7,8 @@ namespace homeobject { -VENUM(ReplicationMessageType, uint16_t, CREATE_SHARD_MSG = 0, SEAL_SHARD_MSG = 1, PUT_BLOB_MSG = 2, DEL_BLOB_MSG = 3, UNKNOWN_MSG = 4); +VENUM(ReplicationMessageType, uint16_t, CREATE_SHARD_MSG = 0, SEAL_SHARD_MSG = 1, PUT_BLOB_MSG = 2, DEL_BLOB_MSG = 3, + UNKNOWN_MSG = 4); // magic num comes from the first 8 bytes of 'echo homeobject_replication | md5sum' static constexpr uint64_t HOMEOBJECT_REPLICATION_MAGIC = 0x11153ca24efc8d34; @@ -18,21 +19,23 @@ static constexpr uint32_t init_crc32 = 0; struct ReplicationMessageHeader { uint64_t magic_num{HOMEOBJECT_REPLICATION_MAGIC}; uint32_t protocol_version{HOMEOBJECT_REPLICATION_PROTOCOL_VERSION_V1}; - ReplicationMessageType msg_type; // message type + ReplicationMessageType msg_type; // message type pg_id_t pg_id; shard_id_t shard_id; uint32_t payload_size; uint32_t payload_crc; uint8_t reserved_pad[4]{}; uint32_t header_crc; - void seal() { - header_crc = calculate_crc(); + void seal() { header_crc = calculate_crc(); } + + bool corrupted() const { + return magic_num != HOMEOBJECT_REPLICATION_MAGIC || + protocol_version != HOMEOBJECT_REPLICATION_PROTOCOL_VERSION_V1 || header_crc != calculate_crc(); } uint32_t calculate_crc() const { return crc32_ieee(init_crc32, r_cast< const unsigned char* >(this), sizeof(*this) - sizeof(header_crc)); } - }; #pragma pack() diff --git a/src/lib/homestore_backend/replication_state_machine.cpp b/src/lib/homestore_backend/replication_state_machine.cpp index 23ff73f7..a2b0fd8b 100644 --- a/src/lib/homestore_backend/replication_state_machine.cpp +++ b/src/lib/homestore_backend/replication_state_machine.cpp @@ -9,7 +9,7 @@ void ReplicationStateMachine::on_commit(int64_t lsn, const sisl::blob& header, c switch (msg_header->msg_type) { case ReplicationMessageType::CREATE_SHARD_MSG: case ReplicationMessageType::SEAL_SHARD_MSG: { - home_object_->on_shard_message_commit(lsn, header, key, pbas, ctx); + home_object_->on_shard_message_commit(lsn, header, pbas, repl_dev(), ctx); break; } case ReplicationMessageType::PUT_BLOB_MSG: @@ -37,32 +37,33 @@ void ReplicationStateMachine::on_rollback(int64_t lsn, sisl::blob const&, sisl:: homestore::blk_alloc_hints ReplicationStateMachine::get_blk_alloc_hints(sisl::blob const& header, cintrusive< homestore::repl_req_ctx >& ctx) { const ReplicationMessageHeader* msg_header = r_cast< const ReplicationMessageHeader* >(header.bytes); - if (msg_header->header_crc != msg_header->calculate_crc()) { - LOGWARN("replication message header is corrupted with crc error and can not get blk alloc hints"); - return homestore::blk_alloc_hints(); - } - switch (msg_header->msg_type) { case ReplicationMessageType::CREATE_SHARD_MSG: { auto list_shard_result = home_object_->shard_manager()->list_shards(msg_header->pg_id).get(); if (!list_shard_result) { - LOGWARN("list shards failed with unknown pg {}", msg_header->pg_id); + LOGWARNMOD(homeobject, "list shards failed from pg {}", msg_header->pg_id); break; } if (list_shard_result.value().empty()) { // pg is empty without any shards, we leave the decision the HeapChunkSelector to select a pdev - // with most available space and select one chunk based on that pdev + // with most available space and then select one chunk based on that pdev } else { - auto chunk_id = home_object_->get_shard_chunk(list_shard_result.value().front().id); + auto chunk_id = home_object_->get_shard_chunk(list_shard_result.value().back().id); RELEASE_ASSERT(!!chunk_id, "unknown shard id to get binded chunk"); - // TODO:HS will add a new interface to get alloc hint based on a reference chunk; - // and we can will call that interface for return alloc hint; + return home_object_->chunk_selector()->chunk_to_hints(chunk_id.value()); } break; } - case ReplicationMessageType::SEAL_SHARD_MSG: + case ReplicationMessageType::SEAL_SHARD_MSG: { + auto chunk_id = home_object_->get_shard_chunk(msg_header->shard_id); + RELEASE_ASSERT(!!chunk_id, "unknown shard id to get binded chunk"); + homestore::blk_alloc_hints hints; + hints.chunk_id_hint = chunk_id.value(); + return hints; + } + case ReplicationMessageType::PUT_BLOB_MSG: case ReplicationMessageType::DEL_BLOB_MSG: default: { @@ -70,7 +71,7 @@ homestore::blk_alloc_hints ReplicationStateMachine::get_blk_alloc_hints(sisl::bl } } - return homestore::blk_alloc_hints(); + return homestore::blk_alloc_hints(); } void ReplicationStateMachine::on_replica_stop() {} diff --git a/src/lib/homestore_backend/replication_state_machine.hpp b/src/lib/homestore_backend/replication_state_machine.hpp index 8444ee80..83adff4e 100644 --- a/src/lib/homestore_backend/replication_state_machine.hpp +++ b/src/lib/homestore_backend/replication_state_machine.hpp @@ -10,7 +10,7 @@ namespace homeobject { class HSHomeObject; struct ho_repl_ctx : public homestore::repl_req_ctx { - ReplicationMessageHeader header_; + ReplicationMessageHeader header_; sisl::io_blob_safe hdr_buf_; ho_repl_ctx(uint32_t size, uint32_t alignment) : homestore::repl_req_ctx{}, hdr_buf_{size, alignment} {} @@ -35,7 +35,6 @@ struct repl_result_ctx : public ho_repl_ctx { class ReplicationStateMachine : public homestore::ReplDevListener { public: - explicit ReplicationStateMachine(HSHomeObject* home_object) : home_object_(home_object) {} virtual ~ReplicationStateMachine() = default; @@ -109,7 +108,7 @@ class ReplicationStateMachine : public homestore::ReplDevListener { void on_replica_stop() override; private: - HSHomeObject* home_object_; + HSHomeObject* home_object_{nullptr}; }; } // namespace homeobject diff --git a/src/lib/homestore_backend/tests/test_shard_manager.cpp b/src/lib/homestore_backend/tests/test_shard_manager.cpp index afa3dcee..8585ff32 100644 --- a/src/lib/homestore_backend/tests/test_shard_manager.cpp +++ b/src/lib/homestore_backend/tests/test_shard_manager.cpp @@ -5,6 +5,7 @@ #include #include +#include #include @@ -21,6 +22,7 @@ using homeobject::shard_id_t; using homeobject::ShardError; using homeobject::ShardInfo; +using homeobject::ShardManager; SISL_LOGGING_INIT(logging, HOMEOBJECT_LOG_MODS) SISL_OPTIONS_ENABLE(logging) @@ -125,8 +127,7 @@ TEST_F(ShardManagerTesting, CreateShardAndValidateMembers) { EXPECT_TRUE(pg->shard_sequence_num_ == 1); EXPECT_EQ(1, pg->shards_.size()); auto& shard = *pg->shards_.begin(); - EXPECT_TRUE(shard.info == shard_info); - EXPECT_TRUE(shard.metablk_cookie != nullptr); + EXPECT_TRUE(shard->info == shard_info); } TEST_F(ShardManagerTesting, GetKnownShard) { @@ -159,116 +160,46 @@ TEST_F(ShardManagerTesting, SealUnknownShard) { EXPECT_EQ(ShardError::UNKNOWN_SHARD, _home_object->shard_manager()->seal_shard(1000).get().error()); } -// Disable following cases temporary as PG Info recovery is needed too and -// depends on another PR to create/recover PG using HS ReplDev -/* -TEST_F(ShardManagerTesting, ShardManagerRecovery) { - - auto e = _home_object->shard_manager()->create_shard(_pg_id, Mi).get(); - ASSERT_TRUE(!!e); - ShardInfo shard_info = e.value(); - EXPECT_EQ(ShardInfo::State::OPEN, shard_info.state); - EXPECT_EQ(Mi, shard_info.total_capacity_bytes); - EXPECT_EQ(Mi, shard_info.available_capacity_bytes); - EXPECT_EQ(0ul, shard_info.deleted_capacity_bytes); - EXPECT_EQ(_pg_id, shard_info.placement_group); - - nlohmann::json shard_json; - shard_json["shard_info"]["shard_id_t"] = shard_info.id; - shard_json["shard_info"]["pg_id_t"] = shard_info.placement_group; - shard_json["shard_info"]["state"] = shard_info.state; - shard_json["shard_info"]["created_time"] = shard_info.created_time; - shard_json["shard_info"]["modified_time"] = shard_info.last_modified_time; - shard_json["shard_info"]["total_capacity"] = shard_info.total_capacity_bytes; - shard_json["shard_info"]["available_capacity"] = shard_info.available_capacity_bytes; - shard_json["shard_info"]["deleted_capacity"] = shard_info.deleted_capacity_bytes; - shard_json["ext_info"]["chunk_id"] = 100; - auto shard_msg = shard_json.dump(); - - // Manual remove shard info from home_object and relay on metablk service to replay it back; - homeobject::HSHomeObject* ho = dynamic_cast< homeobject::HSHomeObject* >(_home_object.get()); - auto pg_iter = ho->_pg_map.find(_pg_id); - EXPECT_TRUE(pg_iter != ho->_pg_map.end()); - auto& pg = pg_iter->second; - EXPECT_EQ(1, pg->shards_.size()); - auto& check_shard = *pg->shards_.begin(); - void* saved_metablk = check_shard.metablk_cookie; - pg_iter->second.shards_.clear(); - ho->_shard_map.clear(); - EXPECT_EQ(ShardError::UNKNOWN_SHARD, _home_object->shard_manager()->get_shard(_shard_id).get().error()); - - auto buf = sisl::make_byte_array(static_cast< uint32_t >(shard_msg.size()), 0, sisl::buftag::metablk); - std::memcpy(buf->bytes, shard_msg.c_str(), shard_msg.size()); - ho->on_shard_meta_blk_found(static_cast< homestore::meta_blk* >(saved_metablk), buf, shard_msg.size()); - // check the recover result; - auto future = _home_object->shard_manager()->get_shard(shard_info.id).get(); - EXPECT_TRUE(!!future); - future.then([this, shard_info](auto const& info) { - EXPECT_TRUE(info.id == shard_info.id); - EXPECT_TRUE(info.placement_group == _pg_id); - EXPECT_EQ(info.state, ShardInfo::State::OPEN); - }); -} - TEST_F(ShardManagerTesting, MockSealShard) { auto e = _home_object->shard_manager()->create_shard(_pg_id, Mi).get(); ASSERT_TRUE(!!e); ShardInfo shard_info = e.value(); - auto shard = homeobject::Shard(shard_info); - shard.info.state = ShardInfo::State::SEALED; + shard_info.state = ShardInfo::State::SEALED; + nlohmann::json j; - j["shard_info"]["shard_id"] = shard.info.id; - j["shard_info"]["pg_id"] = shard.info.placement_group; - j["shard_info"]["state"] = shard.info.state; - j["shard_info"]["created_time"] = shard.info.created_time; - j["shard_info"]["modified_time"] = shard.info.last_modified_time; - j["shard_info"]["total_capacity"] = shard.info.total_capacity_bytes; - j["shard_info"]["available_capacity"] = shard.info.available_capacity_bytes; - j["shard_info"]["deleted_capacity"] = shard.info.deleted_capacity_bytes; - j["ext_info"]["chunk_id"] = shard.chunk_id; + j["shard_info"]["shard_id_t"] = shard_info.id; + j["shard_info"]["pg_id_t"] = shard_info.placement_group; + j["shard_info"]["state"] = shard_info.state; + j["shard_info"]["created_time"] = shard_info.created_time; + j["shard_info"]["modified_time"] = shard_info.last_modified_time; + j["shard_info"]["total_capacity"] = shard_info.total_capacity_bytes; + j["shard_info"]["available_capacity"] = shard_info.available_capacity_bytes; + j["shard_info"]["deleted_capacity"] = shard_info.deleted_capacity_bytes; auto seal_shard_msg = j.dump(); - const auto datasvc_blk_size = homestore::HomeStore::instance()->data_service().get_blk_size(); - auto msg_size = seal_shard_msg.size(); - if (msg_size % datasvc_blk_size != 0) { - msg_size = (msg_size / datasvc_blk_size + 1) * datasvc_blk_size; - } - auto msg_buf = iomanager.iobuf_alloc(512, msg_size); - std::memset(r_cast(msg_buf), 0, msg_size); - std::memcpy(r_cast(msg_buf), seal_shard_msg.c_str(), seal_shard_msg.size()); - - homeobject::ReplicationMessageHeader header; - header.repl_group_id = _pg_id; - header.msg_type = homeobject::ReplicationMessageType::SEAL_SHARD_MSG; - header.payload_size = msg_size; - header.payload_crc = crc32_ieee(homeobject::init_crc32, msg_buf, msg_size); - header.header_crc = header.calculate_crc(); - sisl::sg_list value; - value.size = msg_size; - value.iovs.push_back(iovec(msg_buf, msg_size)); - // header is corrupted with crc; - ++header.header_crc; homeobject::HSHomeObject* ho = dynamic_cast< homeobject::HSHomeObject* >(_home_object.get()); - auto pg = ho->_get_pg(_pg_id); - ASSERT_TRUE(!!pg); - homestore::ReplicationService* replication_service = - (homestore::ReplicationService*)(&homestore::HomeStore::instance()->repl_service()); - auto repl_dev = replication_service->get_replica_dev(pg.value().repl_dev_uuid); - - { - auto [p, sf] = folly::makePromiseContract< homeobject::ShardManager::Result< ShardInfo > >(); - repl_dev.value()->async_alloc_write(sisl::blob(r_cast< uint8_t* >(&header), sizeof(header)), sisl::blob(), - value, static_cast< void* >(&p)); - auto info = std::move(sf).get(); - EXPECT_FALSE(info); - } - - // everything is fine; - header.header_crc = header.calculate_crc(); - auto [p, sf] = folly::makePromiseContract< homeobject::ShardManager::Result< ShardInfo > >(); - repl_dev.value()->async_alloc_write(sisl::blob(r_cast< uint8_t* >(&header), sizeof(header)), sisl::blob(), value, - static_cast< void* >(&p)); - auto info = std::move(sf).get(); + auto pg = dp_cast< homeobject::HSHomeObject::HS_PG >(ho->_pg_map[_pg_id]); + auto repl_dev = pg->repl_dev_; + const auto msg_size = sisl::round_up(seal_shard_msg.size(), repl_dev->get_blk_size()); + auto req = homeobject::repl_result_ctx< ShardManager::Result< ShardInfo > >::make(msg_size, 512 /*alignment*/); + auto buf_ptr = req->hdr_buf_.bytes; + std::memset(buf_ptr, 0, msg_size); + std::memcpy(buf_ptr, seal_shard_msg.c_str(), seal_shard_msg.size()); + + req->header_.msg_type = homeobject::ReplicationMessageType::SEAL_SHARD_MSG; + req->header_.pg_id = _pg_id; + req->header_.shard_id = shard_info.id; + req->header_.payload_size = msg_size; + req->header_.payload_crc = crc32_ieee(homeobject::init_crc32, buf_ptr, msg_size); + req->header_.seal(); + sisl::blob header; + header.bytes = r_cast< uint8_t* >(&req->header_); + header.size = sizeof(req->header_); + sisl::sg_list value; + value.size = msg_size; + value.iovs.push_back(iovec(buf_ptr, msg_size)); + repl_dev->async_alloc_write(header, sisl::blob{}, value, req); + auto info = req->result().get(); EXPECT_TRUE(info); EXPECT_TRUE(info.value().id == shard_info.id); EXPECT_TRUE(info.value().placement_group == _pg_id); @@ -276,25 +207,16 @@ TEST_F(ShardManagerTesting, MockSealShard) { auto pg_iter = ho->_pg_map.find(_pg_id); EXPECT_TRUE(pg_iter != ho->_pg_map.end()); - auto& pg_result = pg_iter->second; - EXPECT_EQ(1, pg_result.shards.size()); - auto& check_shard = *pg_result.shards.begin(); - EXPECT_EQ(ShardInfo::State::SEALED, check_shard.info.state); - EXPECT_TRUE(check_shard.metablk_cookie != nullptr); - iomanager.iobuf_free(msg_buf); + auto pg_result = pg_iter->second; + EXPECT_EQ(1, pg_result->shards_.size()); + auto& check_shard = pg_result->shards_.front(); + EXPECT_EQ(ShardInfo::State::SEALED, check_shard->info.state); } - - +/* class FixtureAppWithRecovery : public FixtureApp { public: std::list< std::filesystem::path > devices() const override { const std::string fpath{"/tmp/test_homestore.data"}; - if (!std::filesystem::exists(fpath)) { - LOGINFO("creating device files with size {} ", 1, homestore::in_bytes(2 * Gi)); - LOGINFO("creating {} device file", fpath); - std::ofstream ofs{fpath, std::ios::binary | std::ios::out | std::ios::trunc}; - std::filesystem::resize_file(fpath, 2 * Gi); - } auto device_info = std::list< std::filesystem::path >(); device_info.emplace_back(std::filesystem::canonical(fpath)); return device_info; @@ -304,19 +226,22 @@ class FixtureAppWithRecovery : public FixtureApp { class ShardManagerTestingRecovery : public ::testing::Test { public: void SetUp() override { app = std::make_shared< FixtureAppWithRecovery >(); } - protected: std::shared_ptr< FixtureApp > app; }; -TEST_F(ShardManagerTestingRecovery, ShardManagerRecoveryV2) { - // clear the env first; +TEST_F(ShardManagerTestingRecovery, ShardManagerRecovery) { + // prepare the env first; const std::string fpath{"/tmp/test_homestore.data"}; + if (std::filesystem::exists(fpath)) { std::filesystem::remove(fpath);} + LOGINFO("creating device files with size {} ", homestore::in_bytes(2 * Gi)); LOGINFO("creating {} device file", fpath); - if (std::filesystem::exists(fpath)) { std::filesystem::remove(fpath); } - homeobject::pg_id _pg_id{1u}; - homeobject::peer_id _peer1; - homeobject::peer_id _peer2; + std::ofstream ofs{fpath, std::ios::binary | std::ios::out | std::ios::trunc}; + std::filesystem::resize_file(fpath, 2 * Gi); + + homeobject::pg_id_t _pg_id{1u}; + homeobject::peer_id_t _peer1; + homeobject::peer_id_t _peer2; std::shared_ptr< homeobject::HomeObject > _home_object; _home_object = homeobject::init_homeobject(std::weak_ptr< homeobject::HomeObjectApplication >(app)); _peer1 = _home_object->our_uuid(); @@ -340,30 +265,26 @@ TEST_F(ShardManagerTestingRecovery, ShardManagerRecoveryV2) { auto pg_iter = ho->_pg_map.find(_pg_id); EXPECT_TRUE(pg_iter != ho->_pg_map.end()); auto& pg_result = pg_iter->second; - EXPECT_EQ(1, pg_result.shards.size()); - auto check_shard = *pg_result.shards.begin(); - EXPECT_EQ(ShardInfo::State::OPEN, check_shard.info.state); - EXPECT_TRUE(check_shard.metablk_cookie != nullptr); - + EXPECT_EQ(1, pg_result->shards_.size()); + auto check_shard = pg_result->shards_.front(); + EXPECT_EQ(ShardInfo::State::OPEN, check_shard->info.state); // release the homeobject and homestore will be shutdown automatically. _home_object.reset(); + LOGINFO("restart home_object"); // re-create the homeobject and pg infos and shard infos will be recover automatically. _home_object = homeobject::init_homeobject(std::weak_ptr< homeobject::HomeObjectApplication >(app)); ho = dynamic_cast< homeobject::HSHomeObject* >(_home_object.get()); EXPECT_TRUE(ho->_pg_map.size() == 1); - // check basic shard info first; - auto recovery_shard = _home_object->shard_manager()->get_shard(shard_info.id).get(); - EXPECT_TRUE(recovery_shard); - EXPECT_TRUE(recovery_shard.value() == shard_info); - // check shard internal state; pg_iter = ho->_pg_map.find(_pg_id); EXPECT_TRUE(pg_iter != ho->_pg_map.end()); - EXPECT_EQ(1, pg_iter->second.shards.size()); - EXPECT_EQ(check_shard.chunk_id, pg_iter->second.shards.front().chunk_id); - EXPECT_TRUE(pg_iter->second.shards.front().metablk_cookie != nullptr); + EXPECT_EQ(1, pg_iter->second->shards_.size()); + auto hs_shard = dp_cast(pg_iter->second->shards_.front()); + EXPECT_TRUE(hs_shard->info == shard_info); + EXPECT_TRUE(hs_shard->sb_->id == shard_info.id); + EXPECT_TRUE(hs_shard->sb_->id == shard_info.id); } */ diff --git a/src/lib/memory_backend/mem_shard_manager.cpp b/src/lib/memory_backend/mem_shard_manager.cpp index e05d7947..8fe3753b 100644 --- a/src/lib/memory_backend/mem_shard_manager.cpp +++ b/src/lib/memory_backend/mem_shard_manager.cpp @@ -16,9 +16,10 @@ ShardManager::Result< ShardInfo > MemoryHomeObject::_create_shard(pg_id_t pg_own auto& s_list = pg_it->second->shards_; info.id = make_new_shard_id(pg_owner, s_list.size()); - auto iter = s_list.emplace(s_list.end(), Shard(info)); + auto shard = std::make_shared< Shard >(info); + s_list.emplace(s_list.end(), shard); LOGDEBUG("Creating Shard [{}]: in Pg [{}] of Size [{}b]", info.id & shard_mask, pg_owner, size_bytes); - auto [_, s_happened] = _shard_map.emplace(info.id, iter); + auto [_, s_happened] = _shard_map.emplace(info.id, shard); RELEASE_ASSERT(s_happened, "Duplicate Shard insertion!"); } auto [it, happened] = index_.try_emplace(info.id, std::make_unique< ShardIndex >()); @@ -30,7 +31,7 @@ ShardManager::Result< ShardInfo > MemoryHomeObject::_seal_shard(shard_id_t id) { auto lg = std::scoped_lock(_shard_lock); auto shard_it = _shard_map.find(id); RELEASE_ASSERT(_shard_map.end() != shard_it, "Missing ShardIterator!"); - auto& shard_info = (*shard_it->second).info; + auto& shard_info = shard_it->second->info; shard_info.state = ShardInfo::State::SEALED; return shard_info; } diff --git a/src/lib/shard_manager.cpp b/src/lib/shard_manager.cpp index 2878457f..912c01ee 100644 --- a/src/lib/shard_manager.cpp +++ b/src/lib/shard_manager.cpp @@ -21,8 +21,8 @@ ShardManager::AsyncResult< InfoList > HomeObjectImpl::list_shards(pg_id_t pgid) auto info_l = std::list< ShardInfo >(); for (auto const& shard : pg->shards_) { - LOGDEBUG("Listing Shard {}", shard.info.id); - info_l.push_back(shard.info); + LOGDEBUG("Listing Shard {}", shard->info.id); + info_l.push_back(shard->info); } return info_l; });