Skip to content

Commit

Permalink
Create repl req details for all IO types. (#336)
Browse files Browse the repository at this point in the history
Following significant changes are made in this commit

* Till now repl req is not created for header_only (inline) request, which results in different
handling of request nullptr everywhere resulted in some crash. Made this uniform behavior on every
types.

* At present, we use replica set is_leader() to make decision on some commit, append entry code path.
This could be potentially dangerous as at some inopprtunite time, if leader is switched and somehow
the request survived, we might run into corruption cases. With this change, there is no special
handling of leader. Only 2 roles, proposer and applier. In most use cases proposer is leader and applier
is the follower, but that requirement is removed.

* Because of the above assumption, there was a bug where in leader non-app entries (conf entries) are not
guaranteed to be flushed, before calling commit. Fixed that behavior as well.

* The HomeLogStore has an limitation (exposed when the above issue is fixed), whereby in proposer when the
data write is completed and then propose to raft happens. RAFT calls end_of_append_batch in the same thread
which does flush_sync. However, the data_write completion happens in IO thread and thus there is a potential
for deadlock (since flush_sync takes giant lock). We can overcome this problem with fiber, but instead of
forcing worker thread to have fiber, we isolate all logstore flush to be in separate flush thread always.
  • Loading branch information
hkadayam authored Feb 27, 2024
1 parent 2301a6d commit 13e235d
Show file tree
Hide file tree
Showing 11 changed files with 276 additions and 214 deletions.
11 changes: 6 additions & 5 deletions src/include/homestore/replication/repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,14 @@ struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost::
repl_key rkey; // Unique key for the request
sisl::blob header; // User header
sisl::blob key; // User supplied key for this req
int64_t lsn{0}; // Lsn for this replication req
int64_t lsn{-1}; // Lsn for this replication req
bool is_proposer{false}; // Is the repl_req proposed by this node

//////////////// Value related section /////////////////
sisl::sg_list value; // Raw value - applicable only to leader req
MultiBlkId local_blkid; // Local BlkId for the value
RemoteBlkId remote_blkid; // Corresponding remote blkid for the value
sisl::sg_list value; // Raw value - applicable only to leader req
MultiBlkId local_blkid; // Local BlkId for the value
RemoteBlkId remote_blkid; // Corresponding remote blkid for the value
bool value_inlined{false}; // Is the value inlined in the header itself

//////////////// Journal/Buf related section /////////////////
std::variant< std::unique_ptr< uint8_t[] >, raft_buf_ptr_t > journal_buf; // Buf for the journal entry
Expand Down Expand Up @@ -244,7 +245,7 @@ class ReplDev {

/// @brief get replication status. If called on follower member
/// this API can return empty result.
virtual std::vector<peer_info> get_replication_status() const = 0;
virtual std::vector< peer_info > get_replication_status() const = 0;

/// @brief Gets the group_id this repldev is working for
/// @return group_id
Expand Down
2 changes: 1 addition & 1 deletion src/lib/common/homestore_config.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ table LogStore {

// Logdev will flush the logs only in a dedicated thread. Turn this on, if flush IO doesn't want to
// intervene with data IO path.
flush_only_in_dedicated_thread: bool = false;
flush_only_in_dedicated_thread: bool = true;
}

table Generic {
Expand Down
15 changes: 7 additions & 8 deletions src/lib/logstore/log_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,11 @@ void LogDev::start(bool format, JournalVirtualDev* vdev) {
m_last_flush_idx = m_log_idx - 1;
}

m_flush_timer_hdl = iomanager.schedule_global_timer(
HS_DYNAMIC_CONFIG(logstore.flush_timer_frequency_us) * 1000, true, nullptr /* cookie */,
iomgr::reactor_regex::all_worker,
[this](void*) {
if (m_pending_flush_size.load() && !m_is_flushing.load(std::memory_order_relaxed)) { flush_if_needed(); }
},
true /* wait_to_schedule */);
iomanager.run_on_wait(logstore_service().flush_thread(), [this]() {
m_flush_timer_hdl = iomanager.schedule_thread_timer(HS_DYNAMIC_CONFIG(logstore.flush_timer_frequency_us) * 1000,
true /* recurring */, nullptr /* cookie */,
[this](void*) { flush_if_needed(); });
});

handle_unopened_log_stores(format);

Expand Down Expand Up @@ -133,7 +131,8 @@ void LogDev::stop() {
}

// cancel the timer
iomanager.cancel_timer(m_flush_timer_hdl, true);
iomanager.run_on_wait(logstore_service().flush_thread(),
[this]() { iomanager.cancel_timer(m_flush_timer_hdl, true); });

{
folly::SharedMutexWritePriority::WriteHolder holder(m_store_map_mtx);
Expand Down
61 changes: 40 additions & 21 deletions src/lib/replication/log_store/repl_log_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,42 +7,62 @@
namespace homestore {

uint64_t ReplLogStore::append(nuraft::ptr< nuraft::log_entry >& entry) {
// We don't want to transform anything that is not an app log
if (entry->get_val_type() != nuraft::log_val_type::app_log) { return HomeRaftLogStore::append(entry); }

repl_req_ptr_t rreq = m_sm.transform_journal_entry(entry);
ulong lsn;
if (rreq) {
lsn = HomeRaftLogStore::append(rreq->raft_journal_buf());
m_sm.link_lsn_to_req(rreq, int64_cast(lsn));
RD_LOG(INFO, "Raft Channel: Received log entry rreq=[{}]", rreq->to_compact_string());
} else {
if (rreq->is_proposer || rreq->value_inlined) {
// No need of any transformation for proposer or inline data, since the entry is already meaningful
lsn = HomeRaftLogStore::append(entry);
} else {
lsn = HomeRaftLogStore::append(rreq->raft_journal_buf());
}
m_sm.link_lsn_to_req(rreq, int64_cast(lsn));
RD_LOG(DEBUG, "Raft Channel: Received append log entry rreq=[{}]", rreq->to_compact_string());
return lsn;
}

void ReplLogStore::write_at(ulong index, nuraft::ptr< nuraft::log_entry >& entry) {
// We don't want to transform anything that is not an app log
if (entry->get_val_type() != nuraft::log_val_type::app_log) {
HomeRaftLogStore::write_at(index, entry);
return;
}

repl_req_ptr_t rreq = m_sm.transform_journal_entry(entry);
if (rreq) {
HomeRaftLogStore::write_at(index, rreq->raft_journal_buf());
m_sm.link_lsn_to_req(rreq, int64_cast(index));
RD_LOG(INFO, "Raft Channel: Received log entry rreq=[{}]", rreq->to_compact_string());
} else {
if (rreq->is_proposer || rreq->value_inlined) {
// No need of any transformation for proposer or inline data, since the entry is already meaningful
HomeRaftLogStore::write_at(index, entry);
} else {
HomeRaftLogStore::write_at(index, rreq->raft_journal_buf());
}
m_sm.link_lsn_to_req(rreq, int64_cast(index));
RD_LOG(DEBUG, "Raft Channel: Received write_at log entry rreq=[{}]", rreq->to_compact_string());
}

void ReplLogStore::end_of_append_batch(ulong start_lsn, ulong count) {
// Skip this call in leader, since this method will synchronously flush the data, which is not required for
// leader. Leader will call the flush as part of commit after receiving quorum, upon which time, there is a high
// possibility the log entry is already flushed.
if (!m_rd.is_leader()) {
int64_t end_lsn = int64_cast(start_lsn + count - 1);
int64_t end_lsn = int64_cast(start_lsn + count - 1);

// Start fetch the batch of data for this lsn range from remote if its not available yet.
auto reqs = sisl::VectorPool< repl_req_ptr_t >::alloc();
for (int64_t lsn = int64_cast(start_lsn); lsn <= end_lsn; ++lsn) {
reqs->emplace_back(m_sm.lsn_to_req(lsn));
// Start fetch the batch of data for this lsn range from remote if its not available yet.
auto reqs = sisl::VectorPool< repl_req_ptr_t >::alloc();
for (int64_t lsn = int64_cast(start_lsn); lsn <= end_lsn; ++lsn) {
auto rreq = m_sm.lsn_to_req(lsn);
// Skip this call in proposer, since this method will synchronously flush the data, which is not required for
// leader. Proposer will call the flush as part of commit after receiving quorum, upon which time, there is a
// high possibility the log entry is already flushed.
if (rreq && rreq->is_proposer) {
RD_LOG(TRACE, "Raft Channel: Ignoring to flush proposer request rreq=[{}]", rreq->to_compact_string());
continue;
}
reqs->emplace_back(std::move(rreq));
}

RD_LOG(TRACE, "Raft Channel: end_of_append_batch start_lsn={} count={} num_data_to_be_written={}", start_lsn, count,
reqs->size());

// All requests are from proposer for data write, so as mentioned above we can skip the flush for now
if (!reqs->empty()) {
// Check the map if data corresponding to all of these requsts have been received and written. If not, schedule
// a fetch and write. Once all requests are completed and written, these requests are poped out of the map and
// the future will be ready.
Expand All @@ -60,9 +80,8 @@ void ReplLogStore::end_of_append_batch(ulong start_lsn, ulong count) {
for (auto const& rreq : *reqs) {
if (rreq) { rreq->state.fetch_or(uint32_cast(repl_req_state_t::LOG_FLUSHED)); }
}

sisl::VectorPool< repl_req_ptr_t >::free(reqs);
}
sisl::VectorPool< repl_req_ptr_t >::free(reqs);
}

std::string ReplLogStore::rdev_name() const { return m_rd.rdev_name(); }
Expand Down
4 changes: 2 additions & 2 deletions src/lib/replication/repl_dev/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ std::string repl_req_ctx::to_string() const {
}

std::string repl_req_ctx::to_compact_string() const {
return fmt::format("dsn={} term={} lsn={} state={} ref={}", rkey.dsn, rkey.term, lsn, req_state_name(state.load()),
this->use_count());
return fmt::format("dsn={} term={} lsn={} Blkid={} state=[{}]", rkey.dsn, rkey.term, lsn, local_blkid.to_string(),
req_state_name(state.load()));
}

} // namespace homestore
Loading

0 comments on commit 13e235d

Please sign in to comment.