diff --git a/.jenkins/Jenkinsfile b/.jenkins/Jenkinsfile index 0a9accdc..a1acbf88 100644 --- a/.jenkins/Jenkinsfile +++ b/.jenkins/Jenkinsfile @@ -6,7 +6,7 @@ pipeline { CONAN_USER = 'oss' TARGET_BRANCH = 'main' STABLE_BRANCH = 'stable/v*' - DISABLE_DEP_TESTS = '-o sisl:testing=False -o iomgr:testing=off -o homestore:testing=off -o nuraft_mesg:testing=False' + DISABLE_DEP_TESTS = '-o sisl:testing=False -o iomgr:testing=off -o homestore:testing=off' } stages { diff --git a/conanfile.py b/conanfile.py index 629a1d46..0b043ea4 100644 --- a/conanfile.py +++ b/conanfile.py @@ -9,7 +9,7 @@ class HomeObjectConan(ConanFile): name = "homeobject" - version = "0.10.2" + version = "0.10.3" homepage = "https://github.com/eBay/HomeObject" description = "Blob Store built on HomeReplication" topics = ("ebay") @@ -42,8 +42,6 @@ def build_requirements(self): def requirements(self): self.requires("homestore/[~=4, include_prerelease=True]@oss/master") self.requires("sisl/[~=10, include_prerelease=True]@oss/master") - # Remove when HomeStore Replication Service is mature - self.requires("nuraft_mesg/[~=1, include_prerelease=True]@oss/main") self.requires("lz4/1.9.4", override=True) def validate(self): diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 243b151e..02db6580 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -3,20 +3,16 @@ cmake_minimum_required (VERSION 3.11) find_package(Threads QUIET REQUIRED) find_package(sisl QUIET REQUIRED) find_package(homestore QUIET REQUIRED) -find_package(nuraft_mesg QUIET REQUIRED) - find_package(GTest QUIET REQUIRED) link_directories(${spdk_LIB_DIRS} ${dpdk_LIB_DIRS}) set (COMMON_DEPS homestore::homestore - nuraft_mesg::nuraft_mesg sisl::sisl ) set(COMMON_TEST_DEPS - home_replication_mock ${COMMON_DEPS} GTest::gmock ${spdk_LIBRARY_LIST} @@ -30,4 +26,4 @@ include_directories(BEFORE ${CMAKE_CURRENT_SOURCE_DIR}/include) include_directories(BEFORE ${CMAKE_CURRENT_BINARY_DIR}/lib) add_subdirectory(lib) -add_subdirectory(mocks) +#add_subdirectory(mocks) diff --git a/src/include/homeobject/blob_manager.hpp b/src/include/homeobject/blob_manager.hpp index d4e3b58b..4297e955 100644 --- a/src/include/homeobject/blob_manager.hpp +++ b/src/include/homeobject/blob_manager.hpp @@ -20,14 +20,15 @@ struct Blob { sisl::io_blob_safe body; std::string user_key; uint64_t object_off; - std::optional< peer_id > current_leader{std::nullopt}; + std::optional< peer_id_t > current_leader{std::nullopt}; }; class BlobManager : public Manager< BlobError > { public: - virtual AsyncResult< blob_id > put(shard_id shard, Blob&&) = 0; - virtual AsyncResult< Blob > get(shard_id shard, blob_id const& blob, uint64_t off = 0, uint64_t len = 0) const = 0; - virtual NullAsyncResult del(shard_id shard, blob_id const& blob) = 0; + virtual AsyncResult< blob_id_t > put(shard_id_t shard, Blob&&) = 0; + virtual AsyncResult< Blob > get(shard_id_t shard, blob_id_t const& blob, uint64_t off = 0, + uint64_t len = 0) const = 0; + virtual NullAsyncResult del(shard_id_t shard, blob_id_t const& blob) = 0; }; } // namespace homeobject diff --git a/src/include/homeobject/common.hpp b/src/include/homeobject/common.hpp index 4fa31fc0..dda01f19 100644 --- a/src/include/homeobject/common.hpp +++ b/src/include/homeobject/common.hpp @@ -11,7 +11,7 @@ SISL_LOGGING_DECL(homeobject); -#define HOMEOBJECT_LOG_MODS grpc_server, HOMESTORE_LOG_MODS, nuraft_mesg, nuraft, home_replication, homeobject +#define HOMEOBJECT_LOG_MODS grpc_server, HOMESTORE_LOG_MODS, homeobject #ifndef Ki constexpr uint64_t Ki = 1024ul; @@ -25,11 +25,26 @@ constexpr uint64_t Gi = Ki * Mi; namespace homeobject { -using blob_id = uint64_t; +using blob_id_t = uint64_t; -using peer_id = boost::uuids::uuid; -using pg_id = uint16_t; -using shard_id = uint64_t; +using peer_id_t = boost::uuids::uuid; +using pg_id_t = uint16_t; +using shard_id_t = uint64_t; + +template < typename T > +using shared = std::shared_ptr< T >; + +template < typename T > +using cshared = const std::shared_ptr< T >; + +template < typename T > +using unique = std::unique_ptr< T >; + +template < typename T > +using intrusive = boost::intrusive_ptr< T >; + +template < typename T > +using cintrusive = const boost::intrusive_ptr< T >; template < class E > class Manager { diff --git a/src/include/homeobject/homeobject.hpp b/src/include/homeobject/homeobject.hpp index 54503ae1..7514d677 100644 --- a/src/include/homeobject/homeobject.hpp +++ b/src/include/homeobject/homeobject.hpp @@ -22,16 +22,16 @@ class HomeObjectApplication { virtual std::list< std::filesystem::path > devices() const = 0; // Callback made after determining if a SvcId exists or not during initialization, will consume response - virtual peer_id discover_svcid(std::optional< peer_id > const& found) const = 0; + virtual peer_id_t discover_svcid(std::optional< peer_id_t > const& found) const = 0; // When RAFT operations take place, we must map the SvcId to a gethostbyaddr() value (IP) - virtual std::string lookup_peer(peer_id const&) const = 0; + virtual std::string lookup_peer(peer_id_t const&) const = 0; }; class HomeObject { public: virtual ~HomeObject() = default; - virtual peer_id our_uuid() const = 0; + virtual peer_id_t our_uuid() const = 0; virtual std::shared_ptr< BlobManager > blob_manager() = 0; virtual std::shared_ptr< PGManager > pg_manager() = 0; virtual std::shared_ptr< ShardManager > shard_manager() = 0; diff --git a/src/include/homeobject/pg_manager.hpp b/src/include/homeobject/pg_manager.hpp index b58b24c0..df416ab1 100644 --- a/src/include/homeobject/pg_manager.hpp +++ b/src/include/homeobject/pg_manager.hpp @@ -9,13 +9,13 @@ namespace homeobject { -ENUM(PGError, uint16_t, UNKNOWN = 1, INVALID_ARG, TIMEOUT, UNKNOWN_PG, UNKNOWN_PEER); +ENUM(PGError, uint16_t, UNKNOWN = 1, INVALID_ARG, TIMEOUT, UNKNOWN_PG, UNKNOWN_PEER, UNSUPPORTED_OP); struct PGMember { - explicit PGMember(peer_id _id) : id(_id) {} - PGMember(peer_id _id, std::string const& _name) : id(_id), name(_name) {} - PGMember(peer_id _id, std::string const& _name, int32_t _priority) : id(_id), name(_name), priority(_priority) {} - peer_id id; + explicit PGMember(peer_id_t _id) : id(_id) {} + PGMember(peer_id_t _id, std::string const& _name) : id(_id), name(_name) {} + PGMember(peer_id_t _id, std::string const& _name, int32_t _priority) : id(_id), name(_name), priority(_priority) {} + peer_id_t id; std::string name; int32_t priority{0}; // <0 (Arbiter), ==0 (Follower), >0 (F|Leader) @@ -28,9 +28,10 @@ struct PGMember { using MemberSet = std::set< PGMember >; struct PGInfo { - explicit PGInfo(pg_id _id) : id(_id) {} - pg_id id; + explicit PGInfo(pg_id_t _id) : id(_id) {} + pg_id_t id; mutable MemberSet members; + peer_id_t replica_set_uuid; auto operator<=>(PGInfo const& rhs) const { return id <=> rhs.id; } auto operator==(PGInfo const& rhs) const { return id == rhs.id; } @@ -39,7 +40,7 @@ struct PGInfo { class PGManager : public Manager< PGError > { public: virtual NullAsyncResult create_pg(PGInfo&& pg_info) = 0; - virtual NullAsyncResult replace_member(pg_id id, peer_id const& old_member, PGMember const& new_member) = 0; + virtual NullAsyncResult replace_member(pg_id_t id, peer_id_t const& old_member, PGMember const& new_member) = 0; }; } // namespace homeobject diff --git a/src/include/homeobject/shard_manager.hpp b/src/include/homeobject/shard_manager.hpp index 271adc9c..36a1c9ed 100644 --- a/src/include/homeobject/shard_manager.hpp +++ b/src/include/homeobject/shard_manager.hpp @@ -9,27 +9,28 @@ namespace homeobject { -ENUM(ShardError, uint16_t, UNKNOWN = 1, TIMEOUT, INVALID_ARG, NOT_LEADER, UNKNOWN_PG, UNKNOWN_SHARD); +ENUM(ShardError, uint16_t, UNKNOWN = 1, TIMEOUT, INVALID_ARG, NOT_LEADER, UNKNOWN_PG, UNKNOWN_SHARD, PG_NOT_READY); struct ShardInfo { - enum class State { + enum class State : uint8_t { OPEN = 0, - SEALED, - DELETED, + SEALED = 1, + DELETED = 2, }; - shard_id id; - pg_id placement_group; + shard_id_t id; + pg_id_t placement_group; State state; uint64_t created_time; uint64_t last_modified_time; uint64_t available_capacity_bytes; uint64_t total_capacity_bytes; uint64_t deleted_capacity_bytes; - std::optional< peer_id > current_leader{std::nullopt}; + std::optional< peer_id_t > current_leader{std::nullopt}; auto operator<=>(ShardInfo const& rhs) const { return id <=> rhs.id; } auto operator==(ShardInfo const& rhs) const { return id == rhs.id; } + std::optional< peer_id_t > current_leader{std::nullopt}; }; using InfoList = std::list< ShardInfo >; @@ -39,10 +40,10 @@ class ShardManager : public Manager< ShardError > { static uint64_t max_shard_size(); // Static function forces runtime evaluation. static uint64_t max_shard_num_in_pg(); - virtual AsyncResult< ShardInfo > get_shard(shard_id id) const = 0; - virtual AsyncResult< InfoList > list_shards(pg_id id) const = 0; - virtual AsyncResult< ShardInfo > create_shard(pg_id pg_owner, uint64_t size_bytes) = 0; - virtual AsyncResult< ShardInfo > seal_shard(shard_id id) = 0; + virtual AsyncResult< ShardInfo > get_shard(shard_id_t id) const = 0; + virtual AsyncResult< InfoList > list_shards(pg_id_t id) const = 0; + virtual AsyncResult< ShardInfo > create_shard(pg_id_t pg_owner, uint64_t size_bytes) = 0; + virtual AsyncResult< ShardInfo > seal_shard(shard_id_t id) = 0; }; } // namespace homeobject diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt index 3c0ed3e0..6c86145f 100644 --- a/src/lib/CMakeLists.txt +++ b/src/lib/CMakeLists.txt @@ -16,5 +16,5 @@ if(BUILD_TESTING) add_subdirectory(tests) endif() -add_subdirectory(homestore) -add_subdirectory(memory) +add_subdirectory(homestore_backend) +add_subdirectory(memory_backend) diff --git a/src/lib/blob_manager.cpp b/src/lib/blob_manager.cpp index 57d54da3..2a2415ad 100644 --- a/src/lib/blob_manager.cpp +++ b/src/lib/blob_manager.cpp @@ -2,12 +2,9 @@ namespace homeobject { -std::shared_ptr< BlobManager > HomeObjectImpl::blob_manager() { - init_repl_svc(); - return shared_from_this(); -} +std::shared_ptr< BlobManager > HomeObjectImpl::blob_manager() { return shared_from_this(); } -BlobManager::AsyncResult< Blob > HomeObjectImpl::get(shard_id shard, blob_id const& blob, uint64_t off, +BlobManager::AsyncResult< Blob > HomeObjectImpl::get(shard_id_t shard, blob_id_t const& blob, uint64_t off, uint64_t len) const { return _get_shard(shard).thenValue([this, blob](auto const e) -> BlobManager::Result< Blob > { if (!e) return folly::makeUnexpected(BlobError::UNKNOWN_SHARD); @@ -15,16 +12,16 @@ BlobManager::AsyncResult< Blob > HomeObjectImpl::get(shard_id shard, blob_id con }); } -BlobManager::AsyncResult< blob_id > HomeObjectImpl::put(shard_id shard, Blob&& blob) { +BlobManager::AsyncResult< blob_id_t > HomeObjectImpl::put(shard_id_t shard, Blob&& blob) { return _get_shard(shard).thenValue( - [this, blob = std::move(blob)](auto const e) mutable -> BlobManager::Result< blob_id > { + [this, blob = std::move(blob)](auto const e) mutable -> BlobManager::Result< blob_id_t > { if (!e) return folly::makeUnexpected(BlobError::UNKNOWN_SHARD); if (ShardInfo::State::SEALED == e.value().info.state) return folly::makeUnexpected(BlobError::INVALID_ARG); return _put_blob(e.value().info, std::move(blob)); }); } -BlobManager::NullAsyncResult HomeObjectImpl::del(shard_id shard, blob_id const& blob) { +BlobManager::NullAsyncResult HomeObjectImpl::del(shard_id_t shard, blob_id_t const& blob) { return _get_shard(shard).thenValue([this, blob](auto const e) mutable -> BlobManager::NullResult { if (!e) return folly::makeUnexpected(BlobError::UNKNOWN_SHARD); return _del_blob(e.value().info, blob); diff --git a/src/lib/blob_route.hpp b/src/lib/blob_route.hpp index 720e17a3..e8bd47db 100644 --- a/src/lib/blob_route.hpp +++ b/src/lib/blob_route.hpp @@ -11,8 +11,8 @@ namespace homeobject { // A Key used in the IndexService (BTree). The inclusion of Shard allows BlobRoutes // to appear in a different Index should the Blob (Shard) be moved between Pgs. struct BlobRoute { - shard_id shard; - blob_id blob; + shard_id_t shard; + blob_id_t blob; auto operator<=>(BlobRoute const&) const = default; }; @@ -37,6 +37,6 @@ struct formatter< homeobject::BlobRoute > { template <> struct std::hash< homeobject::BlobRoute > { std::size_t operator()(homeobject::BlobRoute const& r) const noexcept { - return boost::hash_value< homeobject::blob_id >(std::make_pair(r.shard, r.blob)); + return boost::hash_value< homeobject::blob_id_t >(std::make_pair(r.shard, r.blob)); } }; diff --git a/src/lib/homeobject_impl.hpp b/src/lib/homeobject_impl.hpp index ce72cfd7..d75e0068 100644 --- a/src/lib/homeobject_impl.hpp +++ b/src/lib/homeobject_impl.hpp @@ -7,17 +7,19 @@ #include -namespace home_replication { +namespace homestore { class ReplicationService; } namespace homeobject { -constexpr size_t pg_width = sizeof(pg_id) * 8; -constexpr size_t shard_width = (sizeof(shard_id) * 8) - pg_width; -constexpr size_t shard_mask = std::numeric_limits< homeobject::shard_id >::max() >> pg_width; +constexpr size_t pg_width = sizeof(pg_id_t) * 8; +constexpr size_t shard_width = (sizeof(shard_id_t) * 8) - pg_width; +constexpr size_t shard_mask = std::numeric_limits< homeobject::shard_id_t >::max() >> pg_width; -inline shard_id make_new_shard_id(pg_id pg, shard_id next_shard) { return ((uint64_t)pg << shard_width) | next_shard; } +inline shard_id_t make_new_shard_id(pg_id_t pg, shard_id_t next_shard) { + return ((uint64_t)pg << shard_width) | next_shard; +} struct Shard { explicit Shard(ShardInfo info) : info(std::move(info)) {} @@ -29,11 +31,19 @@ struct Shard { using ShardList = std::list< Shard >; using ShardIterator = ShardList::iterator; +struct PGReplDevBase {}; + struct PG { - explicit PG(PGInfo info) : pg_info(std::move(info)) {} - PGInfo pg_info; - uint64_t shard_sequence_num{0}; - ShardList shards; + explicit PG(PGInfo info) : pg_info_(std::move(info)) {} + PG(PG const& pg) = delete; + PG(PG&& pg) = default; + PG& operator=(PG const& pg) = delete; + virtual ~PG() = default; + + std::shared_mutex mtx_; + PGInfo pg_info_; + uint64_t shard_sequence_num_{0}; + ShardList shards_; }; class HomeObjectImpl : public HomeObject, @@ -43,33 +53,34 @@ class HomeObjectImpl : public HomeObject, public std::enable_shared_from_this< HomeObjectImpl > { /// Implementation defines these - virtual ShardManager::Result< ShardInfo > _create_shard(pg_id, uint64_t size_bytes) = 0; - virtual ShardManager::Result< ShardInfo > _seal_shard(shard_id) = 0; + virtual ShardManager::Result< ShardInfo > _create_shard(pg_id_t, uint64_t size_bytes) = 0; + virtual ShardManager::Result< ShardInfo > _seal_shard(shard_id_t) = 0; - virtual BlobManager::Result< blob_id > _put_blob(ShardInfo const&, Blob&&) = 0; - virtual BlobManager::Result< Blob > _get_blob(ShardInfo const&, blob_id) const = 0; - virtual BlobManager::NullResult _del_blob(ShardInfo const&, blob_id) = 0; + virtual BlobManager::Result< blob_id_t > _put_blob(ShardInfo const&, Blob&&) = 0; + virtual BlobManager::Result< Blob > _get_blob(ShardInfo const&, blob_id_t) const = 0; + virtual BlobManager::NullResult _del_blob(ShardInfo const&, blob_id_t) = 0; /// - folly::Future< ShardManager::Result< Shard > > _get_shard(shard_id id) const; + folly::Future< ShardManager::Result< Shard > > _get_shard(shard_id_t id) const; auto _defer() const { return folly::makeSemiFuture().via(folly::getGlobalCPUExecutor()); } + virtual PGManager::NullAsyncResult _create_pg(PGInfo&& pg_info, std::set< std::string, std::less<> > peers) = 0; + virtual PGManager::NullAsyncResult _replace_member(pg_id_t id, peer_id_t const& old_member, + PGMember const& new_member) = 0; + protected: std::mutex _repl_lock; - std::shared_ptr< home_replication::ReplicationService > _repl_svc; - // std::shared_ptr _repl_svc; - peer_id _our_id; + peer_id_t _our_id; /// Our SvcId retrieval and SvcId->IP mapping std::weak_ptr< HomeObjectApplication > _application; /// mutable std::shared_mutex _pg_lock; - std::map< pg_id, PG > _pg_map; + std::map< pg_id_t, std::shared_ptr< PG > > _pg_map; mutable std::shared_mutex _shard_lock; - std::map< shard_id, ShardIterator > _shard_map; + std::map< shard_id_t, ShardIterator > _shard_map; /// - PGManager::Result< PG > _get_pg(pg_id pg); public: explicit HomeObjectImpl(std::weak_ptr< HomeObjectApplication >&& application) : @@ -81,31 +92,30 @@ class HomeObjectImpl : public HomeObject, HomeObjectImpl& operator=(const HomeObjectImpl&) = delete; HomeObjectImpl& operator=(HomeObjectImpl&&) noexcept = delete; - // This is public but not exposed in the API above - void init_repl_svc(); - - std::shared_ptr< home_replication::ReplicationService > get_repl_svc() { return _repl_svc; } - std::shared_ptr< BlobManager > blob_manager() final; std::shared_ptr< PGManager > pg_manager() final; std::shared_ptr< ShardManager > shard_manager() final; - peer_id our_uuid() const final { return _our_id; } + peer_id_t our_uuid() const final { return _our_id; } /// PgManager + PGManager::Result< PG const* > get_pg(pg_id_t pgid) const; PGManager::NullAsyncResult create_pg(PGInfo&& pg_info) final; - PGManager::NullAsyncResult replace_member(pg_id id, peer_id const& old_member, PGMember const& new_member) final; + PGManager::NullAsyncResult replace_member(pg_id_t id, peer_id_t const& old_member, + PGMember const& new_member) final; /// ShardManager - ShardManager::AsyncResult< ShardInfo > get_shard(shard_id id) const final; - ShardManager::AsyncResult< ShardInfo > create_shard(pg_id pg_owner, uint64_t size_bytes) final; - ShardManager::AsyncResult< InfoList > list_shards(pg_id pg) const final; - ShardManager::AsyncResult< ShardInfo > seal_shard(shard_id id) final; + ShardManager::AsyncResult< ShardInfo > get_shard(shard_id_t id) const final; + ShardManager::AsyncResult< ShardInfo > create_shard(pg_id_t pg_owner, uint64_t size_bytes) final; + ShardManager::AsyncResult< InfoList > list_shards(pg_id_t pg) const final; + ShardManager::AsyncResult< ShardInfo > seal_shard(shard_id_t id) final; uint64_t get_current_timestamp(); + /// BlobManager - BlobManager::AsyncResult< blob_id > put(shard_id shard, Blob&&) final; - BlobManager::AsyncResult< Blob > get(shard_id shard, blob_id const& blob, uint64_t off, uint64_t len) const final; - BlobManager::NullAsyncResult del(shard_id shard, blob_id const& blob) final; + BlobManager::AsyncResult< blob_id_t > put(shard_id_t shard, Blob&&) final; + BlobManager::AsyncResult< Blob > get(shard_id_t shard, blob_id_t const& blob, uint64_t off, + uint64_t len) const final; + BlobManager::NullAsyncResult del(shard_id_t shard, blob_id_t const& blob) final; }; } // namespace homeobject diff --git a/src/lib/homestore/homeobject.hpp b/src/lib/homestore/homeobject.hpp deleted file mode 100644 index 24ab48d8..00000000 --- a/src/lib/homestore/homeobject.hpp +++ /dev/null @@ -1,54 +0,0 @@ -#pragma once - -#include -#include - -#include "mocks/repl_service.h" - -#include "lib/homeobject_impl.hpp" - -namespace homestore { -struct meta_blk; -} - -namespace homeobject { - -class HSHomeObject : public HomeObjectImpl { - /// Overridable Helpers - ShardManager::Result< ShardInfo > _create_shard(pg_id, uint64_t size_bytes) override; - ShardManager::Result< ShardInfo > _seal_shard(shard_id) override; - - BlobManager::Result< blob_id > _put_blob(ShardInfo const&, Blob&&) override; - BlobManager::Result< Blob > _get_blob(ShardInfo const&, blob_id) const override; - BlobManager::NullResult _del_blob(ShardInfo const&, blob_id) override; - /// - mutable std::shared_mutex _flying_shard_lock; - std::map< int64_t, Shard > _flying_shards; - -private: - shard_id generate_new_shard_id(pg_id pg); - uint64_t get_sequence_num_from_shard_id(uint64_t shard_id); - std::string serialize_shard(const Shard& shard) const; - Shard deserialize_shard(const std::string& shard_info_str) const; - void do_commit_new_shard(const Shard& shard); - void do_commit_seal_shard(const Shard& shard); - void register_homestore_metablk_callback(); - void* get_shard_metablk(shard_id id); - -public: - using HomeObjectImpl::HomeObjectImpl; - ~HSHomeObject(); - - void init_homestore(); - - static const std::string s_shard_info_sub_type; - void on_shard_meta_blk_found(homestore::meta_blk* mblk, sisl::byte_view buf, size_t size); - - bool precheck_and_decode_shard_msg(int64_t lsn, sisl::blob const& header, sisl::blob const& key, std::string* msg); - - void on_pre_commit_shard_msg(int64_t lsn, sisl::blob const& header, sisl::blob const& key, void* user_ctx); - void on_rollback_shard_msg(int64_t lsn, sisl::blob const& header, sisl::blob const& key, void* user_ctx); - void on_shard_message_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, void* user_ctx); -}; - -} // namespace homeobject diff --git a/src/lib/homestore/CMakeLists.txt b/src/lib/homestore_backend/CMakeLists.txt similarity index 63% rename from src/lib/homestore/CMakeLists.txt rename to src/lib/homestore_backend/CMakeLists.txt index cc8b8463..1efe7db3 100644 --- a/src/lib/homestore/CMakeLists.txt +++ b/src/lib/homestore_backend/CMakeLists.txt @@ -1,13 +1,13 @@ cmake_minimum_required (VERSION 3.11) - add_library ("${PROJECT_NAME}_homestore") target_sources("${PROJECT_NAME}_homestore" PRIVATE - homeobject.cpp - blob_manager.cpp - shard_manager.cpp - heap_chunk_selector.cpp - replication_state_machine.cpp + hs_homeobject.cpp + hs_blob_manager.cpp + hs_shard_manager.cpp + hs_pg_manager.cpp + heap_chunk_selector.cpp + replication_state_machine.cpp $ ) target_link_libraries("${PROJECT_NAME}_homestore" @@ -15,5 +15,5 @@ target_link_libraries("${PROJECT_NAME}_homestore" ) if(BUILD_TESTING) - add_subdirectory(tests) + add_subdirectory(tests) endif() diff --git a/src/lib/homestore/heap_chunk_selector.cpp b/src/lib/homestore_backend/heap_chunk_selector.cpp similarity index 100% rename from src/lib/homestore/heap_chunk_selector.cpp rename to src/lib/homestore_backend/heap_chunk_selector.cpp diff --git a/src/lib/homestore/heap_chunk_selector.h b/src/lib/homestore_backend/heap_chunk_selector.h similarity index 100% rename from src/lib/homestore/heap_chunk_selector.h rename to src/lib/homestore_backend/heap_chunk_selector.h diff --git a/src/lib/homestore/blob_manager.cpp b/src/lib/homestore_backend/hs_blob_manager.cpp similarity index 69% rename from src/lib/homestore/blob_manager.cpp rename to src/lib/homestore_backend/hs_blob_manager.cpp index c5f7eacf..de8e0084 100644 --- a/src/lib/homestore/blob_manager.cpp +++ b/src/lib/homestore_backend/hs_blob_manager.cpp @@ -1,16 +1,16 @@ -#include "homeobject.hpp" +#include "hs_homeobject.hpp" namespace homeobject { -BlobManager::Result< blob_id > HSHomeObject::_put_blob(ShardInfo const&, Blob&&) { +BlobManager::Result< blob_id_t > HSHomeObject::_put_blob(ShardInfo const&, Blob&&) { return folly::makeUnexpected(BlobError::UNKNOWN); } -BlobManager::Result< Blob > HSHomeObject::_get_blob(ShardInfo const&, blob_id) const { +BlobManager::Result< Blob > HSHomeObject::_get_blob(ShardInfo const&, blob_id_t) const { return folly::makeUnexpected(BlobError::UNKNOWN); } -BlobManager::NullResult HSHomeObject::_del_blob(ShardInfo const&, blob_id) { +BlobManager::NullResult HSHomeObject::_del_blob(ShardInfo const&, blob_id_t) { return folly::makeUnexpected(BlobError::UNKNOWN); } diff --git a/src/lib/homestore/homeobject.cpp b/src/lib/homestore_backend/hs_homeobject.cpp similarity index 66% rename from src/lib/homestore/homeobject.cpp rename to src/lib/homestore_backend/hs_homeobject.cpp index 03f5e576..1656efde 100644 --- a/src/lib/homestore/homeobject.cpp +++ b/src/lib/homestore_backend/hs_homeobject.cpp @@ -1,10 +1,13 @@ -#include "homeobject.hpp" - #include #include #include +#include #include +#include +#include "hs_homeobject.hpp" +#include "heap_chunk_selector.h" + namespace homeobject { const std::string HSHomeObject::s_shard_info_sub_type = "shard_info"; @@ -13,7 +16,6 @@ extern std::shared_ptr< HomeObject > init_homeobject(std::weak_ptr< HomeObjectAp LOGINFOMOD(homeobject, "Initializing HomeObject"); auto instance = std::make_shared< HSHomeObject >(std::move(application)); instance->init_homestore(); - instance->init_repl_svc(); return instance; } @@ -38,21 +40,26 @@ void HSHomeObject::init_homestore() { device_info.emplace_back(std::filesystem::canonical(path).string(), homestore::HSDevType::Data); } - /// TODO need Repl service eventually and use HeapChunkSelector using namespace homestore; - bool need_format = HomeStore::instance()->with_data_service(nullptr).with_log_service().start( - hs_input_params{.devices = device_info, .app_mem_size = app_mem_size}, - [this]() { register_homestore_metablk_callback(); }); + bool need_format = HomeStore::instance() + ->with_index_service(nullptr) + .with_repl_data_service(repl_impl_type::solo, std::make_shared< HeapChunkSelector >()) + .start(hs_input_params{.devices = device_info, .app_mem_size = app_mem_size}, + [this]() { register_homestore_metablk_callback(); }); - /// TODO how should this work? - LOGWARN("Persistence Looks Vacant, Formatting!!"); + LOGWARN("Seems like we are booting/starting first time, Formatting!!"); if (need_format) { - HomeStore::instance()->format_and_start(std::map< uint32_t, hs_format_params >{ + HomeStore::instance()->format_and_start({ {HS_SERVICE::META, hs_format_params{.size_pct = 5.0}}, {HS_SERVICE::LOG_REPLICATED, hs_format_params{.size_pct = 10.0}}, - {HS_SERVICE::LOG_LOCAL, hs_format_params{.size_pct = 5.0}}, - {HS_SERVICE::DATA, hs_format_params{.size_pct = 50.0}}, - {HS_SERVICE::INDEX, hs_format_params{.size_pct = 30.0}}, + {HS_SERVICE::LOG_LOCAL, hs_format_params{.size_pct = 0.1}}, // TODO: Remove this after HS disables LOG_LOCAL + {HS_SERVICE::REPLICATION, + hs_format_params{.size_pct = 80.0, + .num_chunks = 65000, + .block_size = 1024, + .alloc_type = blk_allocator_type_t::append, + .chunk_sel_type = chunk_selector_type_t::CUSTOM}}, + {HS_SERVICE::INDEX, hs_format_params{.size_pct = 5.0}}, }); } } @@ -60,23 +67,19 @@ void HSHomeObject::init_homestore() { void HSHomeObject::register_homestore_metablk_callback() { // register some callbacks for metadata recovery; using namespace homestore; - HomeStore::instance()->meta_service().register_handler( + hs()->meta_service().register_handler( HSHomeObject::s_shard_info_sub_type, [this](homestore::meta_blk* mblk, sisl::byte_view buf, size_t size) { on_shard_meta_blk_found(mblk, buf, size); }, nullptr, true); -} -void HomeObjectImpl::init_repl_svc() { - auto lg = std::scoped_lock(_repl_lock); - if (!_repl_svc) { - // TODO this should come from persistence. - LOGINFOMOD(homeobject, "First time start-up...initiating request for SvcId."); - _our_id = _application.lock()->discover_svcid(std::nullopt); - LOGINFOMOD(homeobject, "SvcId received: {}", to_string(_our_id)); - _repl_svc = home_replication::create_repl_service([](auto) { return nullptr; }); - } + hs()->meta_service().register_handler( + "PGManager", + [this](homestore::meta_blk* mblk, sisl::byte_view buf, size_t size) { + on_pg_meta_blk_found(std::move(buf), voidptr_cast(mblk)); + }, + nullptr, true); } HSHomeObject::~HSHomeObject() { @@ -97,5 +100,4 @@ void HSHomeObject::on_shard_meta_blk_found(homestore::meta_blk* mblk, sisl::byte // deleted too. do_commit_new_shard(shard); } - } // namespace homeobject diff --git a/src/lib/homestore_backend/hs_homeobject.hpp b/src/lib/homestore_backend/hs_homeobject.hpp new file mode 100644 index 00000000..c9826480 --- /dev/null +++ b/src/lib/homestore_backend/hs_homeobject.hpp @@ -0,0 +1,94 @@ +#pragma once + +#include +#include + +#include +#include +#include +#include "lib/homeobject_impl.hpp" + +namespace homestore { +struct meta_blk; +} + +namespace homeobject { + +class HSHomeObject : public HomeObjectImpl { + /// Overridable Helpers + ShardManager::Result< ShardInfo > _create_shard(pg_id_t, uint64_t size_bytes) override; + ShardManager::Result< ShardInfo > _seal_shard(shard_id_t) override; + + BlobManager::Result< blob_id_t > _put_blob(ShardInfo const&, Blob&&) override; + BlobManager::Result< Blob > _get_blob(ShardInfo const&, blob_id_t) const override; + BlobManager::NullResult _del_blob(ShardInfo const&, blob_id_t) override; + + PGManager::NullAsyncResult _create_pg(PGInfo&& pg_info, std::set< std::string, std::less<> > peers) override; + PGManager::NullAsyncResult _replace_member(pg_id_t id, peer_id_t const& old_member, + PGMember const& new_member) override; + + mutable std::shared_mutex _flying_shard_lock; + std::map< int64_t, Shard > _flying_shards; + +public: +#pragma pack(1) + struct pg_members { + static constexpr uint64_t max_name_len = 32; + peer_id_t id; + char name[max_name_len]; + int32_t priority{0}; + }; + + struct pg_info_superblk { + pg_id_t id; + uint32_t num_members; + peer_id_t replica_set_uuid; + pg_members members[1]; // ISO C++ forbids zero-size array + }; +#pragma pack() + + struct HS_PG : public PG { + homestore::superblk< pg_info_superblk > pg_sb_; + std::shared_ptr< homestore::ReplDev > repl_dev_; + + HS_PG(PGInfo info, shared< homestore::ReplDev > rdev); + HS_PG(homestore::superblk< pg_info_superblk > const& sb, shared< homestore::ReplDev > rdev); + static PGInfo pg_info_from_sb(homestore::superblk< pg_info_superblk > const& sb); + }; + +private: + static homestore::ReplicationService& hs_repl_service() { return homestore::hs()->repl_service(); } + + shard_id_t generate_new_shard_id(pg_id_t pg); + shard_id_t make_new_shard_id(pg_id_t pg, uint64_t sequence_num) const; + uint64_t get_sequence_num_from_shard_id(uint64_t shard_id_t); + std::string serialize_shard(const Shard& shard) const; + Shard deserialize_shard(const std::string& shard_info_str) const; + + void do_commit_new_shard(const Shard& shard); + void do_commit_seal_shard(const Shard& shard); + void register_homestore_metablk_callback(); + void* get_shard_metablk(shard_id_t id); + void on_pg_meta_blk_found(sisl::byte_view const& buf, void* meta_cookie); + void add_pg_to_map(shared< HS_PG > hs_pg); + +public: + using HomeObjectImpl::HomeObjectImpl; + ~HSHomeObject(); + + void init_homestore(); + + static const std::string s_shard_info_sub_type; + void on_shard_meta_blk_found(homestore::meta_blk* mblk, sisl::byte_view buf, size_t size); + + bool precheck_and_decode_shard_msg(int64_t lsn, sisl::blob const& header, sisl::blob const& key, std::string* msg); + + bool on_pre_commit_shard_msg(int64_t lsn, sisl::blob const& header, sisl::blob const& key, + cintrusive< homestore::repl_req_ctx >&); + void on_rollback_shard_msg(int64_t lsn, sisl::blob const& header, sisl::blob const& key, + cintrusive< homestore::repl_req_ctx >&); + void on_shard_message_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, + cintrusive< homestore::repl_req_ctx >& hs_ctx); +}; + +} // namespace homeobject diff --git a/src/lib/homestore_backend/hs_pg_manager.cpp b/src/lib/homestore_backend/hs_pg_manager.cpp new file mode 100644 index 00000000..369327d0 --- /dev/null +++ b/src/lib/homestore_backend/hs_pg_manager.cpp @@ -0,0 +1,155 @@ +#include +#include +#include "hs_homeobject.hpp" +#include "replication_state_machine.hpp" + +using namespace homestore; +namespace homeobject { + +PGError toPgError(ReplServiceError const& e) { + switch (e) { + case ReplServiceError::BAD_REQUEST: + [[fallthrough]]; + case ReplServiceError::CANCELLED: + [[fallthrough]]; + case ReplServiceError::CONFIG_CHANGING: + [[fallthrough]]; + case ReplServiceError::SERVER_ALREADY_EXISTS: + [[fallthrough]]; + case ReplServiceError::SERVER_IS_JOINING: + [[fallthrough]]; + case ReplServiceError::SERVER_IS_LEAVING: + [[fallthrough]]; + case ReplServiceError::RESULT_NOT_EXIST_YET: + [[fallthrough]]; + case ReplServiceError::NOT_LEADER: + [[fallthrough]]; + case ReplServiceError::TERM_MISMATCH: + case ReplServiceError::NOT_IMPLEMENTED: + return PGError::INVALID_ARG; + case ReplServiceError::CANNOT_REMOVE_LEADER: + return PGError::UNKNOWN_PEER; + case ReplServiceError::TIMEOUT: + return PGError::TIMEOUT; + case ReplServiceError::SERVER_NOT_FOUND: + return PGError::UNKNOWN_PG; + case ReplServiceError::OK: + DEBUG_ASSERT(false, "Should not process OK!"); + [[fallthrough]]; + case ReplServiceError::FAILED: + return PGError::UNKNOWN; + } + return PGError::UNKNOWN; +} + +[[maybe_unused]] static homestore::ReplDev& pg_repl_dev(PG const& pg) { + return *(static_cast< HSHomeObject::HS_PG const& >(pg).repl_dev_); +} + +PGManager::NullAsyncResult HSHomeObject::_create_pg(PGInfo&& pg_info, std::set< std::string, std::less<> > peers) { + pg_info.replica_set_uuid = boost::uuids::random_generator()(); + return hs_repl_service() + .create_repl_dev(pg_info.replica_set_uuid, std::move(peers), std::make_unique< ReplicationStateMachine >(this)) + .thenValue([this, pg_info = std::move(pg_info)](auto&& v) -> PGManager::NullResult { + if (v.hasError()) { return folly::makeUnexpected(PGError::INVALID_ARG); } + add_pg_to_map(std::make_shared< HS_PG >(std::move(pg_info), std::move(v.value()))); + return folly::Unit(); + }); +} + +PGManager::NullAsyncResult HSHomeObject::_replace_member(pg_id_t id, peer_id_t const& old_member, + PGMember const& new_member) { + return folly::makeSemiFuture< PGManager::NullResult >(folly::makeUnexpected(PGError::UNSUPPORTED_OP)); +} + +void HSHomeObject::add_pg_to_map(shared< HS_PG > hs_pg) { + RELEASE_ASSERT(hs_pg->pg_info_.replica_set_uuid == hs_pg->repl_dev_->group_id(), + "PGInfo replica set uuid mismatch with ReplDev instance for {}", + boost::uuids::to_string(hs_pg->pg_info_.replica_set_uuid)); + auto lg = std::scoped_lock(_pg_lock); + auto [it1, _] = _pg_map.try_emplace(hs_pg->pg_info_.id, std::move(hs_pg)); + RELEASE_ASSERT(_pg_map.end() != it1, "Unknown map insert error!"); +} + +#if 0 +std::string HSHomeObject::serialize_pg_info(PGInfo const& pginfo) { + nlohmann::json j; + j["pg_info"]["pg_id_t"] = pginfo.id; + j["pg_info"]["repl_uuid"] = boost::uuids::to_string(pginfo.replica_set_uuid); + + nlohmann::json members_j = {}; + for (auto const& member : pginfo.members) { + nlohmann::json member_j; + member_j["member_id"] = member.id; + member_j["name"] = member.name; + member_j["priority"] = member.priority; + members_j.push_back(member_j); + } + j["pg_info"]["members"] = members_j; + return j.dump(); +} + +PGInfo HSHomeObject::deserialize_pg_info(std::string const& json_str) { + auto pg_json = nlohmann::json::parse(json_str); + + PGInfo pg_info; + pg_info.id = pg_json["pg_info"]["pg_id_t"].get< pg_id_t >(); + pg_info.replica_set_uuid = boost::uuids::string_generator()(pg_json["pg_info"]["repl_uuid"].get< std::string >()); + + for (auto const& m : pg_info["pg_info"]["members"]) { + PGMember member; + member.id = m["member_id"].get< pg_id_t >(); + member.name = m["name"].get< std::string >(); + member.priority = m["priority"].get< int32_t >(); + pg_info.members.emplace(std::move(member)); + } + return pg_info; +} +#endif + +void HSHomeObject::on_pg_meta_blk_found(sisl::byte_view const& buf, void* meta_cookie) { + homestore::superblk< pg_info_superblk > pg_sb; + pg_sb.load(buf, meta_cookie); + + hs_repl_service() + .open_repl_dev(pg_sb->replica_set_uuid, std::make_unique< ReplicationStateMachine >(this)) + .thenValue([this, pg_sb = std::move(pg_sb)](auto&& v) { + if (v.hasError()) { + LOGERROR("open_repl_dev for group_id={} has failed", boost::uuids::to_string(pg_sb->replica_set_uuid)); + return; + } + add_pg_to_map(std::make_shared< HS_PG >(pg_sb, std::move(v.value()))); + }); +} + +PGInfo HSHomeObject::HS_PG::pg_info_from_sb(homestore::superblk< pg_info_superblk > const& sb) { + PGInfo pginfo{sb->id}; + for (uint32_t i{0}; i < sb->num_members; ++i) { + pginfo.members.emplace(sb->members[i].id, std::string(sb->members[i].name), sb->members[i].priority); + } + return pginfo; +} + +HSHomeObject::HS_PG::HS_PG(PGInfo info, shared< homestore::ReplDev > rdev) : + PG{std::move(info)}, pg_sb_{"PGManager"}, repl_dev_{std::move(rdev)} { + pg_sb_.create(sizeof(pg_info_superblk) + ((pg_info_.members.size() - 1) * sizeof(pg_members))); + pg_sb_->id = pg_info_.id; + pg_sb_->num_members = pg_info_.members.size(); + pg_sb_->replica_set_uuid = repl_dev_->group_id(); + + uint32_t i{0}; + for (auto const& m : pg_info_.members) { + pg_sb_->members[i].id = m.id; + std::strncpy(pg_sb_->members[i].name, m.name.c_str(), std::min(m.name.size(), pg_members::max_name_len)); + pg_sb_->members[i].priority = m.priority; + ++i; + } + + pg_sb_.write(); +} + +HSHomeObject::HS_PG::HS_PG(homestore::superblk< HSHomeObject::pg_info_superblk > const& sb, + shared< homestore::ReplDev > rdev) : + PG{pg_info_from_sb(sb)}, pg_sb_{sb}, repl_dev_{std::move(rdev)} {} + +} // namespace homeobject \ No newline at end of file diff --git a/src/lib/homestore/shard_manager.cpp b/src/lib/homestore_backend/hs_shard_manager.cpp similarity index 64% rename from src/lib/homestore/shard_manager.cpp rename to src/lib/homestore_backend/hs_shard_manager.cpp index c9d81f57..f2c271c7 100644 --- a/src/lib/homestore/shard_manager.cpp +++ b/src/lib/homestore_backend/hs_shard_manager.cpp @@ -1,34 +1,51 @@ -#include "homeobject.hpp" -#include "replication_message.hpp" - #include #include #include +#include "hs_homeobject.hpp" +#include "replication_message.hpp" +#include "replication_state_machine.hpp" +#include "lib/homeobject_impl.hpp" + namespace homeobject { +static constexpr uint32_t SEQUENCE_BIT_NUM_IN_SHARD{48}; + uint64_t ShardManager::max_shard_size() { return Gi; } -uint64_t ShardManager::max_shard_num_in_pg() { return ((uint64_t)0x01) << shard_width; } +uint64_t ShardManager::max_shard_num_in_pg() { return ((uint64_t)0x01) << SEQUENCE_BIT_NUM_IN_SHARD; } + +shard_id_t HSHomeObject::generate_new_shard_id(pg_id_t pgid) { + HS_PG* pg{nullptr}; + { + std::shared_lock lg{_pg_lock}; + if (auto it = _pg_map.find(pgid); it != _pg_map.end()) { + pg = s_cast< HS_PG* >(it->second.get()); + } else { + RELEASE_ASSERT(false, "commit on a shard with unavailable pg [{}]", pgid); + return 0; + } + } + + std::scoped_lock lg(pg->mtx_); + auto new_sequence_num = ++(pg->shard_sequence_num_); + RELEASE_ASSERT_LT(new_sequence_num, ShardManager::max_shard_num_in_pg(), + "new shard id must be less than ShardManager::max_shard_num_in_pg()"); + return make_new_shard_id(pgid, new_sequence_num); +} -shard_id HSHomeObject::generate_new_shard_id(pg_id pg) { - std::scoped_lock lock_guard(_pg_lock); - auto iter = _pg_map.find(pg); - RELEASE_ASSERT(iter != _pg_map.end(), "Missing pg info"); - auto new_sequence_num = ++(iter->second.shard_sequence_num); - RELEASE_ASSERT(new_sequence_num < ShardManager::max_shard_num_in_pg(), - "new shard id must be less than ShardManager::max_shard_num_in_pg()"); - return make_new_shard_id(pg, new_sequence_num); +shard_id_t HSHomeObject::make_new_shard_id(pg_id_t pg, uint64_t sequence_num) const { + return ((uint64_t)pg << SEQUENCE_BIT_NUM_IN_SHARD) | sequence_num; } -uint64_t HSHomeObject::get_sequence_num_from_shard_id(uint64_t shard_id) { - return shard_id & (max_shard_num_in_pg() - 1); +uint64_t HSHomeObject::get_sequence_num_from_shard_id(uint64_t shard_id_t) { + return shard_id_t & (max_shard_num_in_pg() - 1); } std::string HSHomeObject::serialize_shard(const Shard& shard) const { nlohmann::json j; j["shard_info"]["shard_id"] = shard.info.id; - j["shard_info"]["pg_id"] = shard.info.placement_group; + j["shard_info"]["pg_id_t"] = shard.info.placement_group; j["shard_info"]["state"] = shard.info.state; j["shard_info"]["created_time"] = shard.info.created_time; j["shard_info"]["modified_time"] = shard.info.last_modified_time; @@ -42,8 +59,8 @@ std::string HSHomeObject::serialize_shard(const Shard& shard) const { Shard HSHomeObject::deserialize_shard(const std::string& shard_json_str) const { auto shard_json = nlohmann::json::parse(shard_json_str); ShardInfo shard_info; - shard_info.id = shard_json["shard_info"]["shard_id"].get< shard_id >(); - shard_info.placement_group = shard_json["shard_info"]["pg_id"].get< pg_id >(); + shard_info.id = shard_json["shard_info"]["shard_id"].get< shard_id_t >(); + shard_info.placement_group = shard_json["shard_info"]["pg_id_t"].get< pg_id_t >(); shard_info.state = static_cast< ShardInfo::State >(shard_json["shard_info"]["state"].get< int >()); shard_info.created_time = shard_json["shard_info"]["created_time"].get< uint64_t >(); shard_info.last_modified_time = shard_json["shard_info"]["modified_time"].get< uint64_t >(); @@ -55,29 +72,34 @@ Shard HSHomeObject::deserialize_shard(const std::string& shard_json_str) const { return shard; } -ShardManager::Result< ShardInfo > HSHomeObject::_create_shard(pg_id pg_owner, uint64_t size_bytes) { - auto pg = _get_pg(pg_owner); - if (!bool(pg)) { - LOGWARN("failed to create shard with non-exist pg [{}]", pg_owner); - return folly::makeUnexpected(ShardError::UNKNOWN_PG); +ShardManager::Result< ShardInfo > HSHomeObject::_create_shard(pg_id_t pg_owner, uint64_t size_bytes) { + HS_PG* pg{nullptr}; + { + std::shared_lock lg{_pg_lock}; + if (auto it = _pg_map.find(pg_owner); it != _pg_map.end()) { + pg = s_cast< HS_PG* >(it->second.get()); + } else { + LOGWARN("failed to create shard with non-exist pg [{}]", pg_owner); + return folly::makeUnexpected(ShardError::UNKNOWN_PG); + } } - // TODO: will update to ReplDev when ReplDev on HomeStore is ready; - auto replica_set_var = _repl_svc->get_replica_set(fmt::format("{}", pg_owner)); - if (std::holds_alternative< home_replication::ReplServiceError >(replica_set_var)) { - LOGWARN("failed to get replica set instance for pg [{}]", pg_owner); - return folly::makeUnexpected(ShardError::UNKNOWN_PG); + + if (!pg->repl_dev_) { + LOGWARN("failed to get repl dev instance for pg [{}]", pg_owner); + return folly::makeUnexpected(ShardError::PG_NOT_READY); } - auto replica_set = std::get< home_replication::rs_ptr_t >(replica_set_var); auto new_shard_id = generate_new_shard_id(pg_owner); auto create_time = get_current_timestamp(); auto shard_info = ShardInfo(new_shard_id, pg_owner, ShardInfo::State::OPEN, create_time, create_time, size_bytes, size_bytes, 0); - std::string create_shard_message = serialize_shard(Shard(shard_info)); - // preapre msg header; + std::string const create_shard_message = serialize_shard(Shard(shard_info)); + + // prepare msg header; const uint32_t needed_size = sizeof(ReplicationMessageHeader) + create_shard_message.size(); - auto buf = nuraft::buffer::alloc(needed_size); - uint8_t* raw_ptr = buf->data_begin(); + auto req = repl_result_ctx< ShardManager::Result< ShardInfo > >::make(needed_size); + + uint8_t* raw_ptr = req->hdr_buf_.bytes; ReplicationMessageHeader* header = new (raw_ptr) ReplicationMessageHeader(); header->message_type = ReplicationMessageType::SHARD_MESSAGE; header->payload_size = create_shard_message.size(); @@ -87,20 +109,15 @@ ShardManager::Result< ShardInfo > HSHomeObject::_create_shard(pg_id pg_owner, ui raw_ptr += sizeof(ReplicationMessageHeader); std::memcpy(raw_ptr, create_shard_message.c_str(), create_shard_message.size()); - sisl::sg_list value; // replicate this create shard message to PG members; - auto [p, sf] = folly::makePromiseContract< ShardManager::Result< ShardInfo > >(); - replica_set->write(sisl::blob(buf->data_begin(), needed_size), sisl::blob(), value, static_cast< void* >(&p)); - auto info = std::move(sf).get(); - if (!bool(info)) { - LOGWARN("create new shard [{}] on pg [{}] is failed with error:{}", new_shard_id & shard_mask, pg_owner, - info.error()); - } + pg->repl_dev_->async_alloc_write(req->hdr_buf_, sisl::blob{}, sisl::sg_list{}, req); + auto info = req->result().get(); + header->~ReplicationMessageHeader(); return info; } -ShardManager::Result< ShardInfo > HSHomeObject::_seal_shard(shard_id id) { +ShardManager::Result< ShardInfo > HSHomeObject::_seal_shard(shard_id_t id) { return folly::makeUnexpected(ShardError::UNKNOWN_SHARD); } @@ -124,50 +141,29 @@ bool HSHomeObject::precheck_and_decode_shard_msg(int64_t lsn, sisl::blob const& return true; } -void HSHomeObject::on_pre_commit_shard_msg(int64_t lsn, sisl::blob const& header, sisl::blob const& key, - void* user_ctx) { +bool HSHomeObject::on_pre_commit_shard_msg(int64_t lsn, sisl::blob const& header, sisl::blob const& key, + cintrusive< homestore::repl_req_ctx >&) { std::string shard_msg; if (!precheck_and_decode_shard_msg(lsn, header, key, &shard_msg)) { // header is broken, nothing to do; - return; + return false; } auto shard = deserialize_shard(shard_msg); if (shard.info.state != ShardInfo::State::OPEN) { // it is not an create shard msg, nothing to do; - return; - } - - // write shard header to ReplDev which will bind the newly created shard to one underlay homestore physical chunk; - using namespace homestore; - auto datasvc_page_size = HomeStore::instance()->data_service().get_blk_size(); - if (shard_msg.size() % datasvc_page_size != 0) { - uint32_t need_size = (shard_msg.size() / datasvc_page_size + 1) * datasvc_page_size; - shard_msg.resize(need_size); + return false; } - auto write_buf = iomanager.iobuf_alloc(512, shard_msg.size()); - std::memcpy(r_cast< uint8_t* >(write_buf), r_cast< const uint8_t* >(shard_msg.c_str()), shard_msg.size()); - - sisl::sg_list sgs; - sgs.size = shard_msg.size(); - sgs.iovs.emplace_back(iovec(static_cast< void* >(write_buf), shard_msg.size())); - std::vector< homestore::BlkId > out_blkids; - homestore::MultiBlkId out_blkid; - auto future = HomeStore::instance()->data_service().async_alloc_write(sgs, homestore::blk_alloc_hints(), out_blkid); - iomanager.iobuf_free(write_buf); - // non-zero means write failure. - if ((std::move(future).get())) { - LOGWARN("write lsn {} msg to homestore data service is failed", lsn); - return; - } - shard.chunk_id = out_blkid.chunk_num(); std::scoped_lock lock_guard(_flying_shard_lock); auto [_, happened] = _flying_shards.emplace(lsn, std::move(shard)); RELEASE_ASSERT(happened, "duplicated flying create shard msg"); + + return true; } -void HSHomeObject::on_rollback_shard_msg(int64_t lsn, sisl::blob const& header, sisl::blob const& key, void* user_ctx) { +void HSHomeObject::on_rollback_shard_msg(int64_t lsn, sisl::blob const& header, sisl::blob const& key, + cintrusive< homestore::repl_req_ctx >&) { std::string shard_msg; if (!precheck_and_decode_shard_msg(lsn, header, key, &shard_msg)) { // header is broken, nothing to do; @@ -186,17 +182,18 @@ void HSHomeObject::on_rollback_shard_msg(int64_t lsn, sisl::blob const& header, } void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, - void* user_ctx) { - - folly::Promise< ShardManager::Result< ShardInfo > >* promise = nullptr; - // user_ctx will be nullptr when: + cintrusive< homestore::repl_req_ctx >& hs_ctx) { + // ctx will be nullptr when: // 1. on the follower side // 2. on the leader side but homeobject restarts and replay all commited log entries from the last commit id; - if (user_ctx != nullptr) { promise = r_cast< folly::Promise< ShardManager::Result< ShardInfo > >* >(user_ctx); } + repl_result_ctx< ShardManager::Result< ShardInfo > >* ctx{nullptr}; + if (hs_ctx != nullptr) { + ctx = boost::static_pointer_cast< repl_result_ctx< ShardManager::Result< ShardInfo > > >(hs_ctx).get(); + } std::string shard_msg; if (!precheck_and_decode_shard_msg(lsn, header, key, &shard_msg)) { - if (promise) { promise->setValue(folly::makeUnexpected(ShardError::INVALID_ARG)); } + if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(ShardError::INVALID_ARG)); } return; } @@ -207,7 +204,7 @@ void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& header auto iter = _flying_shards.find(lsn); if (iter == _flying_shards.end()) { LOGWARN("can not find flying shards on lsn {}", lsn); - if (promise) { promise->setValue(folly::makeUnexpected(ShardError::UNKNOWN)); } + if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(ShardError::UNKNOWN)); } return; } shard.chunk_id = iter->second.chunk_id; @@ -237,21 +234,30 @@ void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& header } } - if (promise) { promise->setValue(ShardManager::Result< ShardInfo >(shard.info)); } + if (ctx) { ctx->promise_.setValue(ShardManager::Result< ShardInfo >(shard.info)); } } void HSHomeObject::do_commit_new_shard(const Shard& shard) { - std::scoped_lock lock_guard(_pg_lock, _shard_lock); - auto pg_iter = _pg_map.find(shard.info.placement_group); - RELEASE_ASSERT(pg_iter != _pg_map.end(), "Missing PG info"); - auto& shards = pg_iter->second.shards; + HS_PG* pg{nullptr}; + { + std::shared_lock lg{_pg_lock}; + if (auto it = _pg_map.find(shard.info.placement_group); it != _pg_map.end()) { + pg = s_cast< HS_PG* >(it->second.get()); + } else { + RELEASE_ASSERT(false, "commit on a shard with unavailable pg [{}]", shard.info.placement_group); + return; + } + } + + std::scoped_lock lg{pg->mtx_}; + auto& shards = pg->shards_; auto iter = shards.emplace(shards.end(), shard); auto [_, happened] = _shard_map.emplace(shard.info.id, iter); RELEASE_ASSERT(happened, "duplicated shard info"); // following part is must for follower members or when member is restarted; auto sequence_num = get_sequence_num_from_shard_id(shard.info.id); - if (sequence_num > pg_iter->second.shard_sequence_num) { pg_iter->second.shard_sequence_num = sequence_num; } + if (sequence_num > pg->shard_sequence_num_) { pg->shard_sequence_num_ = sequence_num; } } void HSHomeObject::do_commit_seal_shard(const Shard& shard) { @@ -261,7 +267,7 @@ void HSHomeObject::do_commit_seal_shard(const Shard& shard) { *(shard_iter->second) = shard; } -void* HSHomeObject::get_shard_metablk(shard_id id) { +void* HSHomeObject::get_shard_metablk(shard_id_t id) { std::scoped_lock lock_guard(_shard_lock); auto shard_iter = _shard_map.find(id); if (shard_iter == _shard_map.end()) { return nullptr; } diff --git a/src/lib/homestore/replication_message.hpp b/src/lib/homestore_backend/replication_message.hpp similarity index 89% rename from src/lib/homestore/replication_message.hpp rename to src/lib/homestore_backend/replication_message.hpp index 299effca..426da8ce 100644 --- a/src/lib/homestore/replication_message.hpp +++ b/src/lib/homestore_backend/replication_message.hpp @@ -7,7 +7,7 @@ namespace homeobject { -ENUM(ReplicationMessageType, uint16_t, PG_MESSAGE = 0, SHARD_MESSAGE, BLOB_MESSAGE, UNKNOWN_MESSAGE); +VENUM(ReplicationMessageType, uint16_t, PG_MESSAGE = 0, SHARD_MESSAGE = 1, BLOB_MESSAGE = 2, UNKNOWN_MESSAGE = 3); // magic num comes from the first 8 bytes of 'echo homeobject_replication | md5sum' static constexpr uint64_t HOMEOBJECT_REPLICATION_MAGIC = 0x11153ca24efc8d34; diff --git a/src/lib/homestore/replication_state_machine.cpp b/src/lib/homestore_backend/replication_state_machine.cpp similarity index 65% rename from src/lib/homestore/replication_state_machine.cpp rename to src/lib/homestore_backend/replication_state_machine.cpp index e568f070..e9ebe6a8 100644 --- a/src/lib/homestore/replication_state_machine.cpp +++ b/src/lib/homestore_backend/replication_state_machine.cpp @@ -4,7 +4,7 @@ namespace homeobject { void ReplicationStateMachine::on_commit(int64_t lsn, const sisl::blob& header, const sisl::blob& key, - const home_replication::pba_list_t& pbas, void* ctx) { + const homestore::MultiBlkId& pbas, cintrusive< homestore::repl_req_ctx >& ctx) { LOGINFO("applying raft log commit with lsn:{}", lsn); const ReplicationMessageHeader* msg_header = r_cast< const ReplicationMessageHeader* >(header.bytes); @@ -21,13 +21,15 @@ void ReplicationStateMachine::on_commit(int64_t lsn, const sisl::blob& header, c } } -void ReplicationStateMachine::on_pre_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, void* ctx) { +bool ReplicationStateMachine::on_pre_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, + cintrusive< homestore::repl_req_ctx >& ctx) { + bool ret{false}; LOGINFO("on_pre_commit with lsn:{}", lsn); const ReplicationMessageHeader* msg_header = r_cast< const ReplicationMessageHeader* >(header.bytes); switch (msg_header->message_type) { case ReplicationMessageType::SHARD_MESSAGE: { - _home_object->on_pre_commit_shard_msg(lsn, header, key, ctx); + ret = _home_object->on_pre_commit_shard_msg(lsn, header, key, ctx); break; } case ReplicationMessageType::PG_MESSAGE: @@ -36,9 +38,11 @@ void ReplicationStateMachine::on_pre_commit(int64_t lsn, sisl::blob const& heade break; } } + return ret; } -void ReplicationStateMachine::on_rollback(int64_t lsn, sisl::blob const& header, sisl::blob const& key, void* ctx) { +void ReplicationStateMachine::on_rollback(int64_t lsn, sisl::blob const& header, sisl::blob const& key, + cintrusive< homestore::repl_req_ctx >& ctx) { LOGINFO("rollback with lsn:{}", lsn); const ReplicationMessageHeader* msg_header = r_cast< const ReplicationMessageHeader* >(header.bytes); @@ -55,6 +59,12 @@ void ReplicationStateMachine::on_rollback(int64_t lsn, sisl::blob const& header, } } +homestore::blk_alloc_hints ReplicationStateMachine::get_blk_alloc_hints(sisl::blob const& header, + cintrusive< homestore::repl_req_ctx >& ctx) { + // TODO: Return blk_alloc_hints specific to create shard or blob put + return homestore::blk_alloc_hints{}; +} + void ReplicationStateMachine::on_replica_stop() {} } // namespace homeobject diff --git a/src/lib/homestore/replication_state_machine.hpp b/src/lib/homestore_backend/replication_state_machine.hpp similarity index 55% rename from src/lib/homestore/replication_state_machine.hpp rename to src/lib/homestore_backend/replication_state_machine.hpp index c02d1b2f..dc8f9b6a 100644 --- a/src/lib/homestore/replication_state_machine.hpp +++ b/src/lib/homestore_backend/replication_state_machine.hpp @@ -1,17 +1,42 @@ #pragma once -#include "homeobject.hpp" -#include "mocks/repl_service.h" +#include +#include +#include "hs_homeobject.hpp" namespace homeobject { -class HomeObjectImpl; +class HSHomeObject; -class ReplicationStateMachine : public home_replication::ReplicaSetListener { +struct ho_repl_ctx : public homestore::repl_req_ctx { + sisl::io_blob_safe hdr_buf_; + + ho_repl_ctx(uint32_t size) : homestore::repl_req_ctx{}, hdr_buf_{size} {} + template < typename T > + T* to() { + return r_cast< T* >(this); + } +}; + +template < typename T > +struct repl_result_ctx : public ho_repl_ctx { + folly::Promise< T > promise_; + + template < typename... Args > + static intrusive< repl_result_ctx< T > > make(Args&&... args) { + return intrusive< repl_result_ctx< T > >{new repl_result_ctx< T >(std::forward< Args >(args)...)}; + } + + repl_result_ctx(uint32_t hdr_size) : ho_repl_ctx{hdr_size} {} + folly::SemiFuture< T > result() { return promise_.getSemiFuture(); } +}; + +class ReplicationStateMachine : public homestore::ReplDevListener { public: explicit ReplicationStateMachine(HSHomeObject* home_object) : _home_object(home_object) {} - ~ReplicationStateMachine() = default; + virtual ~ReplicationStateMachine() = default; + /// @brief Called when the log entry has been committed in the replica set. /// /// This function is called from a dedicated commit thread which is different from the original thread calling @@ -23,8 +48,8 @@ class ReplicationStateMachine : public home_replication::ReplicaSetListener { /// @param pbas - List of pbas where data is written to the storage engine. /// @param ctx - User contenxt passed as part of the replica_set::write() api /// - virtual void on_commit(int64_t lsn, const sisl::blob& header, const sisl::blob& key, - const home_replication::pba_list_t& pbas, void* ctx); + void on_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, homestore::MultiBlkId const& blkids, + cintrusive< homestore::repl_req_ctx >& ctx) override; /// @brief Called when the log entry has been received by the replica set. /// @@ -44,7 +69,8 @@ class ReplicationStateMachine : public home_replication::ReplicaSetListener { /// @param header - Header originally passed with replica_set::write() api /// @param key - Key originally passed with replica_set::write() api /// @param ctx - User contenxt passed as part of the replica_set::write() api - virtual void on_pre_commit(int64_t lsn, const sisl::blob& header, const sisl::blob& key, void* ctx); + bool on_pre_commit(int64_t lsn, const sisl::blob& header, const sisl::blob& key, + cintrusive< homestore::repl_req_ctx >& ctx) override; /// @brief Called when the log entry has been rolled back by the replica set. /// @@ -59,10 +85,22 @@ class ReplicationStateMachine : public home_replication::ReplicaSetListener { /// @param header - Header originally passed with replica_set::write() api /// @param key - Key originally passed with replica_set::write() api /// @param ctx - User contenxt passed as part of the replica_set::write() api - virtual void on_rollback(int64_t lsn, const sisl::blob& header, const sisl::blob& key, void* ctx); + void on_rollback(int64_t lsn, const sisl::blob& header, const sisl::blob& key, + cintrusive< homestore::repl_req_ctx >& ctx) override; + + /// @brief Called when replication module is trying to allocate a block to write the value + /// + /// This function can be called both on leader and follower when it is trying to allocate a block to write the + /// value. Caller is expected to provide hints for allocation based on the header supplied as part of original + /// write. In cases where caller don't care about the hints can return default blk_alloc_hints. + /// + /// @param header Header originally passed with repl_dev::write() api on the leader + /// @return Expected to return blk_alloc_hints for this write + homestore::blk_alloc_hints get_blk_alloc_hints(sisl::blob const& header, + cintrusive< homestore::repl_req_ctx >& ctx) override; /// @brief Called when the replica set is being stopped - virtual void on_replica_stop(); + void on_replica_stop() override; private: HSHomeObject* _home_object; diff --git a/src/lib/homestore/tests/CMakeLists.txt b/src/lib/homestore_backend/tests/CMakeLists.txt similarity index 84% rename from src/lib/homestore/tests/CMakeLists.txt rename to src/lib/homestore_backend/tests/CMakeLists.txt index 114b8afc..edda8ba0 100644 --- a/src/lib/homestore/tests/CMakeLists.txt +++ b/src/lib/homestore_backend/tests/CMakeLists.txt @@ -14,13 +14,13 @@ set_property(TEST HomeObject PROPERTY RUN_SERIAL 1) add_executable (test_shard_manager) target_sources(test_shard_manager PRIVATE test_shard_manager.cpp) target_link_libraries(test_shard_manager - homeobject_homestore - ${COMMON_TEST_DEPS} - ) + homeobject_homestore + ${COMMON_TEST_DEPS} +) add_test(NAME ShardManagerTest COMMAND ${CMAKE_BINARY_DIR}/bin/test_shard_manager) add_executable (test_heap_chunk_selector) target_sources(test_heap_chunk_selector PRIVATE test_heap_chunk_selector.cpp ../heap_chunk_selector.cpp) -target_link_libraries(test_heap_chunk_selector ${COMMON_TEST_DEPS}) +target_link_libraries(test_heap_chunk_selector homeobject_homestore ${COMMON_TEST_DEPS}) add_test(NAME HeapChunkSelectorTest COMMAND ${CMAKE_BINARY_DIR}/bin/test_heap_chunk_selector) diff --git a/src/lib/homestore/tests/test_heap_chunk_selector.cpp b/src/lib/homestore_backend/tests/test_heap_chunk_selector.cpp similarity index 98% rename from src/lib/homestore/tests/test_heap_chunk_selector.cpp rename to src/lib/homestore_backend/tests/test_heap_chunk_selector.cpp index dc927290..4c0cb259 100644 --- a/src/lib/homestore/tests/test_heap_chunk_selector.cpp +++ b/src/lib/homestore_backend/tests/test_heap_chunk_selector.cpp @@ -1,4 +1,4 @@ -#include "lib/homestore/heap_chunk_selector.h" +#include "lib/homestore_backend/heap_chunk_selector.h" #include diff --git a/src/lib/homestore/tests/test_home_object.cpp b/src/lib/homestore_backend/tests/test_home_object.cpp similarity index 93% rename from src/lib/homestore/tests/test_home_object.cpp rename to src/lib/homestore_backend/tests/test_home_object.cpp index 964ffd85..5691e402 100644 --- a/src/lib/homestore/tests/test_home_object.cpp +++ b/src/lib/homestore_backend/tests/test_home_object.cpp @@ -10,7 +10,7 @@ #include -#include "lib/homestore/homeobject.hpp" +#include "lib/homestore_backend/hs_homeobject.hpp" using namespace std::chrono_literals; @@ -40,12 +40,12 @@ class FixtureApp : public homeobject::HomeObjectApplication { device_info.emplace_back(std::filesystem::canonical(fpath)); return device_info; } - homeobject::peer_id discover_svcid(std::optional< homeobject::peer_id > const&) const override { + homeobject::peer_id_t discover_svcid(std::optional< homeobject::peer_id_t > const&) const override { return boost::uuids::random_generator()(); } /// TODO /// This will have to work if we test replication in the future - std::string lookup_peer(homeobject::peer_id const&) const override { return "test_fixture.com"; } + std::string lookup_peer(homeobject::peer_id_t const&) const override { return "test_fixture.com"; } }; TEST(HomeObject, BasicEquivalence) { @@ -81,7 +81,7 @@ TEST_F(HomeObjectFixture, TestValidations) { homeobject::PGMember{boost::uuids::random_generator()(), "new_member", 1}) .get() .error(), - PGError::UNKNOWN_PG); + PGError::UNSUPPORTED_OP); EXPECT_EQ(ShardError::UNKNOWN_PG, _obj_inst->shard_manager()->create_shard(1, 1000).get().error()); EXPECT_EQ(ShardError::INVALID_ARG, _obj_inst->shard_manager()->create_shard(1, 0).get().error()); EXPECT_EQ(ShardError::INVALID_ARG, _obj_inst->shard_manager()->create_shard(1, 2 * Gi).get().error()); diff --git a/src/lib/homestore/tests/test_shard_manager.cpp b/src/lib/homestore_backend/tests/test_shard_manager.cpp similarity index 93% rename from src/lib/homestore/tests/test_shard_manager.cpp rename to src/lib/homestore_backend/tests/test_shard_manager.cpp index de41df11..b78fde16 100644 --- a/src/lib/homestore/tests/test_shard_manager.cpp +++ b/src/lib/homestore_backend/tests/test_shard_manager.cpp @@ -11,12 +11,11 @@ // will allow unit tests to access object private/protected for validation; #define protected public -#include "lib/homestore/homeobject.hpp" -#include "lib/homestore/replication_message.hpp" -#include "lib/homestore/replication_state_machine.hpp" -#include "mocks/mock_replica_set.hpp" +#include "lib/homestore_backend/hs_homeobject.hpp" +#include "lib/homestore_backend/replication_message.hpp" +#include "lib/homestore_backend/replication_state_machine.hpp" -using homeobject::shard_id; +using homeobject::shard_id_t; using homeobject::ShardError; using homeobject::ShardInfo; @@ -40,18 +39,18 @@ class FixtureApp : public homeobject::HomeObjectApplication { device_info.emplace_back(std::filesystem::canonical(fpath)); return device_info; } - homeobject::peer_id discover_svcid(std::optional< homeobject::peer_id > const&) const override { + homeobject::peer_id_t discover_svcid(std::optional< homeobject::peer_id_t > const&) const override { return boost::uuids::random_generator()(); } - std::string lookup_peer(homeobject::peer_id const&) const override { return "test_fixture.com"; } + std::string lookup_peer(homeobject::peer_id_t const&) const override { return "test_fixture.com"; } }; class ShardManagerTesting : public ::testing::Test { public: - homeobject::pg_id _pg_id{1u}; - homeobject::peer_id _peer1; - homeobject::peer_id _peer2; - homeobject::shard_id _shard_id{100u}; + homeobject::pg_id_t _pg_id{1u}; + homeobject::peer_id_t _peer1; + homeobject::peer_id_t _peer2; + homeobject::shard_id_t _shard_id{100u}; void SetUp() override { app = std::make_shared< FixtureApp >(); @@ -101,11 +100,12 @@ TEST_F(ShardManagerTesting, ListShardsOnEmptyPg) { } class ShardManagerWithShardsTesting : public ShardManagerTesting { - std::shared_ptr< home_replication::ReplicaSetListener > _listener; + // std::shared_ptr< home_replication::ReplicaSetListener > _listener; public: void SetUp() override { ShardManagerTesting::SetUp(); +#if 0 homeobject::HSHomeObject* ho = dynamic_cast< homeobject::HSHomeObject* >(_home_object.get()); EXPECT_TRUE(ho != nullptr); auto rs = ho->get_repl_svc()->get_replica_set(fmt::format("{}", _pg_id)); @@ -114,6 +114,7 @@ class ShardManagerWithShardsTesting : public ShardManagerTesting { dynamic_cast< home_replication::MockReplicaSet* >(std::get< home_replication::rs_ptr_t >(rs).get()); _listener = std::make_shared< homeobject::ReplicationStateMachine >(ho); replica_set->set_listener(_listener); +#endif } }; @@ -137,9 +138,9 @@ TEST_F(ShardManagerWithShardsTesting, CreateShardAndValidateMembers) { auto pg_iter = ho->_pg_map.find(_pg_id); EXPECT_TRUE(pg_iter != ho->_pg_map.end()); auto& pg = pg_iter->second; - EXPECT_TRUE(pg.shard_sequence_num == 1); - EXPECT_EQ(1, pg.shards.size()); - auto& shard = *pg.shards.begin(); + EXPECT_TRUE(pg->shard_sequence_num_ == 1); + EXPECT_EQ(1, pg->shards_.size()); + auto& shard = *pg->shards_.begin(); EXPECT_TRUE(shard.info == shard_info); EXPECT_TRUE(shard.metablk_cookie != nullptr); } @@ -170,14 +171,15 @@ TEST_F(ShardManagerWithShardsTesting, ListShards) { }); } +#if 0 TEST_F(ShardManagerWithShardsTesting, RollbackCreateShard) { homeobject::HSHomeObject* ho = dynamic_cast< homeobject::HSHomeObject* >(_home_object.get()); auto create_time = ho->get_current_timestamp(); auto shard_info = ShardInfo(100, _pg_id, ShardInfo::State::OPEN, create_time, create_time, Mi, Mi, 0); auto shard = homeobject::Shard(shard_info); nlohmann::json j; - j["shard_info"]["shard_id"] = shard.info.id; - j["shard_info"]["pg_id"] = shard.info.placement_group; + j["shard_info"]["shard_id_t"] = shard.info.id; + j["shard_info"]["pg_id_t"] = shard.info.placement_group; j["shard_info"]["state"] = shard.info.state; j["shard_info"]["created_time"] = shard.info.created_time; j["shard_info"]["modified_time"] = shard.info.last_modified_time; @@ -214,8 +216,8 @@ TEST_F(ShardManagerWithShardsTesting, RollbackCreateShardV2) { auto shard_info = ShardInfo(100, _pg_id, ShardInfo::State::OPEN, create_time, create_time, Mi, Mi, 0); auto shard = homeobject::Shard(shard_info); nlohmann::json j; - j["shard_info"]["shard_id"] = shard.info.id; - j["shard_info"]["pg_id"] = shard.info.placement_group; + j["shard_info"]["shard_id_t"] = shard.info.id; + j["shard_info"]["pg_id_t"] = shard.info.placement_group; j["shard_info"]["state"] = shard.info.state; j["shard_info"]["created_time"] = shard.info.created_time; j["shard_info"]["modified_time"] = shard.info.last_modified_time; @@ -257,8 +259,8 @@ TEST_F(ShardManagerWithShardsTesting, MockSealShard) { auto shard = homeobject::Shard(shard_info); shard.info.state = ShardInfo::State::SEALED; nlohmann::json j; - j["shard_info"]["shard_id"] = shard.info.id; - j["shard_info"]["pg_id"] = shard.info.placement_group; + j["shard_info"]["shard_id_t"] = shard.info.id; + j["shard_info"]["pg_id_t"] = shard.info.placement_group; j["shard_info"]["state"] = shard.info.state; j["shard_info"]["created_time"] = shard.info.created_time; j["shard_info"]["modified_time"] = shard.info.last_modified_time; @@ -350,8 +352,8 @@ TEST_F(ShardManagerWithShardsTesting, ShardManagerRecovery) { EXPECT_EQ(_pg_id, shard_info.placement_group); nlohmann::json shard_json; - shard_json["shard_info"]["shard_id"] = shard_info.id; - shard_json["shard_info"]["pg_id"] = shard_info.placement_group; + shard_json["shard_info"]["shard_id_t"] = shard_info.id; + shard_json["shard_info"]["pg_id_t"] = shard_info.placement_group; shard_json["shard_info"]["state"] = shard_info.state; shard_json["shard_info"]["created_time"] = shard_info.created_time; shard_json["shard_info"]["modified_time"] = shard_info.last_modified_time; @@ -385,6 +387,7 @@ TEST_F(ShardManagerWithShardsTesting, ShardManagerRecovery) { EXPECT_EQ(info.state, ShardInfo::State::OPEN); }); } +#endif int main(int argc, char* argv[]) { int parsed_argc = argc; diff --git a/src/lib/memory/CMakeLists.txt b/src/lib/memory_backend/CMakeLists.txt similarity index 87% rename from src/lib/memory/CMakeLists.txt rename to src/lib/memory_backend/CMakeLists.txt index ac7c7083..e5a5d7f4 100644 --- a/src/lib/memory/CMakeLists.txt +++ b/src/lib/memory_backend/CMakeLists.txt @@ -2,14 +2,15 @@ cmake_minimum_required (VERSION 3.11) add_library ("${PROJECT_NAME}_memory") target_sources("${PROJECT_NAME}_memory" PRIVATE - homeobject.cpp - blob_manager.cpp - shard_manager.cpp - $ - ) + mem_homeobject.cpp + mem_blob_manager.cpp + mem_shard_manager.cpp + mem_pg_manager.cpp + $ +) target_link_libraries("${PROJECT_NAME}_memory" ${COMMON_DEPS} - ) +) if(BUILD_TESTING) add_executable (pg_memory_test) diff --git a/src/lib/memory/blob_manager.cpp b/src/lib/memory_backend/mem_blob_manager.cpp similarity index 98% rename from src/lib/memory/blob_manager.cpp rename to src/lib/memory_backend/mem_blob_manager.cpp index 69d91dd0..59dea1a3 100644 --- a/src/lib/memory/blob_manager.cpp +++ b/src/lib/memory_backend/mem_blob_manager.cpp @@ -1,4 +1,4 @@ -#include "homeobject.hpp" +#include "mem_homeobject.hpp" namespace homeobject { diff --git a/src/lib/memory/homeobject.cpp b/src/lib/memory_backend/mem_homeobject.cpp similarity index 83% rename from src/lib/memory/homeobject.cpp rename to src/lib/memory_backend/mem_homeobject.cpp index ab2e7ced..ce72b51d 100644 --- a/src/lib/memory/homeobject.cpp +++ b/src/lib/memory_backend/mem_homeobject.cpp @@ -1,4 +1,4 @@ -#include "homeobject.hpp" +#include "mem_homeobject.hpp" #include @@ -6,11 +6,10 @@ namespace homeobject { /// NOTE: We give ourselves the option to provide a different HR instance here than libhomeobject.a extern std::shared_ptr< HomeObject > init_homeobject(std::weak_ptr< HomeObjectApplication >&& application) { - auto instance = std::make_shared< MemoryHomeObject >(std::move(application)); - instance->init_repl_svc(); - return instance; + return std::make_shared< MemoryHomeObject >(std::move(application)); } +#if 0 void HomeObjectImpl::init_repl_svc() { auto lg = std::scoped_lock(_repl_lock); if (!_repl_svc) { @@ -20,6 +19,7 @@ void HomeObjectImpl::init_repl_svc() { _repl_svc = home_replication::create_repl_service([](auto) { return nullptr; }); } } +#endif ShardIndex::~ShardIndex() { for (auto it = btree_.begin(); it != btree_.end(); ++it) { diff --git a/src/lib/memory/homeobject.hpp b/src/lib/memory_backend/mem_homeobject.hpp similarity index 54% rename from src/lib/memory/homeobject.hpp rename to src/lib/memory_backend/mem_homeobject.hpp index ab1cc883..364be6ed 100644 --- a/src/lib/memory/homeobject.hpp +++ b/src/lib/memory_backend/mem_homeobject.hpp @@ -4,8 +4,6 @@ #include #include -#include "mocks/repl_service.h" - #include "lib/homeobject_impl.hpp" #include "lib/blob_route.hpp" @@ -25,27 +23,34 @@ struct BlobExt { struct ShardIndex { folly::ConcurrentHashMap< BlobRoute, BlobExt > btree_; - std::atomic< blob_id > shard_seq_num_{0ull}; + std::atomic< blob_id_t > shard_seq_num_{0ull}; ~ShardIndex(); }; class MemoryHomeObject : public HomeObjectImpl { /// Simulates the Shard=>Chunk mapping in IndexSvc - using index_svc = folly::ConcurrentHashMap< shard_id, std::unique_ptr< ShardIndex > >; + using index_svc = folly::ConcurrentHashMap< shard_id_t, std::unique_ptr< ShardIndex > >; index_svc index_; /// /// Helpers // ShardManager - ShardManager::Result< ShardInfo > _create_shard(pg_id, uint64_t size_bytes) override; - ShardManager::Result< ShardInfo > _seal_shard(shard_id) override; + ShardManager::Result< ShardInfo > _create_shard(pg_id_t, uint64_t size_bytes) override; + ShardManager::Result< ShardInfo > _seal_shard(shard_id_t) override; // BlobManager - BlobManager::Result< blob_id > _put_blob(ShardInfo const&, Blob&&) override; - BlobManager::Result< Blob > _get_blob(ShardInfo const&, blob_id) const override; - BlobManager::NullResult _del_blob(ShardInfo const&, blob_id) override; + BlobManager::Result< blob_id_t > _put_blob(ShardInfo const&, Blob&&) override; + BlobManager::Result< Blob > _get_blob(ShardInfo const&, blob_id_t) const override; + BlobManager::NullResult _del_blob(ShardInfo const&, blob_id_t) override; /// + // PGManager + PGManager::NullAsyncResult _create_pg(PGInfo&& pg_info, std::set< std::string, std::less<> > peers) override; + PGManager::NullAsyncResult _replace_member(pg_id_t id, peer_id_t const& old_member, + PGMember const& new_member) override; + + ShardIndex& _find_index(shard_id_t) const; + public: using HomeObjectImpl::HomeObjectImpl; ~MemoryHomeObject() override = default; diff --git a/src/lib/memory_backend/mem_pg_manager.cpp b/src/lib/memory_backend/mem_pg_manager.cpp new file mode 100644 index 00000000..d57145ae --- /dev/null +++ b/src/lib/memory_backend/mem_pg_manager.cpp @@ -0,0 +1,15 @@ +#include "mem_homeobject.hpp" + +namespace homeobject { +PGManager::NullAsyncResult MemoryHomeObject::_create_pg(PGInfo&& pg_info, std::set< std::string, std::less<> >) { + auto lg = std::scoped_lock(_pg_lock); + auto [it1, _] = _pg_map.try_emplace(pg_info.id, std::make_shared< PG >(pg_info)); + RELEASE_ASSERT(_pg_map.end() != it1, "Unknown map insert error!"); + return folly::makeSemiFuture< PGManager::NullResult >(folly::Unit()); +} + +PGManager::NullAsyncResult MemoryHomeObject::_replace_member(pg_id_t id, peer_id_t const& old_member, + PGMember const& new_member) { + return folly::makeSemiFuture< PGManager::NullResult >(folly::makeUnexpected(PGError::UNSUPPORTED_OP)); +} +} // namespace homeobject \ No newline at end of file diff --git a/src/lib/memory/shard_manager.cpp b/src/lib/memory_backend/mem_shard_manager.cpp similarity index 91% rename from src/lib/memory/shard_manager.cpp rename to src/lib/memory_backend/mem_shard_manager.cpp index 47ff20ea..e05d7947 100644 --- a/src/lib/memory/shard_manager.cpp +++ b/src/lib/memory_backend/mem_shard_manager.cpp @@ -1,12 +1,12 @@ #include -#include "homeobject.hpp" +#include "mem_homeobject.hpp" namespace homeobject { uint64_t ShardManager::max_shard_size() { return Gi; } -ShardManager::Result< ShardInfo > MemoryHomeObject::_create_shard(pg_id pg_owner, uint64_t size_bytes) { +ShardManager::Result< ShardInfo > MemoryHomeObject::_create_shard(pg_id_t pg_owner, uint64_t size_bytes) { auto const now = get_current_timestamp(); auto info = ShardInfo(0ull, pg_owner, ShardInfo::State::OPEN, now, now, size_bytes, size_bytes, 0); { @@ -14,7 +14,7 @@ ShardManager::Result< ShardInfo > MemoryHomeObject::_create_shard(pg_id pg_owner auto pg_it = _pg_map.find(pg_owner); if (_pg_map.end() == pg_it) return folly::makeUnexpected(ShardError::UNKNOWN_PG); - auto& s_list = pg_it->second.shards; + auto& s_list = pg_it->second->shards_; info.id = make_new_shard_id(pg_owner, s_list.size()); auto iter = s_list.emplace(s_list.end(), Shard(info)); LOGDEBUG("Creating Shard [{}]: in Pg [{}] of Size [{}b]", info.id & shard_mask, pg_owner, size_bytes); @@ -26,7 +26,7 @@ ShardManager::Result< ShardInfo > MemoryHomeObject::_create_shard(pg_id pg_owner return info; } -ShardManager::Result< ShardInfo > MemoryHomeObject::_seal_shard(shard_id id) { +ShardManager::Result< ShardInfo > MemoryHomeObject::_seal_shard(shard_id_t id) { auto lg = std::scoped_lock(_shard_lock); auto shard_it = _shard_map.find(id); RELEASE_ASSERT(_shard_map.end() != shard_it, "Missing ShardIterator!"); diff --git a/src/lib/pg_manager.cpp b/src/lib/pg_manager.cpp index abae0633..d203da94 100644 --- a/src/lib/pg_manager.cpp +++ b/src/lib/pg_manager.cpp @@ -1,51 +1,10 @@ -#include "homeobject_impl.hpp" - -#include "mocks/repl_service.h" #include -using home_replication::ReplServiceError; +#include "homeobject_impl.hpp" namespace homeobject { -PGError toPgError(ReplServiceError const& e) { - switch (e) { - case ReplServiceError::BAD_REQUEST: - [[fallthrough]]; - case ReplServiceError::CANCELLED: - [[fallthrough]]; - case ReplServiceError::CONFIG_CHANGING: - [[fallthrough]]; - case ReplServiceError::SERVER_ALREADY_EXISTS: - [[fallthrough]]; - case ReplServiceError::SERVER_IS_JOINING: - [[fallthrough]]; - case ReplServiceError::SERVER_IS_LEAVING: - [[fallthrough]]; - case ReplServiceError::RESULT_NOT_EXIST_YET: - [[fallthrough]]; - case ReplServiceError::NOT_LEADER: - [[fallthrough]]; - case ReplServiceError::TERM_MISMATCH: - return PGError::INVALID_ARG; - case ReplServiceError::CANNOT_REMOVE_LEADER: - return PGError::UNKNOWN_PEER; - case ReplServiceError::TIMEOUT: - return PGError::TIMEOUT; - case ReplServiceError::SERVER_NOT_FOUND: - return PGError::UNKNOWN_PG; - case ReplServiceError::OK: - DEBUG_ASSERT(false, "Should not process OK!"); - [[fallthrough]]; - case ReplServiceError::FAILED: - return PGError::UNKNOWN; - } - return PGError::UNKNOWN; -} - -std::shared_ptr< PGManager > HomeObjectImpl::pg_manager() { - init_repl_svc(); - return shared_from_this(); -} +std::shared_ptr< PGManager > HomeObjectImpl::pg_manager() { return shared_from_this(); } PGManager::NullAsyncResult HomeObjectImpl::create_pg(PGInfo&& pg_info) { LOGINFO("Creating PG: [{}] of [{}] members", pg_info.id, pg_info.members.size()); @@ -59,21 +18,10 @@ PGManager::NullAsyncResult HomeObjectImpl::create_pg(PGInfo&& pg_info) { } if (!saw_ourself || !saw_leader) return folly::makeUnexpected(PGError::INVALID_ARG); - return _repl_svc->create_replica_set(fmt::format("{}", pg_info.id), std::move(peers)) - .via(folly::getGlobalCPUExecutor()) - .thenValue([this, pg_info = std::move(pg_info)]( - home_replication::ReplicationService::set_var const& v) -> PGManager::NullResult { - if (std::holds_alternative< home_replication::ReplServiceError >(v)) - return folly::makeUnexpected(PGError::INVALID_ARG); - auto pg = PG(std::move(pg_info)); - auto lg = std::scoped_lock(_pg_lock); - auto [it, _] = _pg_map.try_emplace(pg_info.id, std::move(pg)); - RELEASE_ASSERT(_pg_map.end() != it, "Unknown map insert error!"); - return folly::Unit(); - }); + return _create_pg(std::move(pg_info), std::move(peers)); } -PGManager::NullAsyncResult HomeObjectImpl::replace_member(pg_id id, peer_id const& old_member, +PGManager::NullAsyncResult HomeObjectImpl::replace_member(pg_id_t id, peer_id_t const& old_member, PGMember const& new_member) { LOGINFO("Replacing PG: [{}] member [{}] with [{}]", id, to_string(old_member), to_string(new_member.id)); if (old_member == new_member.id) { @@ -86,18 +34,13 @@ PGManager::NullAsyncResult HomeObjectImpl::replace_member(pg_id id, peer_id cons return folly::makeUnexpected(PGError::INVALID_ARG); } - return _repl_svc->replace_member(fmt::format("{}", id), to_string(old_member), to_string(new_member.id)) - .deferValue([](ReplServiceError const& e) -> PGManager::NullResult { - if (ReplServiceError::OK != e) return folly::makeUnexpected(toPgError(e)); - return folly::Unit(); - }); + return _replace_member(id, old_member, new_member); } -PGManager::Result< PG > HomeObjectImpl::_get_pg(pg_id pg) { - std::scoped_lock lock_guard(_pg_lock); +PGManager::Result< PG const* > HomeObjectImpl::get_pg(pg_id_t pg) const { + std::shared_lock lock_guard(_pg_lock); auto iter = _pg_map.find(pg); - if (iter == _pg_map.end()) { return folly::makeUnexpected(PGError::UNKNOWN_PG); } - return iter->second; + if (iter == _pg_map.cend()) { return folly::makeUnexpected(PGError::UNKNOWN_PG); } + return iter->second.get(); } - } // namespace homeobject diff --git a/src/lib/shard_manager.cpp b/src/lib/shard_manager.cpp index 19d8e2e1..183b0f57 100644 --- a/src/lib/shard_manager.cpp +++ b/src/lib/shard_manager.cpp @@ -2,12 +2,9 @@ namespace homeobject { -std::shared_ptr< ShardManager > HomeObjectImpl::shard_manager() { - init_repl_svc(); - return shared_from_this(); -} +std::shared_ptr< ShardManager > HomeObjectImpl::shard_manager() { return shared_from_this(); } -ShardManager::AsyncResult< ShardInfo > HomeObjectImpl::create_shard(pg_id pg_owner, uint64_t size_bytes) { +ShardManager::AsyncResult< ShardInfo > HomeObjectImpl::create_shard(pg_id_t pg_owner, uint64_t size_bytes) { if (0 == size_bytes || max_shard_size() < size_bytes) return folly::makeUnexpected(ShardError::INVALID_ARG); return _defer().thenValue([this, pg_owner, size_bytes](auto) mutable -> ShardManager::Result< ShardInfo > { @@ -15,14 +12,14 @@ ShardManager::AsyncResult< ShardInfo > HomeObjectImpl::create_shard(pg_id pg_own }); } -ShardManager::AsyncResult< InfoList > HomeObjectImpl::list_shards(pg_id pg) const { - return _defer().thenValue([this, pg](auto) mutable -> ShardManager::Result< InfoList > { - auto lg = std::shared_lock(_pg_lock); - auto pg_it = _pg_map.find(pg); - if (_pg_map.end() == pg_it) return folly::makeUnexpected(ShardError::UNKNOWN_PG); +ShardManager::AsyncResult< InfoList > HomeObjectImpl::list_shards(pg_id_t pgid) const { + return _defer().thenValue([this, pgid](auto) mutable -> ShardManager::Result< InfoList > { + auto v = get_pg(pgid); + if (v.hasError()) { return folly::makeUnexpected(ShardError::UNKNOWN_PG); } + PG const* pg = v.value(); auto info_l = std::list< ShardInfo >(); - for (auto const& shard : pg_it->second.shards) { + for (auto const& shard : pg->shards_) { LOGDEBUG("Listing Shard {}", shard.info.id); info_l.push_back(shard.info); } @@ -30,7 +27,7 @@ ShardManager::AsyncResult< InfoList > HomeObjectImpl::list_shards(pg_id pg) cons }); } -ShardManager::AsyncResult< ShardInfo > HomeObjectImpl::seal_shard(shard_id id) { +ShardManager::AsyncResult< ShardInfo > HomeObjectImpl::seal_shard(shard_id_t id) { return _get_shard(id).thenValue([this](auto const e) mutable -> ShardManager::Result< ShardInfo > { if (!e) return folly::makeUnexpected(ShardError::UNKNOWN_SHARD); if (ShardInfo::State::SEALED == e.value().info.state) return e.value().info; @@ -38,7 +35,7 @@ ShardManager::AsyncResult< ShardInfo > HomeObjectImpl::seal_shard(shard_id id) { }); } -ShardManager::AsyncResult< ShardInfo > HomeObjectImpl::get_shard(shard_id id) const { +ShardManager::AsyncResult< ShardInfo > HomeObjectImpl::get_shard(shard_id_t id) const { return _get_shard(id).thenValue([this](auto const e) mutable -> ShardManager::Result< ShardInfo > { if (!e) { return folly::makeUnexpected(e.error()); } return e.value().info; @@ -48,7 +45,7 @@ ShardManager::AsyncResult< ShardInfo > HomeObjectImpl::get_shard(shard_id id) co /// // This is used as a first call for many operations and initializes the Future. // -folly::Future< ShardManager::Result< Shard > > HomeObjectImpl::_get_shard(shard_id id) const { +folly::Future< ShardManager::Result< Shard > > HomeObjectImpl::_get_shard(shard_id_t id) const { return _defer().thenValue([this, id](auto) -> ShardManager::Result< Shard > { auto lg = std::shared_lock(_shard_lock); if (auto it = _shard_map.find(id); _shard_map.end() != it) return *(it->second); diff --git a/test_package/test_package.cpp b/test_package/test_package.cpp index 01f91d26..8526c932 100644 --- a/test_package/test_package.cpp +++ b/test_package/test_package.cpp @@ -11,10 +11,10 @@ class TestApp : public homeobject::HomeObjectApplication { bool spdk_mode() const override { return false; } uint32_t threads() const override { return 1; } std::list< std::filesystem::path > devices() const override { return std::list< std::filesystem::path >(); } - homeobject::peer_id discover_svcid(std::optional< homeobject::peer_id > const& p) const override { + homeobject::peer_id_t discover_svcid(std::optional< homeobject::peer_id_t > const& p) const override { return boost::uuids::random_generator()(); } - std::string lookup_peer(homeobject::peer_id const&) const override { return "test_package.com"; } + std::string lookup_peer(homeobject::peer_id_t const&) const override { return "test_package.com"; } }; int main(int argc, char** argv) { @@ -22,7 +22,7 @@ int main(int argc, char** argv) { sisl::logging::SetLogger(std::string(argv[0])); spdlog::set_pattern("[%D %T%z] [%^%l%$] [%n] [%t] %v"); - auto a = std::make_shared(); + auto a = std::make_shared< TestApp >(); homeobject::init_homeobject(a); return 0; }