From cf006cfc705961b91b7317b095c875f66441abf6 Mon Sep 17 00:00:00 2001 From: Xiaoxi Chen Date: Tue, 16 Jul 2024 07:25:17 -0700 Subject: [PATCH] locking for sync_flush and cp fix Signed-off-by: Xiaoxi Chen --- src/include/homestore/logstore/log_store.hpp | 1 + src/lib/common/homestore_config.fbs | 2 +- src/lib/logstore/log_store.cpp | 4 +++- src/lib/replication/repl_dev/raft_repl_dev.cpp | 18 ++++++++++-------- 4 files changed, 15 insertions(+), 10 deletions(-) diff --git a/src/include/homestore/logstore/log_store.hpp b/src/include/homestore/logstore/log_store.hpp index 6c0b493ec..eec7eac50 100644 --- a/src/include/homestore/logstore/log_store.hpp +++ b/src/include/homestore/logstore/log_store.hpp @@ -368,6 +368,7 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > { // Sync flush sections std::atomic< logstore_seq_num_t > m_sync_flush_waiter_lsn{invalid_lsn()}; std::mutex m_sync_flush_mtx; + std::mutex m_single_sync_flush_mtx; std::condition_variable m_sync_flush_cv; std::vector< seq_ld_key_pair > m_truncation_barriers; // List of truncation barriers diff --git a/src/lib/common/homestore_config.fbs b/src/lib/common/homestore_config.fbs index c283bc9ba..673c2f570 100644 --- a/src/lib/common/homestore_config.fbs +++ b/src/lib/common/homestore_config.fbs @@ -212,7 +212,7 @@ table Consensus { rpc_backoff_ms: uint32 = 250; // Frequency of Raft heartbeat - heartbeat_period_ms: uint32 = 250; + heartbeat_period_ms: uint32 = 500; // Re-election timeout low and high mark elect_to_low_ms: uint32 = 800; diff --git a/src/lib/logstore/log_store.cpp b/src/lib/logstore/log_store.cpp index 71d564234..75ae07df6 100644 --- a/src/lib/logstore/log_store.cpp +++ b/src/lib/logstore/log_store.cpp @@ -176,10 +176,11 @@ void HomeLogStore::read_async(logstore_seq_num_t seq_num, void* cookie, const lo #endif void HomeLogStore::on_write_completion(logstore_req* req, const logdev_key& ld_key) { + std::unique_lock lk(m_sync_flush_mtx); // Upon completion, create the mapping between seq_num and log dev key m_records.update(req->seq_num, [&](logstore_record& rec) -> bool { rec.m_dev_key = ld_key; - // THIS_LOGSTORE_LOG(DEBUG, "Completed write of lsn {} logdev_key={}", req->seq_num, ld_key); + THIS_LOGSTORE_LOG(DEBUG, "Completed write of store lsn {} logdev_key={}", req->seq_num, ld_key); return true; }); // assert(flush_ld_key.idx >= m_last_flush_ldkey.idx); @@ -405,6 +406,7 @@ void HomeLogStore::flush_sync(logstore_seq_num_t upto_seq_num) { HS_DBG_ASSERT_EQ(LogDev::can_flush_in_this_thread(), false, "Logstore flush sync cannot be called on same thread which could do logdev flush"); + std::unique_lock lk(m_single_sync_flush_mtx); if (upto_seq_num == invalid_lsn()) { upto_seq_num = m_records.active_upto(); } // if we have flushed already, we are done diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index ac02e8cde..d7d1fac01 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -148,13 +148,15 @@ void RaftReplDev::async_alloc_write(sisl::blob const& header, sisl::blob const& repl_req_ptr_t rreq) { if (!rreq) { auto rreq = repl_req_ptr_t(new repl_req_ctx{}); } - auto const guard = m_stage.access(); - if (auto const stage = *guard.get(); stage != repl_dev_stage_t::ACTIVE) { - RD_LOGW("Raft channel: Not ready to accept writes, stage={}", enum_name(stage)); - handle_error(rreq, - (stage == repl_dev_stage_t::INIT) ? ReplServiceError::SERVER_IS_JOINING - : ReplServiceError::SERVER_IS_LEAVING); - return; + { + auto const guard = m_stage.access(); + if (auto const stage = *guard.get(); stage != repl_dev_stage_t::ACTIVE) { + RD_LOGW("Raft channel: Not ready to accept writes, stage={}", enum_name(stage)); + handle_error(rreq, + (stage == repl_dev_stage_t::INIT) ? ReplServiceError::SERVER_IS_JOINING + : ReplServiceError::SERVER_IS_LEAVING); + return; + } } rreq->init(repl_key{.server_id = server_id(), .term = raft_server()->get_term(), .dsn = m_next_dsn.fetch_add(1)}, @@ -517,7 +519,7 @@ void RaftReplDev::check_and_fetch_remote_data(std::vector< repl_req_ptr_t > rreq void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs) { if (rreqs.size() == 0) { return; } - std::vector<::flatbuffers::Offset< RequestEntry > > entries; + std::vector< ::flatbuffers::Offset< RequestEntry > > entries; entries.reserve(rreqs.size()); shared< flatbuffers::FlatBufferBuilder > builder = std::make_shared< flatbuffers::FlatBufferBuilder >();