From 319a3d5493a4813038fa7616c1823a26721ab060 Mon Sep 17 00:00:00 2001 From: Harihara Kadayam Date: Wed, 20 Sep 2023 19:51:59 -0700 Subject: [PATCH] SoloReplDev - a pass through replication device (#177) Provide implementation of ReplicationService and introduced a SoloReplDev implementation to have a non-replicated version of same APIs. As a result the HomeStore startup is modified as well. --- conanfile.py | 2 +- src/CMakeLists.txt | 2 + src/include/homestore/blkdata_service.hpp | 15 +- src/include/homestore/checkpoint/cp_mgr.hpp | 8 +- src/include/homestore/homestore.hpp | 51 ++- src/include/homestore/homestore_decl.hpp | 34 +- src/include/homestore/index_service.hpp | 5 +- .../homestore/replication/repl_decls.h | 8 +- src/include/homestore/replication/repl_dev.h | 31 +- src/include/homestore/replication_service.hpp | 49 ++- src/lib/blkdata_svc/blkdata_service.cpp | 18 +- src/lib/checkpoint/cp_mgr.cpp | 19 +- src/lib/device/virtual_dev.cpp | 15 +- src/lib/device/virtual_dev.hpp | 5 +- src/lib/homestore.cpp | 98 ++++-- src/lib/meta/meta_blk_service.cpp | 34 +- src/lib/replication/CMakeLists.txt | 11 + .../replication/repl_dev/solo_repl_dev.cpp | 136 ++++++++ src/lib/replication/repl_dev/solo_repl_dev.h | 121 +++++++ src/lib/replication/repl_service.cpp | 122 ------- .../replication/service/repl_service_impl.cpp | 122 +++++++ .../replication/service/repl_service_impl.h | 71 ++++ src/tests/CMakeLists.txt | 8 +- src/tests/log_store_benchmark.cpp | 9 +- src/tests/test_append_blkalloc.cpp | 20 +- .../test_common/homestore_test_common.hpp | 135 ++++--- src/tests/test_cp_mgr.cpp | 4 +- src/tests/test_data_service.cpp | 37 +- src/tests/test_index_btree.cpp | 49 +-- src/tests/test_journal_vdev.cpp | 10 +- src/tests/test_log_store.cpp | 8 +- src/tests/test_meta_blk_mgr.cpp | 19 +- src/tests/test_solo_repl_dev.cpp | 330 ++++++++++++++++++ 33 files changed, 1201 insertions(+), 405 deletions(-) create mode 100644 src/lib/replication/CMakeLists.txt create mode 100644 src/lib/replication/repl_dev/solo_repl_dev.cpp create mode 100644 src/lib/replication/repl_dev/solo_repl_dev.h delete mode 100644 src/lib/replication/repl_service.cpp create mode 100644 src/lib/replication/service/repl_service_impl.cpp create mode 100644 src/lib/replication/service/repl_service_impl.h create mode 100644 src/tests/test_solo_repl_dev.cpp diff --git a/conanfile.py b/conanfile.py index bf14c486e..19d52eb90 100644 --- a/conanfile.py +++ b/conanfile.py @@ -5,7 +5,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "4.3.2" + version = "4.4.0" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index ea2bf5bea..30329c54e 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -35,6 +35,7 @@ add_subdirectory(lib/logstore) add_subdirectory(lib/meta) add_subdirectory(lib/index) add_subdirectory(lib/blkdata_svc/) +add_subdirectory(lib/replication/) add_subdirectory(tests) set(HOMESTORE_OBJECTS @@ -46,6 +47,7 @@ set(HOMESTORE_OBJECTS $ $ $ + $ lib/homestore.cpp #$ #$ diff --git a/src/include/homestore/blkdata_service.hpp b/src/include/homestore/blkdata_service.hpp index 3cb2987ca..faac2e426 100644 --- a/src/include/homestore/blkdata_service.hpp +++ b/src/include/homestore/blkdata_service.hpp @@ -34,13 +34,11 @@ struct vdev_info; struct stream_info_t; class BlkReadTracker; struct blk_alloc_hints; - -using blk_t = uint64_t; -using blk_list_t = folly::small_vector< blk_t, 4 >; +class ChunkSelector; class BlkDataService { public: - BlkDataService(); + BlkDataService(shared< ChunkSelector > custom_chunk_selector); ~BlkDataService(); /** @@ -48,8 +46,8 @@ class BlkDataService { * * @param size : size of this vdev */ - void create_vdev(uint64_t size, homestore::blk_allocator_type_t alloc_type, - homestore::chunk_selector_type_t chunk_sel_type); + void create_vdev(uint64_t size, uint32_t blk_size, blk_allocator_type_t alloc_type, + chunk_selector_type_t chunk_sel_type); /** * @brief : called during recovery to open existing vdev for data service @@ -113,7 +111,7 @@ class BlkDataService { * * @return : the block list that have the blocks; */ - blk_list_t alloc_blks(uint32_t size); + BlkAllocStatus alloc_blks(uint32_t size, blk_alloc_hints const& hints, MultiBlkId& out_blkids); /** * @brief : asynchronous free block, it is asynchronous because it might need to wait for pending read to complete @@ -144,8 +142,6 @@ class BlkDataService { void start(); private: - BlkAllocStatus alloc_blks(uint32_t size, blk_alloc_hints const& hints, MultiBlkId& out_blkids); - void init(); static void process_data_completion(std::error_condition ec, void* cookie); @@ -153,6 +149,7 @@ class BlkDataService { private: std::shared_ptr< VirtualDev > m_vdev; std::unique_ptr< BlkReadTracker > m_blk_read_tracker; + std::shared_ptr< ChunkSelector > m_custom_chunk_selector; uint32_t m_blk_size; }; diff --git a/src/include/homestore/checkpoint/cp_mgr.hpp b/src/include/homestore/checkpoint/cp_mgr.hpp index 0e79d6af9..322f3411e 100644 --- a/src/include/homestore/checkpoint/cp_mgr.hpp +++ b/src/include/homestore/checkpoint/cp_mgr.hpp @@ -83,12 +83,13 @@ class CPCallbacks { virtual std::unique_ptr< CPContext > on_switchover_cp(CP* cur_cp, CP* new_cp) = 0; /// @brief After gathering CPContext from all consumers, CPManager calls this method to flush the dirty buffers - /// accumulated in this CP. Once CP flush is completed, consumers are required to call the flush_done callback. + /// accumulated in this CP. Once CP flush is completed, consumers are required to set the promise corresponding to + /// returned future. /// @param cp CP pointer to which the dirty buffers have to be flushed /// @param done_cb Callback after cp is done virtual folly::Future< bool > cp_flush(CP* cp) = 0; - /// @brief After flushed the CP, CPManager calls this method to clean up any CP related structures + /// @brief After all consumers flushed the CP, CPManager calls this method to clean up any CP related structures /// @param cp virtual void cp_cleanup(CP* cp) = 0; @@ -164,9 +165,10 @@ class CPManager { std::vector< iomgr::io_fiber_t > m_cp_io_fibers; public: - CPManager(bool first_time_boot); + CPManager(); virtual ~CPManager(); + void start(bool first_time_boot); /// @brief Shutdown the checkpoint manager services. It will not trigger a flush, but cancels any existing /// checkpoint session abruptly. If caller needs clean shutdown, then they explicitly needs to trigger cp flush /// before calling shutdown. diff --git a/src/include/homestore/homestore.hpp b/src/include/homestore/homestore.hpp index b4100e56e..b03a62dc6 100644 --- a/src/include/homestore/homestore.hpp +++ b/src/include/homestore/homestore.hpp @@ -44,11 +44,14 @@ class MetaBlkService; class LogStoreService; class BlkDataService; class IndexService; +class ReplicationServiceImpl; class IndexServiceCallbacks; +class ReplServiceCallbacks; struct vdev_info; class HomeStore; class CPManager; class VirtualDev; +class ChunkSelector; using HomeStoreSafePtr = std::shared_ptr< HomeStore >; @@ -63,8 +66,37 @@ struct hs_vdev_context { }; #pragma pack() -typedef std::function< void(void) > hs_before_services_starting_cb_t; -typedef std::function< void(bool success) > hs_comp_callback; +using hs_before_services_starting_cb_t = std::function< void(void) >; + +struct HS_SERVICE { + static constexpr uint32_t META = 1 << 0; + static constexpr uint32_t LOG_REPLICATED = 1 << 1; + static constexpr uint32_t LOG_LOCAL = 1 << 2; + static constexpr uint32_t DATA = 1 << 3; + static constexpr uint32_t INDEX = 1 << 4; + static constexpr uint32_t REPLICATION = 1 << 5; + + uint32_t svcs; + + HS_SERVICE() : svcs{META} {} + + std::string list() const { + std::string str; + if (svcs & META) { str += "meta,"; } + if (svcs & DATA) { str += "data,"; } + if (svcs & INDEX) { str += "index,"; } + if (svcs & LOG_REPLICATED) { str += "log_replicated,"; } + if (svcs & LOG_LOCAL) { str += "log_local,"; } + if (svcs & REPLICATION) { str += "replication,"; } + return str; + } +}; + +VENUM(repl_impl_type, uint8_t, + server_side, // Completely homestore controlled replication + client_assisted, // Client assisting in replication + solo // For single node - no replication +); /* * IO errors handling by homestore. @@ -82,6 +114,7 @@ class HomeStore { std::unique_ptr< MetaBlkService > m_meta_service; std::unique_ptr< LogStoreService > m_log_service; std::unique_ptr< IndexService > m_index_service; + std::unique_ptr< ReplicationServiceImpl > m_repl_service; std::unique_ptr< DeviceManager > m_dev_mgr; shared< sisl::logging::logger_t > m_periodic_logger; @@ -90,8 +123,7 @@ class HomeStore { std::unique_ptr< CPManager > m_cp_mgr; shared< sisl::Evictor > m_evictor; - bool m_vdev_failed{false}; - + HS_SERVICE m_services; // Services homestore is starting with hs_before_services_starting_cb_t m_before_services_starting_cb{nullptr}; public: @@ -109,11 +141,14 @@ class HomeStore { static shared< spdlog::logger >& periodic_logger() { return instance()->m_periodic_logger; } ///////////////////////////// Member functions ///////////////////////////////////////////// - bool start(const hs_input_params& input, hs_before_services_starting_cb_t svcs_starting_cb = nullptr, - std::unique_ptr< IndexServiceCallbacks > cbs = nullptr); + HomeStore& with_data_service(cshared< ChunkSelector >& custom_chunk_selector = nullptr); + HomeStore& with_log_service(); + HomeStore& with_index_service(std::unique_ptr< IndexServiceCallbacks > cbs); + HomeStore& with_repl_data_service(repl_impl_type repl_type, std::unique_ptr< ReplServiceCallbacks > cbs, + cshared< ChunkSelector >& custom_chunk_selector = nullptr); + bool start(const hs_input_params& input, hs_before_services_starting_cb_t svcs_starting_cb = nullptr); void format_and_start(std::map< uint32_t, hs_format_params >&& format_opts); - void shutdown(); // cap_attrs get_system_capacity() const; // Need to move this to homeblks/homeobj @@ -124,11 +159,13 @@ class HomeStore { bool has_data_service() const; bool has_meta_service() const; bool has_log_service() const; + bool has_repl_data_service() const; BlkDataService& data_service() { return *m_data_service; } MetaBlkService& meta_service() { return *m_meta_service; } LogStoreService& logstore_service() { return *m_log_service; } IndexService& index_service() { return *m_index_service; } + ReplicationServiceImpl& repl_service() { return *m_repl_service; } DeviceManager* device_mgr() { return m_dev_mgr.get(); } ResourceMgr& resource_mgr() { return *m_resource_mgr.get(); } CPManager& cp_mgr() { return *m_cp_mgr.get(); } diff --git a/src/include/homestore/homestore_decl.hpp b/src/include/homestore/homestore_decl.hpp index 2b227d08b..5cd177d30 100644 --- a/src/include/homestore/homestore_decl.hpp +++ b/src/include/homestore/homestore_decl.hpp @@ -97,7 +97,6 @@ ENUM(blk_allocator_type_t, uint8_t, none, fixed, varsize, append); ENUM(chunk_selector_type_t, uint8_t, // What are the options to select chunk to allocate a block NONE, // Caller want nothing to be set ROUND_ROBIN, // Pick round robin - HEAP, // Heap chunk selector CUSTOM, // Controlled by the upper layer RANDOM, // Pick any chunk in uniformly random fashion MOST_AVAILABLE_SPACE, // Pick the most available space @@ -143,40 +142,10 @@ static std::string in_bytes(uint64_t sz) { return _format_decimals(size / arr.back().first, arr.back().second); } -struct HS_SERVICE { - static constexpr uint32_t META = 1 << 0; - static constexpr uint32_t LOG_REPLICATED = 1 << 1; - static constexpr uint32_t LOG_LOCAL = 1 << 2; - static constexpr uint32_t DATA = 1 << 3; - static constexpr uint32_t INDEX = 1 << 4; - static constexpr uint32_t REPLICATION = 1 << 5; - - uint32_t svcs; - - HS_SERVICE() : svcs{META} {} - HS_SERVICE(uint32_t val) : svcs{val} { - svcs |= META; // Force meta to be present always - if (svcs & REPLICATION) { - svcs |= LOG_REPLICATED | LOG_LOCAL; - svcs &= ~DATA; // ReplicationDataSvc or DataSvc only one of them - } - } - - std::string list() const { - std::string str; - if (svcs & META) { str += "meta,"; } - if (svcs & DATA) { str += "data,"; } - if (svcs & INDEX) { str += "index,"; } - if (svcs & LOG_REPLICATED) { str += "log_replicated,"; } - if (svcs & LOG_LOCAL) { str += "log_local,"; } - if (svcs & REPLICATION) { str += "replication,"; } - return str; - } -}; - struct hs_format_params { float size_pct; uint32_t num_chunks{1}; + uint32_t block_size{0}; blk_allocator_type_t alloc_type{blk_allocator_type_t::varsize}; chunk_selector_type_t chunk_sel_type{chunk_selector_type_t::ROUND_ROBIN}; }; @@ -192,7 +161,6 @@ struct hs_input_params { uint64_t hugepage_size{0}; // memory available for the hugepage bool is_read_only{false}; // Is read only bool auto_recovery{true}; // Recovery of data is automatic or controlled by the caller - HS_SERVICE services; // Services homestore is starting with #ifdef _PRERELEASE bool force_reinit{false}; diff --git a/src/include/homestore/index_service.hpp b/src/include/homestore/index_service.hpp index cdeb3d3ba..b4f551a8e 100644 --- a/src/include/homestore/index_service.hpp +++ b/src/include/homestore/index_service.hpp @@ -32,7 +32,10 @@ class VirtualDev; class IndexServiceCallbacks { public: virtual ~IndexServiceCallbacks() = default; - virtual std::shared_ptr< IndexTableBase > on_index_table_found(const superblk< index_table_sb >& cb) = 0; + virtual std::shared_ptr< IndexTableBase > on_index_table_found(superblk< index_table_sb > const&) { + assert(0); + return nullptr; + } }; class IndexService { diff --git a/src/include/homestore/replication/repl_decls.h b/src/include/homestore/replication/repl_decls.h index 1c659d6ad..c496c37e2 100644 --- a/src/include/homestore/replication/repl_decls.h +++ b/src/include/homestore/replication/repl_decls.h @@ -4,7 +4,6 @@ #include #include -#include #include #include #include @@ -18,9 +17,10 @@ using blkid_list_t = folly::small_vector< BlkId, 4 >; // Fully qualified domain pba, unique pba id across replica set struct RemoteBlkId { + RemoteBlkId() = default; RemoteBlkId(uint32_t s, const BlkId& b) : server_id{s}, blkid{b} {} - uint32_t server_id; - BlkId blkid; + uint32_t server_id{0}; + MultiBlkId blkid; bool operator==(RemoteBlkId const& o) const { return (server_id == o.server_id) && (blkid == o.blkid); } }; @@ -38,7 +38,7 @@ namespace std { template <> struct hash< homestore::RemoteBlkId > { size_t operator()(homestore::RemoteBlkId const& fqbid) const noexcept { - return std::hash< uint64_t >()(fqbid.server_id) + std::hash< uint64_t >()(fqbid.blkid.to_integer()); + return std::hash< uint64_t >()(fqbid.server_id) + std::hash< homestore::MultiBlkId >()(fqbid.blkid); } }; } // namespace std diff --git a/src/include/homestore/replication/repl_dev.h b/src/include/homestore/replication/repl_dev.h index 6b459d10a..d3ab9ea46 100644 --- a/src/include/homestore/replication/repl_dev.h +++ b/src/include/homestore/replication/repl_dev.h @@ -1,12 +1,11 @@ #pragma once -#include +// #include #include #include -namespace home_replication { - +namespace homestore { // // Callbacks to be implemented by ReplDev users. // @@ -25,7 +24,7 @@ class ReplDevListener { /// @param blkids - List of blkids where data is written to the storage engine. /// @param ctx - User contenxt passed as part of the replica_set::write() api /// - virtual void on_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, blkid_list_t const& blkids, + virtual void on_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, MultiBlkId const& blkids, void* ctx) = 0; /// @brief Called when the log entry has been received by the replica dev. @@ -71,7 +70,7 @@ class ReplDevListener { /// /// @param header Header originally passed with repl_dev::write() api on the leader /// @return Expected to return blk_alloc_hints for this write - virtual blk_alloc_hints get_blk_alloc_hints(sisl::blob const& header) = 0; + virtual blk_alloc_hints get_blk_alloc_hints(sisl::blob const& header, void* user_ctx) = 0; /// @brief Called when the replica set is being stopped virtual void on_replica_stop() = 0; @@ -79,6 +78,7 @@ class ReplDevListener { class ReplDev { public: + ReplDev() = default; virtual ~ReplDev() = default; /// @brief Replicate the data to the replica set. This method goes through the @@ -100,7 +100,7 @@ class ReplDev { /// list size is 0, then only key is written to replicadev without data. /// @param user_ctx - User supplied opaque context which will be passed to listener /// callbacks - virtual void async_alloc_write(const sisl::blob& header, const sisl::blob& key, const sisl::sg_list& value, + virtual void async_alloc_write(sisl::blob const& header, sisl::blob const& key, sisl::sg_list const& value, void* user_ctx) = 0; /// @brief Reads the data and returns a future to continue on @@ -109,16 +109,16 @@ class ReplDev { /// @param size Total size of the data read /// @param part_of_batch Is read is part of a batch. If part of the batch, then submit_batch needs to be called at /// the end - /// @return A Future with bool to notify if it has successfully read the data, raises the exception in case of - /// failure - virtual folly::Future< bool > async_read(const BlkId& bid, sisl::sg_list& sgs, uint32_t size, - bool part_of_batch = false); + /// @return A Future with std::error_code to notify if it has successfully read the data or any error code in case + /// of failure + virtual folly::Future< std::error_code > async_read(MultiBlkId const& blkid, sisl::sg_list& sgs, uint32_t size, + bool part_of_batch = false) = 0; /// @brief After data is replicated and on_commit to the listener is called. the blkids can be freed. /// /// @param lsn - LSN of the old blkids that is being freed /// @param blkids - blkids to be freed. - virtual void async_free_blks(int64_t lsn, const blkid_list_t& blkids) = 0; + virtual void async_free_blks(int64_t lsn, MultiBlkId const& blkid) = 0; /// @brief Checks if this replica is the leader in this ReplDev /// @return true or false @@ -126,7 +126,12 @@ class ReplDev { /// @brief Gets the group_id this repldev is working for /// @return group_id - virtual std::string group_id() const = 0; + virtual uuid_t group_id() const = 0; + + virtual void attach_listener(std::unique_ptr< ReplDevListener > listener) { m_listener = std::move(listener); } + +protected: + std::unique_ptr< ReplDevListener > m_listener; }; -} // namespace home_replication +} // namespace homestore diff --git a/src/include/homestore/replication_service.hpp b/src/include/homestore/replication_service.hpp index cb27e9306..276b92a31 100644 --- a/src/include/homestore/replication_service.hpp +++ b/src/include/homestore/replication_service.hpp @@ -6,18 +6,31 @@ #include -#include "repl_decls.h" -#include "repl_set.h" - -namespace nuraft { -class state_machine; -} +#include namespace homestore { +// clang-format off +VENUM(ReplServiceError, int32_t, + OK = 0, // Everything OK + CANCELLED = -1, // Request was cancelled + TIMEOUT = -2, + NOT_LEADER = -3, + BAD_REQUEST = -4, + SERVER_ALREADY_EXISTS = -5, + CONFIG_CHANGING = -6, + SERVER_IS_JOINING = -7, + SERVER_NOT_FOUND = -8, + CANNOT_REMOVE_LEADER = -9, + SERVER_IS_LEAVING = -10, + TERM_MISMATCH = -11, + RESULT_NOT_EXIST_YET = -10000, + NOT_IMPLEMENTED = -10001, + FAILED = -32768); +// clang-format on + class ReplDev; -using ReplServiceError = nuraft::cmd_result_code; -using on_replica_dev_init_t = std::function< std::unique_ptr< ReplicaDevListener >(cshared< ReplDev >& rd) >; +class ReplDevListener; template < typename V, typename E > using Result = folly::Expected< V, E >; @@ -29,24 +42,28 @@ template < class V > using ReplResult = Result< V, ReplServiceError >; template < class V > -using ReplAsyncResult = AsyncResult< V, ReplServiceError >; +using AsyncReplResult = AsyncResult< V, ReplServiceError >; + +class ReplServiceCallbacks { +public: + virtual ~ReplServiceCallbacks() = default; + virtual std::unique_ptr< ReplDevListener > on_repl_dev_init(cshared< ReplDev >& rs) = 0; +}; class ReplicationService { public: ReplicationService() = default; virtual ~ReplicationService() = default; - // using set_var = std::variant< shared< ReplDev >, ReplServiceError >; - /// Sync APIs - virtual shared< ReplDev > get_replica_dev(std::string const& group_id) const = 0; - virtual void iterate_replica_devs(std::function< void(cshared< ReplDev >&) > cb) const = 0; + virtual ReplResult< shared< ReplDev > > get_replica_dev(uuid_t group_id) const = 0; + virtual void iterate_replica_devs(std::function< void(cshared< ReplDev >&) > const& cb) = 0; /// Async APIs - virtual ReplAsyncResult< shared< ReplDev > > create_replica_dev(std::string const& group_id, + virtual AsyncReplResult< shared< ReplDev > > create_replica_dev(uuid_t group_id, std::set< std::string, std::less<> >&& members) = 0; - virtual folly::SemiFuture< ReplServiceError > - replace_member(std::string const& group_id, std::string const& member_out, std::string const& member_in) const = 0; + virtual folly::SemiFuture< ReplServiceError > replace_member(uuid_t group_id, std::string const& member_out, + std::string const& member_in) const = 0; }; } // namespace homestore diff --git a/src/lib/blkdata_svc/blkdata_service.cpp b/src/lib/blkdata_svc/blkdata_service.cpp index 33ed6fede..6a596eb54 100644 --- a/src/lib/blkdata_svc/blkdata_service.cpp +++ b/src/lib/blkdata_svc/blkdata_service.cpp @@ -15,6 +15,8 @@ *********************************************************************************/ #include #include +#include + #include "device/chunk.h" #include "device/virtual_dev.hpp" #include "device/physical_dev.hpp" // vdev_info_block @@ -28,21 +30,24 @@ namespace homestore { BlkDataService& data_service() { return hs()->data_service(); } -BlkDataService::BlkDataService() { m_blk_read_tracker = std::make_unique< BlkReadTracker >(); } +BlkDataService::BlkDataService(shared< ChunkSelector > chunk_selector) : + m_custom_chunk_selector{std::move(chunk_selector)} { + m_blk_read_tracker = std::make_unique< BlkReadTracker >(); +} BlkDataService::~BlkDataService() = default; // first-time boot path -void BlkDataService::create_vdev(uint64_t size, blk_allocator_type_t alloc_type, chunk_selector_type_t chunk_sel_type) { - const auto phys_page_size = hs()->device_mgr()->optimal_page_size(HSDevType::Data); - +void BlkDataService::create_vdev(uint64_t size, uint32_t blk_size, blk_allocator_type_t alloc_type, + chunk_selector_type_t chunk_sel_type) { hs_vdev_context vdev_ctx; vdev_ctx.type = hs_vdev_type_t::DATA_VDEV; + if (blk_size == 0) { blk_size = hs()->device_mgr()->optimal_page_size(HSDevType::Data); } m_vdev = hs()->device_mgr()->create_vdev(vdev_parameters{.vdev_name = "blkdata", .vdev_size = size, .num_chunks = 1, - .blk_size = phys_page_size, + .blk_size = blk_size, .dev_type = HSDevType::Data, .alloc_type = alloc_type, .chunk_sel_type = chunk_sel_type, @@ -52,7 +57,8 @@ void BlkDataService::create_vdev(uint64_t size, blk_allocator_type_t alloc_type, // both first_time_boot and recovery path will come here shared< VirtualDev > BlkDataService::open_vdev(const vdev_info& vinfo, bool load_existing) { - m_vdev = std::make_shared< VirtualDev >(*(hs()->device_mgr()), vinfo, nullptr, true /* auto_recovery */); + m_vdev = std::make_shared< VirtualDev >(*(hs()->device_mgr()), vinfo, nullptr, true /* auto_recovery */, + std::move(m_custom_chunk_selector)); m_blk_size = vinfo.blk_size; return m_vdev; } diff --git a/src/lib/checkpoint/cp_mgr.cpp b/src/lib/checkpoint/cp_mgr.cpp index 4faae7ae8..82669645b 100644 --- a/src/lib/checkpoint/cp_mgr.cpp +++ b/src/lib/checkpoint/cp_mgr.cpp @@ -26,7 +26,7 @@ namespace homestore { thread_local std::stack< CP* > CPGuard::t_cp_stack; -CPManager::CPManager(bool first_time_boot) : +CPManager::CPManager() : m_metrics{std::make_unique< CPMgrMetrics >()}, m_wd_cp{std::make_unique< CPWatchdog >(this)}, m_sb{"CPSuperBlock"} { @@ -34,16 +34,20 @@ CPManager::CPManager(bool first_time_boot) : "CPSuperBlock", [this](meta_blk* mblk, sisl::byte_view buf, size_t size) { on_meta_blk_found(std::move(buf), (void*)mblk); }, nullptr); - if (first_time_boot) { - m_sb.create(sizeof(cp_mgr_super_block)); - create_first_cp(); - } start_cp_thread(); } CPManager::~CPManager() { HS_REL_ASSERT(!m_cur_cp, "CPManager is tiering down without calling shutdown"); } +void CPManager::start(bool first_time_boot) { + if (first_time_boot) { + m_sb.create(sizeof(cp_mgr_super_block)); + create_first_cp(); + m_sb.write(); + } +} + void CPManager::on_meta_blk_found(const sisl::byte_view& buf, void* meta_cookie) { m_sb.load(buf, meta_cookie); create_first_cp(); @@ -62,7 +66,10 @@ void CPManager::shutdown() { delete (cp); rcu_xchg_pointer(&m_cur_cp, nullptr); m_metrics.reset(); - m_wd_cp->stop(); + if (m_wd_cp) { + m_wd_cp->stop(); + m_wd_cp.reset(); + } } void CPManager::register_consumer(cp_consumer_t consumer_id, std::unique_ptr< CPCallbacks > callbacks) { diff --git a/src/lib/device/virtual_dev.cpp b/src/lib/device/virtual_dev.cpp index dc7441eaf..906bba9f1 100644 --- a/src/lib/device/virtual_dev.cpp +++ b/src/lib/device/virtual_dev.cpp @@ -81,7 +81,8 @@ static std::shared_ptr< BlkAllocator > create_blk_allocator(blk_allocator_type_t } } -VirtualDev::VirtualDev(DeviceManager& dmgr, vdev_info const& vinfo, vdev_event_cb_t event_cb, bool is_auto_recovery) : +VirtualDev::VirtualDev(DeviceManager& dmgr, vdev_info const& vinfo, vdev_event_cb_t event_cb, bool is_auto_recovery, + shared< ChunkSelector > custom_chunk_selector) : m_vdev_info{vinfo}, m_dmgr{dmgr}, m_name{vinfo.name}, @@ -92,21 +93,19 @@ VirtualDev::VirtualDev(DeviceManager& dmgr, vdev_info const& vinfo, vdev_event_c m_auto_recovery{is_auto_recovery} { switch (m_chunk_selector_type) { case chunk_selector_type_t::ROUND_ROBIN: { - m_chunk_selector = std::make_unique< RoundRobinChunkSelector >(false /* dynamically add chunk */); + m_chunk_selector = std::make_shared< RoundRobinChunkSelector >(false /* dynamically add chunk */); break; } - case chunk_selector_type_t::HEAP: { - // FIXME: change to HeapChunkSelector after it is ready; - m_chunk_selector = std::make_unique< RoundRobinChunkSelector >(false /* dynamically add chunk */); + case chunk_selector_type_t::CUSTOM: { + HS_REL_ASSERT(custom_chunk_selector, "Expected custom chunk selector to be passed with selector_type=CUSTOM"); + m_chunk_selector = std::move(custom_chunk_selector); break; } case chunk_selector_type_t::NONE: { - m_chunk_selector = nullptr; break; } default: - LOGERROR("Unexpected chunk selector type: {}", m_chunk_selector_type); - m_chunk_selector = nullptr; + HS_DBG_ASSERT(false, "Chunk selector type {} not supported yet", m_chunk_selector_type); } } diff --git a/src/lib/device/virtual_dev.hpp b/src/lib/device/virtual_dev.hpp index 0fff20026..79cb95478 100644 --- a/src/lib/device/virtual_dev.hpp +++ b/src/lib/device/virtual_dev.hpp @@ -91,13 +91,14 @@ class VirtualDev { std::mutex m_mgmt_mutex; // Any mutex taken for management operations (like adding/removing chunks). std::set< PhysicalDev* > m_pdevs; // PDevs this vdev is working on sisl::sparse_vector< shared< Chunk > > m_all_chunks; // All chunks part of this vdev - std::unique_ptr< ChunkSelector > m_chunk_selector; // Instance of chunk selector + std::shared_ptr< ChunkSelector > m_chunk_selector; // Instance of chunk selector blk_allocator_type_t m_allocator_type; chunk_selector_type_t m_chunk_selector_type; bool m_auto_recovery; public: - VirtualDev(DeviceManager& dmgr, const vdev_info& vinfo, vdev_event_cb_t event_cb, bool is_auto_recovery); + VirtualDev(DeviceManager& dmgr, const vdev_info& vinfo, vdev_event_cb_t event_cb, bool is_auto_recovery, + shared< ChunkSelector > custom_chunk_selector = nullptr); VirtualDev(VirtualDev const& other) = delete; VirtualDev& operator=(VirtualDev const& other) = delete; diff --git a/src/lib/homestore.cpp b/src/lib/homestore.cpp index 35fb79e64..6d7f6868f 100644 --- a/src/lib/homestore.cpp +++ b/src/lib/homestore.cpp @@ -41,6 +41,7 @@ #include "common/resource_mgr.hpp" #include "meta/meta_sb.hpp" #include "logstore/log_store_family.hpp" +#include "replication/service/repl_service_impl.h" /* * IO errors handling by homestore. @@ -55,13 +56,45 @@ namespace homestore { HomeStoreSafePtr HomeStore::s_instance{nullptr}; +static std::unique_ptr< IndexServiceCallbacks > s_index_cbs; +static repl_impl_type s_repl_impl_type{repl_impl_type::solo}; +static std::unique_ptr< ReplServiceCallbacks > s_repl_cbs; +shared< ChunkSelector > s_custom_chunk_selector{nullptr}; + HomeStore* HomeStore::instance() { if (s_instance == nullptr) { s_instance = std::make_shared< HomeStore >(); } return s_instance.get(); } -bool HomeStore::start(const hs_input_params& input, hs_before_services_starting_cb_t svcs_starting_cb, - std::unique_ptr< IndexServiceCallbacks > cbs) { +HomeStore& HomeStore::with_data_service(cshared< ChunkSelector >& custom_chunk_selector) { + m_services.svcs |= HS_SERVICE::DATA; + m_services.svcs &= ~HS_SERVICE::REPLICATION; // ReplicationDataSvc or DataSvc are mutually exclusive + s_custom_chunk_selector = std::move(custom_chunk_selector); + return *this; +} + +HomeStore& HomeStore::with_index_service(std::unique_ptr< IndexServiceCallbacks > cbs) { + m_services.svcs |= HS_SERVICE::INDEX; + s_index_cbs = std::move(cbs); + return *this; +} + +HomeStore& HomeStore::with_log_service() { + m_services.svcs |= HS_SERVICE::LOG_REPLICATED | HS_SERVICE::LOG_LOCAL; + return *this; +} + +HomeStore& HomeStore::with_repl_data_service(repl_impl_type repl_type, std::unique_ptr< ReplServiceCallbacks > cbs, + cshared< ChunkSelector >& custom_chunk_selector) { + m_services.svcs |= HS_SERVICE::REPLICATION | HS_SERVICE::LOG_REPLICATED | HS_SERVICE::LOG_LOCAL; + m_services.svcs &= ~HS_SERVICE::DATA; // ReplicationDataSvc or DataSvc are mutually exclusive + s_repl_impl_type = repl_type; + s_repl_cbs = std::move(cbs); + s_custom_chunk_selector = std::move(custom_chunk_selector); + return *this; +} + +bool HomeStore::start(const hs_input_params& input, hs_before_services_starting_cb_t svcs_starting_cb) { auto& hs_config = HomeStoreStaticConfig::instance(); hs_config.input = input; @@ -91,12 +124,16 @@ bool HomeStore::start(const hs_input_params& input, hs_before_services_starting_ HomeStoreDynamicConfig::init_settings_default(); - LOGINFO("Homestore is loading with following services: ", input.services.list()); + LOGINFO("Homestore is loading with following services: {}", m_services.list()); if (has_meta_service()) { m_meta_service = std::make_unique< MetaBlkService >(); } if (has_log_service()) { m_log_service = std::make_unique< LogStoreService >(); } - if (has_data_service()) { m_data_service = std::make_unique< BlkDataService >(); } - if (has_index_service()) { m_index_service = std::make_unique< IndexService >(std::move(cbs)); } - + if (has_data_service()) { m_data_service = std::make_unique< BlkDataService >(std::move(s_custom_chunk_selector)); } + if (has_index_service()) { m_index_service = std::make_unique< IndexService >(std::move(s_index_cbs)); } + if (has_repl_data_service()) { + m_repl_service = std::make_unique< ReplicationServiceImpl >(s_repl_impl_type, std::move(s_repl_cbs)); + m_data_service = std::make_unique< BlkDataService >(std::move(s_custom_chunk_selector)); + } + m_cp_mgr = std::make_unique< CPManager >(); m_dev_mgr = std::make_unique< DeviceManager >(input.devices, bind_this(HomeStore::create_vdev_cb, 2)); if (!m_dev_mgr->is_first_time_boot()) { @@ -126,10 +163,13 @@ void HomeStore::format_and_start(std::map< uint32_t, hs_format_params >&& format futs.emplace_back(m_log_service->create_vdev(pct_to_size(fparams.size_pct, HSDevType::Fast), LogStoreService::CTRL_LOG_FAMILY_IDX)); } else if ((svc_type & HS_SERVICE::DATA) && has_data_service()) { - m_data_service->create_vdev(pct_to_size(fparams.size_pct, HSDevType::Data), fparams.alloc_type, - fparams.chunk_sel_type); + m_data_service->create_vdev(pct_to_size(fparams.size_pct, HSDevType::Data), fparams.block_size, + fparams.alloc_type, fparams.chunk_sel_type); } else if ((svc_type & HS_SERVICE::INDEX) && has_index_service()) { m_index_service->create_vdev(pct_to_size(fparams.size_pct, HSDevType::Fast)); + } else if ((svc_type & HS_SERVICE::REPLICATION) && has_repl_data_service()) { + m_data_service->create_vdev(pct_to_size(fparams.size_pct, HSDevType::Data), fparams.block_size, + fparams.alloc_type, fparams.chunk_sel_type); } } @@ -155,17 +195,22 @@ void HomeStore::do_start() { m_dev_mgr->is_first_time_boot(), HS_DYNAMIC_CONFIG(version), cache_size, HomeStoreStaticConfig::instance().to_json().dump(4)); - m_cp_mgr = std::make_unique< CPManager >(is_first_time_boot()); // Initialize CPManager m_meta_service->start(m_dev_mgr->is_first_time_boot()); + m_cp_mgr->start(is_first_time_boot()); m_resource_mgr->set_total_cap(m_dev_mgr->total_capacity()); - // In case of custom recovery, let consumer starts the recovery and it is consumer module's responsibilities to - // start log store - if (has_log_service() && inp_params.auto_recovery) { m_log_service->start(is_first_time_boot() /* format */); } - if (has_index_service()) { m_index_service->start(); } - if (has_data_service()) { m_data_service->start(); } + if (has_data_service()) { + m_data_service->start(); + } else if (has_repl_data_service()) { + m_data_service->start(); + m_repl_service->start(); + } + + // In case of custom recovery, let consumer starts the recovery and it is consumer module's responsibilities + // to start log store + if (has_log_service() && inp_params.auto_recovery) { m_log_service->start(is_first_time_boot() /* format */); } } void HomeStore::shutdown() { @@ -182,9 +227,14 @@ void HomeStore::shutdown() { if (has_data_service()) { m_data_service.reset(); } + if (has_repl_data_service()) { + m_repl_service->stop(); + m_repl_service.reset(); + } m_dev_mgr->close_devices(); m_dev_mgr.reset(); m_cp_mgr->shutdown(); + m_cp_mgr.reset(); HomeStore::reset_instance(); LOGINFO("Homestore is completed its shutdown"); @@ -216,17 +266,12 @@ cap_attrs HomeStore::get_system_capacity() const { bool HomeStore::is_first_time_boot() const { return m_dev_mgr->is_first_time_boot(); } -bool HomeStore::has_index_service() const { - return HomeStoreStaticConfig::instance().input.services.svcs & HS_SERVICE::INDEX; -} -bool HomeStore::has_data_service() const { - return HomeStoreStaticConfig::instance().input.services.svcs & HS_SERVICE::DATA; -} -bool HomeStore::has_meta_service() const { - return HomeStoreStaticConfig::instance().input.services.svcs & HS_SERVICE::META; -} +bool HomeStore::has_index_service() const { return m_services.svcs & HS_SERVICE::INDEX; } +bool HomeStore::has_data_service() const { return m_services.svcs & HS_SERVICE::DATA; } +bool HomeStore::has_repl_data_service() const { return m_services.svcs & HS_SERVICE::REPLICATION; } +bool HomeStore::has_meta_service() const { return m_services.svcs & HS_SERVICE::META; } bool HomeStore::has_log_service() const { - auto const s = HomeStoreStaticConfig::instance().input.services.svcs; + auto const s = m_services.svcs; return (s & (HS_SERVICE::LOG_REPLICATED | HS_SERVICE::LOG_LOCAL)); } @@ -286,7 +331,9 @@ shared< VirtualDev > HomeStore::create_vdev_cb(const vdev_info& vinfo, bool load break; case hs_vdev_type_t::DATA_VDEV: - if (has_data_service()) { ret_vdev = m_data_service->open_vdev(vinfo, load_existing); } + if (has_data_service() || has_repl_data_service()) { + ret_vdev = m_data_service->open_vdev(vinfo, load_existing); + } break; default: @@ -327,7 +374,6 @@ nlohmann::json hs_input_params::to_json() const { json["app_mem_size"] = in_bytes(app_mem_size); json["hugepage_size"] = in_bytes(hugepage_size); json["auto_recovery?"] = auto_recovery; - json["services"] = services.list(); return json; } diff --git a/src/lib/meta/meta_blk_service.cpp b/src/lib/meta/meta_blk_service.cpp index 338eb9a80..85b39fef9 100644 --- a/src/lib/meta/meta_blk_service.cpp +++ b/src/lib/meta/meta_blk_service.cpp @@ -986,7 +986,6 @@ void MetaBlkService::alloc_meta_blks(uint64_t size, std::vector< BlkId >& bids) } HS_DBG_ASSERT_EQ(debug_size, size); #endif - } catch (const std::exception& e) { HS_REL_ASSERT(0, "{}", e.what()); return; @@ -1041,8 +1040,8 @@ sisl::byte_array MetaBlkService::read_sub_sb_internal(const meta_blk* mblk) cons if (i < ovf_hdr->h.nbids - 1) { read_sz_per_db = data_bid[i].blk_count() * block_size(); } else { - // it is possible user context data doesn't occupy the whole block, so we need to remember the - // size that was written to the last data blk; + // it is possible user context data doesn't occupy the whole block, so we need to remember + // the size that was written to the last data blk; read_sz_per_db = ovf_hdr->h.context_sz - read_offset_in_this_ovf; } @@ -1116,8 +1115,8 @@ void MetaBlkService::recover(bool do_comp_cb) { } else { // decompressed_size must equal to input sz before compress HS_REL_ASSERT_EQ(uint64_cast(mblk->hdr.h.src_context_sz), - uint64_cast(decompressed_size)); /* since decompressed_size is >=0 it is - safe to cast to uint64_t */ + uint64_cast(decompressed_size)); /* since decompressed_size is >=0 it + is safe to cast to uint64_t */ HS_LOG(DEBUG, metablk, "[type={}] Successfully decompressed, compressed_sz: {}, src_context_sz: {}, " "decompressed_size: {}", @@ -1127,8 +1126,8 @@ void MetaBlkService::recover(bool do_comp_cb) { cb(mblk, decompressed_buf, mblk->hdr.h.src_context_sz); } else { - // There is use case that cb could be nullptr because client want to get its superblock via read - // api; + // There is use case that cb could be nullptr because client want to get its superblock via + // read api; cb(mblk, buf, mblk->hdr.h.context_sz); } @@ -1175,7 +1174,8 @@ void MetaBlkService::read_sub_sb(meta_sub_type type) { auto* mblk = it->second; // // No client writes compressed data with reads it back with read_sub_sb for now; - // This assert can be removed if any client writes compressed data who calls read_sub_sb to read it back; + // This assert can be removed if any client writes compressed data who calls read_sub_sb to read it + // back; // sisl::byte_array buf = read_sub_sb_internal(mblk); @@ -1184,7 +1184,8 @@ void MetaBlkService::read_sub_sb(meta_sub_type type) { it_s->second.cb(mblk, buf, mblk->hdr.h.context_sz); } - // if is allowed if consumer doesn't care about complete cb, e.g. consumer knows how many mblks it is expecting; + // if is allowed if consumer doesn't care about complete cb, e.g. consumer knows how many mblks it is + // expecting; if (it_s->second.comp_cb) { it_s->second.comp_cb(true); } } @@ -1374,7 +1375,8 @@ bool MetaBlkService::sanity_check(bool check_ovf_chain) { // // some clients might registered but not written any meta blk to disk, which is okay; - // one case is create a volume, then delete a volume, then client: VOLUME will don't have any meta blk on disk; + // one case is create a volume, then delete a volume, then client: VOLUME will don't have any meta blk on + // disk; // HS_LOG_ASSERT_LE(clients.size(), m_sub_info.size(), "client size on disk: {} is larger than registered: {}, which is not possible!", clients.size(), @@ -1482,12 +1484,12 @@ nlohmann::json MetaBlkService::populate_json(int log_level, meta_blk_map_t& meta if (free_space < buf->size) { j[x.first]["meta_bids"][std::to_string(bid_cnt)] = "Not_able_to_dump_to_file_exceeding_allowed_space"; - HS_LOG_EVERY_N( - WARN, metablk, 100, - "[type={}] Skip dumping to file, exceeding allowed space: {}, requested_size: {}, " - "total_free: {}, free_fs_percent: {}", - x.first, free_space, buf->size, total_free, - HS_DYNAMIC_CONFIG(metablk.percent_of_free_space)); + HS_LOG_EVERY_N(WARN, metablk, 100, + "[type={}] Skip dumping to file, exceeding allowed space: {}, " + "requested_size: {}, " + "total_free: {}, free_fs_percent: {}", + x.first, free_space, buf->size, total_free, + HS_DYNAMIC_CONFIG(metablk.percent_of_free_space)); continue; } diff --git a/src/lib/replication/CMakeLists.txt b/src/lib/replication/CMakeLists.txt new file mode 100644 index 000000000..c71bb4516 --- /dev/null +++ b/src/lib/replication/CMakeLists.txt @@ -0,0 +1,11 @@ +include (${CMAKE_SOURCE_DIR}/cmake/test_mode.cmake) + +include_directories (BEFORE ..) +include_directories (BEFORE .) + +add_library(hs_replication OBJECT) +target_sources(hs_replication PRIVATE + service/repl_service_impl.cpp + repl_dev/solo_repl_dev.cpp + ) +target_link_libraries(hs_replication ${COMMON_DEPS}) diff --git a/src/lib/replication/repl_dev/solo_repl_dev.cpp b/src/lib/replication/repl_dev/solo_repl_dev.cpp new file mode 100644 index 000000000..5ee1b6137 --- /dev/null +++ b/src/lib/replication/repl_dev/solo_repl_dev.cpp @@ -0,0 +1,136 @@ +#include +#include +#include +#include "common/homestore_assert.hpp" +#include "replication/repl_dev/solo_repl_dev.h" + +namespace homestore { +SoloReplDev::SoloReplDev(superblk< repl_dev_superblk > const& rd_sb, bool load_existing) : m_rd_sb{rd_sb} { + if (load_existing) { + logstore_service().open_log_store(LogStoreService::DATA_LOG_FAMILY_IDX, m_rd_sb->data_journal_id, true, + bind_this(SoloReplDev::on_data_journal_created, 1)); + } else { + m_data_journal = + logstore_service().create_new_log_store(LogStoreService::DATA_LOG_FAMILY_IDX, true /* append_mode */); + m_rd_sb->data_journal_id = m_data_journal->get_store_id(); + } +} + +void SoloReplDev::on_data_journal_created(shared< HomeLogStore > log_store) { + m_data_journal = std::move(log_store); + m_rd_sb->data_journal_id = m_data_journal->get_store_id(); + m_data_journal->register_log_found_cb(bind_this(SoloReplDev::on_log_found, 3)); +} + +void SoloReplDev::async_alloc_write(sisl::blob const& header, sisl::blob const& key, sisl::sg_list const& value, + void* user_ctx) { + auto rreq = intrusive< repl_req >(new repl_req{}); + rreq->header = header; + rreq->key = key; + rreq->value = std::move(value); + rreq->user_ctx = user_ctx; + + // If it is header only entry, directly write to the journal + if (rreq->value.size) { + // Step 1: Alloc Blkid + auto status = data_service().alloc_blks( + uint32_cast(rreq->value.size), m_listener->get_blk_alloc_hints(rreq->header, user_ctx), rreq->local_blkid); + HS_REL_ASSERT_EQ(status, BlkAllocStatus::SUCCESS); + + // Write the data + data_service() + .async_write(rreq->value, rreq->local_blkid) + .thenValue([this, rreq = std::move(rreq)](auto&& err) { + HS_REL_ASSERT(!err, "Error in writing data"); // TODO: Find a way to return error to the Listener + write_journal(std::move(rreq)); + }); + } else { + write_journal(std::move(rreq)); + } +} + +void SoloReplDev::write_journal(intrusive< repl_req > rreq) { + uint32_t entry_size = + sizeof(repl_journal_entry) + rreq->header.size + rreq->key.size + rreq->local_blkid.serialized_size(); + rreq->alloc_journal_entry(entry_size); + rreq->journal_entry->code = journal_type_t::HS_DATA; + rreq->journal_entry->user_header_size = rreq->header.size; + rreq->journal_entry->key_size = rreq->key.size; + + uint8_t* raw_ptr = uintptr_cast(rreq->journal_entry) + sizeof(repl_journal_entry); + if (rreq->header.size) { + std::memcpy(raw_ptr, rreq->header.bytes, rreq->header.size); + raw_ptr += rreq->header.size; + } + + if (rreq->key.size) { + std::memcpy(raw_ptr, rreq->key.bytes, rreq->key.size); + raw_ptr += rreq->key.size; + } + + if (rreq->value.size) { + auto b = rreq->local_blkid.serialize(); + std::memcpy(raw_ptr, b.bytes, b.size); + raw_ptr += b.size; + } + + m_data_journal->append_async( + sisl::io_blob{rreq->journal_buf.get(), entry_size, false /* is_aligned */}, nullptr /* cookie */, + [this, rreq](int64_t lsn, sisl::io_blob&, homestore::logdev_key, void*) { + rreq->lsn = lsn; + m_listener->on_pre_commit(rreq->lsn, rreq->header, rreq->key, rreq->user_ctx); + + auto cur_lsn = m_commit_upto.load(); + if (cur_lsn < lsn) { m_commit_upto.compare_exchange_strong(cur_lsn, lsn); } + + m_listener->on_commit(rreq->lsn, rreq->header, rreq->key, rreq->local_blkid, rreq->user_ctx); + }); +} + +void SoloReplDev::on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx) { + repl_journal_entry* entry = r_cast< repl_journal_entry* >(buf.bytes()); + uint32_t remain_size = buf.size() - sizeof(repl_journal_entry); + HS_REL_ASSERT_EQ(entry->major_version, repl_journal_entry::JOURNAL_ENTRY_MAJOR, + "Mismatched version of journal entry found"); + HS_REL_ASSERT_EQ(entry->code, journal_type_t::HS_DATA, "Found a journal entry which is not data"); + + uint8_t* raw_ptr = r_cast< uint8_t* >(entry) + sizeof(repl_journal_entry); + sisl::blob header{raw_ptr, entry->user_header_size}; + HS_REL_ASSERT_GE(remain_size, entry->user_header_size, "Invalid journal entry, header_size mismatch"); + raw_ptr += entry->user_header_size; + remain_size -= entry->user_header_size; + + sisl::blob key{raw_ptr, entry->key_size}; + HS_REL_ASSERT_GE(remain_size, entry->key_size, "Invalid journal entry, key_size mismatch"); + raw_ptr += entry->key_size; + remain_size -= entry->key_size; + + sisl::blob value_blob{raw_ptr, remain_size}; + MultiBlkId blkid; + if (remain_size) { blkid.deserialize(value_blob, true /* copy */); } + + m_listener->on_pre_commit(lsn, header, key, nullptr); + + auto cur_lsn = m_commit_upto.load(); + if (cur_lsn < lsn) { m_commit_upto.compare_exchange_strong(cur_lsn, lsn); } + + m_listener->on_commit(lsn, header, key, blkid, nullptr); +} + +folly::Future< std::error_code > SoloReplDev::async_read(MultiBlkId const& bid, sisl::sg_list& sgs, uint32_t size, + bool part_of_batch) { + return data_service().async_read(bid, sgs, size, part_of_batch); +} + +void SoloReplDev::async_free_blks(int64_t, MultiBlkId const& bid) { data_service().async_free_blk(bid); } + +void SoloReplDev::cp_flush(CP*) { + auto lsn = m_commit_upto.load(); + m_rd_sb->commit_lsn = lsn; + m_rd_sb->checkpoint_lsn = lsn; + m_rd_sb.write(); +} + +void SoloReplDev::cp_cleanup(CP*) { m_data_journal->truncate(m_rd_sb->checkpoint_lsn); } + +} // namespace homestore \ No newline at end of file diff --git a/src/lib/replication/repl_dev/solo_repl_dev.h b/src/lib/replication/repl_dev/solo_repl_dev.h new file mode 100644 index 000000000..1e37528b9 --- /dev/null +++ b/src/lib/replication/repl_dev/solo_repl_dev.h @@ -0,0 +1,121 @@ +/********************************************************************************* + * Modifications Copyright 2017-2019 eBay Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + *********************************************************************************/ +#pragma once + +#include +#include + +#include +#include +#include +#include + +namespace homestore { +#pragma pack(1) +struct repl_dev_superblk { + static constexpr uint64_t REPL_DEV_SB_MAGIC = 0xABCDF00D; + static constexpr uint32_t REPL_DEV_SB_VERSION = 1; + + uint64_t magic{REPL_DEV_SB_MAGIC}; + uint32_t version{REPL_DEV_SB_MAGIC}; + uuid_t gid; // gid of this replica set + logstore_id_t data_journal_id; // Logstore id for the data journal + int64_t commit_lsn; // LSN upto which this replica has committed + int64_t checkpoint_lsn; // LSN upto which this replica have checkpointed the data + +#if 0 + logstore_id_t free_pba_store_id; // Logstore id for storing free pba records +#endif + + uint64_t get_magic() const { return magic; } + uint32_t get_version() const { return version; } +}; +#pragma pack() + +VENUM(journal_type_t, uint16_t, HS_DATA = 0) +struct repl_journal_entry { + static constexpr uint16_t JOURNAL_ENTRY_MAJOR = 1; + static constexpr uint16_t JOURNAL_ENTRY_MINOR = 1; + + // Major and minor version. For each major version underlying structures could change. Minor versions can only add + // fields, not change any existing fields. + uint16_t major_version{JOURNAL_ENTRY_MAJOR}; + uint16_t minor_version{JOURNAL_ENTRY_MINOR}; + + journal_type_t code; + uint32_t replica_id; + uint32_t user_header_size; + uint32_t key_size; + // Followed by user_header, then key, then MultiBlkId +}; + +struct repl_req : public boost::intrusive_ref_counter< repl_req, boost::thread_safe_counter > { + sisl::blob header; // User header + sisl::blob key; // Key to replicate + sisl::sg_list value; // Raw value - applicable only to leader req + MultiBlkId local_blkid; // List of corresponding local blkids for the value + RemoteBlkId remote_blkid; // List of remote blkid for the value + std::unique_ptr< uint8_t[] > journal_buf; // Buf for the journal entry + repl_journal_entry* journal_entry{nullptr}; // pointer to the journal entry + void* user_ctx{nullptr}; // User context passed with replica_set::write, valie for leader only + int64_t lsn{0}; // Lsn for this replication req + + repl_req() = default; + + void alloc_journal_entry(uint32_t size) { + journal_buf = std::unique_ptr< uint8_t[] >(new uint8_t[size]); + journal_entry = new (journal_buf.get()) repl_journal_entry(); + } + + ~repl_req() { + if (journal_entry) { journal_entry->~repl_journal_entry(); } + } +}; + +class CP; + +class SoloReplDev : public ReplDev { +private: + std::shared_ptr< HomeLogStore > m_data_journal; + uuid_t m_group_id; + std::atomic< logstore_seq_num_t > m_commit_upto{-1}; + superblk< repl_dev_superblk > m_rd_sb; + +public: + SoloReplDev(superblk< repl_dev_superblk > const& rd_sb, bool load_existing); + virtual ~SoloReplDev() = default; + + void async_alloc_write(sisl::blob const& header, sisl::blob const& key, sisl::sg_list const& value, + void* user_ctx) override; + + folly::Future< std::error_code > async_read(MultiBlkId const& bid, sisl::sg_list& sgs, uint32_t size, + bool part_of_batch = false) override; + + void async_free_blks(int64_t lsn, MultiBlkId const& blkid) override; + + bool is_leader() const override { return true; } + + uuid_t group_id() const override { return m_group_id; } + + void cp_flush(CP* cp); + void cp_cleanup(CP* cp); + +private: + void on_data_journal_created(shared< HomeLogStore > log_store); + void write_journal(intrusive< repl_req > rreq); + void on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx); +}; + +} // namespace homestore \ No newline at end of file diff --git a/src/lib/replication/repl_service.cpp b/src/lib/replication/repl_service.cpp deleted file mode 100644 index 52cdca413..000000000 --- a/src/lib/replication/repl_service.cpp +++ /dev/null @@ -1,122 +0,0 @@ -#include - -#include -#include -#include -#include - -#include -#include "service/repl_backend.h" -#include "service/home_repl_backend.h" - -namespace homestore { -ReplicationServiceImpl::ReplicationServiceImpl(std::unique_ptr< ReplServiceCallbacks > cbs) : - m_svc_cbs{std::move(cbs)} { - m_messaging = std::make_shared< nuraft_mesg::service >(); - - // FIXME: RAFT server parameters, should be a config and reviewed!!! - nuraft::raft_params r_params; - r_params.with_election_timeout_lower(900) - .with_election_timeout_upper(1400) - .with_hb_interval(250) - .with_max_append_size(10) - .with_rpc_failure_backoff(250) - .with_auto_forwarding(true) - .with_snapshot_enabled(1); - - meta_service().register_handler( - "replication", - [this](meta_blk* mblk, sisl::byte_view buf, size_t) { rd_super_blk_found(std::move(buf), voidptr_cast(mblk)); }, - nullptr); - - // This closure is where we initialize new ReplicaSet instances. When NuRaft Messging is asked to join a new group - // either through direct creation or gRPC request it will use this callback to initialize a new state_manager and - // state_machine for the raft_server it constructs. - auto group_type_params = nuraft_mesg::consensus_component::register_params{ - r_params, [this](int32_t const, std::string const& group_id) mutable { - return create_replica_dev(group_id, std::set< std::string, std::less<> >()) - .via(&folly::QueuedImmediateExecutor::instance()) - .get(); - // RELEASE_ASSERT(std::holds_alternative< shared< ReplDev > >(v), "Could Not Create ReplicaSet!"); - // return std::get< shared< ReplDev > >(v); - }}; - // m_messaging->register_mgr_type("homestore", group_type_params); -} - -void ReplicationServiceImpl::create_vdev(uint64_t size) { - auto const atomic_page_size = hs()->device_mgr()->atomic_page_size(HSDevType::Data); - hs_vdev_context vdev_ctx; - vdev_ctx.type = hs_vdev_type_t::REPL_DATA_VDEV; - - hs()->device_mgr()->create_vdev(vdev_parameters{.vdev_name = "index", - .vdev_size = size, - .num_chunks = 1, - .blk_size = atomic_page_size, - .dev_type = HSDevType::Data, - .multi_pdev_opts = vdev_multi_pdev_opts_t::ALL_PDEV_STRIPED, - .context_data = vdev_ctx.to_blob()}); -} - -shared< VirtualDev > ReplicationServiceImpl::open_vdev(const vdev_info& vinfo, bool load_existing) { - m_vdev = std::make_shared< VirtualDev >(*(hs()->device_mgr()), vinfo, m_svc_cbs->blk_allocator_type(), - m_svc_cbs->chunk_selector(), nullptr, true /* auto_recovery */); - return m_vdev; -} - -ReplAsyncResult< shared< ReplDev > > -ReplicationServiceImpl::create_replica_dev(std::string const& group_id, - std::set< std::string, std::less<> >&& members) { - superblk< repl_dev_superblk > rd_sb; - rd_sb.create(sizeof(repl_dev_superblk)); - rd_sb->gid = group_id; - return folly::makeSemiFuture< shared< ReplDev > >(open_replica_dev(rd_sb, false /* load_existing */)); -} - -folly::SemiFuture< ReplServiceError > ReplicationServiceImpl::replace_member(std::string const& group_id, - std::string const& member_out, - std::string const& member_in) const { - return folly::makeSemiFuture(ReplServiceError::CANCELLED); -} - -ReplAsyncResult< shared< ReplDev > > ReplicationServiceImpl::get_replica_dev(std::string const& group_id) const { - std::unique_lock lg(m_rd_map_mtx); - if (auto it = m_rd_map.find(group_id); it != m_rd_map.end()) { return it->second; } - return ReplServiceError::SERVER_NOT_FOUND; -} - -void ReplicationServiceImpl::iterate_replica_devs(std::function< void(cshared< ReplDev >&) > const& cb) { - std::unique_lock lg(m_rd_map_mtx); - for (const auto& [uuid, rd] : m_rd_map) { - cb(rd); - } -} - -shared< ReplDev > ReplicationServiceImpl::open_replica_dev(superblk< repl_dev_superblk > const& rd_sb, - bool load_existing) { - auto it = m_rd_map.end(); - bool happened = false; - - { - std::unique_lock lg(m_rd_map_mtx); - std::tie(it, happened) = m_rd_map.emplace(std::make_pair(gid, nullptr)); - } - DEBUG_ASSERT(m_rd_map.end() != it, "Could not insert into map!"); - if (!happened) { return it->second }; - - auto repl_dev = std::make_shared< ReplDevImpl >(rd_sb, load_existing); - it->second = repl_dev; - repl_dev->attach_listener(std::move(m_svc_cbs->on_repl_dev_init(repl_dev))); - - return repl_dev; -} - -void ReplicationServiceImpl::rd_super_blk_found(sisl::byte_view const& buf, void* meta_cookie) { - superblk< repl_dev_superblk > rd_sb; - rd_sb.load(buf, meta_cookie); - DEBUG_ASSERT_EQ(rd_sb->get_magic(), home_rs_superblk::REPLICA_DEV_SB_MAGIC, "Invalid rdev metablk, magic mismatch"); - DEBUG_ASSERT_EQ(rd_sb->get_version(), home_rs_superblk::REPLICA_DEV_SB_VERSION, "Invalid version of rdev metablk"); - - open_replica_dev(rd_sb, true /* load_existing */); -} - -} // namespace homestore diff --git a/src/lib/replication/service/repl_service_impl.cpp b/src/lib/replication/service/repl_service_impl.cpp new file mode 100644 index 000000000..3a17dc7cd --- /dev/null +++ b/src/lib/replication/service/repl_service_impl.cpp @@ -0,0 +1,122 @@ +/********************************************************************************* + * Modifications Copyright 2017-2019 eBay Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + *********************************************************************************/ +#include +#include +#include "common/homestore_assert.hpp" +#include "replication/service/repl_service_impl.h" +#include "replication/repl_dev/solo_repl_dev.h" + +namespace homestore { +ReplicationServiceImpl& repl_service() { return hs()->repl_service(); } + +ReplicationServiceImpl::ReplicationServiceImpl(repl_impl_type impl_type, std::unique_ptr< ReplServiceCallbacks > cbs) : + m_svc_cbs{std::move(cbs)}, m_repl_type{impl_type} { + meta_service().register_handler( + "replication", + [this](meta_blk* mblk, sisl::byte_view buf, size_t) { rd_super_blk_found(std::move(buf), voidptr_cast(mblk)); }, + nullptr); +} + +void ReplicationServiceImpl::start() { + // Register to CP to flush the super blk and truncate the logstore + hs()->cp_mgr().register_consumer(cp_consumer_t::REPLICATION_SVC, std::make_unique< ReplServiceCPHandler >()); +} + +void ReplicationServiceImpl::stop() { + std::unique_lock lg{m_rd_map_mtx}; + m_rd_map.clear(); +} + +AsyncReplResult< shared< ReplDev > > +ReplicationServiceImpl::create_replica_dev(uuid_t group_id, std::set< std::string, std::less<> >&& members) { + superblk< repl_dev_superblk > rd_sb; + rd_sb.create(sizeof(repl_dev_superblk)); + rd_sb->gid = group_id; + + shared< ReplDev > repl_dev = open_replica_dev(rd_sb, false /* load_existing */); + return folly::makeSemiFuture< ReplResult< shared< ReplDev > > >(std::move(repl_dev)); +} + +ReplResult< shared< ReplDev > > ReplicationServiceImpl::get_replica_dev(uuid_t group_id) const { + std::shared_lock lg(m_rd_map_mtx); + if (auto it = m_rd_map.find(group_id); it != m_rd_map.end()) { return it->second; } + return folly::makeUnexpected(ReplServiceError::SERVER_NOT_FOUND); +} + +void ReplicationServiceImpl::iterate_replica_devs(std::function< void(cshared< ReplDev >&) > const& cb) { + std::shared_lock lg(m_rd_map_mtx); + for (const auto& [uuid, rd] : m_rd_map) { + cb(rd); + } +} + +folly::SemiFuture< ReplServiceError > ReplicationServiceImpl::replace_member(uuid_t group_id, + std::string const& member_out, + std::string const& member_in) const { + return folly::makeSemiFuture< ReplServiceError >(ReplServiceError::NOT_IMPLEMENTED); +} + +shared< ReplDev > ReplicationServiceImpl::open_replica_dev(superblk< repl_dev_superblk > const& rd_sb, + bool load_existing) { + auto it = m_rd_map.end(); + bool happened = false; + + { + std::unique_lock lg(m_rd_map_mtx); + std::tie(it, happened) = m_rd_map.emplace(std::make_pair(rd_sb->gid, nullptr)); + } + DEBUG_ASSERT(m_rd_map.end() != it, "Could not insert into map!"); + if (!happened) { return it->second; } + + shared< ReplDev > repl_dev; + if (m_repl_type == repl_impl_type::solo) { + repl_dev = std::make_shared< SoloReplDev >(rd_sb, load_existing); + } else { + HS_REL_ASSERT(false, "Repl impl type = {} is not supported yet", enum_name(m_repl_type)); + } + repl_dev->attach_listener(m_svc_cbs->on_repl_dev_init(repl_dev)); + it->second = repl_dev; + + return repl_dev; +} + +void ReplicationServiceImpl::rd_super_blk_found(sisl::byte_view const& buf, void* meta_cookie) { + superblk< repl_dev_superblk > rd_sb; + rd_sb.load(buf, meta_cookie); + HS_DBG_ASSERT_EQ(rd_sb->get_magic(), repl_dev_superblk::REPL_DEV_SB_MAGIC, "Invalid rdev metablk, magic mismatch"); + HS_DBG_ASSERT_EQ(rd_sb->get_version(), repl_dev_superblk::REPL_DEV_SB_VERSION, "Invalid version of rdev metablk"); + + open_replica_dev(rd_sb, true /* load_existing */); +} + +///////////////////// CP Callbacks for Repl Service ////////////// +ReplServiceCPHandler::ReplServiceCPHandler() {} + +std::unique_ptr< CPContext > ReplServiceCPHandler::on_switchover_cp(CP* cur_cp, CP* new_cp) { return nullptr; } + +folly::Future< bool > ReplServiceCPHandler::cp_flush(CP* cp) { + repl_service().iterate_replica_devs( + [cp](cshared< ReplDev >& repl_dev) { std::dynamic_pointer_cast< SoloReplDev >(repl_dev)->cp_flush(cp); }); + return folly::makeFuture< bool >(true); +} + +void ReplServiceCPHandler::cp_cleanup(CP* cp) { + repl_service().iterate_replica_devs( + [cp](cshared< ReplDev >& repl_dev) { std::dynamic_pointer_cast< SoloReplDev >(repl_dev)->cp_cleanup(cp); }); +} + +int ReplServiceCPHandler::cp_progress_percent() { return 100; } + +} // namespace homestore diff --git a/src/lib/replication/service/repl_service_impl.h b/src/lib/replication/service/repl_service_impl.h new file mode 100644 index 000000000..f53c9a052 --- /dev/null +++ b/src/lib/replication/service/repl_service_impl.h @@ -0,0 +1,71 @@ +/********************************************************************************* + * Modifications Copyright 2017-2019 eBay Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + *********************************************************************************/ +#pragma once +#include +#include +#include +#include + +#include +#include + +#include +#include +#include +#include +#include +#include + +namespace homestore { + +struct repl_dev_superblk; +class ReplicationServiceImpl : public ReplicationService { +protected: + std::unique_ptr< ReplServiceCallbacks > m_svc_cbs; + repl_impl_type m_repl_type; + std::shared_mutex m_rd_map_mtx; + std::map< uuid_t, shared< ReplDev > > m_rd_map; + +public: + ReplicationServiceImpl(repl_impl_type impl_type, std::unique_ptr< ReplServiceCallbacks > cbs); + void start(); + void stop(); + AsyncReplResult< shared< ReplDev > > create_replica_dev(uuid_t group_id, + std::set< std::string, std::less<> >&& members) override; + ReplResult< shared< ReplDev > > get_replica_dev(uuid_t group_id) const override; + void iterate_replica_devs(std::function< void(cshared< ReplDev >&) > const& cb) override; + + folly::SemiFuture< ReplServiceError > replace_member(uuid_t group_id, std::string const& member_out, + std::string const& member_in) const override; + +private: + shared< ReplDev > open_replica_dev(superblk< repl_dev_superblk > const& rd_sb, bool load_existing); + void rd_super_blk_found(sisl::byte_view const& buf, void* meta_cookie); +}; + +class ReplServiceCPHandler : public CPCallbacks { +public: + ReplServiceCPHandler(); + virtual ~ReplServiceCPHandler() = default; + +public: + std::unique_ptr< CPContext > on_switchover_cp(CP* cur_cp, CP* new_cp) override; + folly::Future< bool > cp_flush(CP* cp) override; + void cp_cleanup(CP* cp) override; + int cp_progress_percent() override; +}; + +extern ReplicationServiceImpl& repl_service(); +} // namespace homestore diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt index 021138bd9..c89708d72 100644 --- a/src/tests/CMakeLists.txt +++ b/src/tests/CMakeLists.txt @@ -90,18 +90,24 @@ if (${io_tests}) target_link_libraries(test_cp_mgr homestore ${COMMON_TEST_DEPS} GTest::gtest) add_test(NAME CPMgr COMMAND test_cp_mgr) + add_executable(test_solo_repl_dev) + target_sources(test_solo_repl_dev PRIVATE test_solo_repl_dev.cpp) + target_link_libraries(test_solo_repl_dev homestore ${COMMON_TEST_DEPS} GTest::gmock) + can_build_epoll_io_tests(epoll_tests) if(${epoll_tests}) add_test(NAME LogStore-Epoll COMMAND ${CMAKE_SOURCE_DIR}/test_wrap.sh ${CMAKE_BINARY_DIR}/bin/test_log_store) add_test(NAME MetaBlkMgr-Epoll COMMAND ${CMAKE_SOURCE_DIR}/test_wrap.sh ${CMAKE_BINARY_DIR}/bin/test_meta_blk_mgr) add_test(NAME DataService-Epoll COMMAND ${CMAKE_SOURCE_DIR}/test_wrap.sh ${CMAKE_BINARY_DIR}/bin/test_data_service) + add_test(NAME SoloReplDev-Epoll COMMAND ${CMAKE_SOURCE_DIR}/test_wrap.sh ${CMAKE_BINARY_DIR}/bin/test_solo_repl_dev) endif() can_build_spdk_io_tests(spdk_tests) if(${spdk_tests}) - add_test(NAME LogStore-Spdk COMMAND ${CMAKE_SOURCE_DIR}/test_wrap.sh ${CMAKE_BINARY_DIR}/bin/test_log_store -- --spdk true) + add_test(NAME LogStore-Spdk COMMAND ${CMAKE_SOURCE_DIR}/test_wrap.sh ${CMAKE_BINARY_DIR}/bin/test_log_store -- --spdk "true") add_test(NAME MetaBlkMgr-Spdk COMMAND ${CMAKE_SOURCE_DIR}/test_wrap.sh ${CMAKE_BINARY_DIR}/bin/test_meta_blk_mgr -- --spdk "true") add_test(NAME DataSerice-Spdk COMMAND ${CMAKE_SOURCE_DIR}/test_wrap.sh ${CMAKE_BINARY_DIR}/bin/test_data_service -- --spdk "true") + add_test(NAME SoloReplDev-Spdk COMMAND ${CMAKE_SOURCE_DIR}/test_wrap.sh ${CMAKE_BINARY_DIR}/bin/test_solo_repl_dev -- --spdk "true") if(${epoll_tests}) SET_TESTS_PROPERTIES(MetaBlkMgr-Spdk PROPERTIES DEPENDS LogStore-Spdk) SET_TESTS_PROPERTIES(DataService-Spdk PROPERTIES DEPENDS MetaBlkMgr-Spdk) diff --git a/src/tests/log_store_benchmark.cpp b/src/tests/log_store_benchmark.cpp index 086633428..b8d43e6ec 100644 --- a/src/tests/log_store_benchmark.cpp +++ b/src/tests/log_store_benchmark.cpp @@ -37,8 +37,6 @@ using namespace homestore; SISL_LOGGING_INIT(HOMESTORE_LOG_MODS) std::vector< std::string > test_common::HSTestHelper::s_dev_names; -blk_allocator_type_t test_common::HSTestHelper::s_ds_alloc_type; -chunk_selector_type_t test_common::HSTestHelper::s_ds_chunk_sel_type; SISL_OPTIONS_ENABLE(logging, log_store_benchmark, iomgr, test_common_setup) SISL_OPTION_GROUP(log_store_benchmark, @@ -169,7 +167,12 @@ static void test_append(benchmark::State& state) { } } -static void setup() { test_common::HSTestHelper::start_homestore("logstore_bench", 5.0, 85.0, 2.0, 0, 0, nullptr); } +static void setup() { + test_common::HSTestHelper::start_homestore("test_log_store", + {{HS_SERVICE::META, {.size_pct = 5.0}}, + {HS_SERVICE::LOG_REPLICATED, {.size_pct = 85.0}}, + {HS_SERVICE::LOG_LOCAL, {.size_pct = 2.0}}}); +} static void teardown() { test_common::HSTestHelper::shutdown_homestore(); } diff --git a/src/tests/test_append_blkalloc.cpp b/src/tests/test_append_blkalloc.cpp index c3d186268..43c7a50aa 100644 --- a/src/tests/test_append_blkalloc.cpp +++ b/src/tests/test_append_blkalloc.cpp @@ -49,8 +49,6 @@ SISL_OPTIONS_ENABLE(logging, test_append_blkalloc, iomgr, test_common_setup) SISL_LOGGING_DECL(test_append_blkalloc) std::vector< std::string > test_common::HSTestHelper::s_dev_names; -blk_allocator_type_t test_common::HSTestHelper::s_ds_alloc_type; -chunk_selector_type_t test_common::HSTestHelper::s_ds_chunk_sel_type; constexpr uint64_t Ki{1024}; constexpr uint64_t Mi{Ki * Ki}; @@ -68,20 +66,20 @@ class AppendBlkAllocatorTest : public testing::Test { BlkDataService& inst() { return homestore::data_service(); } virtual void SetUp() override { - test_common::HSTestHelper::set_data_svc_allocator(homestore::blk_allocator_type_t::append); - test_common::HSTestHelper::set_data_svc_chunk_selector(homestore::chunk_selector_type_t::HEAP); - - test_common::HSTestHelper::start_homestore("test_append_blkalloc", 5.0 /* meta */, 0 /* data_log */, - 0 /* ctrl_log */, 80.0 /* data */, 0 /* index */, nullptr, - false /* recovery */, nullptr, false /* default ds type */); + test_common::HSTestHelper::start_homestore( + "test_append_blkalloc", + {{HS_SERVICE::META, {.size_pct = 5.0}}, + {HS_SERVICE::DATA, {.size_pct = 80.0, .blkalloc_type = homestore::blk_allocator_type_t::append}}}); } virtual void TearDown() override { test_common::HSTestHelper::shutdown_homestore(); } void restart_homestore() { - test_common::HSTestHelper::start_homestore("test_append_blkalloc", 5.0, 0, 0, 80.0, 0, - nullptr /* before_svc_start_cb */, true /* restart */, - nullptr /* indx_svc_cb */, false /* default ds type */); + test_common::HSTestHelper::start_homestore( + "test_append_blkalloc", + {{HS_SERVICE::META, {.size_pct = 5.0}}, + {HS_SERVICE::DATA, {.size_pct = 80.0, .blkalloc_type = homestore::blk_allocator_type_t::append}}}, + nullptr /* before_svc_start_cb */, true /* restart */); } void reset_io_job_done() { m_io_job_done = false; } diff --git a/src/tests/test_common/homestore_test_common.hpp b/src/tests/test_common/homestore_test_common.hpp index 4fe02ea59..b02d41d60 100644 --- a/src/tests/test_common/homestore_test_common.hpp +++ b/src/tests/test_common/homestore_test_common.hpp @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -71,13 +72,6 @@ inline static uint32_t generate_random_http_port() { return http_port; } -class TestIndexServiceCallbacks : public IndexServiceCallbacks { -public: - std::shared_ptr< IndexTableBase > on_index_table_found(const superblk< index_table_sb >& sb) override { - return nullptr; - } -}; - class HSTestHelper { private: static void remove_files(const std::vector< std::string >& file_paths) { @@ -95,24 +89,26 @@ class HSTestHelper { } static std::vector< std::string > s_dev_names; - static blk_allocator_type_t s_ds_alloc_type; - static chunk_selector_type_t s_ds_chunk_sel_type; public: - static void set_data_svc_allocator(blk_allocator_type_t alloc_type) { s_ds_alloc_type = alloc_type; } - static void set_data_svc_chunk_selector(chunk_selector_type_t chunk_sel_type) { - s_ds_chunk_sel_type = chunk_sel_type; - } - + struct test_params { + float size_pct{0}; + blk_allocator_type_t blkalloc_type{blk_allocator_type_t::varsize}; + uint32_t blk_size{0}; + shared< ChunkSelector > custom_chunk_selector{nullptr}; + IndexServiceCallbacks* index_svc_cbs{nullptr}; + ReplServiceCallbacks* repl_svc_cbs{nullptr}; + repl_impl_type repl_impl{repl_impl_type::solo}; + }; + +#if 0 static void start_homestore(const std::string& test_name, float meta_pct, float data_log_pct, float ctrl_log_pct, float data_pct, float index_pct, hs_before_services_starting_cb_t cb, bool restart = false, std::unique_ptr< IndexServiceCallbacks > index_svc_cb = nullptr, - bool default_data_svc_alloc_type = true) { - if (default_data_svc_alloc_type) { - set_data_svc_allocator(homestore::blk_allocator_type_t::varsize); - set_data_svc_chunk_selector(homestore::chunk_selector_type_t::ROUND_ROBIN); - } - + bool default_data_svc_alloc_type = true); +#endif + static void start_homestore(const std::string& test_name, std::map< uint32_t, test_params >&& svc_params, + hs_before_services_starting_cb_t cb = nullptr, bool restart = false) { auto const ndevices = SISL_OPTIONS["num_devs"].as< uint32_t >(); auto const dev_size = SISL_OPTIONS["dev_size_mb"].as< uint64_t >() * 1024 * 1024; auto nthreads = SISL_OPTIONS["num_threads"].as< uint32_t >(); @@ -161,33 +157,44 @@ class HSTestHelper { ioenvironment.with_http_server(); } - if (!index_svc_cb) { index_svc_cb = std::make_unique< TestIndexServiceCallbacks >(); } - const uint64_t app_mem_size = ((ndevices * dev_size) * 15) / 100; LOGINFO("Initialize and start HomeStore with app_mem_size = {}", homestore::in_bytes(app_mem_size)); using namespace homestore; - uint32_t services = 0; - if (meta_pct) { services |= HS_SERVICE::META; } - if (data_log_pct) { services |= HS_SERVICE::LOG_REPLICATED; } - if (ctrl_log_pct) { services |= HS_SERVICE::LOG_LOCAL; } - if (data_pct) { services |= HS_SERVICE::DATA; } - if (index_pct) { services |= HS_SERVICE::INDEX; } - - bool need_format = HomeStore::instance()->start( - hs_input_params{.devices = device_info, .app_mem_size = app_mem_size, .services = services}, std::move(cb), - std::move(index_svc_cb)); + auto hsi = HomeStore::instance(); + for (auto& [svc, tp] : svc_params) { + if (svc == HS_SERVICE::DATA) { + hsi->with_data_service(tp.custom_chunk_selector); + } else if (svc == HS_SERVICE::INDEX) { + hsi->with_index_service(std::unique_ptr< IndexServiceCallbacks >(tp.index_svc_cbs)); + } else if ((svc == HS_SERVICE::LOG_REPLICATED) || (svc == HS_SERVICE::LOG_LOCAL)) { + hsi->with_log_service(); + } else if (svc == HS_SERVICE::REPLICATION) { + hsi->with_repl_data_service(tp.repl_impl, std::unique_ptr< ReplServiceCallbacks >(tp.repl_svc_cbs), + tp.custom_chunk_selector); + } + } + bool need_format = + hsi->start(hs_input_params{.devices = device_info, .app_mem_size = app_mem_size}, std::move(cb)); if (need_format) { - HomeStore::instance()->format_and_start(std::map< uint32_t, hs_format_params >{ - {HS_SERVICE::META, hs_format_params{.size_pct = meta_pct}}, - {HS_SERVICE::LOG_REPLICATED, hs_format_params{.size_pct = data_log_pct}}, - {HS_SERVICE::LOG_LOCAL, hs_format_params{.size_pct = ctrl_log_pct}}, - {HS_SERVICE::DATA, - hs_format_params{ - .size_pct = data_pct, .alloc_type = s_ds_alloc_type, .chunk_sel_type = s_ds_chunk_sel_type}}, - {HS_SERVICE::INDEX, hs_format_params{.size_pct = index_pct}}, - }); + hsi->format_and_start( + {{HS_SERVICE::META, {.size_pct = svc_params[HS_SERVICE::META].size_pct}}, + {HS_SERVICE::LOG_REPLICATED, {.size_pct = svc_params[HS_SERVICE::LOG_REPLICATED].size_pct}}, + {HS_SERVICE::LOG_LOCAL, {.size_pct = svc_params[HS_SERVICE::LOG_LOCAL].size_pct}}, + {HS_SERVICE::DATA, + {.size_pct = svc_params[HS_SERVICE::DATA].size_pct, + .alloc_type = svc_params[HS_SERVICE::DATA].blkalloc_type, + .chunk_sel_type = svc_params[HS_SERVICE::DATA].custom_chunk_selector + ? chunk_selector_type_t::CUSTOM + : chunk_selector_type_t::ROUND_ROBIN}}, + {HS_SERVICE::INDEX, {.size_pct = svc_params[HS_SERVICE::INDEX].size_pct}}, + {HS_SERVICE::REPLICATION, + {.size_pct = svc_params[HS_SERVICE::REPLICATION].size_pct, + .alloc_type = svc_params[HS_SERVICE::REPLICATION].blkalloc_type, + .chunk_sel_type = svc_params[HS_SERVICE::REPLICATION].custom_chunk_selector + ? chunk_selector_type_t::CUSTOM + : chunk_selector_type_t::ROUND_ROBIN}}}); } } @@ -200,10 +207,48 @@ class HSTestHelper { s_dev_names.clear(); } - static void fill_data_buf(uint8_t* buf, uint64_t size) { - for (uint64_t i = 0ul; i < size; ++i) { - *(buf + i) = (i % 256); + static void fill_data_buf(uint8_t* buf, uint64_t size, uint64_t pattern = 0) { + uint64_t* ptr = r_cast< uint64_t* >(buf); + for (uint64_t i = 0ul; i < size / sizeof(uint64_t); ++i) { + *(ptr + i) = (pattern == 0) ? i : pattern; + } + } + + static void validate_data_buf(uint8_t* buf, uint64_t size, uint64_t pattern = 0) { + uint64_t* ptr = r_cast< uint64_t* >(buf); + for (uint64_t i = 0ul; i < size / sizeof(uint64_t); ++i) { + HS_REL_ASSERT_EQ(ptr[i], ((pattern == 0) ? i : pattern), "data_buf mismatch at offset={}", i); + } + } + + static sisl::sg_list create_sgs(uint64_t io_size, uint32_t blk_size, uint32_t max_size_per_iov, + std::optional< uint64_t > fill_data_pattern = std::nullopt) { + HS_REL_ASSERT_EQ(io_size % blk_size, 0, "io_size should be a multiple of blk_size"); + HS_REL_ASSERT_EQ(max_size_per_iov % blk_size, 0, "max_size_per_iov should be a multiple of blk_size"); + + uint32_t const nblks = io_size / blk_size; + uint32_t const max_iov_nblks = std::min(nblks, max_size_per_iov / blk_size); + + static std::random_device s_rd{}; + static std::default_random_engine s_re{s_rd()}; + static std::uniform_int_distribution< uint32_t > iov_nblks_generator{1u, max_iov_nblks}; + + sisl::sg_list sgs; + sgs.size = 0; + uint32_t remain_nblks = nblks; + while (remain_nblks != 0) { + uint32_t iov_nblks = iov_nblks_generator(s_re); + uint32_t iov_len = blk_size * std::min(iov_nblks, remain_nblks); + sgs.iovs.emplace_back(iovec{.iov_base = iomanager.iobuf_alloc(512, iov_len), .iov_len = iov_len}); + sgs.size += iov_nblks * blk_size; + remain_nblks -= iov_nblks; + + if (fill_data_pattern) { + fill_data_buf(uintptr_cast(sgs.iovs.back().iov_base), sgs.iovs.back().iov_len, *fill_data_pattern); + } } + + return sgs; } static bool compare(const sisl::sg_list& sg1, const sisl::sg_list& sg2) { @@ -258,5 +303,3 @@ class HSTestHelper { } }; } // namespace test_common - -// TODO: start_homestore should be moved here and called by each testing binaries diff --git a/src/tests/test_cp_mgr.cpp b/src/tests/test_cp_mgr.cpp index 6558f8f59..0db53f176 100644 --- a/src/tests/test_cp_mgr.cpp +++ b/src/tests/test_cp_mgr.cpp @@ -32,8 +32,6 @@ SISL_LOGGING_INIT(HOMESTORE_LOG_MODS) SISL_OPTIONS_ENABLE(logging, test_cp_mgr, iomgr, test_common_setup) SISL_LOGGING_DECL(test_cp_mgr) std::vector< std::string > test_common::HSTestHelper::s_dev_names; -blk_allocator_type_t test_common::HSTestHelper::s_ds_alloc_type; -chunk_selector_type_t test_common::HSTestHelper::s_ds_chunk_sel_type; SISL_OPTION_GROUP(test_cp_mgr, (num_records, "", "num_records", "number of record to test", @@ -88,7 +86,7 @@ class TestCPCallbacks : public CPCallbacks { class TestCPMgr : public ::testing::Test { public: void SetUp() override { - test_common::HSTestHelper::start_homestore("test_cp", 85.0, 0, 0, 0, 0, nullptr, false /* restart */); + test_common::HSTestHelper::start_homestore("test_cp", {{HS_SERVICE::META, {.size_pct = 85.0}}}); hs()->cp_mgr().register_consumer(cp_consumer_t::HS_CLIENT, std::move(std::make_unique< TestCPCallbacks >())); } void TearDown() override { test_common::HSTestHelper::shutdown_homestore(); } diff --git a/src/tests/test_data_service.cpp b/src/tests/test_data_service.cpp index baf1f3d48..011b80f27 100644 --- a/src/tests/test_data_service.cpp +++ b/src/tests/test_data_service.cpp @@ -50,8 +50,6 @@ SISL_OPTIONS_ENABLE(logging, test_data_service, iomgr, test_common_setup) SISL_LOGGING_DECL(test_data_service) std::vector< std::string > test_common::HSTestHelper::s_dev_names; -blk_allocator_type_t test_common::HSTestHelper::s_ds_alloc_type; -chunk_selector_type_t test_common::HSTestHelper::s_ds_chunk_sel_type; constexpr uint64_t Ki{1024}; constexpr uint64_t Mi{Ki * Ki}; @@ -73,6 +71,13 @@ class BlkDataServiceTest : public testing::Test { public: BlkDataService& inst() { return homestore::data_service(); } + virtual void SetUp() override { + test_common::HSTestHelper::start_homestore( + "test_data_service", {{HS_SERVICE::META, {.size_pct = 5.0}}, {HS_SERVICE::DATA, {.size_pct = 80.0}}}); + } + + virtual void TearDown() override { test_common::HSTestHelper::shutdown_homestore(); } + void free(sisl::sg_list& sg) { test_common::HSTestHelper::free(sg); } // free_blk after read completes @@ -301,9 +306,6 @@ class BlkDataServiceTest : public testing::Test { // single vector in sg_list; // TEST_F(BlkDataServiceTest, TestBasicWrite) { - LOGINFO("Step 0: Starting homestore."); - test_common::HSTestHelper::start_homestore("test_data_service", 5.0, 0, 0, 80.0, 0, nullptr); - // start io in worker thread; const auto io_size = 4 * Ki; LOGINFO("Step 1: run on worker thread to schedule write for {} Bytes.", io_size); @@ -313,13 +315,9 @@ TEST_F(BlkDataServiceTest, TestBasicWrite) { wait_for_all_io_complete(); LOGINFO("Step 3: I/O completed, do shutdown."); - test_common::HSTestHelper::shutdown_homestore(); } TEST_F(BlkDataServiceTest, TestWriteMultiplePagesSingleIov) { - LOGINFO("Step 0: Starting homestore."); - test_common::HSTestHelper::start_homestore("test_data_service", 5.0, 0, 0, 80.0, 0, nullptr); - // start io in worker thread; const auto io_size = 4 * Mi; LOGINFO("Step 1: run on worker thread to schedule write for {} Bytes.", io_size); @@ -329,13 +327,9 @@ TEST_F(BlkDataServiceTest, TestWriteMultiplePagesSingleIov) { wait_for_all_io_complete(); LOGINFO("Step 3: I/O completed, do shutdown."); - test_common::HSTestHelper::shutdown_homestore(); } TEST_F(BlkDataServiceTest, TestWriteMultiplePagesMultiIovs) { - LOGINFO("Step 0: Starting homestore."); - test_common::HSTestHelper::start_homestore("test_data_service", 5.0, 0, 0, 80.0, 0, nullptr); - // start io in worker thread; const auto io_size = 4 * Mi; const auto num_iovs = 4; @@ -347,13 +341,9 @@ TEST_F(BlkDataServiceTest, TestWriteMultiplePagesMultiIovs) { wait_for_all_io_complete(); LOGINFO("Step 3: I/O completed, do shutdown."); - test_common::HSTestHelper::shutdown_homestore(); } TEST_F(BlkDataServiceTest, TestWriteThenReadVerify) { - LOGINFO("Step 0: Starting homestore."); - test_common::HSTestHelper::start_homestore("test_data_service", 5.0, 0, 0, 80.0, 0, nullptr); - // start io in worker thread; auto io_size = 4 * Ki; LOGINFO("Step 1: run on worker thread to schedule write for {} Bytes.", io_size); @@ -363,14 +353,10 @@ TEST_F(BlkDataServiceTest, TestWriteThenReadVerify) { wait_for_all_io_complete(); LOGINFO("Step 4: I/O completed, do shutdown."); - test_common::HSTestHelper::shutdown_homestore(); } // Free_blk test, no read involved; TEST_F(BlkDataServiceTest, TestWriteThenFreeBlk) { - LOGINFO("Step 0: Starting homestore."); - test_common::HSTestHelper::start_homestore("test_data_service", 5.0, 0, 0, 80.0, 0, nullptr); - // start io in worker thread; auto io_size = 4 * Mi; LOGINFO("Step 1: run on worker thread to schedule write for {} Bytes, then free blk.", io_size); @@ -381,16 +367,12 @@ TEST_F(BlkDataServiceTest, TestWriteThenFreeBlk) { wait_for_all_io_complete(); LOGINFO("Step 4: I/O completed, do shutdown."); - test_common::HSTestHelper::shutdown_homestore(); } // // write, read, then free the blk after read completes, free should succeed // TEST_F(BlkDataServiceTest, TestWriteReadThenFreeBlkAfterReadComp) { - LOGINFO("Step 0: Starting homestore."); - test_common::HSTestHelper::start_homestore("test_data_service", 5.0, 0, 0, 80.0, 0, nullptr); - // start io in worker thread; auto io_size = 4 * Ki; LOGINFO("Step 1: Run on worker thread to schedule write for {} Bytes.", io_size); @@ -401,13 +383,9 @@ TEST_F(BlkDataServiceTest, TestWriteReadThenFreeBlkAfterReadComp) { wait_for_all_io_complete(); LOGINFO("Step 3: I/O completed, do shutdown."); - test_common::HSTestHelper::shutdown_homestore(); } TEST_F(BlkDataServiceTest, TestWriteReadThenFreeBeforeReadComp) { - LOGINFO("Step 0: Starting homestore."); - test_common::HSTestHelper::start_homestore("test_data_service", 5.0, 0, 0, 80.0, 0, nullptr); - // start io in worker thread; auto io_size = 4 * Ki; LOGINFO("Step 1: Run on worker thread to schedule write for {} Bytes.", io_size); @@ -418,7 +396,6 @@ TEST_F(BlkDataServiceTest, TestWriteReadThenFreeBeforeReadComp) { wait_for_all_io_complete(); LOGINFO("Step 5: I/O completed, do shutdown."); - test_common::HSTestHelper::shutdown_homestore(); } // Stream related test diff --git a/src/tests/test_index_btree.cpp b/src/tests/test_index_btree.cpp index 50fcfe4dd..4463ebbd4 100644 --- a/src/tests/test_index_btree.cpp +++ b/src/tests/test_index_btree.cpp @@ -41,8 +41,6 @@ SISL_LOGGING_DECL(test_index_btree) std::vector< std::string > test_common::HSTestHelper::s_dev_names; // TODO increase num_entries to 65k as io mgr page size is 512 and its slow. -blk_allocator_type_t test_common::HSTestHelper::s_ds_alloc_type; -chunk_selector_type_t test_common::HSTestHelper::s_ds_chunk_sel_type; SISL_OPTION_GROUP(test_index_btree, (num_iters, "", "num_iters", "number of iterations for rand ops", @@ -92,17 +90,16 @@ struct BtreeTest : public testing::Test { class TestIndexServiceCallbacks : public IndexServiceCallbacks { public: - TestIndexServiceCallbacks(BtreeTest* test, BtreeConfig cfg) : m_test(test), m_bt_cfg(cfg) {} + TestIndexServiceCallbacks(BtreeTest* test) : m_test(test) {} std::shared_ptr< IndexTableBase > on_index_table_found(const superblk< index_table_sb >& sb) override { LOGINFO("Index table recovered"); LOGINFO("Root bnode_id {} version {}", sb->root_node, sb->link_version); - m_test->m_bt = std::make_shared< typename T::BtreeType >(sb, this->m_bt_cfg); + m_test->m_bt = std::make_shared< typename T::BtreeType >(sb, *m_test->m_bt_cfg); return m_test->m_bt; } private: BtreeTest* m_test; - BtreeConfig m_bt_cfg; }; std::shared_ptr< typename T::BtreeType > m_bt; @@ -110,8 +107,10 @@ struct BtreeTest : public testing::Test { std::unique_ptr< BtreeConfig > m_bt_cfg; void SetUp() override { - test_common::HSTestHelper::start_homestore("test_index_btree", 10 /* meta */, 0 /* data log */, 0 /* ctrl log*/, - 0 /* data */, 70 /* index */, nullptr, false /* restart */); + test_common::HSTestHelper::start_homestore( + "test_index_btree", + {{HS_SERVICE::META, {.size_pct = 10.0}}, + {HS_SERVICE::INDEX, {.size_pct = 70.0, .index_svc_cbs = new TestIndexServiceCallbacks(this)}}}); LOGINFO("Node size {} ", hs()->index_service().node_size()); m_bt_cfg = std::make_unique< BtreeConfig >(hs()->index_service().node_size()); @@ -141,6 +140,13 @@ struct BtreeTest : public testing::Test { test_common::HSTestHelper::shutdown_homestore(); } + void restart_homestore() { + test_common::HSTestHelper::start_homestore( + "test_index_btree", + {{HS_SERVICE::META, {}}, {HS_SERVICE::INDEX, {.index_svc_cbs = new TestIndexServiceCallbacks(this)}}}, + nullptr, true /* restart */); + } + void put(uint32_t k, btree_put_type put_type) { auto existing_v = std::make_unique< V >(); auto pk = std::make_unique< K >(k); @@ -340,8 +346,12 @@ struct BtreeTest : public testing::Test { } }; - using BtreeTypes = testing::Types< FixedLenBtreeTest, VarKeySizeBtreeTest, VarValueSizeBtreeTest, VarObjSizeBtreeTest - >; +// TODO sanal fix the varkey issue. +// using BtreeTypes = testing::Types< FixedLenBtreeTest, VarKeySizeBtreeTest, VarValueSizeBtreeTest, +// VarObjSizeBtreeTest +// >; + +using BtreeTypes = testing::Types< FixedLenBtreeTest >; TYPED_TEST_SUITE(BtreeTest, BtreeTypes); @@ -382,7 +392,7 @@ TYPED_TEST(BtreeTest, SequentialInsert) { LOGINFO("Step 8: Do incorrect input and validate errors"); this->query_validate(num_entries + 100, num_entries + 500, 5); this->get_any_validate(num_entries + 1, num_entries + 2); -// this->print(); + // this->print(); LOGINFO("SequentialInsert test end"); } @@ -499,10 +509,8 @@ TYPED_TEST(BtreeTest, CpFlush) { this->destroy_btree(); // Restart homestore. m_bt is updated by the TestIndexServiceCallback. - auto index_svc_cb = std::make_unique< typename TestFixture::TestIndexServiceCallbacks >(this, *this->m_bt_cfg); - test_common::HSTestHelper::start_homestore("test_index_btree", 10 /* meta */, 0 /* data log */, 0 /* ctrl log*/, - 0 /* data */, 70 /* index */, nullptr, true /* restart */, - std::move(index_svc_cb) /* index service callbacks */); + this->restart_homestore(); + std::this_thread::sleep_for(std::chrono::seconds{3}); LOGINFO("Restarted homestore with index recovered"); @@ -547,11 +555,10 @@ TYPED_TEST(BtreeTest, MultipleCpFlush) { this->print(std::string("before.txt")); this->destroy_btree(); + // Restart homestore. m_bt is updated by the TestIndexServiceCallback. - auto index_svc_cb = std::make_unique< typename TestFixture::TestIndexServiceCallbacks >(this, *this->m_bt_cfg); - test_common::HSTestHelper::start_homestore("test_index_btree", 10 /* meta */, 0 /* data log */, 0 /* ctrl log*/, - 0 /* data */, 70 /* index */, nullptr, true /* restart */, - std::move(index_svc_cb) /* index service callbacks */); + this->restart_homestore(); + std::this_thread::sleep_for(std::chrono::seconds{3}); LOGINFO(" Restarted homestore with index recovered"); this->print(std::string("after.txt")); @@ -597,10 +604,8 @@ TYPED_TEST(BtreeTest, ThreadedCpFlush) { this->destroy_btree(); // Restart homestore. m_bt is updated by the TestIndexServiceCallback. - auto index_svc_cb = std::make_unique< typename TestFixture::TestIndexServiceCallbacks >(this, *this->m_bt_cfg); - test_common::HSTestHelper::start_homestore("test_index_btree", 10 /* meta */, 0 /* data log */, 0 /* ctrl log*/, - 0 /* data */, 70 /* index */, nullptr, true /* restart */, - std::move(index_svc_cb) /* index service callbacks */); + this->restart_homestore(); + std::this_thread::sleep_for(std::chrono::seconds{3}); LOGINFO(" Restarted homestore with index recovered"); this->print(std::string("after.txt")); diff --git a/src/tests/test_journal_vdev.cpp b/src/tests/test_journal_vdev.cpp index eff4c7b7f..39abf9b43 100644 --- a/src/tests/test_journal_vdev.cpp +++ b/src/tests/test_journal_vdev.cpp @@ -44,8 +44,7 @@ SISL_LOGGING_INIT(HOMESTORE_LOG_MODS) SISL_OPTIONS_ENABLE(logging, test_vdev, iomgr, test_common_setup) std::vector< std::string > test_common::HSTestHelper::s_dev_names; -blk_allocator_type_t test_common::HSTestHelper::s_ds_alloc_type; -chunk_selector_type_t test_common::HSTestHelper::s_ds_chunk_sel_type; + struct Param { uint64_t num_io; uint64_t run_time; @@ -74,7 +73,12 @@ class VDevIOTest : public ::testing::Test { virtual void SetUp() override { auto const ndevices = SISL_OPTIONS["num_devs"].as< uint32_t >(); auto const dev_size = SISL_OPTIONS["dev_size_mb"].as< uint64_t >() * 1024 * 1024; - test_common::HSTestHelper::start_homestore("test_vdev", 15.0, 75.0, 5.0, 0, 0, nullptr); + + test_common::HSTestHelper::start_homestore("test_journal_vdev", + {{HS_SERVICE::META, {.size_pct = 15.0}}, + {HS_SERVICE::LOG_REPLICATED, {.size_pct = 75.0}}, + {HS_SERVICE::LOG_LOCAL, {.size_pct = 5.0}}}); + m_vdev = hs()->logstore_service().get_vdev(homestore::LogStoreService::DATA_LOG_FAMILY_IDX); } diff --git a/src/tests/test_log_store.cpp b/src/tests/test_log_store.cpp index 8e95a99a8..d7ad85012 100644 --- a/src/tests/test_log_store.cpp +++ b/src/tests/test_log_store.cpp @@ -56,8 +56,7 @@ using namespace homestore; RCU_REGISTER_INIT SISL_LOGGING_INIT(HOMESTORE_LOG_MODS) std::vector< std::string > test_common::HSTestHelper::s_dev_names; -blk_allocator_type_t test_common::HSTestHelper::s_ds_alloc_type; -chunk_selector_type_t test_common::HSTestHelper::s_ds_chunk_sel_type; + struct test_log_data { test_log_data() = default; test_log_data(const test_log_data&) = delete; @@ -431,7 +430,10 @@ class SampleDB { } test_common::HSTestHelper::start_homestore( - "test_log_store", 5.0, 42.0, 42.0, 0, 0, + "test_log_store", + {{HS_SERVICE::META, {.size_pct = 5.0}}, + {HS_SERVICE::LOG_REPLICATED, {.size_pct = 42.0}}, + {HS_SERVICE::LOG_LOCAL, {.size_pct = 42.0}}}, [this, restart, n_log_stores]() { if (restart) { for (uint32_t i{0}; i < n_log_stores; ++i) { diff --git a/src/tests/test_meta_blk_mgr.cpp b/src/tests/test_meta_blk_mgr.cpp index f194c67cd..1e4c211f0 100644 --- a/src/tests/test_meta_blk_mgr.cpp +++ b/src/tests/test_meta_blk_mgr.cpp @@ -48,8 +48,7 @@ using namespace homestore; RCU_REGISTER_INIT SISL_LOGGING_INIT(HOMESTORE_LOG_MODS) std::vector< std::string > test_common::HSTestHelper::s_dev_names; -blk_allocator_type_t test_common::HSTestHelper::s_ds_alloc_type; -chunk_selector_type_t test_common::HSTestHelper::s_ds_chunk_sel_type; + SISL_OPTIONS_ENABLE(logging, test_meta_blk_mgr, iomgr, test_common_setup) SISL_LOGGING_DECL(test_meta_blk_mgr) @@ -97,7 +96,9 @@ class VMetaBlkMgrTest : public ::testing::Test { virtual ~VMetaBlkMgrTest() override = default; protected: - void SetUp() override{}; + void SetUp() override { + test_common::HSTestHelper::start_homestore("test_meta_blk_mgr", {{HS_SERVICE::META, {.size_pct = 85.0}}}); + } void TearDown() override{}; @@ -364,7 +365,7 @@ class VMetaBlkMgrTest : public ::testing::Test { iomanager.iobuf_free(buf); } else { if (unaligned_addr) { - delete[](buf - unaligned_shift); + delete[] (buf - unaligned_shift); } else { delete[] buf; } @@ -450,8 +451,12 @@ class VMetaBlkMgrTest : public ::testing::Test { } void recover() { + // TODO: This scan_blks and recover should be replaced with actual TestHelper::start_homestore with restart + // on. That way, we don't need to simulate all these calls here // do recover and callbacks will be triggered; m_cb_blks.clear(); + hs()->cp_mgr().shutdown(); + hs()->cp_mgr().start(false /* first_time_boot */); m_mbm->recover(false); } @@ -575,7 +580,6 @@ class VMetaBlkMgrTest : public ::testing::Test { static constexpr uint64_t MIN_DRIVE_SIZE{2147483648}; // 2 GB TEST_F(VMetaBlkMgrTest, min_drive_size_test) { - test_common::HSTestHelper::start_homestore("test_meta_blk_mgr", 85.0, 0, 0, 0, 0, nullptr); mtype = "Test_Min_Drive_Size"; this->register_client(); @@ -587,7 +591,6 @@ TEST_F(VMetaBlkMgrTest, min_drive_size_test) { } TEST_F(VMetaBlkMgrTest, write_to_full_test) { - test_common::HSTestHelper::start_homestore("test_meta_blk_mgr", 85.0, 0, 0, 0, 0, nullptr); mtype = "Test_Write_to_Full"; reset_counters(); m_start_time = Clock::now(); @@ -599,7 +602,6 @@ TEST_F(VMetaBlkMgrTest, write_to_full_test) { } TEST_F(VMetaBlkMgrTest, single_read_test) { - test_common::HSTestHelper::start_homestore("test_meta_blk_mgr", 85.0, 0, 0, 0, 0, nullptr); mtype = "Test_Read"; reset_counters(); m_start_time = Clock::now(); @@ -615,7 +617,6 @@ TEST_F(VMetaBlkMgrTest, single_read_test) { // 1. randome write, update, remove; // 2. recovery test and verify callback context data matches; TEST_F(VMetaBlkMgrTest, random_load_test) { - test_common::HSTestHelper::start_homestore("test_meta_blk_mgr", 85.0, 0, 0, 0, 0, nullptr); mtype = "Test_Rand_Load"; reset_counters(); m_start_time = Clock::now(); @@ -643,7 +644,6 @@ TEST_F(VMetaBlkMgrTest, random_load_test) { // 4. After recovery everything should be fine; // TEST_F(VMetaBlkMgrTest, RecoveryFromBadData) { - test_common::HSTestHelper::start_homestore("test_meta_blk_mgr", 85.0, 0, 0, 0, 0, nullptr); mtype = "Test_Recovery_from_bad_data"; reset_counters(); m_start_time = Clock::now(); @@ -685,7 +685,6 @@ TEST_F(VMetaBlkMgrTest, RecoveryFromBadData) { #endif TEST_F(VMetaBlkMgrTest, CompressionBackoff) { - test_common::HSTestHelper::start_homestore("test_meta_blk_mgr", 85.0, 0, 0, 0, 0, nullptr); mtype = "Test_Compression_Backoff"; reset_counters(); m_start_time = Clock::now(); diff --git a/src/tests/test_solo_repl_dev.cpp b/src/tests/test_solo_repl_dev.cpp new file mode 100644 index 000000000..2dfadd254 --- /dev/null +++ b/src/tests/test_solo_repl_dev.cpp @@ -0,0 +1,330 @@ +/********************************************************************************* + * Modifications Copyright 2017-2019 eBay Inc. + * + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + *********************************************************************************/ +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include "common/homestore_config.hpp" +#include "common/homestore_assert.hpp" +#include "common/homestore_utils.hpp" +#include "test_common/homestore_test_common.hpp" +#include "replication/service/repl_service_impl.h" +#include "replication/repl_dev/solo_repl_dev.h" + +//////////////////////////////////////////////////////////////////////////// +// // +// This test is to test solo repl device // +// // +//////////////////////////////////////////////////////////////////////////// + +using namespace homestore; +using namespace test_common; + +SISL_LOGGING_INIT(HOMESTORE_LOG_MODS) +SISL_OPTIONS_ENABLE(logging, test_solo_repl_dev, iomgr, test_common_setup) +SISL_LOGGING_DECL(test_solo_repl_dev) + +std::vector< std::string > test_common::HSTestHelper::s_dev_names; +static thread_local std::random_device g_rd{}; +static thread_local std::default_random_engine g_re{g_rd()}; +static uint32_t g_block_size; + +static constexpr uint64_t Ki{1024}; +static constexpr uint64_t Mi{Ki * Ki}; +static constexpr uint64_t Gi{Ki * Mi}; + +struct Runner { + uint64_t total_tasks{0}; + uint32_t qdepth{8}; + std::atomic< uint64_t > issued_tasks{0}; + std::atomic< uint64_t > pending_tasks{0}; + std::function< void(void) > task; + folly::Promise< folly::Unit > comp_promise; + + Runner() : total_tasks{SISL_OPTIONS["num_io"].as< uint64_t >()} { + if (total_tasks < (uint64_t)qdepth) { total_tasks = qdepth; } + } + + void set_task(std::function< void(void) > f) { task = std::move(f); } + + folly::Future< folly::Unit > execute() { + for (uint32_t i{0}; i < qdepth; ++i) { + run_task(); + } + return comp_promise.getFuture(); + } + + void next_task() { + auto ptasks = pending_tasks.fetch_sub(1) - 1; + if ((issued_tasks.load() < total_tasks)) { + run_task(); + } else if (ptasks == 0) { + comp_promise.setValue(); + } + } + + void run_task() { + ++issued_tasks; + ++pending_tasks; + iomanager.run_on_forget(iomgr::reactor_regex::random_worker, task); + } +}; + +struct rdev_req : boost::intrusive_ref_counter< rdev_req > { + sisl::byte_array header; + sisl::byte_array key; + sisl::sg_list write_sgs; + sisl::sg_list read_sgs; + int64_t lsn; + MultiBlkId written_blkids; + + rdev_req() { + write_sgs.size = 0; + read_sgs.size = 0; + } + ~rdev_req() { + for (auto const& iov : write_sgs.iovs) { + iomanager.iobuf_free(uintptr_cast(iov.iov_base)); + } + + for (auto const& iov : read_sgs.iovs) { + iomanager.iobuf_free(uintptr_cast(iov.iov_base)); + } + } + struct journal_header { + uint32_t key_size; + uint64_t key_pattern; + uint64_t data_size; + uint64_t data_pattern; + }; +}; + +class SoloReplDevTest : public testing::Test { +public: + class Listener : public ReplDevListener { + private: + SoloReplDevTest& m_test; + ReplDev& m_rdev; + + public: + Listener(SoloReplDevTest& test, ReplDev& rdev) : m_test{test}, m_rdev{rdev} {} + virtual ~Listener() = default; + + void on_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, MultiBlkId const& blkids, + void* ctx) override { + if (ctx == nullptr) { + m_test.validate_replay(m_rdev, lsn, header, key, blkids); + } else { + rdev_req* req = r_cast< rdev_req* >(ctx); + req->lsn = lsn; + req->written_blkids = std::move(blkids); + m_test.on_write_complete(m_rdev, intrusive< rdev_req >(req, false)); + } + } + + void on_pre_commit(int64_t lsn, const sisl::blob& header, const sisl::blob& key, void* ctx) override {} + + void on_rollback(int64_t lsn, const sisl::blob& header, const sisl::blob& key, void* ctx) override {} + + blk_alloc_hints get_blk_alloc_hints(sisl::blob const& header, void* user_ctx) override { + return blk_alloc_hints{}; + } + + void on_replica_stop() override {} + }; + + class Callbacks : public ReplServiceCallbacks { + private: + SoloReplDevTest& m_test; + + public: + Callbacks(SoloReplDevTest* test) : m_test{*test} {} + virtual ~Callbacks() = default; + + std::unique_ptr< ReplDevListener > on_repl_dev_init(cshared< ReplDev >& rdev) override { + m_test.found_repl_dev(rdev); + return std::make_unique< Listener >(m_test, *rdev); + } + }; + +protected: + Runner m_runner; + shared< ReplDev > m_repl_dev1; + shared< ReplDev > m_repl_dev2; + +public: + virtual void SetUp() override { + test_common::HSTestHelper::start_homestore( + "test_solo_repl_dev", + {{HS_SERVICE::META, {.size_pct = 5.0}}, + {HS_SERVICE::REPLICATION, + {.size_pct = 60.0, .repl_svc_cbs = new Callbacks(this), .repl_impl = repl_impl_type::solo}}, + {HS_SERVICE::LOG_REPLICATED, {.size_pct = 20.0}}, + {HS_SERVICE::LOG_LOCAL, {.size_pct = 2.0}}}); + hs()->repl_service().create_replica_dev(hs_utils::gen_random_uuid(), {}); + hs()->repl_service().create_replica_dev(hs_utils::gen_random_uuid(), {}); + } + + virtual void TearDown() override { + m_repl_dev1.reset(); + m_repl_dev2.reset(); + test_common::HSTestHelper::shutdown_homestore(); + } + + void restart() { + m_repl_dev1.reset(); + m_repl_dev2.reset(); + test_common::HSTestHelper::start_homestore( + "test_solo_repl_dev", + {{HS_SERVICE::REPLICATION, {.repl_svc_cbs = new Callbacks(this), .repl_impl = repl_impl_type::solo}}, + {HS_SERVICE::LOG_REPLICATED, {}}, + {HS_SERVICE::LOG_LOCAL, {}}}, + nullptr, true /* restart */); + } + + void found_repl_dev(cshared< ReplDev >& rdev) { + if (m_repl_dev1 == nullptr) { + m_repl_dev1 = rdev; + } else { + HS_REL_ASSERT_EQ((void*)m_repl_dev2.get(), (void*)nullptr, "More than one replica dev reported"); + m_repl_dev2 = rdev; + } + } + + void write_io(uint32_t key_size, uint64_t data_size, uint32_t max_size_per_iov) { + auto req = intrusive< rdev_req >(new rdev_req()); + req->header = sisl::make_byte_array(sizeof(rdev_req::journal_header)); + auto hdr = r_cast< rdev_req::journal_header* >(req->header->bytes); + hdr->key_size = key_size; + hdr->key_pattern = ((long long)rand() << 32) | rand(); + hdr->data_size = data_size; + hdr->data_pattern = ((long long)rand() << 32) | rand(); + + if (key_size != 0) { + req->key = sisl::make_byte_array(key_size); + HSTestHelper::fill_data_buf(req->key->bytes, key_size, hdr->key_pattern); + } + + if (data_size != 0) { + req->write_sgs = HSTestHelper::create_sgs(data_size, g_block_size, max_size_per_iov, hdr->data_pattern); + } + + auto& rdev = (rand() % 2) ? m_repl_dev1 : m_repl_dev2; + intrusive_ptr_add_ref(req.get()); + rdev->async_alloc_write(*req->header, req->key ? *req->key : sisl::blob{}, req->write_sgs, (void*)req.get()); + } + + void validate_replay(ReplDev& rdev, int64_t lsn, sisl::blob const& header, sisl::blob const& key, + MultiBlkId const& blkids) { + auto jhdr = r_cast< rdev_req::journal_header* >(header.bytes); + HSTestHelper::validate_data_buf(key.bytes, key.size, jhdr->key_pattern); + + uint32_t size = blkids.blk_count() * g_block_size; + if (size) { + auto read_sgs = HSTestHelper::create_sgs(size, g_block_size, size); + rdev.async_read(blkids, read_sgs, size).thenValue([jhdr, read_sgs](auto&& err) { + RELEASE_ASSERT(!err, "Error during async_read"); + HS_REL_ASSERT_EQ(jhdr->data_size, read_sgs.size, "journal hdr data size mismatch with actual size"); + + for (auto const& iov : read_sgs.iovs) { + HSTestHelper::validate_data_buf(uintptr_cast(iov.iov_base), iov.iov_len, jhdr->data_pattern); + } + }); + } + } + + void on_write_complete(ReplDev& rdev, intrusive< rdev_req > req) { + // If we did send some data to the repl_dev, validate it by doing async_read + if (req->write_sgs.size != 0) { + req->read_sgs = HSTestHelper::create_sgs(req->write_sgs.size, g_block_size, req->write_sgs.size); + + rdev.async_read(req->written_blkids, req->read_sgs, req->read_sgs.size) + .thenValue([this, &rdev, req](auto&& err) { + RELEASE_ASSERT(!err, "Error during async_read"); + + LOGDEBUG("Write complete with lsn={} for size={}", req->lsn, req->write_sgs.size); + auto hdr = r_cast< rdev_req::journal_header* >(req->header->bytes); + HS_REL_ASSERT_EQ(hdr->data_size, req->read_sgs.size, + "journal hdr data size mismatch with actual size"); + + for (auto const& iov : req->read_sgs.iovs) { + HSTestHelper::validate_data_buf(uintptr_cast(iov.iov_base), iov.iov_len, hdr->data_pattern); + } + m_runner.next_task(); + }); + } else { + m_runner.next_task(); + } + } +}; + +TEST_F(SoloReplDevTest, TestSingleDataBlock) { + LOGINFO("Step 1: run on worker threads to schedule write for {} Bytes.", g_block_size); + this->m_runner.set_task([this]() { this->write_io(0u, g_block_size, g_block_size); }); + this->m_runner.execute().get(); + + LOGINFO("Step 2: Restart homestore and validate replay data.", g_block_size); + restart(); +} + +TEST_F(SoloReplDevTest, TestRandomSizedDataBlock) { + LOGINFO("Step 1: run on worker threads to schedule write for random bytes ranging {}-{}.", 0, 1 * Mi); + this->m_runner.set_task([this]() { + uint32_t nblks = rand() % ((1 * Mi) / g_block_size); + uint32_t key_size = rand() % 512 + 8; + this->write_io(key_size, nblks * g_block_size, g_block_size); + }); + this->m_runner.execute().get(); + restart(); +} + +TEST_F(SoloReplDevTest, TestHeaderOnly) { + LOGINFO("Step 1: run on worker threads to schedule write"); + this->m_runner.set_task([this]() { this->write_io(0u, 0u, g_block_size); }); + this->m_runner.execute().get(); + restart(); +} + +SISL_OPTION_GROUP(test_solo_repl_dev, + (num_io, "", "num_io", "number of io", ::cxxopts::value< uint64_t >()->default_value("300"), + "number"), + (block_size, "", "block_size", "block size to io", + ::cxxopts::value< uint32_t >()->default_value("4096"), "number")); + +int main(int argc, char* argv[]) { + int parsed_argc{argc}; + ::testing::InitGoogleTest(&parsed_argc, argv); + SISL_OPTIONS_LOAD(parsed_argc, argv, logging, test_solo_repl_dev, iomgr, test_common_setup); + sisl::logging::SetLogger("test_solo_repl_dev"); + spdlog::set_pattern("[%D %T%z] [%^%l%$] [%n] [%t] %v"); + + g_block_size = SISL_OPTIONS["block_size"].as< uint32_t >(); + return RUN_ALL_TESTS(); +}