From 0af7f2e8c6eb4c9b78248516b0de4b5272224c86 Mon Sep 17 00:00:00 2001 From: zilai Date: Tue, 5 Sep 2023 00:33:08 -0700 Subject: [PATCH] add metablk recovery support for create_shard --- src/include/homeobject/pg_manager.hpp | 8 - src/include/homeobject/shard_manager.hpp | 4 +- src/lib/homeobject_impl.hpp | 29 +++- src/lib/homestore/CMakeLists.txt | 1 + src/lib/homestore/homeobject.cpp | 34 ++-- src/lib/homestore/homeobject.hpp | 13 +- src/lib/homestore/replication_message.hpp | 13 +- .../homestore/replication_state_machine.cpp | 52 ++++-- .../homestore/replication_state_machine.hpp | 30 ++-- src/lib/homestore/shard_manager.cpp | 148 ++++++++++++------ src/lib/memory/shard_manager.cpp | 24 +-- src/lib/shard_manager.cpp | 16 +- 12 files changed, 222 insertions(+), 150 deletions(-) diff --git a/src/include/homeobject/pg_manager.hpp b/src/include/homeobject/pg_manager.hpp index 52a16bd6..f7757c5b 100644 --- a/src/include/homeobject/pg_manager.hpp +++ b/src/include/homeobject/pg_manager.hpp @@ -5,7 +5,6 @@ #include #include "common.hpp" -#include "shard_manager.hpp" namespace homeobject { @@ -28,13 +27,6 @@ struct PGInfo { mutable MemberSet members; }; -struct PG { - explicit PG(PGInfo info) : pg_info(std::move(info)) {} - PGInfo pg_info; - uint64_t next_sequence_num_for_new_shard{0}; - CShardInfoList shards; -}; - class PGManager : public Manager< PGError > { public: virtual NullAsyncResult create_pg(PGInfo&& pg_info) = 0; diff --git a/src/include/homeobject/shard_manager.hpp b/src/include/homeobject/shard_manager.hpp index 254958aa..b4705862 100644 --- a/src/include/homeobject/shard_manager.hpp +++ b/src/include/homeobject/shard_manager.hpp @@ -28,9 +28,7 @@ struct ShardInfo { std::optional< peer_id > current_leader{std::nullopt}; }; -using InfoList = std::list; -using CShardInfo = std::shared_ptr; -using CShardInfoList = std::list< CShardInfo >; +using InfoList = std::list< ShardInfo >; class ShardManager : public Manager< ShardError > { public: diff --git a/src/lib/homeobject_impl.hpp b/src/lib/homeobject_impl.hpp index 4545f9e9..6695883d 100644 --- a/src/lib/homeobject_impl.hpp +++ b/src/lib/homeobject_impl.hpp @@ -7,12 +7,6 @@ #include - -template <> -struct std::hash< homeobject::ShardInfo > { - std::size_t operator()(homeobject::ShardInfo const& i) const noexcept { return std::hash< uint64_t >()(i.id); } -}; - namespace home_replication { class ReplicationService; } @@ -22,6 +16,26 @@ namespace homeobject { inline bool operator<(ShardInfo const& lhs, ShardInfo const& rhs) { return lhs.id < rhs.id; } inline bool operator==(ShardInfo const& lhs, ShardInfo const& rhs) { return lhs.id == rhs.id; } +struct ShardInfoExt { + uint16_t chunk_id; +}; + +struct Shard { + explicit Shard(ShardInfo info) : info(std::move(info)) {} + ShardInfo info; + ShardInfoExt ext_info; +}; + +using ShardPtr = std::shared_ptr< Shard >; +using ShardPtrList = std::list< ShardPtr >; + +struct PG { + explicit PG(PGInfo info) : pg_info(std::move(info)) {} + PGInfo pg_info; + uint64_t shard_sequence_num{0}; + ShardPtrList shards; +}; + class HomeObjectImpl : public HomeObject, public BlobManager, public PGManager, @@ -54,7 +68,7 @@ class HomeObjectImpl : public HomeObject, std::map< pg_id, PG > _pg_map; mutable std::shared_mutex _shard_lock; - std::map< shard_id, CShardInfo > _shard_map; + std::map < shard_id, ShardPtr > _shard_map; /// public: explicit HomeObjectImpl(std::weak_ptr< HomeObjectApplication >&& application) : @@ -84,6 +98,7 @@ class HomeObjectImpl : public HomeObject, ShardManager::AsyncResult< ShardInfo > create_shard(pg_id pg_owner, uint64_t size_bytes) final; ShardManager::AsyncResult< InfoList > list_shards(pg_id pg) const final; ShardManager::AsyncResult< ShardInfo > seal_shard(shard_id id) final; + uint64_t get_current_timestamp(); static const std::string s_shard_info_sub_type; /// BlobManager diff --git a/src/lib/homestore/CMakeLists.txt b/src/lib/homestore/CMakeLists.txt index ab4c1685..83158a50 100644 --- a/src/lib/homestore/CMakeLists.txt +++ b/src/lib/homestore/CMakeLists.txt @@ -6,6 +6,7 @@ target_sources("${PROJECT_NAME}_homestore" PRIVATE homeobject.cpp blob_manager.cpp shard_manager.cpp + replication_state_machine.cpp $ ) target_link_libraries("${PROJECT_NAME}_homestore" diff --git a/src/lib/homestore/homeobject.cpp b/src/lib/homestore/homeobject.cpp index 2bf7aade..d919d36b 100644 --- a/src/lib/homestore/homeobject.cpp +++ b/src/lib/homestore/homeobject.cpp @@ -7,7 +7,7 @@ namespace homeobject { -const std::string HomeObjectImpl::s_shard_info_sub_type = "shard_info"; +const std::string HSHomeObject::s_shard_info_sub_type = "shard_info"; extern std::shared_ptr< HomeObject > init_homeobject(std::weak_ptr< HomeObjectApplication >&& application) { LOGINFOMOD(homeobject, "Initializing HomeObject"); @@ -30,7 +30,8 @@ void HSHomeObject::init_homestore() { ioenvironment.with_iomgr(iomgr::iomgr_params{.num_threads = app->threads(), .is_spdk = app->spdk_mode()}); //register some callbacks for metadata recovery; - HomeStore::instance()->meta_service().register_handler(HomeObjectImpl::s_shard_info_sub_type, + using namespace homestore; + HomeStore::instance()->meta_service().register_handler(HSHomeObject::s_shard_info_sub_type, [this](homestore::meta_blk* mblk, sisl::byte_view buf, size_t size) { on_shard_meta_blk_found(mblk, buf, size); }, @@ -47,7 +48,6 @@ void HSHomeObject::init_homestore() { } /// TODO need Repl service eventually yeah? - using namespace homestore; uint32_t services = HS_SERVICE::META | HS_SERVICE::LOG_REPLICATED | HS_SERVICE::LOG_LOCAL | HS_SERVICE::DATA; bool need_format = HomeStore::instance()->start( @@ -77,7 +77,6 @@ void HomeObjectImpl::init_repl_svc() { } } - HSHomeObject::~HSHomeObject() { homestore::HomeStore::instance()->shutdown(); homestore::HomeStore::reset_instance(); @@ -85,30 +84,15 @@ HSHomeObject::~HSHomeObject() { } void HSHomeObject::on_shard_meta_blk_found(homestore::meta_blk* mblk, sisl::byte_view buf, size_t size) { - const char* shard_info_json_str = r_cast(buf.bytes()); - auto shard_info_json = nlohmann::json::parse(shard_info_json_str, shard_info_json_str + size); - - CShardInfo shard_info = std::make_shared(); - shard_info->id = shard_info_json["shard_id"].get(); - shard_info->placement_group = shard_info_json["pg_id"].get(); - shard_info->state = static_cast(shard_info_json["state"].get()); - shard_info->total_capacity_bytes = shard_info_json["capacity"].get(); + std::string shard_info_str; + shard_info_str.append(r_cast(buf.bytes()), size); - std::scoped_lock lock_guard(_pg_lock, _shard_lock); - if (shard_info->state == ShardInfo::State::OPEN) { + auto shard = deserialize_shard_info(shard_info_str); + if (shard->info.state == ShardInfo::State::OPEN) { // create shard; - auto pg_iter = _pg_map.find(shard_info->placement_group); - RELEASE_ASSERT(pg_iter != _pg_map.end(), "Missing PG info"); - pg_iter->second.shards.push_back(shard_info); - auto sequence_num = get_sequence_num_from_shard_id(shard_info->id); - if (sequence_num > pg_iter->second.next_sequence_num_for_new_shard) { - pg_iter->second.next_sequence_num_for_new_shard = sequence_num; - } - _shard_map[shard_info->id] = shard_info; + do_commit_new_shard(shard); } else { - auto shard_iter = _shard_map.find(shard_info->id); - RELEASE_ASSERT(shard_iter != _shard_map.end(), "Missing shard info"); - shard_iter->second = shard_info; + do_commit_seal_shard(shard); } } diff --git a/src/lib/homestore/homeobject.hpp b/src/lib/homestore/homeobject.hpp index 4214a2d3..a4071f3f 100644 --- a/src/lib/homestore/homeobject.hpp +++ b/src/lib/homestore/homeobject.hpp @@ -27,16 +27,25 @@ class HSHomeObject : public HomeObjectImpl { shard_id generate_new_shard_id(pg_id pg); shard_id make_new_shard_id(pg_id pg, uint64_t sequence_num) const; uint64_t get_sequence_num_from_shard_id(uint64_t shard_id); - std::string prepare_create_shard_message(pg_id pg, shard_id new_shard_id, uint64_t shard_size) const; + std::string serialize_shard_info(ShardPtr shard) const; + ShardPtr deserialize_shard_info(const std::string& shard_info_str) const; + void do_commit_new_shard(ShardPtr shard_info); + void do_commit_seal_shard(ShardPtr shard_info); public: using HomeObjectImpl::HomeObjectImpl; ~HSHomeObject(); void init_homestore(); + static const std::string s_shard_info_sub_type; void on_shard_meta_blk_found(homestore::meta_blk* mblk, sisl::byte_view buf, size_t size); + + void on_pre_commit_shard_msg(int64_t lsn, sisl::blob const& header, sisl::blob const& key, + void* user_ctx); + void on_rollback_shard_msg(int64_t lsn, sisl::blob const& header, sisl::blob const& key, + void* user_ctx); void on_shard_message_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, - sisl::sg_list const& value, void* user_ctx); + void* user_ctx); }; } // namespace homeobject diff --git a/src/lib/homestore/replication_message.hpp b/src/lib/homestore/replication_message.hpp index d937cbe4..d2043622 100644 --- a/src/lib/homestore/replication_message.hpp +++ b/src/lib/homestore/replication_message.hpp @@ -1,5 +1,7 @@ #pragma once +#include "homeobject/common.hpp" + #include #include @@ -9,16 +11,19 @@ ENUM(ReplicationMessageType, uint16_t, PG_MESSAGE = 0, SHARD_MESSAGE, BLOB_MESSA // magic num comes from the first 8 bytes of 'echo homeobject_replication | md5sum' static constexpr uint64_t HOMEOBJECT_REPLICATION_MAGIC = 0x11153ca24efc8d34; +static constexpr uint32_t HOMEOBJECT_REPLICATION_PROTOCOL_VERSION_V1 = 0x01; +static constexpr uint32_t init_crc32 = 0; struct ReplicationMessageHeader { uint64_t magic_num{HOMEOBJECT_REPLICATION_MAGIC}; + uint32_t protocol_version{HOMEOBJECT_REPLICATION_PROTOCOL_VERSION_V1}; ReplicationMessageType message_type; - uint32_t message_size; - uint32_t message_crc; - uint8_t reserved_pad[2]{}; + uint32_t payload_size; + uint32_t payload_crc; + uint8_t reserved_pad[6]{}; uint32_t header_crc; uint32_t calculate_crc() const { - return crc32_ieee(0, r_cast(this), sizeof(*this) - sizeof(header_crc)); + return crc32_ieee(init_crc32, r_cast(this), sizeof(*this) - sizeof(header_crc)); } }; diff --git a/src/lib/homestore/replication_state_machine.cpp b/src/lib/homestore/replication_state_machine.cpp index 623af65a..33721291 100644 --- a/src/lib/homestore/replication_state_machine.cpp +++ b/src/lib/homestore/replication_state_machine.cpp @@ -1,32 +1,58 @@ +#include "replication_message.hpp" #include "replication_state_machine.hpp" -#include "homeobject_impl.hpp" namespace homeobject { -void ReplicationStateMachine::on_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, - blkid_list_t const& blkids,void* ctx) { +void ReplicationStateMachine::on_commit(int64_t lsn, const sisl::blob& header, const sisl::blob& key, + const home_replication::pba_list_t& pbas, void* ctx) { + LOGINFO("applying raft log commit with lsn:{}", lsn); const ReplicationMessageHeader* msg_header = r_cast(header.bytes); - //TODO: read data from pba or get data from replication context(prefered) - sisl::sg_list value; switch (msg_header->message_type) { - case SHARD_MESSAGE: { - home_object->on_shard_message_commit(lsn, header, key, value, ctx); + case ReplicationMessageType::SHARD_MESSAGE: { + _home_object->on_shard_message_commit(lsn, header, key,ctx); + break; } - case PG_MESSAGE: - case BLOB_MESSAGE: + case ReplicationMessageType::PG_MESSAGE: + case ReplicationMessageType::BLOB_MESSAGE: default: { break; } } } -void ReplicationStateMachine::on_pre_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, void* ctx) {} +void ReplicationStateMachine::on_pre_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, void* ctx) { + LOGINFO("on_pre_commit with lsn:{}", lsn); + const ReplicationMessageHeader* msg_header = r_cast(header.bytes); -void ReplicationStateMachine::on_rollback(int64_t lsn, sisl::blob const& header, sisl::blob const& key, void* ctx) {} + switch (msg_header->message_type) { + case ReplicationMessageType::SHARD_MESSAGE: { + _home_object->on_pre_commit_shard_msg(lsn, header, key,ctx); + break; + } + case ReplicationMessageType::PG_MESSAGE: + case ReplicationMessageType::BLOB_MESSAGE: + default: { + break; + } + } +} -blk_alloc_hints ReplicationStateMachine::get_blk_alloc_hints(sisl::blob const& header) { - return blk_alloc_hints(); +void ReplicationStateMachine::on_rollback(int64_t lsn, sisl::blob const& header, sisl::blob const& key, void* ctx) { + LOGINFO("rollback with lsn:{}", lsn); + const ReplicationMessageHeader* msg_header = r_cast(header.bytes); + + switch (msg_header->message_type) { + case ReplicationMessageType::SHARD_MESSAGE: { + _home_object->on_rollback_shard_msg(lsn, header, key,ctx); + break; + } + case ReplicationMessageType::PG_MESSAGE: + case ReplicationMessageType::BLOB_MESSAGE: + default: { + break; + } + } } void ReplicationStateMachine::on_replica_stop() {} diff --git a/src/lib/homestore/replication_state_machine.hpp b/src/lib/homestore/replication_state_machine.hpp index 45bcb60b..a959c5de 100644 --- a/src/lib/homestore/replication_state_machine.hpp +++ b/src/lib/homestore/replication_state_machine.hpp @@ -1,13 +1,13 @@ #pragma once -#include "homestore/replication/repl_dev.h" - +#include "homeobject.hpp" +#include "mocks/repl_service.h" namespace homeobject { class HomeObjectImpl; -class ReplicationStateMachine : public homestore::ReplicaDevListener { +class ReplicationStateMachine : public home_replication::ReplicaSetListener { public: /// @brief Called when the log entry has been committed in the replica set. /// @@ -20,10 +20,10 @@ class ReplicationStateMachine : public homestore::ReplicaDevListener { /// @param pbas - List of pbas where data is written to the storage engine. /// @param ctx - User contenxt passed as part of the replica_set::write() api /// - virtual void on_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, blkid_list_t const& blkids, - void* ctx); + virtual void on_commit(int64_t lsn, const sisl::blob& header, const sisl::blob& key, + const home_replication::pba_list_t& pbas, void* ctx); - /// @brief Called when the log entry has been received by the replica dev. + /// @brief Called when the log entry has been received by the replica set. /// /// On recovery, this is called from a random worker thread before the raft server is started. It is /// guaranteed to be serialized in log index order. @@ -41,7 +41,7 @@ class ReplicationStateMachine : public homestore::ReplicaDevListener { /// @param header - Header originally passed with replica_set::write() api /// @param key - Key originally passed with replica_set::write() api /// @param ctx - User contenxt passed as part of the replica_set::write() api - virtual void on_pre_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, void* ctx); + virtual void on_pre_commit(int64_t lsn, const sisl::blob& header, const sisl::blob& key, void* ctx); /// @brief Called when the log entry has been rolled back by the replica set. /// @@ -56,23 +56,13 @@ class ReplicationStateMachine : public homestore::ReplicaDevListener { /// @param header - Header originally passed with replica_set::write() api /// @param key - Key originally passed with replica_set::write() api /// @param ctx - User contenxt passed as part of the replica_set::write() api - virtual void on_rollback(int64_t lsn, sisl::blob const& header, sisl::blob const& key, void* ctx); - - /// @brief Called when replication module is trying to allocate a block to write the value - /// - /// This function can be called both on leader and follower when it is trying to allocate a block to write the - /// value. Callee is expected to provide hints for allocation based on the header supplied as part of original - /// write. In cases where callee don't care about the hints can return default blk_alloc_hints. - /// - /// @param header Header originally passed with repl_dev::write() api on the leader - /// @return Expected to return blk_alloc_hints for this write - virtual blk_alloc_hints get_blk_alloc_hints(sisl::blob const& header); + virtual void on_rollback(int64_t lsn, const sisl::blob& header, const sisl::blob& key, void* ctx); /// @brief Called when the replica set is being stopped - virtual void on_replica_stop(); + virtual void on_replica_stop() = 0; private: - HomeObjectImpl* home_object; + HSHomeObject* _home_object; }; } diff --git a/src/lib/homestore/shard_manager.cpp b/src/lib/homestore/shard_manager.cpp index 51d07693..a47e041d 100644 --- a/src/lib/homestore/shard_manager.cpp +++ b/src/lib/homestore/shard_manager.cpp @@ -18,11 +18,8 @@ bool HSHomeObject::check_if_pg_exist(pg_id pg) const { shard_id HSHomeObject::generate_new_shard_id(pg_id pg) { std::scoped_lock lock_guard(_pg_lock); auto iter = _pg_map.find(pg); - if (iter == _pg_map.end()) { - auto created_pg = PG(std::move(PGInfo(pg))); - iter = _pg_map.insert(std::make_pair(pg, std::move(created_pg))).first; - } - auto new_sequence_num = ++iter->second.next_sequence_num_for_new_shard; + RELEASE_ASSERT(iter != _pg_map.end(), "Missing pg info"); + auto new_sequence_num = ++(iter->second.shard_sequence_num); RELEASE_ASSERT(new_sequence_num < ShardManager::max_shard_num_in_pg(), "new shard id must be less than ShardManager::max_shard_num_in_pg()"); return make_new_shard_id(pg, new_sequence_num); } @@ -34,15 +31,37 @@ shard_id HSHomeObject::make_new_shard_id(pg_id pg, uint64_t sequence_num) const uint64_t HSHomeObject::get_sequence_num_from_shard_id(uint64_t shard_id) { return shard_id & (0x0000FFFFFFFFFFFF); } -std::string HSHomeObject::prepare_create_shard_message(pg_id pg, shard_id new_shard_id, uint64_t shard_size) const { - nlohmann::json j; - j["pg_id"] = pg; - j["shard_id"] = new_shard_id; - j["state"] = ShardInfo::State::OPEN; - j["capacity"] = shard_size; + +std::string HSHomeObject::serialize_shard_info(ShardPtr shard) const { + nlohmann::json j; + j["shard_info"]["shard_id"] = shard->info.id; + j["shard_info"]["pg_id"] = shard->info.placement_group; + j["shard_info"]["state"] = shard->info.state; + j["shard_info"]["created_time"] = shard->info.created_time; + j["shard_info"]["modified_time"] = shard->info.last_modified_time; + j["shard_info"]["total_capacity"] = shard->info.total_capacity_bytes; + j["shard_info"]["available_capacity"] = shard->info.available_capacity_bytes; + j["shard_info"]["deleted_capacity"] = shard->info.deleted_capacity_bytes; + j["ext_info"]["chunk_id"] = shard->ext_info.chunk_id; return j.dump(); } +ShardPtr HSHomeObject::deserialize_shard_info(const std::string& shard_json_str) const { + auto shard_json = nlohmann::json::parse(shard_json_str); + ShardInfo shard_info; + shard_info.id = shard_json["shard_info"]["shard_id"].get(); + shard_info.placement_group = shard_json["shard_info"]["pg_id"].get(); + shard_info.state = static_cast(shard_json["shard_info"]["state"].get()); + shard_info.created_time = shard_json["shard_info"]["created_time"].get(); + shard_info.last_modified_time = shard_json["shard_info"]["modified_time"].get(); + shard_info.available_capacity_bytes = shard_json["shard_info"]["available_capacity"].get(); + shard_info.total_capacity_bytes = shard_json["shard_info"]["total_capacity"].get(); + shard_info.deleted_capacity_bytes = shard_json["shard_info"]["deleted_capacity"].get(); + auto shard = std::make_shared< Shard >(shard_info); + shard->ext_info.chunk_id = shard_json["ext_info"]["chunk_id"].get(); + return shard; +} + ShardManager::Result< ShardInfo > HSHomeObject::_create_shard(pg_id pg_owner, uint64_t size_bytes) { bool pg_exist = check_if_pg_exist(pg_owner); if (!pg_exist) { @@ -50,38 +69,46 @@ ShardManager::Result< ShardInfo > HSHomeObject::_create_shard(pg_id pg_owner, ui return folly::makeUnexpected(ShardError::UNKNOWN_PG); } - auto repl_dev = _repl_svc->get_replica_dev(fmt::format("{}", pg_owner)); - if (repl_dev == nullptr) { + //TODO: will update to ReplDev when ReplDev on HomeStore is ready; + auto replica_set_var = _repl_svc->get_replica_set(fmt::format("{}", pg_owner)); + if (std::holds_alternative< home_replication:: ReplServiceError>(replica_set_var)) { LOGWARN("failed to get replica set instance for pg [{}]", pg_owner); - return folly::makeUnexpected(ShardError::UNKNOWN_PG); + return folly::makeUnexpected(ShardError::UNKNOWN_PG); } - if (!repl_dev->is_leader()) { + auto replica_set = std::get< home_replication::rs_ptr_t >(replica_set_var); + if (!replica_set->is_leader()) { LOGWARN("pg [{}] replica set is not a leader, please retry other pg members", pg_owner); return folly::makeUnexpected(ShardError::NOT_LEADER); } auto new_shard_id = generate_new_shard_id(pg_owner); - std::string create_shard_message = prepare_create_shard_message(pg_owner, new_shard_id, size_bytes); - ReplicationMessageHeader header; - header.message_type = ReplicationMessageType::SHARD_MESSAGE; - header.message_size = create_shard_message.size(); - header.message_crc = crc32_ieee(0, r_cast< const uint8_t* >(create_shard_message.c_str()), create_shard_message.size()); - header.header_crc = header.calculate_crc(); + auto create_time = get_current_timestamp(); + auto shard = std::make_shared(ShardInfo(new_shard_id, pg_owner, ShardInfo::State::OPEN, + create_time, create_time, size_bytes, size_bytes, 0)); + std::string create_shard_message = serialize_shard_info(shard); + //preapre msg header; + const uint32_t needed_size = sizeof(ReplicationMessageHeader) + create_shard_message.size(); + auto buf = nuraft::buffer::alloc(needed_size); + uint8_t* raw_ptr = static_cast(buf->data_begin()); + ReplicationMessageHeader *header = new(raw_ptr) ReplicationMessageHeader(); + header->message_type = ReplicationMessageType::SHARD_MESSAGE; + header->payload_size = create_shard_message.size(); + header->payload_crc = crc32_ieee(init_crc32, r_cast< const uint8_t* >(create_shard_message.c_str()), create_shard_message.size()); + header->header_crc = header->calculate_crc(); + raw_ptr += sizeof(ReplicationMessageHeader); + std::memcpy(raw_ptr, create_shard_message.c_str(), create_shard_message.size()); + sisl::sg_list value; - value.size = create_shard_message.size(); - value.iovs.emplace_back(iovec{static_cast(const_cast(create_shard_message.c_str())), - create_shard_message.size()}); //replicate this create shard message to PG members; auto [p, sf] = folly::makePromiseContract< ShardManager::Result >(); - repl_dev->async_alloc_write(sisl::blob(uintptr_cast(&header), sizeof(header)), - sisl::blob(), - value, - static_cast(&p)); + replica_set->write(sisl::blob(buf->data_begin(), needed_size), sisl::blob(), value, + static_cast(&p)); auto info = std::move(sf).via(folly::getGlobalCPUExecutor()).get(); if (!bool(info)) { LOGWARN("create new shard [{}] on pg [{}] is failed with error:{}", new_shard_id, pg_owner, info.error()); } + header->~ReplicationMessageHeader(); return info; } @@ -89,8 +116,17 @@ ShardManager::Result< ShardInfo > HSHomeObject::_seal_shard(shard_id id) { return folly::makeUnexpected(ShardError::UNKNOWN_SHARD); } +void HSHomeObject::on_pre_commit_shard_msg(int64_t lsn, sisl::blob const& header, sisl::blob const& key, + void* user_ctx) { + +} + +void HSHomeObject::on_rollback_shard_msg(int64_t lsn, sisl::blob const& header, sisl::blob const& key, + void* user_ctx) { +} + void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, - sisl::sg_list const& value, void* user_ctx) { + void* user_ctx) { const ReplicationMessageHeader* msg_header = r_cast(header.bytes); folly::Promise > *promise = nullptr; // user_ctx will be nullptr when: @@ -109,12 +145,10 @@ void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& header } std::string string_value; - for (auto & iovec: value.iovs) { - string_value.append(static_cast(iovec.iov_base), iovec.iov_len); - } + string_value.append(r_cast(header.bytes + sizeof(ReplicationMessageHeader)), msg_header->payload_size); - auto crc = crc32_ieee(0, r_cast< const uint8_t* >(string_value.c_str()), string_value.size()); - if (msg_header->message_crc != crc) { + auto crc = crc32_ieee(init_crc32, r_cast< const uint8_t* >(string_value.c_str()), string_value.size()); + if (msg_header->payload_crc != crc) { LOGWARN("replication message body is corrupted with crc error, lsn:{}", lsn); if (promise) { promise->setValue(folly::makeUnexpected(ShardError::INVALID_ARG)); @@ -122,26 +156,44 @@ void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& header return; } - //preapre ShardInfoExt msg and persisted this ShardInfoExt to homestore MetaBlkService; + //TODO: preapre ShardInfoExt msg with chunk_id and persisted the ShardInfoExt to homestore MetaBlkService; void* cookie = nullptr; - homestore::hs()->meta_service().add_sub_sb(HomeObjectImpl::s_shard_info_sub_type, r_cast(string_value.c_str()), string_value.size(), cookie); + homestore::hs()->meta_service().add_sub_sb(HSHomeObject::s_shard_info_sub_type, r_cast(string_value.c_str()), string_value.size(), cookie); //update in-memory shard map; - CShardInfo shard_info = std::make_shared(); - auto shard_info_json = nlohmann::json::parse(string_value); - shard_info->id = shard_info_json["shard_id"].get(); - shard_info->placement_group = shard_info_json["pg_id"].get(); - shard_info->state = static_cast(shard_info_json["state"].get()); - shard_info->total_capacity_bytes = shard_info_json["capacity"].get(); - std::scoped_lock lock_guard(_pg_lock, _shard_lock); - auto pg_iter = _pg_map.find(shard_info->placement_group); - RELEASE_ASSERT(pg_iter != _pg_map.end(), "Missing PG info"); - pg_iter->second.shards.push_back(shard_info); - _shard_map[shard_info->id] = shard_info; + auto shard = deserialize_shard_info(string_value); + if (shard->info.state == ShardInfo::State::OPEN) { + //create shard; + do_commit_new_shard(shard); + } else { + do_commit_seal_shard(shard); + } if (promise) { - promise->setValue(ShardManager::Result(*shard_info)); + promise->setValue(ShardManager::Result(shard->info)); } } +void HSHomeObject::do_commit_new_shard(ShardPtr shard) { + std::scoped_lock lock_guard(_pg_lock, _shard_lock); + auto pg_iter = _pg_map.find(shard->info.placement_group); + RELEASE_ASSERT(pg_iter != _pg_map.end(), "Missing PG info"); + pg_iter->second.shards.push_back(shard); + auto [_, happened] = _shard_map.emplace(shard->info.id, shard); + RELEASE_ASSERT(happened, "duplicated shard info"); + + //following part is must for follower members or when member is restarted; + auto sequence_num = get_sequence_num_from_shard_id(shard->info.id); + if (sequence_num > pg_iter->second.shard_sequence_num) { + pg_iter->second.shard_sequence_num = sequence_num; + } +} + +void HSHomeObject::do_commit_seal_shard(ShardPtr shard) { + std::scoped_lock lock_guard(_shard_lock); + auto shard_iter = _shard_map.find(shard->info.id); + RELEASE_ASSERT(shard_iter != _shard_map.end(), "Missing shard info"); + *(shard_iter->second) = *shard; +} + } // namespace homeobject diff --git a/src/lib/memory/shard_manager.cpp b/src/lib/memory/shard_manager.cpp index c6c330a1..ddc340d6 100644 --- a/src/lib/memory/shard_manager.cpp +++ b/src/lib/memory/shard_manager.cpp @@ -6,28 +6,20 @@ namespace homeobject { uint64_t ShardManager::max_shard_size() { return Gi; } -static uint64_t get_current_timestamp() { - auto now = std::chrono::system_clock::now(); - auto duration = std::chrono::duration_cast< std::chrono::milliseconds >(now.time_since_epoch()); - auto timestamp = static_cast< uint64_t >(duration.count()); - return timestamp; -} - ShardManager::Result< ShardInfo > MemoryHomeObject::_create_shard(pg_id pg_owner, uint64_t size_bytes) { auto const now = get_current_timestamp(); - auto info = ShardInfo{0ull, pg_owner, ShardInfo::State::OPEN, now, now, size_bytes, size_bytes, 0}; + auto info = ShardInfo(0ull, pg_owner, ShardInfo::State::OPEN, now, now, size_bytes, size_bytes, 0); { auto lg = std::scoped_lock(_pg_lock, _shard_lock); auto pg_it = _pg_map.find(pg_owner); if (_pg_map.end() == pg_it) return folly::makeUnexpected(ShardError::UNKNOWN_PG); - auto& s_set = pg_it->second.shards; - info.id = (((uint64_t)pg_owner) << 48) + s_set.size(); + auto& s_list = pg_it->second.shards; + info.id = (((uint64_t)pg_owner) << 48) + s_list.size(); + auto shard = std::make_shared(info); LOGDEBUG("Creating Shard [{}]: in Pg [{}] of Size [{}b]", info.id, pg_owner, size_bytes); - - auto [s_it, happened] = s_set.emplace(info); - RELEASE_ASSERT(happened, "Duplicate Shard insertion!"); - auto [_, s_happened] = _shard_map.emplace(info.id, s_it); + s_list.push_back(shard); + auto [_, s_happened] = _shard_map.emplace(info.id, shard); RELEASE_ASSERT(s_happened, "Duplicate Shard insertion!"); } auto lg = std::scoped_lock(_index_lock); @@ -40,8 +32,8 @@ ShardManager::Result< ShardInfo > MemoryHomeObject::_seal_shard(shard_id id) { auto lg = std::scoped_lock(_shard_lock); auto shard_it = _shard_map.find(id); if (_shard_map.end() == shard_it) return folly::makeUnexpected(ShardError::UNKNOWN_SHARD); - shard_it->second->state = ShardInfo::State::SEALED; - return *(shard_it->second); + shard_it->second->info.state = ShardInfo::State::SEALED; + return shard_it->second->info; } } // namespace homeobject diff --git a/src/lib/shard_manager.cpp b/src/lib/shard_manager.cpp index 8c609cfd..458236f6 100644 --- a/src/lib/shard_manager.cpp +++ b/src/lib/shard_manager.cpp @@ -22,9 +22,9 @@ ShardManager::AsyncResult< InfoList > HomeObjectImpl::list_shards(pg_id pg) cons if (_pg_map.end() == pg_it) return folly::makeUnexpected(ShardError::UNKNOWN_PG); auto info_l = std::list< ShardInfo >(); - for (auto const& info : pg_it->second.second) { - LOGDEBUG("Listing Shard {}", info.id); - info_l.push_back(info); + for (auto const& shard : pg_it->second.shards) { + LOGDEBUG("Listing Shard {}", shard->info.id); + info_l.push_back(shard->info); } return info_l; }); @@ -35,6 +35,7 @@ ShardManager::AsyncResult< ShardInfo > HomeObjectImpl::seal_shard(shard_id id) { [this, id](auto) mutable -> ShardManager::Result< ShardInfo > { return _seal_shard(id); }); } + ShardManager::AsyncResult< ShardInfo > HomeObjectImpl::get_shard(shard_id id) const { return _get_shard(id); } /// @@ -43,9 +44,16 @@ ShardManager::AsyncResult< ShardInfo > HomeObjectImpl::get_shard(shard_id id) co folly::Future< ShardManager::Result< ShardInfo > > HomeObjectImpl::_get_shard(shard_id id) const { return _defer().thenValue([this, id](auto) -> ShardManager::Result< ShardInfo > { auto lg = std::shared_lock(_shard_lock); - if (auto it = _shard_map.find(id); _shard_map.end() != it) return *it->second; + if (auto it = _shard_map.find(id); _shard_map.end() != it) return it->second->info; return folly::makeUnexpected(ShardError::UNKNOWN_SHARD); }); } +uint64_t HomeObjectImpl::get_current_timestamp() { + auto now = std::chrono::system_clock::now(); + auto duration = std::chrono::duration_cast< std::chrono::milliseconds >(now.time_since_epoch()); + auto timestamp = static_cast< uint64_t >(duration.count()); + return timestamp; +} + } // namespace homeobject