Skip to content

Commit

Permalink
Addressed review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
hkadayam committed May 1, 2024
1 parent 19a4e41 commit 1da162c
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 10 deletions.
46 changes: 41 additions & 5 deletions src/include/homestore/replication/repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,29 +97,65 @@ struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost::

raft_buf_ptr_t& raft_journal_buf();
uint8_t* raw_journal_buf();
flatbuffers::FlatBufferBuilder& create_fb_builder() { return m_fb_builder; }
void release_fb_builder() { m_fb_builder.Release(); }

/////////////////////// Non modifiers methods //////////////////
std::string to_string() const;
std::string to_compact_string() const;
Clock::time_point created_time() const { return m_start_time; }

/////////////////////// All Modifiers methods //////////////////

/// @brief Anytime a request needs to allocate blks for the data locally, this method needs to be called. This will
/// call the listener blk_alloc_hints and then allocate the blks from data service and update the state.
/// @param listener Listener associated with the repl_dev
/// @param data_size Size of the data for which blks are to be allocated
/// @return Any error in getting hints or allocating blkids
ReplServiceError alloc_local_blks(cshared< ReplDevListener >& listener, uint32_t data_size);

/// @brief This method creates the journal entry for the repl_req. It will allocate the buffer for the journal
/// entry and build the basic journal entry
/// @param is_raft_buf Is the journal entry buffer has to be raft_buf or plain buf. For Raft repl service, it will
/// have to be true
/// @param server_id Server id which is originating this request
void create_journal_entry(bool is_raft_buf, int32_t server_id);

/// @brief Change the journal entry buffer to new_buf and adjust the header and key if adjust_hdr_key is true. It is
/// expected that the original buffer is already created as raft buffer type.
/// @param new_buf New raft buffer to be used
/// @param adjust_hdr_key If the header, key of this request has to be adjusted to the new buffer
void change_raft_journal_buf(raft_buf_ptr_t new_buf, bool adjust_hdr_key);
void set_remote_blkid(RemoteBlkId const& rbid) { m_remote_blkid = rbid; }
void set_lsn(int64_t lsn);

/// @brief Save the data that was pushed by the remote node for this request. When a push data rpc is called with
/// the data, this method is called to save them to the request and make it shareable. This method makes a copy of
/// the data in case the buffer is not aligned.
/// @param pushed_data Data that was received from the RPC. This is used to keep the data alive
/// @param data Data pointer
/// @param data_size Size of the data
/// @return true if the request didn't receive the data already, false otherwise
bool save_pushed_data(intrusive< sisl::GenericRpcData > const& pushed_data, uint8_t const* data,
uint32_t data_size);

/// @brief Save the data that was fetched from the remote node for this request. When a fetch data rpc is called
/// with the data, this method is called to save them to the request and make it shareable. This method makes a copy
/// of the data in case the buffer is not aligned.
/// @param fetched_data Data from RPC which fetched the data. This is used to keep the data alive
/// @param data Data pointer
/// @param data_size Size of the data
/// @return true if the request didn't receive the data already, false otherwise
bool save_fetched_data(sisl::GenericClientResponse const& fetched_data, uint8_t const* data, uint32_t data_size);

void set_remote_blkid(RemoteBlkId const& rbid) { m_remote_blkid = rbid; }
void set_lsn(int64_t lsn);
void add_state(repl_req_state_t s);
bool add_state_if_not_already(repl_req_state_t s);
void clear();
flatbuffers::FlatBufferBuilder& create_fb_builder() { return m_fb_builder; }
void release_fb_builder() { m_fb_builder.Release(); }

public:
// We keep this public since they are considered thread safe
// IMPORTANT: Avoid declaring variables public, since this structure carries various entries and try to work in
// lockless way. As a result, we keep only those which are considered thread safe and others are accessed with
// methods.
folly::Promise< folly::Unit > m_data_received_promise; // Promise to be fulfilled when data is received
folly::Promise< folly::Unit > m_data_written_promise; // Promise to be fulfilled when data is written
sisl::io_blob_list_t m_pkts; // Pkts used for sending data
Expand Down
4 changes: 2 additions & 2 deletions src/include/homestore/replication_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ class ReplicationService {
virtual AsyncReplResult< shared< ReplDev > > create_repl_dev(group_id_t group_id,
std::set< replica_id_t > const& members) = 0;

/// @brief Removes the entire Repl Device and completely the replica device. While the underlying replica group is
/// destroyed, all its resources are not released until garbage collection of repl devices kick in.
/// @brief Removes the entire Repl Device. The underlying replica group is marked as destroy_pending and all its
/// resources are not released until garbage collection of repl devices kick in.
/// @param group_id Group ID to be removed
/// @return A Future which gets called after schedule to release (before garbage collection is kicked in)
virtual folly::SemiFuture< ReplServiceError > remove_repl_dev(group_id_t group_id) = 0;
Expand Down
5 changes: 3 additions & 2 deletions src/lib/replication/log_store/repl_log_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ void ReplLogStore::end_of_append_batch(ulong start_lsn, ulong count) {
auto rreq = m_sm.lsn_to_req(lsn);
// Skip this call in proposer, since this method will synchronously flush the data, which is not required for
// leader. Proposer will call the flush as part of commit after receiving quorum, upon which time, there is a
// high possibility the log entry is already flushed.
if (rreq && rreq->is_proposer()) { continue; }
// high possibility the log entry is already flushed. Skip it for rreq == nullptr which is the case for raft
// config entries.
if ((rreq == nullptr) || rreq->is_proposer()) { continue; }
reqs->emplace_back(std::move(rreq));
}

Expand Down
2 changes: 1 addition & 1 deletion src/lib/replication/repl_dev/raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ folly::Future< folly::Unit > RaftReplDev::notify_after_data_written(std::vector<

// Walk through the list of requests and wait for the data to be received and written
for (auto const& rreq : *rreqs) {
if ((rreq == nullptr) || (!rreq->has_linked_data())) { continue; }
if (!rreq->has_linked_data()) { continue; }
auto const status = uint32_cast(rreq->state());
if (status & uint32_cast(repl_req_state_t::DATA_WRITTEN)) {
RD_LOGD("Raft Channel: Data write completed and blkid mapped: rreq=[{}]", rreq->to_compact_string());
Expand Down

0 comments on commit 1da162c

Please sign in to comment.