diff --git a/src/include/homestore/replication/repl_dev.h b/src/include/homestore/replication/repl_dev.h index 32aad69a1..f81e3ef31 100644 --- a/src/include/homestore/replication/repl_dev.h +++ b/src/include/homestore/replication/repl_dev.h @@ -97,8 +97,6 @@ struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost:: raft_buf_ptr_t& raft_journal_buf(); uint8_t* raw_journal_buf(); - flatbuffers::FlatBufferBuilder& create_fb_builder() { return m_fb_builder; } - void release_fb_builder() { m_fb_builder.Release(); } /////////////////////// Non modifiers methods ////////////////// std::string to_string() const; @@ -106,20 +104,58 @@ struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost:: Clock::time_point created_time() const { return m_start_time; } /////////////////////// All Modifiers methods ////////////////// + + /// @brief Anytime a request needs to allocate blks for the data locally, this method needs to be called. This will + /// call the listener blk_alloc_hints and then allocate the blks from data service and update the state. + /// @param listener Listener associated with the repl_dev + /// @param data_size Size of the data for which blks are to be allocated + /// @return Any error in getting hints or allocating blkids ReplServiceError alloc_local_blks(cshared< ReplDevListener >& listener, uint32_t data_size); + + /// @brief This method creates the journal entry for the repl_req. It will allocate the buffer for the journal + /// entry and build the basic journal entry + /// @param is_raft_buf Is the journal entry buffer has to be raft_buf or plain buf. For Raft repl service, it will + /// have to be true + /// @param server_id Server id which is originating this request void create_journal_entry(bool is_raft_buf, int32_t server_id); + + /// @brief Change the journal entry buffer to new_buf and adjust the header and key if adjust_hdr_key is true. It is + /// expected that the original buffer is already created as raft buffer type. + /// @param new_buf New raft buffer to be used + /// @param adjust_hdr_key If the header, key of this request has to be adjusted to the new buffer void change_raft_journal_buf(raft_buf_ptr_t new_buf, bool adjust_hdr_key); - void set_remote_blkid(RemoteBlkId const& rbid) { m_remote_blkid = rbid; } - void set_lsn(int64_t lsn); + + /// @brief Save the data that was pushed by the remote node for this request. When a push data rpc is called with + /// the data, this method is called to save them to the request and make it shareable. This method makes a copy of + /// the data in case the buffer is not aligned. + /// @param pushed_data Data that was received from the RPC. This is used to keep the data alive + /// @param data Data pointer + /// @param data_size Size of the data + /// @return true if the request didn't receive the data already, false otherwise bool save_pushed_data(intrusive< sisl::GenericRpcData > const& pushed_data, uint8_t const* data, uint32_t data_size); + + /// @brief Save the data that was fetched from the remote node for this request. When a fetch data rpc is called + /// with the data, this method is called to save them to the request and make it shareable. This method makes a copy + /// of the data in case the buffer is not aligned. + /// @param fetched_data Data from RPC which fetched the data. This is used to keep the data alive + /// @param data Data pointer + /// @param data_size Size of the data + /// @return true if the request didn't receive the data already, false otherwise bool save_fetched_data(sisl::GenericClientResponse const& fetched_data, uint8_t const* data, uint32_t data_size); + + void set_remote_blkid(RemoteBlkId const& rbid) { m_remote_blkid = rbid; } + void set_lsn(int64_t lsn); void add_state(repl_req_state_t s); bool add_state_if_not_already(repl_req_state_t s); void clear(); + flatbuffers::FlatBufferBuilder& create_fb_builder() { return m_fb_builder; } + void release_fb_builder() { m_fb_builder.Release(); } public: - // We keep this public since they are considered thread safe + // IMPORTANT: Avoid declaring variables public, since this structure carries various entries and try to work in + // lockless way. As a result, we keep only those which are considered thread safe and others are accessed with + // methods. folly::Promise< folly::Unit > m_data_received_promise; // Promise to be fulfilled when data is received folly::Promise< folly::Unit > m_data_written_promise; // Promise to be fulfilled when data is written sisl::io_blob_list_t m_pkts; // Pkts used for sending data diff --git a/src/include/homestore/replication_service.hpp b/src/include/homestore/replication_service.hpp index ee37d3686..8f535b855 100644 --- a/src/include/homestore/replication_service.hpp +++ b/src/include/homestore/replication_service.hpp @@ -35,8 +35,8 @@ class ReplicationService { virtual AsyncReplResult< shared< ReplDev > > create_repl_dev(group_id_t group_id, std::set< replica_id_t > const& members) = 0; - /// @brief Removes the entire Repl Device and completely the replica device. While the underlying replica group is - /// destroyed, all its resources are not released until garbage collection of repl devices kick in. + /// @brief Removes the entire Repl Device. The underlying replica group is marked as destroy_pending and all its + /// resources are not released until garbage collection of repl devices kick in. /// @param group_id Group ID to be removed /// @return A Future which gets called after schedule to release (before garbage collection is kicked in) virtual folly::SemiFuture< ReplServiceError > remove_repl_dev(group_id_t group_id) = 0; diff --git a/src/lib/replication/log_store/repl_log_store.cpp b/src/lib/replication/log_store/repl_log_store.cpp index 19f79e79a..79a725bc4 100644 --- a/src/lib/replication/log_store/repl_log_store.cpp +++ b/src/lib/replication/log_store/repl_log_store.cpp @@ -40,8 +40,9 @@ void ReplLogStore::end_of_append_batch(ulong start_lsn, ulong count) { auto rreq = m_sm.lsn_to_req(lsn); // Skip this call in proposer, since this method will synchronously flush the data, which is not required for // leader. Proposer will call the flush as part of commit after receiving quorum, upon which time, there is a - // high possibility the log entry is already flushed. - if (rreq && rreq->is_proposer()) { continue; } + // high possibility the log entry is already flushed. Skip it for rreq == nullptr which is the case for raft + // config entries. + if ((rreq == nullptr) || rreq->is_proposer()) { continue; } reqs->emplace_back(std::move(rreq)); } diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index 3b46b490b..b02917643 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -340,7 +340,7 @@ folly::Future< folly::Unit > RaftReplDev::notify_after_data_written(std::vector< // Walk through the list of requests and wait for the data to be received and written for (auto const& rreq : *rreqs) { - if ((rreq == nullptr) || (!rreq->has_linked_data())) { continue; } + if (!rreq->has_linked_data()) { continue; } auto const status = uint32_cast(rreq->state()); if (status & uint32_cast(repl_req_state_t::DATA_WRITTEN)) { RD_LOGD("Raft Channel: Data write completed and blkid mapped: rreq=[{}]", rreq->to_compact_string());