diff --git a/conanfile.py b/conanfile.py index 83e03ce9d..c0bf4d834 100644 --- a/conanfile.py +++ b/conanfile.py @@ -5,7 +5,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "4.4.1" + version = "4.5.1" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" diff --git a/src/include/homestore/btree/btree.hpp b/src/include/homestore/btree/btree.hpp index 22c460006..63056c413 100644 --- a/src/include/homestore/btree/btree.hpp +++ b/src/include/homestore/btree/btree.hpp @@ -66,7 +66,7 @@ class Btree { // to overcome the gcc bug, pointer here: https://gcc.gnu.org/bugzilla/show_bug.cgi?id=66944 static BtreeThreadVariables* bt_thread_vars() { auto this_id(boost::this_fiber::get_id()); - static thread_local std::map< fibers::fiber::id, std::unique_ptr< BtreeThreadVariables > > fiber_map; + static thread_local std::map< boost::fibers::fiber::id, std::unique_ptr< BtreeThreadVariables > > fiber_map; if (fiber_map.count(this_id)) { return fiber_map[this_id].get(); } fiber_map[this_id] = std::make_unique< BtreeThreadVariables >(); return fiber_map[this_id].get(); diff --git a/src/include/homestore/btree/detail/simple_node.hpp b/src/include/homestore/btree/detail/simple_node.hpp index e44fae02f..8dbbc62a6 100644 --- a/src/include/homestore/btree/detail/simple_node.hpp +++ b/src/include/homestore/btree/detail/simple_node.hpp @@ -262,6 +262,7 @@ class SimpleNode : public BtreeNode { } std::string to_string_keys(bool print_friendly = false) const override { +#if 0 std::string delimiter = print_friendly ? "\n" : "\t"; auto str = fmt::format("{}{} nEntries={} {} ", print_friendly ? "------------------------------------------------------------\n" : "", @@ -314,6 +315,8 @@ class SimpleNode : public BtreeNode { fmt::format_to(std::back_inserter(str), "-{}]", cur_key); } return str; +#endif + return {}; } uint8_t* get_node_context() override { return uintptr_cast(this) + sizeof(SimpleNode< K, V >); } diff --git a/src/include/homestore/btree/detail/varlen_node.hpp b/src/include/homestore/btree/detail/varlen_node.hpp index 4ac896335..9c89a89c0 100644 --- a/src/include/homestore/btree/detail/varlen_node.hpp +++ b/src/include/homestore/btree/detail/varlen_node.hpp @@ -518,6 +518,7 @@ class VariableNode : public BtreeNode { } std::string to_string_keys(bool print_friendly = false) const override { +#if 0 std::string delimiter = print_friendly ? "\n" : "\t"; auto str = fmt::format("{}{} nEntries={} {} ", print_friendly ? "------------------------------------------------------------\n" : "", @@ -570,6 +571,8 @@ class VariableNode : public BtreeNode { fmt::format_to(std::back_inserter(str), "-{}]", cur_key); } return str; +#endif + return {}; } uint8_t* get_node_context() override { return uintptr_cast(this) + sizeof(VariableNode< K, V >); } diff --git a/src/include/homestore/homestore.hpp b/src/include/homestore/homestore.hpp index b03a62dc6..cd4b46f8e 100644 --- a/src/include/homestore/homestore.hpp +++ b/src/include/homestore/homestore.hpp @@ -44,9 +44,8 @@ class MetaBlkService; class LogStoreService; class BlkDataService; class IndexService; -class ReplicationServiceImpl; +class ReplicationService; class IndexServiceCallbacks; -class ReplServiceCallbacks; struct vdev_info; class HomeStore; class CPManager; @@ -114,7 +113,7 @@ class HomeStore { std::unique_ptr< MetaBlkService > m_meta_service; std::unique_ptr< LogStoreService > m_log_service; std::unique_ptr< IndexService > m_index_service; - std::unique_ptr< ReplicationServiceImpl > m_repl_service; + std::unique_ptr< ReplicationService > m_repl_service; std::unique_ptr< DeviceManager > m_dev_mgr; shared< sisl::logging::logger_t > m_periodic_logger; @@ -144,7 +143,7 @@ class HomeStore { HomeStore& with_data_service(cshared< ChunkSelector >& custom_chunk_selector = nullptr); HomeStore& with_log_service(); HomeStore& with_index_service(std::unique_ptr< IndexServiceCallbacks > cbs); - HomeStore& with_repl_data_service(repl_impl_type repl_type, std::unique_ptr< ReplServiceCallbacks > cbs, + HomeStore& with_repl_data_service(repl_impl_type repl_type, cshared< ChunkSelector >& custom_chunk_selector = nullptr); bool start(const hs_input_params& input, hs_before_services_starting_cb_t svcs_starting_cb = nullptr); @@ -165,7 +164,7 @@ class HomeStore { MetaBlkService& meta_service() { return *m_meta_service; } LogStoreService& logstore_service() { return *m_log_service; } IndexService& index_service() { return *m_index_service; } - ReplicationServiceImpl& repl_service() { return *m_repl_service; } + ReplicationService& repl_service() { return *m_repl_service; } DeviceManager* device_mgr() { return m_dev_mgr.get(); } ResourceMgr& resource_mgr() { return *m_resource_mgr.get(); } CPManager& cp_mgr() { return *m_cp_mgr.get(); } diff --git a/src/include/homestore/homestore_decl.hpp b/src/include/homestore/homestore_decl.hpp index 5cd177d30..99c6f234e 100644 --- a/src/include/homestore/homestore_decl.hpp +++ b/src/include/homestore/homestore_decl.hpp @@ -55,6 +55,9 @@ using unique = std::unique_ptr< T >; template < typename T > using intrusive = boost::intrusive_ptr< T >; +template < typename T > +using cintrusive = const boost::intrusive_ptr< T >; + ////////////// All Size Limits /////////////////// constexpr uint32_t BLK_NUM_BITS{32}; constexpr uint32_t NBLKS_BITS{8}; diff --git a/src/include/homestore/index/wb_cache_base.hpp b/src/include/homestore/index/wb_cache_base.hpp index 6079b5277..f59d4b1ce 100644 --- a/src/include/homestore/index/wb_cache_base.hpp +++ b/src/include/homestore/index/wb_cache_base.hpp @@ -31,6 +31,8 @@ struct CPContext; class IndexWBCacheBase { public: + virtual ~IndexWBCacheBase() = default; + /// @brief Allocate the buffer and initialize the btree node. It adds the node to the wb cache. /// @tparam K Key type of the Index /// @param node_initializer Callback to be called upon which buffer is turned into btree node diff --git a/src/include/homestore/replication/repl_dev.h b/src/include/homestore/replication/repl_dev.h index d3ab9ea46..5bd781928 100644 --- a/src/include/homestore/replication/repl_dev.h +++ b/src/include/homestore/replication/repl_dev.h @@ -1,11 +1,35 @@ #pragma once -// #include +#include +#include #include #include namespace homestore { +class ReplDev; + +struct repl_journal_entry; +struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost::thread_safe_counter > { + friend class SoloReplDev; + +public: + virtual ~repl_req_ctx(); + int64_t get_lsn() const { return lsn; } + +private: + sisl::blob header; // User header + sisl::blob key; // Key to replicate + sisl::sg_list value; // Raw value - applicable only to leader req + MultiBlkId local_blkid; // List of corresponding local blkids for the value + RemoteBlkId remote_blkid; // List of remote blkid for the value + std::unique_ptr< uint8_t[] > journal_buf; // Buf for the journal entry + repl_journal_entry* journal_entry{nullptr}; // pointer to the journal entry + int64_t lsn{0}; // Lsn for this replication req + + void alloc_journal_entry(uint32_t size); +}; + // // Callbacks to be implemented by ReplDev users. // @@ -13,6 +37,9 @@ class ReplDevListener { public: virtual ~ReplDevListener() = default; + void set_repl_dev(ReplDev* rdev) { m_repl_dev = std::move(rdev); } + virtual ReplDev* repl_dev() { return m_repl_dev; } + /// @brief Called when the log entry has been committed in the replica set. /// /// This function is called from a dedicated commit thread which is different from the original thread calling @@ -22,10 +49,10 @@ class ReplDevListener { /// @param header - Header originally passed with replica_set::write() api /// @param key - Key originally passed with replica_set::write() api /// @param blkids - List of blkids where data is written to the storage engine. - /// @param ctx - User contenxt passed as part of the replica_set::write() api + /// @param ctx - Context passed as part of the replica_set::write() api /// virtual void on_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, MultiBlkId const& blkids, - void* ctx) = 0; + cintrusive< repl_req_ctx >& ctx) = 0; /// @brief Called when the log entry has been received by the replica dev. /// @@ -44,8 +71,9 @@ class ReplDevListener { /// @param lsn - The log sequence number /// @param header - Header originally passed with repl_dev::write() api /// @param key - Key originally passed with repl_dev::write() api - /// @param ctx - User contenxt passed as part of the repl_dev::write() api - virtual void on_pre_commit(int64_t lsn, const sisl::blob& header, const sisl::blob& key, void* ctx) = 0; + /// @param ctx - Context passed as part of the replica_set::write() api + virtual bool on_pre_commit(int64_t lsn, const sisl::blob& header, const sisl::blob& key, + cintrusive< repl_req_ctx >& ctx) = 0; /// @brief Called when the log entry has been rolled back by the replica set. /// @@ -59,8 +87,9 @@ class ReplDevListener { /// @param lsn - The log sequence number getting rolled back /// @param header - Header originally passed with repl_dev::write() api /// @param key - Key originally passed with repl_dev::write() api - /// @param ctx - User contenxt passed as part of the repl_dev::write() api - virtual void on_rollback(int64_t lsn, const sisl::blob& header, const sisl::blob& key, void* ctx) = 0; + /// @param ctx - Context passed as part of the replica_set::write() api + virtual void on_rollback(int64_t lsn, const sisl::blob& header, const sisl::blob& key, + cintrusive< repl_req_ctx >& ctx) = 0; /// @brief Called when replication module is trying to allocate a block to write the value /// @@ -68,12 +97,16 @@ class ReplDevListener { /// value. Caller is expected to provide hints for allocation based on the header supplied as part of original /// write. In cases where caller don't care about the hints can return default blk_alloc_hints. /// - /// @param header Header originally passed with repl_dev::write() api on the leader + /// @param header Header originally passed with repl_dev::async_alloc_write() api on the leader + /// @param Original context passed as part of repl_dev::async_alloc_write /// @return Expected to return blk_alloc_hints for this write - virtual blk_alloc_hints get_blk_alloc_hints(sisl::blob const& header, void* user_ctx) = 0; + virtual blk_alloc_hints get_blk_alloc_hints(sisl::blob const& header, cintrusive< repl_req_ctx >& ctx) = 0; /// @brief Called when the replica set is being stopped virtual void on_replica_stop() = 0; + +private: + ReplDev* m_repl_dev; }; class ReplDev { @@ -98,10 +131,10 @@ class ReplDev { /// cases /// @param value - vector of io buffers that contain value for the key. It is an optional field and if the value /// list size is 0, then only key is written to replicadev without data. - /// @param user_ctx - User supplied opaque context which will be passed to listener + /// @param ctx - User supplied context which will be passed to listener /// callbacks virtual void async_alloc_write(sisl::blob const& header, sisl::blob const& key, sisl::sg_list const& value, - void* user_ctx) = 0; + intrusive< repl_req_ctx > ctx) = 0; /// @brief Reads the data and returns a future to continue on /// @param bid Block id to read @@ -130,6 +163,8 @@ class ReplDev { virtual void attach_listener(std::unique_ptr< ReplDevListener > listener) { m_listener = std::move(listener); } + virtual uint32_t get_blk_size() const = 0; + protected: std::unique_ptr< ReplDevListener > m_listener; }; diff --git a/src/include/homestore/replication_service.hpp b/src/include/homestore/replication_service.hpp index 276b92a31..eb88cccfe 100644 --- a/src/include/homestore/replication_service.hpp +++ b/src/include/homestore/replication_service.hpp @@ -36,7 +36,7 @@ template < typename V, typename E > using Result = folly::Expected< V, E >; template < class V, class E > -using AsyncResult = folly::SemiFuture< Result< V, E > >; +using AsyncResult = folly::Future< Result< V, E > >; template < class V > using ReplResult = Result< V, ReplServiceError >; @@ -44,26 +44,45 @@ using ReplResult = Result< V, ReplServiceError >; template < class V > using AsyncReplResult = AsyncResult< V, ReplServiceError >; -class ReplServiceCallbacks { -public: - virtual ~ReplServiceCallbacks() = default; - virtual std::unique_ptr< ReplDevListener > on_repl_dev_init(cshared< ReplDev >& rs) = 0; -}; - class ReplicationService { public: ReplicationService() = default; virtual ~ReplicationService() = default; - /// Sync APIs - virtual ReplResult< shared< ReplDev > > get_replica_dev(uuid_t group_id) const = 0; - virtual void iterate_replica_devs(std::function< void(cshared< ReplDev >&) > const& cb) = 0; + /// @brief Creates the Repl Device to which eventually user can read locally and write to the quorom of the members + /// @param group_id Unique ID indicating the group. This is the key for several lookup structures + /// @param members List of members to form this group + /// @param listener state machine listener of all the events happening on the repl_dev (commit, precommit etc) + /// @return A Future ReplDev on success or Future ReplServiceError upon error + virtual AsyncReplResult< shared< ReplDev > > create_repl_dev(uuid_t group_id, + std::set< std::string, std::less<> >&& members, + std::unique_ptr< ReplDevListener > listener) = 0; + + /// @brief Opens the Repl Device for a given group id. It is expected that the repl dev is already created and used + /// this method for recovering. It is possible that repl_dev is not ready and in that case it will provide Repl + /// Device after it is ready and thus returns a Future. + /// + /// NOTE 1: If callers does an open for a repl device which was not created before, then at the end of + /// initialization an error is returned saying ReplServiceError::SERVER_NOT_FOUND + /// + /// NOTE 2: If the open repl device is called after Replication service is started, then it returns an error + /// ReplServiceError::BAD_REQUEST + /// @param group_id Group id to open the repl device with + /// @param listener state machine listener of all the events happening on the repl_dev (commit, precommit etc) + /// @return A Future ReplDev on successful open of ReplDev or Future ReplServiceError upon error + virtual AsyncReplResult< shared< ReplDev > > open_repl_dev(uuid_t group_id, + std::unique_ptr< ReplDevListener > listener) = 0; + + virtual folly::Future< ReplServiceError > replace_member(uuid_t group_id, std::string const& member_out, + std::string const& member_in) const = 0; - /// Async APIs - virtual AsyncReplResult< shared< ReplDev > > create_replica_dev(uuid_t group_id, - std::set< std::string, std::less<> >&& members) = 0; + /// @brief Get the repl dev for a given group id if it is already created or opened + /// @param group_id Group id interested in + /// @return ReplDev is opened or ReplServiceError::SERVER_NOT_FOUND if it doesn't exist + virtual ReplResult< shared< ReplDev > > get_repl_dev(uuid_t group_id) const = 0; - virtual folly::SemiFuture< ReplServiceError > replace_member(uuid_t group_id, std::string const& member_out, - std::string const& member_in) const = 0; + /// @brief Iterate over all repl devs and then call the callback provided + /// @param cb Callback with repl dev + virtual void iterate_repl_devs(std::function< void(cshared< ReplDev >&) > const& cb) = 0; }; } // namespace homestore diff --git a/src/lib/blkalloc/blk.cpp b/src/lib/blkalloc/blk.cpp index 29507dcf3..8f5276062 100644 --- a/src/lib/blkalloc/blk.cpp +++ b/src/lib/blkalloc/blk.cpp @@ -110,6 +110,14 @@ void MultiBlkId::deserialize(sisl::blob const& b, bool copy) { } } +#if 0 +static uint32_t MultiBlkId::expected_serialized_size(uint16_t num_pieces) { + uint32_t sz = BlkId::expected_serialized_size(); + if (num_pieces > 1) { sz += sizeof(uint16_t) + ((num_pieces - 1) * sizeof(chain_blkid)); } + return sz; +} +#endif + uint16_t MultiBlkId::num_pieces() const { return BlkId::is_valid() ? n_addln_piece + 1 : 0; } bool MultiBlkId::has_room() const { return (n_addln_piece < max_addln_pieces); } diff --git a/src/lib/blkdata_svc/blk_read_tracker.cpp b/src/lib/blkdata_svc/blk_read_tracker.cpp index 1d90618a9..6111689c1 100644 --- a/src/lib/blkdata_svc/blk_read_tracker.cpp +++ b/src/lib/blkdata_svc/blk_read_tracker.cpp @@ -28,59 +28,38 @@ void BlkReadTracker::merge(const BlkId& blkid, int64_t new_ref_count, const std::shared_ptr< blk_track_waiter >& waiter) { HS_DBG_ASSERT(new_ref_count ? waiter == nullptr : waiter != nullptr, "Invalid waiter"); - // - // Don't move alignment handling outside of this function, because the nblks between (first and last blk num after - // alignment) could be larger than 255 which exceeds a BlkId can hold; - // - auto cur_blk_num_aligned = s_cast< blk_num_t >(sisl::round_down(blkid.blk_num(), entries_per_record())); - auto last_blk_num_aligned_up = - s_cast< blk_num_t >(sisl::round_up(blkid.blk_num() + blkid.blk_count() + 1, entries_per_record()) - 1); + auto cur_base_blk_num = s_cast< blk_num_t >(sisl::round_down(blkid.blk_num(), entries_per_record())); + auto last_base_blk_num = + s_cast< blk_num_t >(sisl::round_down(blkid.blk_num() + blkid.blk_count() - 1, entries_per_record())); [[maybe_unused]] bool waiter_rescheduled{false}; // everything is aligned after this point, so we don't need to handle sub_range in a base blkid; - while (cur_blk_num_aligned <= last_blk_num_aligned_up) { - BlkId base_blkid{cur_blk_num_aligned, entries_per_record(), blkid.chunk_num()}; - - BlkTrackRecord rec; - const auto rec_found = m_pending_reads_map.get(base_blkid, rec); - - if (new_ref_count != 0) { - // this is insert/remove operations - if (rec_found) { - // if some read is already happening on this record, just update the ref_cnt; + while (cur_base_blk_num <= last_base_blk_num) { + BlkId base_blkid{cur_base_blk_num, entries_per_record(), blkid.chunk_num()}; + + if (new_ref_count > 0) { + // This is an insert operation + m_pending_reads_map.upsert_or_delete(base_blkid, + [&base_blkid, new_ref_count](BlkTrackRecord& rec, bool existing) { + if (!existing) { rec.m_key = base_blkid; } + rec.m_ref_cnt += new_ref_count; + return false; + }); + } else if (new_ref_count < 0) { + // This is a remove operation + m_pending_reads_map.upsert_or_delete(base_blkid, [new_ref_count](BlkTrackRecord& rec, bool existing) { + HS_DBG_ASSERT_EQ(existing, true, "Decrement a ref count which does not exist in map"); rec.m_ref_cnt += new_ref_count; - } else { - // if no record found, no read is happening on this record; - rec.m_key = base_blkid; - rec.m_ref_cnt = new_ref_count; - } - - // in either case, ref_cnt can not drop below zero; - HS_DBG_ASSERT_GE(rec.m_ref_cnt, 0); - - if (rec.m_ref_cnt > 0) { - m_pending_reads_map.upsert(base_blkid, rec); - } else { - // ref_cnt drops to zero, clear all the references held by this record; - HS_DBG_ASSERT_EQ(rec_found, true); - rec.m_waiters.clear(); - BlkTrackRecord dummy_rec; - m_pending_reads_map.erase(base_blkid, dummy_rec); - } + return (rec.m_ref_cnt == 0); + }); } else { // this is wait_on operation - if (rec_found) { - // apply waiter to this record; + m_pending_reads_map.update(base_blkid, [&waiter_rescheduled, &waiter](BlkTrackRecord& rec) { rec.m_waiters.push_back(waiter); - // overwirte existing record; - m_pending_reads_map.upsert(base_blkid, rec); waiter_rescheduled = true; - } - - // not found, nothing needs to be done; fall through and visit remaining records; + }); } - - cur_blk_num_aligned += entries_per_record(); + cur_base_blk_num += entries_per_record(); } #ifdef _PRERELEASE diff --git a/src/lib/homestore.cpp b/src/lib/homestore.cpp index 6d7f6868f..92efbe652 100644 --- a/src/lib/homestore.cpp +++ b/src/lib/homestore.cpp @@ -58,7 +58,6 @@ HomeStoreSafePtr HomeStore::s_instance{nullptr}; static std::unique_ptr< IndexServiceCallbacks > s_index_cbs; static repl_impl_type s_repl_impl_type{repl_impl_type::solo}; -static std::unique_ptr< ReplServiceCallbacks > s_repl_cbs; shared< ChunkSelector > s_custom_chunk_selector{nullptr}; HomeStore* HomeStore::instance() { @@ -84,12 +83,11 @@ HomeStore& HomeStore::with_log_service() { return *this; } -HomeStore& HomeStore::with_repl_data_service(repl_impl_type repl_type, std::unique_ptr< ReplServiceCallbacks > cbs, +HomeStore& HomeStore::with_repl_data_service(repl_impl_type repl_type, cshared< ChunkSelector >& custom_chunk_selector) { m_services.svcs |= HS_SERVICE::REPLICATION | HS_SERVICE::LOG_REPLICATED | HS_SERVICE::LOG_LOCAL; m_services.svcs &= ~HS_SERVICE::DATA; // ReplicationDataSvc or DataSvc are mutually exclusive s_repl_impl_type = repl_type; - s_repl_cbs = std::move(cbs); s_custom_chunk_selector = std::move(custom_chunk_selector); return *this; } @@ -130,7 +128,7 @@ bool HomeStore::start(const hs_input_params& input, hs_before_services_starting_ if (has_data_service()) { m_data_service = std::make_unique< BlkDataService >(std::move(s_custom_chunk_selector)); } if (has_index_service()) { m_index_service = std::make_unique< IndexService >(std::move(s_index_cbs)); } if (has_repl_data_service()) { - m_repl_service = std::make_unique< ReplicationServiceImpl >(s_repl_impl_type, std::move(s_repl_cbs)); + m_repl_service = std::make_unique< ReplicationServiceImpl >(s_repl_impl_type); m_data_service = std::make_unique< BlkDataService >(std::move(s_custom_chunk_selector)); } m_cp_mgr = std::make_unique< CPManager >(); @@ -205,7 +203,7 @@ void HomeStore::do_start() { m_data_service->start(); } else if (has_repl_data_service()) { m_data_service->start(); - m_repl_service->start(); + s_cast< ReplicationServiceImpl* >(m_repl_service.get())->start(); } // In case of custom recovery, let consumer starts the recovery and it is consumer module's responsibilities @@ -228,7 +226,7 @@ void HomeStore::shutdown() { if (has_data_service()) { m_data_service.reset(); } if (has_repl_data_service()) { - m_repl_service->stop(); + s_cast< ReplicationServiceImpl* >(m_repl_service.get())->stop(); m_repl_service.reset(); } m_dev_mgr->close_devices(); diff --git a/src/lib/replication/repl_dev/solo_repl_dev.cpp b/src/lib/replication/repl_dev/solo_repl_dev.cpp index 5ee1b6137..653ef5302 100644 --- a/src/lib/replication/repl_dev/solo_repl_dev.cpp +++ b/src/lib/replication/repl_dev/solo_repl_dev.cpp @@ -5,7 +5,8 @@ #include "replication/repl_dev/solo_repl_dev.h" namespace homestore { -SoloReplDev::SoloReplDev(superblk< repl_dev_superblk > const& rd_sb, bool load_existing) : m_rd_sb{rd_sb} { +SoloReplDev::SoloReplDev(superblk< repl_dev_superblk > const& rd_sb, bool load_existing) : + m_rd_sb{rd_sb}, m_group_id{m_rd_sb->gid} { if (load_existing) { logstore_service().open_log_store(LogStoreService::DATA_LOG_FAMILY_IDX, m_rd_sb->data_journal_id, true, bind_this(SoloReplDev::on_data_journal_created, 1)); @@ -23,18 +24,17 @@ void SoloReplDev::on_data_journal_created(shared< HomeLogStore > log_store) { } void SoloReplDev::async_alloc_write(sisl::blob const& header, sisl::blob const& key, sisl::sg_list const& value, - void* user_ctx) { - auto rreq = intrusive< repl_req >(new repl_req{}); + intrusive< repl_req_ctx > rreq) { + if (!rreq) { auto rreq = intrusive< repl_req_ctx >(new repl_req_ctx{}); } rreq->header = header; rreq->key = key; rreq->value = std::move(value); - rreq->user_ctx = user_ctx; // If it is header only entry, directly write to the journal if (rreq->value.size) { // Step 1: Alloc Blkid - auto status = data_service().alloc_blks( - uint32_cast(rreq->value.size), m_listener->get_blk_alloc_hints(rreq->header, user_ctx), rreq->local_blkid); + auto status = data_service().alloc_blks(uint32_cast(rreq->value.size), + m_listener->get_blk_alloc_hints(rreq->header, rreq), rreq->local_blkid); HS_REL_ASSERT_EQ(status, BlkAllocStatus::SUCCESS); // Write the data @@ -49,9 +49,9 @@ void SoloReplDev::async_alloc_write(sisl::blob const& header, sisl::blob const& } } -void SoloReplDev::write_journal(intrusive< repl_req > rreq) { - uint32_t entry_size = - sizeof(repl_journal_entry) + rreq->header.size + rreq->key.size + rreq->local_blkid.serialized_size(); +void SoloReplDev::write_journal(intrusive< repl_req_ctx > rreq) { + uint32_t entry_size = sizeof(repl_journal_entry) + rreq->header.size + rreq->key.size + + (rreq->value.size ? rreq->local_blkid.serialized_size() : 0); rreq->alloc_journal_entry(entry_size); rreq->journal_entry->code = journal_type_t::HS_DATA; rreq->journal_entry->user_header_size = rreq->header.size; @@ -74,17 +74,17 @@ void SoloReplDev::write_journal(intrusive< repl_req > rreq) { raw_ptr += b.size; } - m_data_journal->append_async( - sisl::io_blob{rreq->journal_buf.get(), entry_size, false /* is_aligned */}, nullptr /* cookie */, - [this, rreq](int64_t lsn, sisl::io_blob&, homestore::logdev_key, void*) { - rreq->lsn = lsn; - m_listener->on_pre_commit(rreq->lsn, rreq->header, rreq->key, rreq->user_ctx); + m_data_journal->append_async(sisl::io_blob{rreq->journal_buf.get(), entry_size, false /* is_aligned */}, + nullptr /* cookie */, + [this, rreq](int64_t lsn, sisl::io_blob&, homestore::logdev_key, void*) mutable { + rreq->lsn = lsn; + m_listener->on_pre_commit(rreq->lsn, rreq->header, rreq->key, rreq); - auto cur_lsn = m_commit_upto.load(); - if (cur_lsn < lsn) { m_commit_upto.compare_exchange_strong(cur_lsn, lsn); } + auto cur_lsn = m_commit_upto.load(); + if (cur_lsn < lsn) { m_commit_upto.compare_exchange_strong(cur_lsn, lsn); } - m_listener->on_commit(rreq->lsn, rreq->header, rreq->key, rreq->local_blkid, rreq->user_ctx); - }); + m_listener->on_commit(rreq->lsn, rreq->header, rreq->key, rreq->local_blkid, rreq); + }); } void SoloReplDev::on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx) { @@ -124,6 +124,8 @@ folly::Future< std::error_code > SoloReplDev::async_read(MultiBlkId const& bid, void SoloReplDev::async_free_blks(int64_t, MultiBlkId const& bid) { data_service().async_free_blk(bid); } +uint32_t SoloReplDev::get_blk_size() const { return data_service().get_blk_size(); } + void SoloReplDev::cp_flush(CP*) { auto lsn = m_commit_upto.load(); m_rd_sb->commit_lsn = lsn; @@ -133,4 +135,12 @@ void SoloReplDev::cp_flush(CP*) { void SoloReplDev::cp_cleanup(CP*) { m_data_journal->truncate(m_rd_sb->checkpoint_lsn); } +void repl_req_ctx::alloc_journal_entry(uint32_t size) { + journal_buf = std::unique_ptr< uint8_t[] >(new uint8_t[size]); + journal_entry = new (journal_buf.get()) repl_journal_entry(); +} + +repl_req_ctx::~repl_req_ctx() { + if (journal_entry) { journal_entry->~repl_journal_entry(); } +} } // namespace homestore \ No newline at end of file diff --git a/src/lib/replication/repl_dev/solo_repl_dev.h b/src/lib/replication/repl_dev/solo_repl_dev.h index 1e37528b9..684378a13 100644 --- a/src/lib/replication/repl_dev/solo_repl_dev.h +++ b/src/lib/replication/repl_dev/solo_repl_dev.h @@ -15,7 +15,6 @@ #pragma once #include -#include #include #include @@ -29,7 +28,7 @@ struct repl_dev_superblk { static constexpr uint32_t REPL_DEV_SB_VERSION = 1; uint64_t magic{REPL_DEV_SB_MAGIC}; - uint32_t version{REPL_DEV_SB_MAGIC}; + uint32_t version{REPL_DEV_SB_VERSION}; uuid_t gid; // gid of this replica set logstore_id_t data_journal_id; // Logstore id for the data journal int64_t commit_lsn; // LSN upto which this replica has committed @@ -61,44 +60,21 @@ struct repl_journal_entry { // Followed by user_header, then key, then MultiBlkId }; -struct repl_req : public boost::intrusive_ref_counter< repl_req, boost::thread_safe_counter > { - sisl::blob header; // User header - sisl::blob key; // Key to replicate - sisl::sg_list value; // Raw value - applicable only to leader req - MultiBlkId local_blkid; // List of corresponding local blkids for the value - RemoteBlkId remote_blkid; // List of remote blkid for the value - std::unique_ptr< uint8_t[] > journal_buf; // Buf for the journal entry - repl_journal_entry* journal_entry{nullptr}; // pointer to the journal entry - void* user_ctx{nullptr}; // User context passed with replica_set::write, valie for leader only - int64_t lsn{0}; // Lsn for this replication req - - repl_req() = default; - - void alloc_journal_entry(uint32_t size) { - journal_buf = std::unique_ptr< uint8_t[] >(new uint8_t[size]); - journal_entry = new (journal_buf.get()) repl_journal_entry(); - } - - ~repl_req() { - if (journal_entry) { journal_entry->~repl_journal_entry(); } - } -}; - class CP; class SoloReplDev : public ReplDev { private: std::shared_ptr< HomeLogStore > m_data_journal; + superblk< repl_dev_superblk > m_rd_sb; uuid_t m_group_id; std::atomic< logstore_seq_num_t > m_commit_upto{-1}; - superblk< repl_dev_superblk > m_rd_sb; public: SoloReplDev(superblk< repl_dev_superblk > const& rd_sb, bool load_existing); virtual ~SoloReplDev() = default; void async_alloc_write(sisl::blob const& header, sisl::blob const& key, sisl::sg_list const& value, - void* user_ctx) override; + intrusive< repl_req_ctx > ctx) override; folly::Future< std::error_code > async_read(MultiBlkId const& bid, sisl::sg_list& sgs, uint32_t size, bool part_of_batch = false) override; @@ -109,12 +85,14 @@ class SoloReplDev : public ReplDev { uuid_t group_id() const override { return m_group_id; } + uint32_t get_blk_size() const override; + void cp_flush(CP* cp); void cp_cleanup(CP* cp); private: void on_data_journal_created(shared< HomeLogStore > log_store); - void write_journal(intrusive< repl_req > rreq); + void write_journal(intrusive< repl_req_ctx > rreq); void on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx); }; diff --git a/src/lib/replication/service/repl_service_impl.cpp b/src/lib/replication/service/repl_service_impl.cpp index 3a17dc7cd..c4316ee58 100644 --- a/src/lib/replication/service/repl_service_impl.cpp +++ b/src/lib/replication/service/repl_service_impl.cpp @@ -19,12 +19,11 @@ #include "replication/repl_dev/solo_repl_dev.h" namespace homestore { -ReplicationServiceImpl& repl_service() { return hs()->repl_service(); } +ReplicationService& repl_service() { return hs()->repl_service(); } -ReplicationServiceImpl::ReplicationServiceImpl(repl_impl_type impl_type, std::unique_ptr< ReplServiceCallbacks > cbs) : - m_svc_cbs{std::move(cbs)}, m_repl_type{impl_type} { +ReplicationServiceImpl::ReplicationServiceImpl(repl_impl_type impl_type) : m_repl_type{impl_type} { meta_service().register_handler( - "replication", + "repl_dev", [this](meta_blk* mblk, sisl::byte_view buf, size_t) { rd_super_blk_found(std::move(buf), voidptr_cast(mblk)); }, nullptr); } @@ -32,6 +31,14 @@ ReplicationServiceImpl::ReplicationServiceImpl(repl_impl_type impl_type, std::un void ReplicationServiceImpl::start() { // Register to CP to flush the super blk and truncate the logstore hs()->cp_mgr().register_consumer(cp_consumer_t::REPLICATION_SVC, std::make_unique< ReplServiceCPHandler >()); + + { + std::shared_lock lg{m_rd_map_mtx}; + for (auto const& [gid, info] : m_pending_open) { + // info.dev_promise.setValue(folly::makeUnexpected(ReplServiceError::SERVER_NOT_FOUND)); + } + } + m_rd_map_loaded = true; } void ReplicationServiceImpl::stop() { @@ -40,36 +47,66 @@ void ReplicationServiceImpl::stop() { } AsyncReplResult< shared< ReplDev > > -ReplicationServiceImpl::create_replica_dev(uuid_t group_id, std::set< std::string, std::less<> >&& members) { - superblk< repl_dev_superblk > rd_sb; +ReplicationServiceImpl::create_repl_dev(uuid_t group_id, std::set< std::string, std::less<> >&& members, + std::unique_ptr< ReplDevListener > listener) { + superblk< repl_dev_superblk > rd_sb{"repl_dev"}; rd_sb.create(sizeof(repl_dev_superblk)); rd_sb->gid = group_id; - shared< ReplDev > repl_dev = open_replica_dev(rd_sb, false /* load_existing */); - return folly::makeSemiFuture< ReplResult< shared< ReplDev > > >(std::move(repl_dev)); + shared< ReplDev > repl_dev = create_repl_dev_instance(rd_sb, false /* load_existing */); + listener->set_repl_dev(repl_dev.get()); + repl_dev->attach_listener(std::move(listener)); + rd_sb.write(); + return make_async_success(std::move(repl_dev)); +} + +AsyncReplResult< shared< ReplDev > > +ReplicationServiceImpl::open_repl_dev(uuid_t group_id, std::unique_ptr< ReplDevListener > listener) { + if (m_rd_map_loaded) { + // We have already loaded all repl_dev and open_repl_dev is called after that, we don't support dynamically + // opening the repl_dev. Return an error + LOGERROR("Opening group_id={} after services are started, which is not supported", + boost::uuids::to_string(group_id)); + return make_async_error< shared< ReplDev > >(ReplServiceError::BAD_REQUEST); + } + + std::unique_lock lg(m_rd_map_mtx); + auto it = m_rd_map.find(group_id); + if (it != m_rd_map.end()) { + // We already loaded the ReplDev, just call the group_id and attach the listener + auto& repl_dev = it->second; + listener->set_repl_dev(repl_dev.get()); + repl_dev->attach_listener(std::move(listener)); + return make_async_success< shared< ReplDev > >(std::move(repl_dev)); + } else { + auto [pending_it, inserted] = + m_pending_open.insert_or_assign(group_id, listener_info{.listener = std::move(listener)}); + DEBUG_ASSERT(inserted, "Duplicate open_replica_dev called for group_id = {}", + boost::uuids::to_string(group_id)); + return pending_it->second.dev_promise.getFuture(); + } } -ReplResult< shared< ReplDev > > ReplicationServiceImpl::get_replica_dev(uuid_t group_id) const { +ReplResult< shared< ReplDev > > ReplicationServiceImpl::get_repl_dev(uuid_t group_id) const { std::shared_lock lg(m_rd_map_mtx); if (auto it = m_rd_map.find(group_id); it != m_rd_map.end()) { return it->second; } return folly::makeUnexpected(ReplServiceError::SERVER_NOT_FOUND); } -void ReplicationServiceImpl::iterate_replica_devs(std::function< void(cshared< ReplDev >&) > const& cb) { +void ReplicationServiceImpl::iterate_repl_devs(std::function< void(cshared< ReplDev >&) > const& cb) { std::shared_lock lg(m_rd_map_mtx); for (const auto& [uuid, rd] : m_rd_map) { cb(rd); } } -folly::SemiFuture< ReplServiceError > ReplicationServiceImpl::replace_member(uuid_t group_id, - std::string const& member_out, - std::string const& member_in) const { - return folly::makeSemiFuture< ReplServiceError >(ReplServiceError::NOT_IMPLEMENTED); +folly::Future< ReplServiceError > ReplicationServiceImpl::replace_member(uuid_t group_id, std::string const& member_out, + std::string const& member_in) const { + return folly::makeFuture< ReplServiceError >(ReplServiceError::NOT_IMPLEMENTED); } -shared< ReplDev > ReplicationServiceImpl::open_replica_dev(superblk< repl_dev_superblk > const& rd_sb, - bool load_existing) { +shared< ReplDev > ReplicationServiceImpl::create_repl_dev_instance(superblk< repl_dev_superblk > const& rd_sb, + bool load_existing) { auto it = m_rd_map.end(); bool happened = false; @@ -86,7 +123,6 @@ shared< ReplDev > ReplicationServiceImpl::open_replica_dev(superblk< repl_dev_su } else { HS_REL_ASSERT(false, "Repl impl type = {} is not supported yet", enum_name(m_repl_type)); } - repl_dev->attach_listener(m_svc_cbs->on_repl_dev_init(repl_dev)); it->second = repl_dev; return repl_dev; @@ -98,7 +134,19 @@ void ReplicationServiceImpl::rd_super_blk_found(sisl::byte_view const& buf, void HS_DBG_ASSERT_EQ(rd_sb->get_magic(), repl_dev_superblk::REPL_DEV_SB_MAGIC, "Invalid rdev metablk, magic mismatch"); HS_DBG_ASSERT_EQ(rd_sb->get_version(), repl_dev_superblk::REPL_DEV_SB_VERSION, "Invalid version of rdev metablk"); - open_replica_dev(rd_sb, true /* load_existing */); + shared< ReplDev > repl_dev = create_repl_dev_instance(rd_sb, true /* load_existing */); + { + std::unique_lock lg(m_rd_map_mtx); + auto it = m_pending_open.find(rd_sb->gid); + if (it != m_pending_open.end()) { + auto& li_info = it->second; + // Someone waiting for this repl dev to open, call them to attach the listener and provide the value + li_info.listener->set_repl_dev(repl_dev.get()); + repl_dev->attach_listener(std::move(li_info.listener)); + li_info.dev_promise.setValue(repl_dev); + m_pending_open.erase(it); + } + } } ///////////////////// CP Callbacks for Repl Service ////////////// @@ -107,13 +155,13 @@ ReplServiceCPHandler::ReplServiceCPHandler() {} std::unique_ptr< CPContext > ReplServiceCPHandler::on_switchover_cp(CP* cur_cp, CP* new_cp) { return nullptr; } folly::Future< bool > ReplServiceCPHandler::cp_flush(CP* cp) { - repl_service().iterate_replica_devs( + repl_service().iterate_repl_devs( [cp](cshared< ReplDev >& repl_dev) { std::dynamic_pointer_cast< SoloReplDev >(repl_dev)->cp_flush(cp); }); return folly::makeFuture< bool >(true); } void ReplServiceCPHandler::cp_cleanup(CP* cp) { - repl_service().iterate_replica_devs( + repl_service().iterate_repl_devs( [cp](cshared< ReplDev >& repl_dev) { std::dynamic_pointer_cast< SoloReplDev >(repl_dev)->cp_cleanup(cp); }); } diff --git a/src/lib/replication/service/repl_service_impl.h b/src/lib/replication/service/repl_service_impl.h index f53c9a052..6e0a4c822 100644 --- a/src/lib/replication/service/repl_service_impl.h +++ b/src/lib/replication/service/repl_service_impl.h @@ -21,6 +21,7 @@ #include #include +#include #include #include #include @@ -32,26 +33,45 @@ namespace homestore { struct repl_dev_superblk; class ReplicationServiceImpl : public ReplicationService { + struct listener_info { + folly::Promise< folly::Expected< shared< ReplDev >, ReplServiceError > > dev_promise{}; + std::unique_ptr< ReplDevListener > listener; + }; + + template < class V > + auto make_async_error(ReplServiceError err) { + return folly::makeFuture< ReplResult< V > >(folly::makeUnexpected(err)); + } + + template < class V > + auto make_async_success(V&& v) { + return folly::makeFuture< ReplResult< V > >(std::move(v)); + } + protected: - std::unique_ptr< ReplServiceCallbacks > m_svc_cbs; repl_impl_type m_repl_type; std::shared_mutex m_rd_map_mtx; std::map< uuid_t, shared< ReplDev > > m_rd_map; + std::map< uuid_t, listener_info > m_pending_open; + std::atomic< bool > m_rd_map_loaded{false}; public: - ReplicationServiceImpl(repl_impl_type impl_type, std::unique_ptr< ReplServiceCallbacks > cbs); + ReplicationServiceImpl(repl_impl_type impl_type); void start(); void stop(); - AsyncReplResult< shared< ReplDev > > create_replica_dev(uuid_t group_id, - std::set< std::string, std::less<> >&& members) override; - ReplResult< shared< ReplDev > > get_replica_dev(uuid_t group_id) const override; - void iterate_replica_devs(std::function< void(cshared< ReplDev >&) > const& cb) override; + AsyncReplResult< shared< ReplDev > > create_repl_dev(uuid_t group_id, + std::set< std::string, std::less<> >&& members, + std::unique_ptr< ReplDevListener > listener) override; + AsyncReplResult< shared< ReplDev > > open_repl_dev(uuid_t group_id, + std::unique_ptr< ReplDevListener > listener) override; + ReplResult< shared< ReplDev > > get_repl_dev(uuid_t group_id) const override; + void iterate_repl_devs(std::function< void(cshared< ReplDev >&) > const& cb) override; - folly::SemiFuture< ReplServiceError > replace_member(uuid_t group_id, std::string const& member_out, - std::string const& member_in) const override; + folly::Future< ReplServiceError > replace_member(uuid_t group_id, std::string const& member_out, + std::string const& member_in) const override; private: - shared< ReplDev > open_replica_dev(superblk< repl_dev_superblk > const& rd_sb, bool load_existing); + shared< ReplDev > create_repl_dev_instance(superblk< repl_dev_superblk > const& rd_sb, bool load_existing); void rd_super_blk_found(sisl::byte_view const& buf, void* meta_cookie); }; @@ -67,5 +87,5 @@ class ReplServiceCPHandler : public CPCallbacks { int cp_progress_percent() override; }; -extern ReplicationServiceImpl& repl_service(); +extern ReplicationService& repl_service(); } // namespace homestore diff --git a/src/tests/test_common/homestore_test_common.hpp b/src/tests/test_common/homestore_test_common.hpp index b02d41d60..cb93db64b 100644 --- a/src/tests/test_common/homestore_test_common.hpp +++ b/src/tests/test_common/homestore_test_common.hpp @@ -97,7 +97,6 @@ class HSTestHelper { uint32_t blk_size{0}; shared< ChunkSelector > custom_chunk_selector{nullptr}; IndexServiceCallbacks* index_svc_cbs{nullptr}; - ReplServiceCallbacks* repl_svc_cbs{nullptr}; repl_impl_type repl_impl{repl_impl_type::solo}; }; @@ -170,8 +169,7 @@ class HSTestHelper { } else if ((svc == HS_SERVICE::LOG_REPLICATED) || (svc == HS_SERVICE::LOG_LOCAL)) { hsi->with_log_service(); } else if (svc == HS_SERVICE::REPLICATION) { - hsi->with_repl_data_service(tp.repl_impl, std::unique_ptr< ReplServiceCallbacks >(tp.repl_svc_cbs), - tp.custom_chunk_selector); + hsi->with_repl_data_service(tp.repl_impl, tp.custom_chunk_selector); } } bool need_format = diff --git a/src/tests/test_solo_repl_dev.cpp b/src/tests/test_solo_repl_dev.cpp index 2dfadd254..1aa451692 100644 --- a/src/tests/test_solo_repl_dev.cpp +++ b/src/tests/test_solo_repl_dev.cpp @@ -68,10 +68,12 @@ struct Runner { std::function< void(void) > task; folly::Promise< folly::Unit > comp_promise; - Runner() : total_tasks{SISL_OPTIONS["num_io"].as< uint64_t >()} { + Runner(uint64_t num_tasks, uint32_t qd = 8) : total_tasks{num_tasks}, qdepth{qd} { if (total_tasks < (uint64_t)qdepth) { total_tasks = qdepth; } } + Runner() : Runner{SISL_OPTIONS["num_io"].as< uint64_t >()} {} + void set_task(std::function< void(void) > f) { task = std::move(f); } folly::Future< folly::Unit > execute() { @@ -97,19 +99,36 @@ struct Runner { } }; -struct rdev_req : boost::intrusive_ref_counter< rdev_req > { +struct Waiter { + std::atomic< uint64_t > expected_comp{0}; + std::atomic< uint64_t > actual_comp{0}; + folly::Promise< folly::Unit > comp_promise; + + Waiter(uint64_t num_op) : expected_comp{num_op} {} + Waiter() : Waiter{SISL_OPTIONS["num_io"].as< uint64_t >()} {} + + folly::Future< folly::Unit > start(std::function< void(void) > f) { + f(); + return comp_promise.getFuture(); + } + + void one_complete() { + if ((actual_comp.fetch_add(1) + 1) >= expected_comp.load()) { comp_promise.setValue(); } + } +}; + +struct test_repl_req : public repl_req_ctx { sisl::byte_array header; sisl::byte_array key; sisl::sg_list write_sgs; sisl::sg_list read_sgs; - int64_t lsn; MultiBlkId written_blkids; - rdev_req() { + test_repl_req() { write_sgs.size = 0; read_sgs.size = 0; } - ~rdev_req() { + ~test_repl_req() { for (auto const& iov : write_sgs.iovs) { iomanager.iobuf_free(uintptr_cast(iov.iov_base)); } @@ -131,65 +150,59 @@ class SoloReplDevTest : public testing::Test { class Listener : public ReplDevListener { private: SoloReplDevTest& m_test; - ReplDev& m_rdev; public: - Listener(SoloReplDevTest& test, ReplDev& rdev) : m_test{test}, m_rdev{rdev} {} + Listener(SoloReplDevTest& test) : m_test{test} {} virtual ~Listener() = default; void on_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, MultiBlkId const& blkids, - void* ctx) override { + cintrusive< repl_req_ctx >& ctx) override { if (ctx == nullptr) { - m_test.validate_replay(m_rdev, lsn, header, key, blkids); + m_test.validate_replay(*repl_dev(), lsn, header, key, blkids); } else { - rdev_req* req = r_cast< rdev_req* >(ctx); - req->lsn = lsn; + auto req = boost::static_pointer_cast< test_repl_req >(ctx); req->written_blkids = std::move(blkids); - m_test.on_write_complete(m_rdev, intrusive< rdev_req >(req, false)); + m_test.on_write_complete(*repl_dev(), req); } } - void on_pre_commit(int64_t lsn, const sisl::blob& header, const sisl::blob& key, void* ctx) override {} + bool on_pre_commit(int64_t lsn, const sisl::blob& header, const sisl::blob& key, + cintrusive< repl_req_ctx >& ctx) override { + return true; + } - void on_rollback(int64_t lsn, const sisl::blob& header, const sisl::blob& key, void* ctx) override {} + void on_rollback(int64_t lsn, const sisl::blob& header, const sisl::blob& key, + cintrusive< repl_req_ctx >& ctx) override {} - blk_alloc_hints get_blk_alloc_hints(sisl::blob const& header, void* user_ctx) override { + blk_alloc_hints get_blk_alloc_hints(sisl::blob const& header, cintrusive< repl_req_ctx >& ctx) override { return blk_alloc_hints{}; } void on_replica_stop() override {} }; - class Callbacks : public ReplServiceCallbacks { - private: - SoloReplDevTest& m_test; - - public: - Callbacks(SoloReplDevTest* test) : m_test{*test} {} - virtual ~Callbacks() = default; - - std::unique_ptr< ReplDevListener > on_repl_dev_init(cshared< ReplDev >& rdev) override { - m_test.found_repl_dev(rdev); - return std::make_unique< Listener >(m_test, *rdev); - } - }; - protected: - Runner m_runner; + Runner m_io_runner; + Waiter m_task_waiter; shared< ReplDev > m_repl_dev1; shared< ReplDev > m_repl_dev2; + uuid_t m_uuid1; + uuid_t m_uuid2; public: virtual void SetUp() override { test_common::HSTestHelper::start_homestore( "test_solo_repl_dev", {{HS_SERVICE::META, {.size_pct = 5.0}}, - {HS_SERVICE::REPLICATION, - {.size_pct = 60.0, .repl_svc_cbs = new Callbacks(this), .repl_impl = repl_impl_type::solo}}, + {HS_SERVICE::REPLICATION, {.size_pct = 60.0, .repl_impl = repl_impl_type::solo}}, {HS_SERVICE::LOG_REPLICATED, {.size_pct = 20.0}}, {HS_SERVICE::LOG_LOCAL, {.size_pct = 2.0}}}); - hs()->repl_service().create_replica_dev(hs_utils::gen_random_uuid(), {}); - hs()->repl_service().create_replica_dev(hs_utils::gen_random_uuid(), {}); + m_uuid1 = hs_utils::gen_random_uuid(); + m_uuid2 = hs_utils::gen_random_uuid(); + m_repl_dev1 = + hs()->repl_service().create_repl_dev(m_uuid1, {}, std::make_unique< Listener >(*this)).get().value(); + m_repl_dev2 = + hs()->repl_service().create_repl_dev(m_uuid2, {}, std::make_unique< Listener >(*this)).get().value(); } virtual void TearDown() override { @@ -201,27 +214,26 @@ class SoloReplDevTest : public testing::Test { void restart() { m_repl_dev1.reset(); m_repl_dev2.reset(); + test_common::HSTestHelper::start_homestore( "test_solo_repl_dev", - {{HS_SERVICE::REPLICATION, {.repl_svc_cbs = new Callbacks(this), .repl_impl = repl_impl_type::solo}}, + {{HS_SERVICE::REPLICATION, {.repl_impl = repl_impl_type::solo}}, {HS_SERVICE::LOG_REPLICATED, {}}, {HS_SERVICE::LOG_LOCAL, {}}}, - nullptr, true /* restart */); - } - - void found_repl_dev(cshared< ReplDev >& rdev) { - if (m_repl_dev1 == nullptr) { - m_repl_dev1 = rdev; - } else { - HS_REL_ASSERT_EQ((void*)m_repl_dev2.get(), (void*)nullptr, "More than one replica dev reported"); - m_repl_dev2 = rdev; - } + [this]() { + hs()->repl_service().open_repl_dev(m_uuid1, std::make_unique< Listener >(*this)); + hs()->repl_service().open_repl_dev(m_uuid2, std::make_unique< Listener >(*this)); + }, + true /* restart */); + + m_repl_dev1 = hs()->repl_service().get_repl_dev(m_uuid1).value(); + m_repl_dev2 = hs()->repl_service().get_repl_dev(m_uuid2).value(); } void write_io(uint32_t key_size, uint64_t data_size, uint32_t max_size_per_iov) { - auto req = intrusive< rdev_req >(new rdev_req()); - req->header = sisl::make_byte_array(sizeof(rdev_req::journal_header)); - auto hdr = r_cast< rdev_req::journal_header* >(req->header->bytes); + auto req = intrusive< test_repl_req >(new test_repl_req()); + req->header = sisl::make_byte_array(sizeof(test_repl_req::journal_header)); + auto hdr = r_cast< test_repl_req::journal_header* >(req->header->bytes); hdr->key_size = key_size; hdr->key_pattern = ((long long)rand() << 32) | rand(); hdr->data_size = data_size; @@ -237,30 +249,38 @@ class SoloReplDevTest : public testing::Test { } auto& rdev = (rand() % 2) ? m_repl_dev1 : m_repl_dev2; - intrusive_ptr_add_ref(req.get()); - rdev->async_alloc_write(*req->header, req->key ? *req->key : sisl::blob{}, req->write_sgs, (void*)req.get()); + rdev->async_alloc_write(*req->header, req->key ? *req->key : sisl::blob{}, req->write_sgs, req); } void validate_replay(ReplDev& rdev, int64_t lsn, sisl::blob const& header, sisl::blob const& key, MultiBlkId const& blkids) { - auto jhdr = r_cast< rdev_req::journal_header* >(header.bytes); + auto jhdr = r_cast< test_repl_req::journal_header* >(header.bytes); HSTestHelper::validate_data_buf(key.bytes, key.size, jhdr->key_pattern); uint32_t size = blkids.blk_count() * g_block_size; if (size) { auto read_sgs = HSTestHelper::create_sgs(size, g_block_size, size); - rdev.async_read(blkids, read_sgs, size).thenValue([jhdr, read_sgs](auto&& err) { - RELEASE_ASSERT(!err, "Error during async_read"); - HS_REL_ASSERT_EQ(jhdr->data_size, read_sgs.size, "journal hdr data size mismatch with actual size"); - - for (auto const& iov : read_sgs.iovs) { - HSTestHelper::validate_data_buf(uintptr_cast(iov.iov_base), iov.iov_len, jhdr->data_pattern); - } - }); + LOGDEBUG("[{}] Validating replay of lsn={} blkid = {}", boost::uuids::to_string(rdev.group_id()), lsn, + blkids.to_string()); + rdev.async_read(blkids, read_sgs, size) + .thenValue([this, hdr = *jhdr, read_sgs, lsn, blkids, &rdev](auto&& err) { + RELEASE_ASSERT(!err, "Error during async_read"); + HS_REL_ASSERT_EQ(hdr.data_size, read_sgs.size, "journal hdr data size mismatch with actual size"); + + for (auto const& iov : read_sgs.iovs) { + HSTestHelper::validate_data_buf(uintptr_cast(iov.iov_base), iov.iov_len, hdr.data_pattern); + iomanager.iobuf_free(uintptr_cast(iov.iov_base)); + } + LOGDEBUG("[{}] Replay of lsn={} blkid={} validated successfully", + boost::uuids::to_string(rdev.group_id()), lsn, blkids.to_string()); + m_task_waiter.one_complete(); + }); + } else { + m_task_waiter.one_complete(); } } - void on_write_complete(ReplDev& rdev, intrusive< rdev_req > req) { + void on_write_complete(ReplDev& rdev, intrusive< test_repl_req > req) { // If we did send some data to the repl_dev, validate it by doing async_read if (req->write_sgs.size != 0) { req->read_sgs = HSTestHelper::create_sgs(req->write_sgs.size, g_block_size, req->write_sgs.size); @@ -269,47 +289,49 @@ class SoloReplDevTest : public testing::Test { .thenValue([this, &rdev, req](auto&& err) { RELEASE_ASSERT(!err, "Error during async_read"); - LOGDEBUG("Write complete with lsn={} for size={}", req->lsn, req->write_sgs.size); - auto hdr = r_cast< rdev_req::journal_header* >(req->header->bytes); + LOGDEBUG("[{}] Write complete with lsn={} for size={} blkids={}", + boost::uuids::to_string(rdev.group_id()), req->get_lsn(), req->write_sgs.size, + req->written_blkids.to_string()); + auto hdr = r_cast< test_repl_req::journal_header* >(req->header->bytes); HS_REL_ASSERT_EQ(hdr->data_size, req->read_sgs.size, "journal hdr data size mismatch with actual size"); for (auto const& iov : req->read_sgs.iovs) { HSTestHelper::validate_data_buf(uintptr_cast(iov.iov_base), iov.iov_len, hdr->data_pattern); } - m_runner.next_task(); + m_io_runner.next_task(); }); } else { - m_runner.next_task(); + m_io_runner.next_task(); } } }; TEST_F(SoloReplDevTest, TestSingleDataBlock) { LOGINFO("Step 1: run on worker threads to schedule write for {} Bytes.", g_block_size); - this->m_runner.set_task([this]() { this->write_io(0u, g_block_size, g_block_size); }); - this->m_runner.execute().get(); + this->m_io_runner.set_task([this]() { this->write_io(0u, g_block_size, g_block_size); }); + this->m_io_runner.execute().get(); LOGINFO("Step 2: Restart homestore and validate replay data.", g_block_size); - restart(); + this->m_task_waiter.start([this]() { this->restart(); }).get(); } TEST_F(SoloReplDevTest, TestRandomSizedDataBlock) { LOGINFO("Step 1: run on worker threads to schedule write for random bytes ranging {}-{}.", 0, 1 * Mi); - this->m_runner.set_task([this]() { + this->m_io_runner.set_task([this]() { uint32_t nblks = rand() % ((1 * Mi) / g_block_size); uint32_t key_size = rand() % 512 + 8; this->write_io(key_size, nblks * g_block_size, g_block_size); }); - this->m_runner.execute().get(); - restart(); + this->m_io_runner.execute().get(); + this->m_task_waiter.start([this]() { this->restart(); }).get(); } TEST_F(SoloReplDevTest, TestHeaderOnly) { LOGINFO("Step 1: run on worker threads to schedule write"); - this->m_runner.set_task([this]() { this->write_io(0u, 0u, g_block_size); }); - this->m_runner.execute().get(); - restart(); + this->m_io_runner.set_task([this]() { this->write_io(0u, 0u, g_block_size); }); + this->m_io_runner.execute().get(); + this->m_task_waiter.start([this]() { this->restart(); }).get(); } SISL_OPTION_GROUP(test_solo_repl_dev,