diff --git a/src/include/homeobject/pg_manager.hpp b/src/include/homeobject/pg_manager.hpp index f7757c5b..52a16bd6 100644 --- a/src/include/homeobject/pg_manager.hpp +++ b/src/include/homeobject/pg_manager.hpp @@ -5,6 +5,7 @@ #include #include "common.hpp" +#include "shard_manager.hpp" namespace homeobject { @@ -27,6 +28,13 @@ struct PGInfo { mutable MemberSet members; }; +struct PG { + explicit PG(PGInfo info) : pg_info(std::move(info)) {} + PGInfo pg_info; + uint64_t next_sequence_num_for_new_shard{0}; + CShardInfoList shards; +}; + class PGManager : public Manager< PGError > { public: virtual NullAsyncResult create_pg(PGInfo&& pg_info) = 0; diff --git a/src/include/homeobject/shard_manager.hpp b/src/include/homeobject/shard_manager.hpp index 4f22f6bd..254958aa 100644 --- a/src/include/homeobject/shard_manager.hpp +++ b/src/include/homeobject/shard_manager.hpp @@ -27,11 +27,15 @@ struct ShardInfo { uint64_t deleted_capacity_bytes; std::optional< peer_id > current_leader{std::nullopt}; }; -using InfoList = std::list< ShardInfo >; + +using InfoList = std::list; +using CShardInfo = std::shared_ptr; +using CShardInfoList = std::list< CShardInfo >; class ShardManager : public Manager< ShardError > { public: static uint64_t max_shard_size(); // Static function forces runtime evaluation. + static uint64_t max_shard_num_in_pg(); virtual AsyncResult< ShardInfo > get_shard(shard_id id) const = 0; virtual AsyncResult< InfoList > list_shards(pg_id id) const = 0; diff --git a/src/lib/homeobject_impl.hpp b/src/lib/homeobject_impl.hpp index 89537e28..9c4e362f 100644 --- a/src/lib/homeobject_impl.hpp +++ b/src/lib/homeobject_impl.hpp @@ -8,6 +8,7 @@ #include #include + template <> struct std::hash< homeobject::ShardInfo > { std::size_t operator()(homeobject::ShardInfo const& i) const noexcept { return std::hash< uint64_t >()(i.id); } @@ -24,9 +25,6 @@ class HomeObjectImpl : public HomeObject, public ShardManager, public std::enable_shared_from_this< HomeObjectImpl > { - std::mutex _repl_lock; - std::shared_ptr< home_replication::ReplicationService > _repl_svc; - /// Implementation defines these virtual folly::Future< ShardManager::Result< ShardInfo > > _get_shard(shard_id) const = 0; virtual ShardManager::Result< ShardInfo > _create_shard(pg_id, uint64_t size_bytes) = 0; @@ -39,6 +37,9 @@ class HomeObjectImpl : public HomeObject, /// protected: + std::mutex _repl_lock; + std::shared_ptr< home_replication::ReplicationService > _repl_svc; + //std::shared_ptr _repl_svc; peer_id _our_id; /// Our SvcId retrieval and SvcId->IP mapping @@ -46,14 +47,11 @@ class HomeObjectImpl : public HomeObject, /// mutable std::shared_mutex _pg_lock; - using shard_set = std::unordered_set< ShardInfo >; - using pg_pair = std::pair< PGInfo, shard_set >; - std::map< pg_id, pg_pair > _pg_map; + std::map< pg_id, PG > _pg_map; mutable std::shared_mutex _shard_lock; - std::map< shard_id, shard_set::const_iterator > _shard_map; + std::map< shard_id, CShardInfo > _shard_map; /// - public: explicit HomeObjectImpl(std::weak_ptr< HomeObjectApplication >&& application) : _application(std::move(application)) {} @@ -79,6 +77,7 @@ class HomeObjectImpl : public HomeObject, ShardManager::AsyncResult< InfoList > list_shards(pg_id pg) const final; ShardManager::AsyncResult< ShardInfo > seal_shard(shard_id id) final; + static const std::string s_shard_info_sub_type; /// BlobManager BlobManager::AsyncResult< blob_id > put(shard_id shard, Blob&&) final; BlobManager::AsyncResult< Blob > get(shard_id shard, blob_id const& blob, uint64_t off, uint64_t len) const final; diff --git a/src/lib/homestore/homeobject.cpp b/src/lib/homestore/homeobject.cpp index 2229f1f7..4ea51d05 100644 --- a/src/lib/homestore/homeobject.cpp +++ b/src/lib/homestore/homeobject.cpp @@ -1,9 +1,19 @@ #include "homeobject.hpp" +#include +#include + namespace homeobject { +const std::string HomeObjectImpl::s_shard_info_sub_type = "shard_info"; + extern std::shared_ptr< HomeObject > init_homeobject(std::weak_ptr< HomeObjectApplication >&& application) { LOGINFOMOD(homeobject, "Initializing HomeObject"); + //register some callbacks for metadata recovery; + homestore::hs()->meta_service().register_handler(HomeObjectImpl::s_shard_info_sub_type, + HSHomeObject::on_shard_meta_blk_found, + nullptr, + true); auto instance = std::make_shared< HSHomeObject >(std::move(application)); instance->init_repl_svc(); return instance; @@ -20,4 +30,6 @@ void HomeObjectImpl::init_repl_svc() { } } +void HSHomeObject::on_shard_meta_blk_found(homestore::meta_blk* mblk, sisl::byte_view buf, size_t size) {} + } // namespace homeobject diff --git a/src/lib/homestore/homeobject.hpp b/src/lib/homestore/homeobject.hpp index 751fed94..8b370ff6 100644 --- a/src/lib/homestore/homeobject.hpp +++ b/src/lib/homestore/homeobject.hpp @@ -7,6 +7,10 @@ #include "lib/homeobject_impl.hpp" +namespace homestore { +struct meta_blk; +} + namespace homeobject { class HSHomeObject : public HomeObjectImpl { @@ -20,10 +24,17 @@ class HSHomeObject : public HomeObjectImpl { BlobManager::Result< Blob > _get_blob(ShardInfo const&, blob_id) const override; BlobManager::NullResult _del_blob(ShardInfo const&, blob_id) override; /// - +private: + bool check_if_pg_exist(pg_id pg); + shard_id generate_new_shard_id(pg_id pg); + shard_id make_new_shard_id(pg_id pg, uint64_t sequence_num); + std::string prepare_create_shard_message(pg_id pg, shard_id new_shard_id, uint64_t shard_size); public: using HomeObjectImpl::HomeObjectImpl; ~HSHomeObject() override = default; + static void on_shard_meta_blk_found(homestore::meta_blk* mblk, sisl::byte_view buf, size_t size); + void on_shard_message_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, + sisl::sg_list const& value, void* user_ctx); }; } // namespace homeobject diff --git a/src/lib/homestore/replication_message.hpp b/src/lib/homestore/replication_message.hpp new file mode 100644 index 00000000..d937cbe4 --- /dev/null +++ b/src/lib/homestore/replication_message.hpp @@ -0,0 +1,26 @@ +#pragma once + +#include +#include + +namespace homeobject { + +ENUM(ReplicationMessageType, uint16_t, PG_MESSAGE = 0, SHARD_MESSAGE, BLOB_MESSAGE, UNKNOWN_MESSAGE); + +// magic num comes from the first 8 bytes of 'echo homeobject_replication | md5sum' +static constexpr uint64_t HOMEOBJECT_REPLICATION_MAGIC = 0x11153ca24efc8d34; + +struct ReplicationMessageHeader { + uint64_t magic_num{HOMEOBJECT_REPLICATION_MAGIC}; + ReplicationMessageType message_type; + uint32_t message_size; + uint32_t message_crc; + uint8_t reserved_pad[2]{}; + uint32_t header_crc; + uint32_t calculate_crc() const { + return crc32_ieee(0, r_cast(this), sizeof(*this) - sizeof(header_crc)); + } +}; + + +} diff --git a/src/lib/homestore/replication_state_machine.cpp b/src/lib/homestore/replication_state_machine.cpp new file mode 100644 index 00000000..623af65a --- /dev/null +++ b/src/lib/homestore/replication_state_machine.cpp @@ -0,0 +1,34 @@ +#include "replication_state_machine.hpp" +#include "homeobject_impl.hpp" + +namespace homeobject { + +void ReplicationStateMachine::on_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, + blkid_list_t const& blkids,void* ctx) { + const ReplicationMessageHeader* msg_header = r_cast(header.bytes); + //TODO: read data from pba or get data from replication context(prefered) + sisl::sg_list value; + + switch (msg_header->message_type) { + case SHARD_MESSAGE: { + home_object->on_shard_message_commit(lsn, header, key, value, ctx); + } + case PG_MESSAGE: + case BLOB_MESSAGE: + default: { + break; + } + } +} + +void ReplicationStateMachine::on_pre_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, void* ctx) {} + +void ReplicationStateMachine::on_rollback(int64_t lsn, sisl::blob const& header, sisl::blob const& key, void* ctx) {} + +blk_alloc_hints ReplicationStateMachine::get_blk_alloc_hints(sisl::blob const& header) { + return blk_alloc_hints(); +} + +void ReplicationStateMachine::on_replica_stop() {} + +} diff --git a/src/lib/homestore/replication_state_machine.hpp b/src/lib/homestore/replication_state_machine.hpp new file mode 100644 index 00000000..45bcb60b --- /dev/null +++ b/src/lib/homestore/replication_state_machine.hpp @@ -0,0 +1,78 @@ +#pragma once + +#include "homestore/replication/repl_dev.h" + + +namespace homeobject { + +class HomeObjectImpl; + +class ReplicationStateMachine : public homestore::ReplicaDevListener { +public: + /// @brief Called when the log entry has been committed in the replica set. + /// + /// This function is called from a dedicated commit thread which is different from the original thread calling + /// replica_set::write(). There is only one commit thread, and lsn is guaranteed to be monotonically increasing. + /// + /// @param lsn - The log sequence number + /// @param header - Header originally passed with replica_set::write() api + /// @param key - Key originally passed with replica_set::write() api + /// @param pbas - List of pbas where data is written to the storage engine. + /// @param ctx - User contenxt passed as part of the replica_set::write() api + /// + virtual void on_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, blkid_list_t const& blkids, + void* ctx); + + /// @brief Called when the log entry has been received by the replica dev. + /// + /// On recovery, this is called from a random worker thread before the raft server is started. It is + /// guaranteed to be serialized in log index order. + /// + /// On the leader, this is called from the same thread that replica_set::write() was called. + /// + /// On the follower, this is called when the follower has received the log entry. It is guaranteed to be serialized + /// in log sequence order. + /// + /// NOTE: Listener can choose to ignore this pre commit, however, typical use case of maintaining this is in-case + /// replica set needs to support strong consistent reads and follower needs to ignore any keys which are not being + /// currently in pre-commit, but yet to be committed. + /// + /// @param lsn - The log sequence number + /// @param header - Header originally passed with replica_set::write() api + /// @param key - Key originally passed with replica_set::write() api + /// @param ctx - User contenxt passed as part of the replica_set::write() api + virtual void on_pre_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, void* ctx); + + /// @brief Called when the log entry has been rolled back by the replica set. + /// + /// This function is called on followers only when the log entry is going to be overwritten. This function is called + /// from a random worker thread, but is guaranteed to be serialized. + /// + /// For each log index, it is guaranteed that either on_commit() or on_rollback() is called but not both. + /// + /// NOTE: Listener should do the free any resources created as part of pre-commit. + /// + /// @param lsn - The log sequence number getting rolled back + /// @param header - Header originally passed with replica_set::write() api + /// @param key - Key originally passed with replica_set::write() api + /// @param ctx - User contenxt passed as part of the replica_set::write() api + virtual void on_rollback(int64_t lsn, sisl::blob const& header, sisl::blob const& key, void* ctx); + + /// @brief Called when replication module is trying to allocate a block to write the value + /// + /// This function can be called both on leader and follower when it is trying to allocate a block to write the + /// value. Callee is expected to provide hints for allocation based on the header supplied as part of original + /// write. In cases where callee don't care about the hints can return default blk_alloc_hints. + /// + /// @param header Header originally passed with repl_dev::write() api on the leader + /// @return Expected to return blk_alloc_hints for this write + virtual blk_alloc_hints get_blk_alloc_hints(sisl::blob const& header); + + /// @brief Called when the replica set is being stopped + virtual void on_replica_stop(); + +private: + HomeObjectImpl* home_object; +}; + +} diff --git a/src/lib/homestore/shard_manager.cpp b/src/lib/homestore/shard_manager.cpp index 93dedae0..e0d7d04d 100644 --- a/src/lib/homestore/shard_manager.cpp +++ b/src/lib/homestore/shard_manager.cpp @@ -1,11 +1,85 @@ #include "homeobject.hpp" +#include "replication_message.hpp" + +#include +#include namespace homeobject { uint64_t ShardManager::max_shard_size() { return Gi; } +uint64_t ShardManager::max_shard_num_in_pg() {return ((uint64_t)0x01) << 48;} + +bool HSHomeObject::check_if_pg_exist(pg_id pg) { + std::scoped_lock lock_guard(_pg_lock); + auto iter = _pg_map.find(pg); + return (iter == _pg_map.end()) ? false : true; +} + +shard_id HSHomeObject::generate_new_shard_id(pg_id pg) { + std::scoped_lock lock_guard(_pg_lock); + auto iter = _pg_map.find(pg); + if (iter == _pg_map.end()) { + auto created_pg = PG(std::move(PGInfo(pg))); + iter = _pg_map.insert(std::make_pair(pg, std::move(created_pg))).first; + } + auto new_sequence_num = iter->second.next_sequence_num_for_new_shard++; + RELEASE_ASSERT(new_sequence_num < ShardManager::max_shard_num_in_pg(), "new shard id must be less than ShardManager::max_shard_num_in_pg()"); + return make_new_shard_id(pg, new_sequence_num); +} + +shard_id HSHomeObject::make_new_shard_id(pg_id pg, uint64_t sequence_num) { + return ((uint64_t)pg << 48) | sequence_num; +} + +std::string HSHomeObject::prepare_create_shard_message(pg_id pg, shard_id new_shard_id, uint64_t shard_size) { + nlohmann::json j; + j["pg_id"] = pg; + j["shard_id"] = new_shard_id; + j["state"] = ShardInfo::State::OPEN; + j["capacity"] = shard_size; + return j.dump(); +} ShardManager::Result< ShardInfo > HSHomeObject::_create_shard(pg_id pg_owner, uint64_t size_bytes) { - return folly::makeUnexpected(ShardError::UNKNOWN_PG); + bool pg_exist = check_if_pg_exist(pg_owner); + if (!pg_exist) { + LOGWARN("failed to create shard with non-exist pg [{}]", pg_owner); + return folly::makeUnexpected(ShardError::UNKNOWN_PG); + } + + auto repl_dev = _repl_svc->get_replica_dev(fmt::format("{}", pg_owner)); + if (repl_dev == nullptr) { + LOGWARN("failed to get replica set instance for pg [{}]", pg_owner); + return folly::makeUnexpected(ShardError::UNKNOWN_PG); + } + + if (!repl_dev->is_leader()) { + LOGWARN("pg [{}] replica set is not a leader, please retry other pg members", pg_owner); + return folly::makeUnexpected(ShardError::NOT_LEADER); + } + + auto new_shard_id = generate_new_shard_id(pg_owner); + std::string create_shard_message = prepare_create_shard_message(pg_owner, new_shard_id, size_bytes); + ReplicationMessageHeader header; + header.message_type = ReplicationMessageType::SHARD_MESSAGE; + header.message_size = create_shard_message.size(); + header.message_crc = crc32_ieee(0, r_cast< const uint8_t* >(create_shard_message.c_str()), create_shard_message.size()); + header.header_crc = header.calculate_crc(); + sisl::sg_list value; + value.size = create_shard_message.size(); + value.iovs.emplace_back(iovec{static_cast(const_cast(create_shard_message.c_str())), + create_shard_message.size()}); + //replicate this create shard message to PG members; + auto [p, sf] = folly::makePromiseContract< ShardManager::Result >(); + repl_dev->async_alloc_write(sisl::blob(uintptr_cast(&header), sizeof(header)), + sisl::blob(), + value, + static_cast(&p)); + auto info = std::move(sf).via(folly::getGlobalCPUExecutor()).get(); + if (!bool(info)) { + LOGWARN("create new shard [{}] on pg [{}] is failed with error:{}", new_shard_id, pg_owner, info.error()); + } + return info; } ShardManager::Result< ShardInfo > HSHomeObject::_seal_shard(shard_id id) { @@ -20,4 +94,59 @@ ShardManager::Result< InfoList > HSHomeObject::_list_shards(pg_id pg) const { return folly::makeUnexpected(ShardError::UNKNOWN_PG); } +void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, + sisl::sg_list const& value, void* user_ctx) { + const ReplicationMessageHeader* msg_header = r_cast(header.bytes); + folly::Promise > *promise = nullptr; + // user_ctx will be nullptr when: + // 1. on the follower side + // 2. on the leader side but homeobject restarts and replay all commited log entries from the last commit id; + if (user_ctx != nullptr) { + promise = r_cast >*> (user_ctx); + } + + if (msg_header->header_crc != msg_header->calculate_crc()) { + LOGWARN("replication message header is corrupted with crc error, lsn:{}", lsn); + if (promise) { + promise->setValue(folly::makeUnexpected(ShardError::INVALID_ARG)); + } + return; + } + + std::string string_value; + for (auto & iovec: value.iovs) { + string_value.append(static_cast(iovec.iov_base), iovec.iov_len); + } + + auto crc = crc32_ieee(0, r_cast< const uint8_t* >(string_value.c_str()), string_value.size()); + if (msg_header->message_crc != crc) { + LOGWARN("replication message body is corrupted with crc error, lsn:{}", lsn); + if (promise) { + promise->setValue(folly::makeUnexpected(ShardError::INVALID_ARG)); + } + return; + } + + //preapre ShardInfoExt msg and persisted this ShardInfoExt to homestore MetaBlkService; + void* cookie = nullptr; + homestore::hs()->meta_service().add_sub_sb(HomeObjectImpl::s_shard_info_sub_type, r_cast(string_value.c_str()), string_value.size(), cookie); + + //update in-memory shard map; + CShardInfo shard_info = std::make_shared(); + auto shard_info_json = nlohmann::json::parse(string_value); + shard_info->id = shard_info_json["shard_id"].get(); + shard_info->placement_group = shard_info_json["pg_id"].get(); + shard_info->state = static_cast(shard_info_json["state"].get()); + shard_info->total_capacity_bytes = shard_info_json["capacity"].get(); + std::scoped_lock lock_guard(_pg_lock, _shard_lock); + auto pg_iter = _pg_map.find(shard_info->placement_group); + RELEASE_ASSERT(pg_iter != _pg_map.end(), "Missing PG info"); + pg_iter->second.shards.push_back(shard_info); + _shard_map[shard_info->id] = shard_info; + + if (promise) { + promise->setValue(ShardManager::Result(*shard_info)); + } +} + } // namespace homeobject diff --git a/src/lib/memory/shard_manager.cpp b/src/lib/memory/shard_manager.cpp index 59e46626..b6403480 100644 --- a/src/lib/memory/shard_manager.cpp +++ b/src/lib/memory/shard_manager.cpp @@ -19,17 +19,16 @@ ShardManager::Result< ShardInfo > MemoryHomeObject::_create_shard(pg_id pg_owner if (_pg_map.end() == pg_it) return folly::makeUnexpected(ShardError::UNKNOWN_PG); auto const now = get_current_timestamp(); - auto& s_set = pg_it->second.second; - auto s_id = (((uint64_t)pg_owner) << 48) + s_set.size(); + auto& shard_list = pg_it->second.shards; + auto s_id = (((uint64_t)pg_owner) << 48) + shard_list.size(); auto info = ShardInfo{s_id, pg_owner, ShardInfo::State::OPEN, now, now, size_bytes, size_bytes, 0}; + auto cinfo = std::make_shared(info); LOGDEBUG("Creating Shard [{}]: in Pg [{}] of Size [{}b]", info.id, pg_owner, size_bytes); - - auto [s_it, happened] = s_set.emplace(info); - RELEASE_ASSERT(happened, "Duplicate Shard insertion!"); - auto [_, s_happened] = _shard_map.emplace(info.id, s_it); + shard_list.push_back(cinfo); + auto [_, s_happened] = _shard_map.emplace(info.id, cinfo); RELEASE_ASSERT(s_happened, "Duplicate Shard insertion!"); - return info; + return *cinfo; } folly::Future< ShardManager::Result< ShardInfo > > MemoryHomeObject::_get_shard(shard_id id) const { @@ -37,7 +36,7 @@ folly::Future< ShardManager::Result< ShardInfo > > MemoryHomeObject::_get_shard( .via(folly::getGlobalCPUExecutor()) .thenValue([this, id](auto) -> ShardManager::Result< ShardInfo > { auto lg = std::shared_lock(_shard_lock); - if (auto it = _shard_map.find(id); _shard_map.end() != it) return *it->second; + if (auto it = _shard_map.find(id); _shard_map.end() != it) return *(it->second); return folly::makeUnexpected(ShardError::UNKNOWN_SHARD); }); } @@ -48,31 +47,19 @@ ShardManager::Result< InfoList > MemoryHomeObject::_list_shards(pg_id id) const if (_pg_map.end() == pg_it) return folly::makeUnexpected(ShardError::UNKNOWN_PG); auto info_l = std::list< ShardInfo >(); - for (auto const& info : pg_it->second.second) { - LOGDEBUG("Listing Shard {}", info.id); - info_l.push_back(info); + for (auto const& info : pg_it->second.shards) { + LOGDEBUG("Listing Shard {}", info->id); + info_l.push_back(*info); } return info_l; } ShardManager::Result< ShardInfo > MemoryHomeObject::_seal_shard(shard_id id) { - auto lg = std::scoped_lock(_pg_lock, _shard_lock); + auto lg = std::scoped_lock(_shard_lock); auto shard_it = _shard_map.find(id); if (_shard_map.end() == shard_it) return folly::makeUnexpected(ShardError::UNKNOWN_SHARD); - - auto const_it = shard_it->second; - auto pg_it = _pg_map.find(const_it->placement_group); - RELEASE_ASSERT(_pg_map.end() != pg_it, "Missing ShardInfo!"); - - auto new_info = *const_it; - new_info.state = ShardInfo::State::SEALED; - pg_it->second.second.erase(const_it); - - auto [s_it, happened] = pg_it->second.second.emplace(new_info); - RELEASE_ASSERT(happened, "Duplicate Shard insertion!"); - shard_it->second = s_it; - - return new_info; + shard_it->second->state = ShardInfo::State::SEALED; + return *(shard_it->second); } } // namespace homeobject diff --git a/src/lib/pg_manager.cpp b/src/lib/pg_manager.cpp index fbd9ced3..3b67817b 100644 --- a/src/lib/pg_manager.cpp +++ b/src/lib/pg_manager.cpp @@ -65,10 +65,9 @@ PGManager::NullAsyncResult HomeObjectImpl::create_pg(PGInfo&& pg_info) { home_replication::ReplicationService::set_var const& v) -> PGManager::NullResult { if (std::holds_alternative< home_replication::ReplServiceError >(v)) return folly::makeUnexpected(PGError::INVALID_ARG); - - auto p = pg_pair(std::move(pg_info), shard_set()); + auto pg = PG(std::move(pg_info)); auto lg = std::scoped_lock(_pg_lock); - auto [it, _] = _pg_map.try_emplace(pg_info.id, std::move(p)); + auto [it, _] = _pg_map.try_emplace(pg_info.id, std::move(pg)); RELEASE_ASSERT(_pg_map.end() != it, "Unknown map insert error!"); return folly::Unit(); });