diff --git a/src/include/homeobject/shard_manager.hpp b/src/include/homeobject/shard_manager.hpp index 4f22f6bd..f80e7ef7 100644 --- a/src/include/homeobject/shard_manager.hpp +++ b/src/include/homeobject/shard_manager.hpp @@ -29,9 +29,15 @@ struct ShardInfo { }; using InfoList = std::list< ShardInfo >; +struct ShardContextsInPG { + uint64_t sequence_num{0}; + std::unordered_set shards; +}; + class ShardManager : public Manager< ShardError > { public: static uint64_t max_shard_size(); // Static function forces runtime evaluation. + static uint64_t max_shard_num_in_pg(); virtual AsyncResult< ShardInfo > get_shard(shard_id id) const = 0; virtual AsyncResult< InfoList > list_shards(pg_id id) const = 0; diff --git a/src/lib/homeobject_impl.cpp b/src/lib/homeobject_impl.cpp index 2b109bb6..cbb29291 100644 --- a/src/lib/homeobject_impl.cpp +++ b/src/lib/homeobject_impl.cpp @@ -2,10 +2,19 @@ #include +#include + namespace homeobject { +const std::string HomeObjectImpl::s_shard_info_sub_type = "shard_info"; + extern std::shared_ptr< HomeObject > init_homeobject(std::weak_ptr< HomeObjectApplication >&& application) { LOGINFOMOD(homeobject, "Initializing HomeObject"); + //register some callbacks for metadata recovery; + homestore::hs()->meta_service().register_handler(HomeObjectImpl::s_shard_info_sub_type, + HomeObjectImpl::on_shard_meta_blk_found, + nullptr, + true); auto instance = std::make_shared< HomeObjectImpl >(std::move(application)); instance->init_repl_svc(); return instance; @@ -22,4 +31,6 @@ void HomeObjectImpl::init_repl_svc() { } } +void HomeObjectImpl::on_shard_meta_blk_found(homestore::meta_blk* mblk, sisl::byte_view buf, size_t size) {} + } // namespace homeobject diff --git a/src/lib/homeobject_impl.hpp b/src/lib/homeobject_impl.hpp index d9c83627..b5b98ec6 100644 --- a/src/lib/homeobject_impl.hpp +++ b/src/lib/homeobject_impl.hpp @@ -5,9 +5,15 @@ #include "homeobject/pg_manager.hpp" #include "homeobject/shard_manager.hpp" +#include + #include #include +namespace homestore { +struct meta_blk; +} + namespace homeobject { class HomeObjectImpl : public HomeObject, @@ -17,8 +23,7 @@ class HomeObjectImpl : public HomeObject, public std::enable_shared_from_this< HomeObjectImpl > { std::mutex _repl_lock; - std::shared_ptr< home_replication::ReplicationService > _repl_svc; - + std::shared_ptr _repl_svc; protected: peer_id _our_id; @@ -27,9 +32,15 @@ class HomeObjectImpl : public HomeObject, /// This simulates the MetaBlkSvc thats used within real HomeObject mutable std::mutex _pg_lock; - std::map< pg_id, std::unordered_set< shard_id > > _pg_map; - /// - + std::map< pg_id, ShardContextsInPG > _pg_map; + + mutable std::mutex _shard_mutex; + std::map _shard_map; +private: + bool check_if_pg_exist(pg_id pg); + shard_id generate_new_shard_id(pg_id pg); + shard_id make_new_shard_id(pg_id pg, uint64_t sequence_num); + std::string prepare_create_shard_message(pg_id pg, shard_id new_shard_id, uint64_t shard_size); public: explicit HomeObjectImpl(std::weak_ptr< HomeObjectApplication >&& application) : _application(std::move(application)) {} @@ -54,7 +65,11 @@ class HomeObjectImpl : public HomeObject, ShardManager::AsyncResult< InfoList > list_shards(pg_id pg) const override; ShardManager::AsyncResult< ShardInfo > create_shard(pg_id pg_owner, uint64_t size_bytes) override; ShardManager::AsyncResult< ShardInfo > seal_shard(shard_id id) override; - + void on_shard_message_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, + sisl::sg_list const& value, void* user_ctx); + static void on_shard_meta_blk_found(homestore::meta_blk* mblk, sisl::byte_view buf, size_t size); + static const std::string s_shard_info_sub_type; + /// BlobManager BlobManager::AsyncResult< blob_id > put(shard_id shard, Blob&&) override; BlobManager::AsyncResult< Blob > get(shard_id shard, blob_id const& blob, uint64_t off, diff --git a/src/lib/pg_manager.cpp b/src/lib/pg_manager.cpp index 1057e14d..704fe0b6 100644 --- a/src/lib/pg_manager.cpp +++ b/src/lib/pg_manager.cpp @@ -67,7 +67,7 @@ PGManager::NullAsyncResult HomeObjectImpl::create_pg(PGInfo&& pg_info) { return folly::makeUnexpected(PGError::INVALID_ARG); auto lg = std::scoped_lock(_pg_lock); - auto [it, _] = _pg_map.try_emplace(pg_info.id, std::unordered_set< shard_id >()); + auto [it, _] = _pg_map.try_emplace(pg_info.id, ShardContextsInPG()); RELEASE_ASSERT(_pg_map.end() != it, "Unknown map insert error!"); return folly::Unit(); }); diff --git a/src/lib/replication_message.hpp b/src/lib/replication_message.hpp new file mode 100644 index 00000000..b7b45ca9 --- /dev/null +++ b/src/lib/replication_message.hpp @@ -0,0 +1,26 @@ +#pragma once + +#include +#include + +namespace homeobject { + +ENUM(ReplicationMessageType, uint16_t, PG_MESSAGE = 0, SHARD_MESSAGE, BLOB_MESSAGE, UNKNOWN_MESSAGE); + +// magic num comes from the first 8 bytes of 'echo homeobject_replication | md5sum' +static constexpr uint64_t HOMEOBJECT_REPLICATION_MAGIC = 0x11153ca24efc8d34; + +struct ReplicationMessageHeader { + uint64_t magic_num; + ReplicationMessageType message_type; + uint32_t message_size; + uint32_t message_crc; + uint8_t reserved_pad[2]{}; + uint32_t header_crc; + uint32_t calculate_crc() const { + return crc32_ieee(0, r_cast(this), sizeof(*this) - sizeof(header_crc)); + } +}; + + +} diff --git a/src/lib/replication_state_machine.cpp b/src/lib/replication_state_machine.cpp new file mode 100644 index 00000000..a5a8b3dc --- /dev/null +++ b/src/lib/replication_state_machine.cpp @@ -0,0 +1,32 @@ +#include "replication_state_machine.hpp" +#include "homeobject_impl.hpp" + +namespace homeobject { + +void ReplicationStateMachine::on_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, + blkid_list_t const& blkids,void* ctx) { + const ReplicationMessageHeader* msg_header = r_cast(header.bytes); + //TODO: read data from pba or get data from replication context(prefered) + sisl::sg_list value; + + switch (msg_header->message_type) { + case SHARD_MESSAGE: { + home_object->on_shard_message_commit(lsn, header, key, value, ctx); + } + case PG_MESSAGE: + case BLOB_MESSAGE: + default: { + break; + } + } +} + +void ReplicationStateMachine::on_pre_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, void* ctx) {} + +void ReplicationStateMachine::on_rollback(int64_t lsn, sisl::blob const& header, sisl::blob const& key, void* ctx) {} + +blk_alloc_hints ReplicationStateMachine::get_blk_alloc_hints(sisl::blob const& header) {} + +void ReplicationStateMachine::on_replica_stop() {} + +} diff --git a/src/lib/replication_state_machine.hpp b/src/lib/replication_state_machine.hpp new file mode 100644 index 00000000..45bcb60b --- /dev/null +++ b/src/lib/replication_state_machine.hpp @@ -0,0 +1,78 @@ +#pragma once + +#include "homestore/replication/repl_dev.h" + + +namespace homeobject { + +class HomeObjectImpl; + +class ReplicationStateMachine : public homestore::ReplicaDevListener { +public: + /// @brief Called when the log entry has been committed in the replica set. + /// + /// This function is called from a dedicated commit thread which is different from the original thread calling + /// replica_set::write(). There is only one commit thread, and lsn is guaranteed to be monotonically increasing. + /// + /// @param lsn - The log sequence number + /// @param header - Header originally passed with replica_set::write() api + /// @param key - Key originally passed with replica_set::write() api + /// @param pbas - List of pbas where data is written to the storage engine. + /// @param ctx - User contenxt passed as part of the replica_set::write() api + /// + virtual void on_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, blkid_list_t const& blkids, + void* ctx); + + /// @brief Called when the log entry has been received by the replica dev. + /// + /// On recovery, this is called from a random worker thread before the raft server is started. It is + /// guaranteed to be serialized in log index order. + /// + /// On the leader, this is called from the same thread that replica_set::write() was called. + /// + /// On the follower, this is called when the follower has received the log entry. It is guaranteed to be serialized + /// in log sequence order. + /// + /// NOTE: Listener can choose to ignore this pre commit, however, typical use case of maintaining this is in-case + /// replica set needs to support strong consistent reads and follower needs to ignore any keys which are not being + /// currently in pre-commit, but yet to be committed. + /// + /// @param lsn - The log sequence number + /// @param header - Header originally passed with replica_set::write() api + /// @param key - Key originally passed with replica_set::write() api + /// @param ctx - User contenxt passed as part of the replica_set::write() api + virtual void on_pre_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, void* ctx); + + /// @brief Called when the log entry has been rolled back by the replica set. + /// + /// This function is called on followers only when the log entry is going to be overwritten. This function is called + /// from a random worker thread, but is guaranteed to be serialized. + /// + /// For each log index, it is guaranteed that either on_commit() or on_rollback() is called but not both. + /// + /// NOTE: Listener should do the free any resources created as part of pre-commit. + /// + /// @param lsn - The log sequence number getting rolled back + /// @param header - Header originally passed with replica_set::write() api + /// @param key - Key originally passed with replica_set::write() api + /// @param ctx - User contenxt passed as part of the replica_set::write() api + virtual void on_rollback(int64_t lsn, sisl::blob const& header, sisl::blob const& key, void* ctx); + + /// @brief Called when replication module is trying to allocate a block to write the value + /// + /// This function can be called both on leader and follower when it is trying to allocate a block to write the + /// value. Callee is expected to provide hints for allocation based on the header supplied as part of original + /// write. In cases where callee don't care about the hints can return default blk_alloc_hints. + /// + /// @param header Header originally passed with repl_dev::write() api on the leader + /// @return Expected to return blk_alloc_hints for this write + virtual blk_alloc_hints get_blk_alloc_hints(sisl::blob const& header); + + /// @brief Called when the replica set is being stopped + virtual void on_replica_stop(); + +private: + HomeObjectImpl* home_object; +}; + +} diff --git a/src/lib/shard_manager.cpp b/src/lib/shard_manager.cpp index f47fa88b..3c951988 100644 --- a/src/lib/shard_manager.cpp +++ b/src/lib/shard_manager.cpp @@ -1,17 +1,100 @@ #include "homeobject_impl.hpp" +#include "replication_message.hpp" + +#include namespace homeobject { uint64_t ShardManager::max_shard_size() { return Gi; } - +uint64_t ShardManager::max_shard_num_in_pg() {return ((uint64_t)0x01) << 48;} + std::shared_ptr< ShardManager > HomeObjectImpl::shard_manager() { init_repl_svc(); return shared_from_this(); } +bool HomeObjectImpl::check_if_pg_exist(pg_id pg) { + std::scoped_lock lock_guard(_pg_lock); + auto iter = _pg_map.find(pg); + return (iter == _pg_map.end()) ? false : true; +} + +shard_id HomeObjectImpl::generate_new_shard_id(pg_id pg) { + std::scoped_lock lock_guard(_pg_lock); + auto iter = _pg_map.find(pg); + if (iter == _pg_map.end()) { + iter = _pg_map.insert(std::make_pair(pg, ShardContextsInPG())).first; + } + auto new_sequence_num = iter->second.sequence_num++; + RELEASE_ASSERT(new_sequence_num < ShardManager::max_shard_num_in_pg(), "new shard id must be less than ShardManager::max_shard_num_in_pg()"); + return make_new_shard_id(pg, new_sequence_num); +} + +shard_id HomeObjectImpl::make_new_shard_id(pg_id pg, uint64_t sequence_num) { + return ((uint64_t)pg << 48) | sequence_num; +} + +std::string HomeObjectImpl::prepare_create_shard_message(pg_id pg, shard_id new_shard_id, uint64_t shard_size) { + nlohmann::json j; + j["pg_id"] = pg; + j["shard_id"] = new_shard_id; + j["state"] = ShardInfo::State::OPEN; + j["capacity"] = shard_size; + return j.dump(); +} + ShardManager::AsyncResult< ShardInfo > HomeObjectImpl::create_shard(pg_id pg_owner, uint64_t size_bytes) { - if (0 == size_bytes || max_shard_size() < size_bytes) return folly::makeUnexpected(ShardError::INVALID_ARG); - return folly::makeUnexpected(ShardError::UNKNOWN_PG); + if (0 == size_bytes || max_shard_size() < size_bytes) + return folly::makeUnexpected(ShardError::INVALID_ARG); + + bool pg_exist = check_if_pg_exist(pg_owner); + if (!pg_exist) { + LOGWARN("failed to create shard with non-exist pg [{}]", pg_owner); + return folly::makeUnexpected(ShardError::UNKNOWN_PG); + } + + auto repl_dev = _repl_svc->get_replica_dev(fmt::format("{}", pg_owner)); + if (repl_dev == nullptr) { + LOGWARN("failed to get replica set instance for pg [{}]", pg_owner); + return folly::makeUnexpected(ShardError::UNKNOWN_PG); + } + + if (!repl_dev->is_leader()) { + LOGWARN("pg [{}] replica set is not a leader, please retry other pg members", pg_owner); + return folly::makeUnexpected(ShardError::NOT_LEADER); + } + + auto new_shard_id = generate_new_shard_id(pg_owner); + std::string create_shard_message = prepare_create_shard_message(pg_owner, new_shard_id, size_bytes); + ReplicationMessageHeader header; + header.magic_num = HOMEOBJECT_REPLICATION_MAGIC; + header.message_type = ReplicationMessageType::SHARD_MESSAGE; + header.message_size = create_shard_message.size(); + header.message_crc = crc32_ieee(0, r_cast< const uint8_t* >(create_shard_message.c_str()), create_shard_message.size()); + header.header_crc = header.calculate_crc(); + sisl::sg_list value; + value.size = create_shard_message.size(); + value.iovs.emplace_back(iovec{static_cast(const_cast(create_shard_message.c_str())), + create_shard_message.size()}); + + //replicate this create shard message to PG members; + folly::Promise > *promise = new folly::Promise >(); + repl_dev->async_alloc_write(sisl::blob(uintptr_cast(&header), sizeof(header)), + sisl::blob(), + value, + promise); + + return promise->getSemiFuture().via(folly::getGlobalCPUExecutor()) + .thenValue([promise, this](ShardManager::Result const& v) -> ShardManager::Result { + std::unique_ptr> > promise_guard(promise); + if (bool(v)) { + const auto& shard_info = *v; + std::scoped_lock lock_guard(_pg_lock, _shard_mutex); + _pg_map[shard_info.placement_group].shards.insert(shard_info.id); + _shard_map[shard_info.id] = shard_info; + } + return v; + }); } ShardManager::AsyncResult< ShardInfo > HomeObjectImpl::seal_shard(shard_id id) { @@ -26,4 +109,40 @@ ShardManager::AsyncResult< ShardInfo > HomeObjectImpl::get_shard(shard_id id) co return folly::makeUnexpected(ShardError::UNKNOWN_SHARD); } +void HomeObjectImpl::on_shard_message_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, + sisl::sg_list const& value, void* user_ctx) { + const ReplicationMessageHeader* msg_header = r_cast(header.bytes); + folly::Promise > *promise = r_cast >*> (user_ctx); + if (msg_header->header_crc != msg_header->calculate_crc()) { + LOGWARN("replication message header is corrupted with crc error, lsn:{}", lsn); + promise->setValue(folly::makeUnexpected(ShardError::INVALID_ARG)); + return; + } + + std::string string_value; + for (auto & iovec: value.iovs) { + string_value.append(static_cast(iovec.iov_base), iovec.iov_len); + } + + auto crc = crc32_ieee(0, r_cast< const uint8_t* >(string_value.c_str()), string_value.size()); + if (msg_header->message_crc != crc) { + LOGWARN("replication message body is corrupted with crc error, lsn:{}", lsn); + promise->setValue(folly::makeUnexpected(ShardError::INVALID_ARG)); + return; + } + + //check is passed and it is time to submit shard info to homestore meta blk; + void* cookie = nullptr; + homestore::hs()->meta_service().add_sub_sb(HomeObjectImpl::s_shard_info_sub_type, r_cast(string_value.c_str()), string_value.size(), cookie); + + //prepare for return shard info; + auto shard_info_json = nlohmann::json::parse(string_value); + ShardInfo shard_info; + shard_info.id = shard_info_json["shard_id"].get(); + shard_info.placement_group = shard_info_json["pg_id"].get(); + shard_info.state = static_cast(shard_info_json["state"].get()); + shard_info.total_capacity_bytes = shard_info_json["capacity"].get(); + promise->setValue(ShardManager::Result(shard_info)); +} + } // namespace homeobject diff --git a/src/mocks/mock_shard_manager.cpp b/src/mocks/mock_shard_manager.cpp index 27a3f34e..ffc0e34d 100644 --- a/src/mocks/mock_shard_manager.cpp +++ b/src/mocks/mock_shard_manager.cpp @@ -24,7 +24,7 @@ ShardManager::AsyncResult< ShardInfo > MockHomeObject::create_shard(pg_id pg_own LOGDEBUG("Creating Shard [{}]: in Pg [{}] of Size [{}b] shard_cnt:[{}]", info.id, pg_owner, size_bytes, _shards.size()); - pg_it->second.emplace(info.id); + pg_it->second.shards.emplace(info.id); auto [_, happened] = _shards.try_emplace(info.id, info); RELEASE_ASSERT(happened, "Duplicate Shard insertion!"); return info; @@ -52,7 +52,7 @@ ShardManager::AsyncResult< InfoList > MockHomeObject::list_shards(pg_id id) cons auto lg = std::scoped_lock(_pg_lock, _shard_lock); if (auto pg_it = _pg_map.find(id); _pg_map.end() != pg_it) { auto info = std::list< ShardInfo >(); - for (auto const& shard_id : pg_it->second) { + for (auto const& shard_id : pg_it->second.shards) { LOGDEBUG("Listing Shard {}", shard_id); auto shard_it = _shards.find(shard_id); RELEASE_ASSERT(_shards.end() != shard_it, "Missing Shard [{}]!", shard_id);