diff --git a/src/include/homestore/replication/repl_dev.h b/src/include/homestore/replication/repl_dev.h index 34bc41e49..d42ccc7ee 100644 --- a/src/include/homestore/replication/repl_dev.h +++ b/src/include/homestore/replication/repl_dev.h @@ -71,13 +71,14 @@ struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost:: repl_key rkey; // Unique key for the request sisl::blob header; // User header sisl::blob key; // User supplied key for this req - int64_t lsn{0}; // Lsn for this replication req + int64_t lsn{-1}; // Lsn for this replication req bool is_proposer{false}; // Is the repl_req proposed by this node //////////////// Value related section ///////////////// - sisl::sg_list value; // Raw value - applicable only to leader req - MultiBlkId local_blkid; // Local BlkId for the value - RemoteBlkId remote_blkid; // Corresponding remote blkid for the value + sisl::sg_list value; // Raw value - applicable only to leader req + MultiBlkId local_blkid; // Local BlkId for the value + RemoteBlkId remote_blkid; // Corresponding remote blkid for the value + bool value_inlined{false}; // Is the value inlined in the header itself //////////////// Journal/Buf related section ///////////////// std::variant< std::unique_ptr< uint8_t[] >, raft_buf_ptr_t > journal_buf; // Buf for the journal entry @@ -244,7 +245,7 @@ class ReplDev { /// @brief get replication status. If called on follower member /// this API can return empty result. - virtual std::vector get_replication_status() const = 0; + virtual std::vector< peer_info > get_replication_status() const = 0; /// @brief Gets the group_id this repldev is working for /// @return group_id diff --git a/src/lib/common/homestore_config.fbs b/src/lib/common/homestore_config.fbs index faa2a1358..7be5f659b 100644 --- a/src/lib/common/homestore_config.fbs +++ b/src/lib/common/homestore_config.fbs @@ -111,7 +111,7 @@ table LogStore { // Logdev will flush the logs only in a dedicated thread. Turn this on, if flush IO doesn't want to // intervene with data IO path. - flush_only_in_dedicated_thread: bool = false; + flush_only_in_dedicated_thread: bool = true; } table Generic { diff --git a/src/lib/logstore/log_dev.cpp b/src/lib/logstore/log_dev.cpp index 83c284c3e..90cc8f2e7 100644 --- a/src/lib/logstore/log_dev.cpp +++ b/src/lib/logstore/log_dev.cpp @@ -87,13 +87,11 @@ void LogDev::start(bool format, JournalVirtualDev* vdev) { m_last_flush_idx = m_log_idx - 1; } - m_flush_timer_hdl = iomanager.schedule_global_timer( - HS_DYNAMIC_CONFIG(logstore.flush_timer_frequency_us) * 1000, true, nullptr /* cookie */, - iomgr::reactor_regex::all_worker, - [this](void*) { - if (m_pending_flush_size.load() && !m_is_flushing.load(std::memory_order_relaxed)) { flush_if_needed(); } - }, - true /* wait_to_schedule */); + iomanager.run_on_wait(logstore_service().flush_thread(), [this]() { + m_flush_timer_hdl = iomanager.schedule_thread_timer(HS_DYNAMIC_CONFIG(logstore.flush_timer_frequency_us) * 1000, + true /* recurring */, nullptr /* cookie */, + [this](void*) { flush_if_needed(); }); + }); handle_unopened_log_stores(format); @@ -133,7 +131,8 @@ void LogDev::stop() { } // cancel the timer - iomanager.cancel_timer(m_flush_timer_hdl, true); + iomanager.run_on_wait(logstore_service().flush_thread(), + [this]() { iomanager.cancel_timer(m_flush_timer_hdl, true); }); { folly::SharedMutexWritePriority::WriteHolder holder(m_store_map_mtx); diff --git a/src/lib/replication/log_store/repl_log_store.cpp b/src/lib/replication/log_store/repl_log_store.cpp index 84a12925d..1020258ba 100644 --- a/src/lib/replication/log_store/repl_log_store.cpp +++ b/src/lib/replication/log_store/repl_log_store.cpp @@ -7,42 +7,62 @@ namespace homestore { uint64_t ReplLogStore::append(nuraft::ptr< nuraft::log_entry >& entry) { + // We don't want to transform anything that is not an app log + if (entry->get_val_type() != nuraft::log_val_type::app_log) { return HomeRaftLogStore::append(entry); } + repl_req_ptr_t rreq = m_sm.transform_journal_entry(entry); ulong lsn; - if (rreq) { - lsn = HomeRaftLogStore::append(rreq->raft_journal_buf()); - m_sm.link_lsn_to_req(rreq, int64_cast(lsn)); - RD_LOG(INFO, "Raft Channel: Received log entry rreq=[{}]", rreq->to_compact_string()); - } else { + if (rreq->is_proposer || rreq->value_inlined) { + // No need of any transformation for proposer or inline data, since the entry is already meaningful lsn = HomeRaftLogStore::append(entry); + } else { + lsn = HomeRaftLogStore::append(rreq->raft_journal_buf()); } + m_sm.link_lsn_to_req(rreq, int64_cast(lsn)); + RD_LOG(DEBUG, "Raft Channel: Received append log entry rreq=[{}]", rreq->to_compact_string()); return lsn; } void ReplLogStore::write_at(ulong index, nuraft::ptr< nuraft::log_entry >& entry) { + // We don't want to transform anything that is not an app log + if (entry->get_val_type() != nuraft::log_val_type::app_log) { + HomeRaftLogStore::write_at(index, entry); + return; + } + repl_req_ptr_t rreq = m_sm.transform_journal_entry(entry); - if (rreq) { - HomeRaftLogStore::write_at(index, rreq->raft_journal_buf()); - m_sm.link_lsn_to_req(rreq, int64_cast(index)); - RD_LOG(INFO, "Raft Channel: Received log entry rreq=[{}]", rreq->to_compact_string()); - } else { + if (rreq->is_proposer || rreq->value_inlined) { + // No need of any transformation for proposer or inline data, since the entry is already meaningful HomeRaftLogStore::write_at(index, entry); + } else { + HomeRaftLogStore::write_at(index, rreq->raft_journal_buf()); } + m_sm.link_lsn_to_req(rreq, int64_cast(index)); + RD_LOG(DEBUG, "Raft Channel: Received write_at log entry rreq=[{}]", rreq->to_compact_string()); } void ReplLogStore::end_of_append_batch(ulong start_lsn, ulong count) { - // Skip this call in leader, since this method will synchronously flush the data, which is not required for - // leader. Leader will call the flush as part of commit after receiving quorum, upon which time, there is a high - // possibility the log entry is already flushed. - if (!m_rd.is_leader()) { - int64_t end_lsn = int64_cast(start_lsn + count - 1); + int64_t end_lsn = int64_cast(start_lsn + count - 1); - // Start fetch the batch of data for this lsn range from remote if its not available yet. - auto reqs = sisl::VectorPool< repl_req_ptr_t >::alloc(); - for (int64_t lsn = int64_cast(start_lsn); lsn <= end_lsn; ++lsn) { - reqs->emplace_back(m_sm.lsn_to_req(lsn)); + // Start fetch the batch of data for this lsn range from remote if its not available yet. + auto reqs = sisl::VectorPool< repl_req_ptr_t >::alloc(); + for (int64_t lsn = int64_cast(start_lsn); lsn <= end_lsn; ++lsn) { + auto rreq = m_sm.lsn_to_req(lsn); + // Skip this call in proposer, since this method will synchronously flush the data, which is not required for + // leader. Proposer will call the flush as part of commit after receiving quorum, upon which time, there is a + // high possibility the log entry is already flushed. + if (rreq && rreq->is_proposer) { + RD_LOG(TRACE, "Raft Channel: Ignoring to flush proposer request rreq=[{}]", rreq->to_compact_string()); + continue; } + reqs->emplace_back(std::move(rreq)); + } + + RD_LOG(TRACE, "Raft Channel: end_of_append_batch start_lsn={} count={} num_data_to_be_written={}", start_lsn, count, + reqs->size()); + // All requests are from proposer for data write, so as mentioned above we can skip the flush for now + if (!reqs->empty()) { // Check the map if data corresponding to all of these requsts have been received and written. If not, schedule // a fetch and write. Once all requests are completed and written, these requests are poped out of the map and // the future will be ready. @@ -60,9 +80,8 @@ void ReplLogStore::end_of_append_batch(ulong start_lsn, ulong count) { for (auto const& rreq : *reqs) { if (rreq) { rreq->state.fetch_or(uint32_cast(repl_req_state_t::LOG_FLUSHED)); } } - - sisl::VectorPool< repl_req_ptr_t >::free(reqs); } + sisl::VectorPool< repl_req_ptr_t >::free(reqs); } std::string ReplLogStore::rdev_name() const { return m_rd.rdev_name(); } diff --git a/src/lib/replication/repl_dev/common.cpp b/src/lib/replication/repl_dev/common.cpp index db5540d61..2d2e8a122 100644 --- a/src/lib/replication/repl_dev/common.cpp +++ b/src/lib/replication/repl_dev/common.cpp @@ -44,8 +44,8 @@ std::string repl_req_ctx::to_string() const { } std::string repl_req_ctx::to_compact_string() const { - return fmt::format("dsn={} term={} lsn={} state={} ref={}", rkey.dsn, rkey.term, lsn, req_state_name(state.load()), - this->use_count()); + return fmt::format("dsn={} term={} lsn={} Blkid={} state=[{}]", rkey.dsn, rkey.term, lsn, local_blkid.to_string(), + req_state_name(state.load())); } } // namespace homestore \ No newline at end of file diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index d03e0563d..30f98570c 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -95,11 +95,15 @@ void RaftReplDev::async_alloc_write(sisl::blob const& header, sisl::blob const& rreq->header = header; rreq->key = key; rreq->value = value; + rreq->rkey = repl_key{.server_id = server_id(), .term = raft_server()->get_term(), .dsn = m_next_dsn.fetch_add(1)}; + + // Add the request to the repl_dev_rreq map, it will be accessed throughout the life cycle of this request + auto const [it, happened] = m_repl_key_req_map.emplace(rreq->rkey, rreq); + RD_DBG_ASSERT(happened, "Duplicate repl_key={} found in the map", rreq->rkey.to_string()); // If it is header only entry, directly propose to the raft if (rreq->value.size) { - rreq->rkey = - repl_key{.server_id = server_id(), .term = raft_server()->get_term(), .dsn = m_next_dsn.fetch_add(1)}; + rreq->value_inlined = false; push_data_to_all_followers(rreq); // Step 1: Alloc Blkid @@ -116,7 +120,6 @@ void RaftReplDev::async_alloc_write(sisl::blob const& header, sisl::blob const& // Write the data data_service().async_write(rreq->value, rreq->local_blkid).thenValue([this, rreq](auto&& err) { if (!err) { - rreq->state.fetch_or(uint32_cast(repl_req_state_t::DATA_WRITTEN)); auto raft_status = m_state_machine->propose_to_raft(std::move(rreq)); if (raft_status != ReplServiceError::OK) { handle_error(rreq, raft_status); } } else { @@ -125,8 +128,8 @@ void RaftReplDev::async_alloc_write(sisl::blob const& header, sisl::blob const& } }); } else { - RD_LOG(INFO, "Skipping data channel send since value size is 0"); - rreq->state.fetch_or(uint32_cast(repl_req_state_t::DATA_WRITTEN)); + rreq->value_inlined = true; + RD_LOG(DEBUG, "Skipping data channel send since value size is 0"); auto raft_status = m_state_machine->propose_to_raft(std::move(rreq)); if (raft_status != ReplServiceError::OK) { handle_error(rreq, raft_status); } } @@ -160,7 +163,7 @@ void RaftReplDev::push_data_to_all_followers(repl_req_ptr_t rreq) { return; } // Release the buffer which holds the packets - RD_LOG(INFO, "Data Channel: Data push completed for rreq=[{}]", rreq->to_compact_string()); + RD_LOG(DEBUG, "Data Channel: Data push completed for rreq=[{}]", rreq->to_compact_string()); rreq->fb_builder.Release(); rreq->pkts.clear(); }); @@ -170,7 +173,7 @@ void RaftReplDev::on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_ auto const& incoming_buf = rpc_data->request_blob(); auto fetch_req = GetSizePrefixedFetchData(incoming_buf.cbytes()); - RD_LOG(INFO, "Data Channel: FetchData received: fetch_req.size={}", fetch_req->request()->entries()->size()); + RD_LOG(DEBUG, "Data Channel: FetchData received: fetch_req.size={}", fetch_req->request()->entries()->size()); std::vector< sisl::sg_list > sgs_vec; @@ -254,13 +257,13 @@ void RaftReplDev::handle_error(repl_req_ptr_t const& rreq, ReplServiceError err) auto s = rreq->state.load(); if ((s & uint32_cast(repl_req_state_t::ERRORED)) || !(rreq->state.compare_exchange_strong(s, s | uint32_cast(repl_req_state_t::ERRORED)))) { - RD_LOG(INFO, "Raft Channel: Error in processing rreq=[{}] error={} already errored", rreq->to_compact_string(), + RD_LOG(ERROR, "Raft Channel: Error in processing rreq=[{}] error={} already errored", rreq->to_compact_string(), err); return; } // Free the blks which is allocated already - RD_LOG(INFO, "Raft Channel: Error in processing rreq=[{}] error={}", rreq->to_compact_string(), err); + RD_LOG(ERROR, "Raft Channel: Error in processing rreq=[{}] error={}", rreq->to_compact_string(), err); if (rreq->state.load() & uint32_cast(repl_req_state_t::BLK_ALLOCATED)) { auto blkid = rreq->local_blkid; data_service().async_free_blk(blkid).thenValue([blkid](auto&& err) { @@ -296,7 +299,7 @@ void RaftReplDev::on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_d sisl::blob header = sisl::blob{push_req->user_header()->Data(), push_req->user_header()->size()}; sisl::blob key = sisl::blob{push_req->user_key()->Data(), push_req->user_key()->size()}; - auto rreq = follower_create_req( + auto rreq = applier_create_req( repl_key{.server_id = push_req->issuer_replica_id(), .term = push_req->raft_term(), .dsn = push_req->dsn()}, header, key, push_req->data_size()); rreq->rpc_data = rpc_data; @@ -308,7 +311,7 @@ void RaftReplDev::on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_d return; } #endif - RD_LOG(INFO, "Data Channel: Received data rreq=[{}]", rreq->to_compact_string()); + RD_LOG(DEBUG, "Data Channel: Received data rreq=[{}]", rreq->to_compact_string()); if (rreq->state.fetch_or(uint32_cast(repl_req_state_t::DATA_RECEIVED)) & uint32_cast(repl_req_state_t::DATA_RECEIVED)) { @@ -339,7 +342,7 @@ void RaftReplDev::on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_d } else { rreq->state.fetch_or(uint32_cast(repl_req_state_t::DATA_WRITTEN)); rreq->data_written_promise.setValue(); - RD_LOG(INFO, "Data Channel: Data Write completed rreq=[{}]", rreq->to_compact_string()); + RD_LOG(DEBUG, "Data Channel: Data Write completed rreq=[{}]", rreq->to_compact_string()); } }); } @@ -356,12 +359,27 @@ static MultiBlkId do_alloc_blk(uint32_t size, blk_alloc_hints const& hints) { return blkid; } -repl_req_ptr_t RaftReplDev::follower_create_req(repl_key const& rkey, sisl::blob const& user_header, - sisl::blob const& user_key, uint32_t data_size) { +repl_req_ptr_t RaftReplDev::repl_key_to_req(repl_key const& rkey) const { + auto const it = m_repl_key_req_map.find(rkey); + if (it == m_repl_key_req_map.cend()) { return nullptr; } + return it->second; +} + +repl_req_ptr_t RaftReplDev::applier_create_req(repl_key const& rkey, sisl::blob const& user_header, + sisl::blob const& user_key, uint32_t data_size) { auto const [it, happened] = m_repl_key_req_map.try_emplace(rkey, repl_req_ptr_t(new repl_req_ctx())); RD_DBG_ASSERT((it != m_repl_key_req_map.end()), "Unexpected error in map_repl_key_to_req"); auto rreq = it->second; + // There is no data portion, so there is not requied to allocate + if (data_size == 0) { + rreq->rkey = rkey; + rreq->header = user_header; + rreq->key = user_key; + rreq->value_inlined = true; + return rreq; + } + if (!happened) { // We already have the entry in the map, check if we are already allocated the blk by previous caller, in that // case we need to return the req. @@ -370,7 +388,7 @@ repl_req_ptr_t RaftReplDev::follower_create_req(repl_key const& rkey, sisl::blob RD_REL_ASSERT(blob_equals(user_header, rreq->header), "User header mismatch for repl_key={}", rkey.to_string()); RD_REL_ASSERT(blob_equals(user_key, rreq->key), "User key mismatch for repl_key={}", rkey.to_string()); - RD_LOG(INFO, "Repl_key=[{}] already received ", rkey.to_string()); + RD_LOG(DEBUG, "Repl_key=[{}] already received ", rkey.to_string()); return rreq; } } @@ -382,15 +400,16 @@ repl_req_ptr_t RaftReplDev::follower_create_req(repl_key const& rkey, sisl::blob rreq->rkey = rkey; rreq->header = user_header; rreq->key = user_key; + rreq->value_inlined = false; rreq->local_blkid = do_alloc_blk(data_size, m_listener->get_blk_alloc_hints(user_header, data_size)); rreq->state.fetch_or(uint32_cast(repl_req_state_t::BLK_ALLOCATED)); - RD_LOG(INFO, "in follower_create_req: rreq={}, addr={}", rreq->to_compact_string(), - reinterpret_cast< uintptr_t >(rreq.get())); + RD_LOG(DEBUG, "in applier_create_req: rreq={}", rreq->to_compact_string()); + return rreq; } -auto RaftReplDev::get_max_data_fetch_size() const { +static auto get_max_data_fetch_size() { #ifdef _PRERELEASE if (iomgr_flip::instance()->test_flip("simulate_staging_fetch_data")) { LOGINFO("Flip simulate_staging_fetch_data is enabled, return max_data_fetch_size: 16K"); @@ -402,21 +421,18 @@ auto RaftReplDev::get_max_data_fetch_size() const { void RaftReplDev::check_and_fetch_remote_data(std::vector< repl_req_ptr_t >* rreqs) { // Pop any entries that are already completed - from the entries list as well as from map - rreqs->erase(std::remove_if( - rreqs->begin(), rreqs->end(), - [this](repl_req_ptr_t const& rreq) { - if (rreq == nullptr) { return true; } - - if (rreq->state.load() & uint32_cast(repl_req_state_t::DATA_WRITTEN)) { - m_repl_key_req_map.erase(rreq->rkey); // Remove=Pop from map as well, since it is completed - RD_LOG(INFO, - "Raft Channel: Data write completed and blkid mapped, removing from map: rreq=[{}]", - rreq->to_compact_string()); - return true; // Remove from the pending list - } else { - return false; - } - }), + rreqs->erase(std::remove_if(rreqs->begin(), rreqs->end(), + [this](repl_req_ptr_t const& rreq) { + if (rreq == nullptr) { return true; } + + if (rreq->state.load() & uint32_cast(repl_req_state_t::DATA_WRITTEN)) { + RD_LOG(DEBUG, "Raft Channel: Data write completed and blkid mapped: rreq=[{}]", + rreq->to_compact_string()); + return true; // Remove from the pending list + } else { + return false; + } + }), rreqs->end()); if (rreqs->size()) { @@ -444,7 +460,7 @@ void RaftReplDev::check_and_fetch_remote_data(std::vector< repl_req_ptr_t >* rre void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs) { if (rreqs.size() == 0) { return; } - std::vector<::flatbuffers::Offset< RequestEntry > > entries; + std::vector< ::flatbuffers::Offset< RequestEntry > > entries; entries.reserve(rreqs.size()); shared< flatbuffers::FlatBufferBuilder > builder = std::make_shared< flatbuffers::FlatBufferBuilder >(); @@ -483,7 +499,7 @@ void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs) { if (!e) { // if we are here, it means the original who sent the log entries are down. // we need to handle error and when the other member becomes leader, it will resend the log entries; - RD_LOG(INFO, + RD_LOG(ERROR, "Not able to fetching data from originator={}, error={}, probably originator is down. Will " "retry when new leader start appending log entries", rreqs.front()->remote_blkid.server_id, e.error()); @@ -499,7 +515,7 @@ void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs) { RD_DBG_ASSERT_GT(total_size, 0, "Empty response from remote"); RD_DBG_ASSERT(raw_data, "Empty response from remote"); - RD_LOG(INFO, "Data Channel: FetchData completed for reques.size()={} ", rreqs.size()); + RD_LOG(DEBUG, "Data Channel: FetchData completed for reques.size()={} ", rreqs.size()); thread_local std::vector< folly::Future< std::error_code > > futs; // static is impplied futs.clear(); @@ -520,7 +536,7 @@ void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs) { "Data size mismatch for rreq={} blkid={}, remote size: {}, local size: {}", rreq->to_compact_string(), rreq->local_blkid.to_string(), data_size, local_size); - RD_LOG(INFO, "Data Channel: Data already received for rreq=[{}], skip and move on to next rreq.", + RD_LOG(DEBUG, "Data Channel: Data already received for rreq=[{}], skip and move on to next rreq.", rreq->to_compact_string()); continue; } else { @@ -565,7 +581,7 @@ void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs) { raw_data += data_size; total_size -= data_size; - RD_LOG(INFO, + RD_LOG(DEBUG, "Data Channel: Data fetched from remote: rreq=[{}], data_size: {}, total_size: {}, " "local_blkid: {}", rreq->to_compact_string(), data_size, total_size, rreq->local_blkid.to_string()); @@ -599,22 +615,21 @@ AsyncNotify RaftReplDev::notify_after_data_written(std::vector< repl_req_ptr_t > futs.reserve(rreqs->size()); // Pop any entries that are already completed - from the entries list as well as from map - rreqs->erase(std::remove_if( - rreqs->begin(), rreqs->end(), - [this, &futs](repl_req_ptr_t const& rreq) { - if (rreq == nullptr) { return true; } - - if (rreq->state.load() & uint32_cast(repl_req_state_t::DATA_WRITTEN)) { - m_repl_key_req_map.erase(rreq->rkey); // Remove=Pop from map as well, since it is completed - RD_LOG(INFO, - "Raft Channel: Data write completed and blkid mapped, removing from map: rreq=[{}]", - rreq->to_compact_string()); - return true; // Remove from the pending list - } else { - futs.emplace_back(rreq->data_written_promise.getSemiFuture()); - return false; - } - }), + rreqs->erase(std::remove_if(rreqs->begin(), rreqs->end(), + [this, &futs](repl_req_ptr_t const& rreq) { + if ((rreq == nullptr) || (rreq->value_inlined)) { return true; } + + if (rreq->state.load() & uint32_cast(repl_req_state_t::DATA_WRITTEN)) { + RD_LOG(DEBUG, "Raft Channel: Data write completed and blkid mapped: rreq=[{}]", + rreq->to_compact_string()); + return true; // Remove from the pending list + } else { + RD_LOG(TRACE, "Data Channel: Data write pending rreq=[{}]", + rreq->to_compact_string()); + futs.emplace_back(rreq->data_written_promise.getSemiFuture()); + return false; + } + }), rreqs->end()); // All the entries are done already, no need to wait @@ -630,7 +645,7 @@ AsyncNotify RaftReplDev::notify_after_data_written(std::vector< repl_req_ptr_t > m_wait_data_timer_hdl = iomanager.schedule_global_timer( // timer wakes up in current thread; HS_DYNAMIC_CONFIG(consensus.wait_data_write_timer_sec) * 1000 * 1000 * 1000, false /* recurring */, nullptr /* cookie */, iomgr::reactor_regex::all_worker, [this, rreqs](auto /*cookie*/) { - RD_LOG(INFO, "Data Channel: Wait data write timer fired, checking if data is written"); + RD_LOG(DEBUG, "Data Channel: Wait data write timer fired, checking if data is written"); check_and_fetch_remote_data(rreqs); }); } @@ -641,10 +656,9 @@ AsyncNotify RaftReplDev::notify_after_data_written(std::vector< repl_req_ptr_t > HS_DBG_ASSERT(rreq->state.load() & uint32_cast(repl_req_state_t::DATA_WRITTEN), "Data written promise raised without updating DATA_WRITTEN state for rkey={}", rreq->rkey.to_string()); - RD_LOG(INFO, "Raft Channel: Data write completed and blkid mapped, removing from map: rreq=[{}]", - rreq->to_compact_string()); - m_repl_key_req_map.erase(rreq->rkey); // Remove from map as well, since it is completed + RD_LOG(DEBUG, "Raft Channel: Data write completed and blkid mapped: rreq=[{}]", rreq->to_compact_string()); } + RD_LOG(TRACE, "Data Channel: {} pending reqs's data are written", rreqs->size()); return folly::makeSemiFuture< folly::Unit >(folly::Unit{}); }); } @@ -817,10 +831,13 @@ void RaftReplDev::leave() { void RaftReplDev::report_committed(repl_req_ptr_t rreq) { if (rreq->local_blkid.is_valid()) { data_service().commit_blk(rreq->local_blkid); } + // Remove the request from repl_key map. + m_repl_key_req_map.erase(rreq->rkey); + auto prev_lsn = m_commit_upto_lsn.exchange(rreq->lsn); RD_DBG_ASSERT_GT(rreq->lsn, prev_lsn, "Out of order commit of lsns, it is not expected in RaftReplDev"); - RD_LOG(INFO, "Raft channel: Commit rreq=[{}]", rreq->to_compact_string()); + RD_LOG(DEBUG, "Raft channel: Commit rreq=[{}]", rreq->to_compact_string()); m_listener->on_commit(rreq->lsn, rreq->header, rreq->key, rreq->local_blkid, rreq); if (!rreq->is_proposer) { @@ -843,6 +860,7 @@ void RaftReplDev::cp_flush(CP*) { m_rd_sb->commit_lsn = lsn; m_rd_sb->checkpoint_lsn = lsn; + m_rd_sb->last_applied_dsn = m_next_dsn.load(); m_rd_sb.write(); m_last_flushed_commit_lsn = lsn; } diff --git a/src/lib/replication/repl_dev/raft_repl_dev.h b/src/lib/replication/repl_dev/raft_repl_dev.h index 257598a32..5c5f80627 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.h +++ b/src/lib/replication/repl_dev/raft_repl_dev.h @@ -97,8 +97,9 @@ class RaftReplDev : public ReplDev, //////////////// Methods needed for other Raft classes to access ///////////////// void use_config(json_superblk raft_config_sb); void report_committed(repl_req_ptr_t rreq); - repl_req_ptr_t follower_create_req(repl_key const& rkey, sisl::blob const& user_header, sisl::blob const& user_key, - uint32_t data_size); + repl_req_ptr_t repl_key_to_req(repl_key const& rkey) const; + repl_req_ptr_t applier_create_req(repl_key const& rkey, sisl::blob const& user_header, sisl::blob const& user_key, + uint32_t data_size); AsyncNotify notify_after_data_written(std::vector< repl_req_ptr_t >* rreqs); void cp_flush(CP* cp); void cp_cleanup(CP* cp); @@ -126,7 +127,6 @@ class RaftReplDev : public ReplDev, void on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_data); void check_and_fetch_remote_data(std::vector< repl_req_ptr_t >* rreqs); void fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs); - auto get_max_data_fetch_size() const; bool is_resync_mode() { return m_resync_mode; } void handle_error(repl_req_ptr_t const& rreq, ReplServiceError err); }; diff --git a/src/lib/replication/repl_dev/raft_state_machine.cpp b/src/lib/replication/repl_dev/raft_state_machine.cpp index ea3cba12d..98f63af71 100644 --- a/src/lib/replication/repl_dev/raft_state_machine.cpp +++ b/src/lib/replication/repl_dev/raft_state_machine.cpp @@ -16,55 +16,22 @@ RaftStateMachine::RaftStateMachine(RaftReplDev& rd) : m_rd{rd} { m_success_ptr->put(0); } -raft_buf_ptr_t RaftStateMachine::pre_commit_ext(nuraft::state_machine::ext_op_params const& params) { - // Leader precommit is processed in next callback, because this callback doesn't provide a way to stick a context - // which could contain the req structure in it. - if (!m_rd.is_leader()) { - int64_t lsn = s_cast< int64_t >(params.log_idx); - raft_buf_ptr_t data = params.data; - - repl_req_ptr_t rreq = lsn_to_req(lsn); - RD_LOG(INFO, "Raft channel: Precommit rreq=[{}]", rreq->to_compact_string()); - m_rd.m_listener->on_pre_commit(rreq->lsn, rreq->header, rreq->key, rreq); - } - return m_success_ptr; -} - -void RaftStateMachine::after_precommit_in_leader(nuraft::raft_server::req_ext_cb_params const& params) { - repl_req_ptr_t rreq = repl_req_ptr_t(r_cast< repl_req_ctx* >(params.context)); - link_lsn_to_req(rreq, int64_cast(params.log_idx)); - - RD_LOG(INFO, "Raft Channel: Proposed rreq=[{}]", rreq->to_compact_string()); - m_rd.m_listener->on_pre_commit(rreq->lsn, rreq->header, rreq->key, rreq); +static std::pair< sisl::blob, sisl::blob > header_only_extract(nuraft::buffer& buf) { + repl_journal_entry* jentry = r_cast< repl_journal_entry* >(buf.data_begin()); + RELEASE_ASSERT_EQ(jentry->major_version, repl_journal_entry::JOURNAL_ENTRY_MAJOR, + "Mismatched version of journal entry received from RAFT peer"); + RELEASE_ASSERT_EQ(jentry->code, journal_type_t::HS_HEADER_ONLY, + "Trying to extract header on non-header only entry"); + sisl::blob const header = sisl::blob{uintptr_cast(jentry) + sizeof(repl_journal_entry), jentry->user_header_size}; + sisl::blob const key = sisl::blob{header.cbytes() + header.size(), jentry->key_size}; + return {header, key}; } -raft_buf_ptr_t RaftStateMachine::commit_ext(nuraft::state_machine::ext_op_params const& params) { - int64_t lsn = s_cast< int64_t >(params.log_idx); - raft_buf_ptr_t data = params.data; - - repl_req_ptr_t rreq = lsn_to_req(lsn); - if (rreq == nullptr) { return m_success_ptr; } - - RD_LOG(INFO, "Raft channel: Received Commit message rreq=[{}]", rreq->to_compact_string()); - if (m_rd.is_leader()) { - // This is the time to ensure flushing of journal happens in leader - if (m_rd.m_data_journal->last_durable_index() < uint64_cast(lsn)) { m_rd.m_data_journal->flush(); } - rreq->state.fetch_or(uint32_cast(repl_req_state_t::LOG_FLUSHED)); - } - if (rreq->state.load() & uint32_cast(repl_req_state_t::DATA_WRITTEN)) { - m_lsn_req_map.erase(rreq->lsn); - m_rd.report_committed(rreq); - } - return m_success_ptr; -} - -uint64_t RaftStateMachine::last_commit_index() { return uint64_cast(m_rd.get_last_commit_lsn()); } - ReplServiceError RaftStateMachine::propose_to_raft(repl_req_ptr_t rreq) { - uint32_t val_size = rreq->value.size ? rreq->local_blkid.serialized_size() : 0; + uint32_t val_size = rreq->value_inlined ? 0 : rreq->local_blkid.serialized_size(); uint32_t entry_size = sizeof(repl_journal_entry) + rreq->header.size() + rreq->key.size() + val_size; rreq->alloc_journal_entry(entry_size, true /* raft_buf */); - rreq->journal_entry->code = (rreq->value.size) ? journal_type_t::HS_LARGE_DATA : journal_type_t::HS_HEADER_ONLY; + rreq->journal_entry->code = (rreq->value_inlined) ? journal_type_t::HS_HEADER_ONLY : journal_type_t::HS_LARGE_DATA; rreq->journal_entry->server_id = m_rd.server_id(); rreq->journal_entry->dsn = rreq->dsn(); rreq->journal_entry->user_header_size = rreq->header.size(); @@ -92,14 +59,9 @@ ReplServiceError RaftStateMachine::propose_to_raft(repl_req_ptr_t rreq) { auto* vec = sisl::VectorPool< raft_buf_ptr_t >::alloc(); vec->push_back(rreq->raft_journal_buf()); - nuraft::raft_server::req_ext_params param; - param.after_precommit_ = bind_this(RaftStateMachine::after_precommit_in_leader, 1); - param.expected_term_ = 0; - param.context_ = voidptr_cast(rreq.get()); - RD_LOG(TRACE, "Raft Channel: journal_entry=[{}] ", rreq->journal_entry->to_string()); - auto append_status = m_rd.raft_server()->append_entries_ext(*vec, param); + auto append_status = m_rd.raft_server()->append_entries(*vec); sisl::VectorPool< raft_buf_ptr_t >::free(vec); if (append_status && !append_status->get_accepted()) { @@ -111,28 +73,29 @@ ReplServiceError RaftStateMachine::propose_to_raft(repl_req_ptr_t rreq) { } repl_req_ptr_t RaftStateMachine::transform_journal_entry(nuraft::ptr< nuraft::log_entry >& lentry) { - // Leader has nothing to transform or process - if (m_rd.is_leader()) { return nullptr; } - - // We don't want to transform anything that is not an app log - if (lentry->get_val_type() != nuraft::log_val_type::app_log) { return nullptr; } - - // Validate the journal entry and see if it needs to be processed - { - repl_journal_entry* tmp_jentry = r_cast< repl_journal_entry* >(lentry->get_buf().data_begin()); - RELEASE_ASSERT_EQ(tmp_jentry->major_version, repl_journal_entry::JOURNAL_ENTRY_MAJOR, - "Mismatched version of journal entry received from RAFT peer"); - - RD_LOG(TRACE, "Received Raft log_entry=[term={}], journal_entry=[{}] ", lentry->get_term(), - tmp_jentry->to_string()); - - // For inline data we don't need to transform anything - if (tmp_jentry->code != journal_type_t::HS_LARGE_DATA) { return nullptr; } - - DEBUG_ASSERT_GT(tmp_jentry->value_size, 0, "Entry marked as large data, but value size is notified as 0"); + // Validate the journal entry and see if it needs to be transformed + + repl_journal_entry* tmp_jentry = r_cast< repl_journal_entry* >(lentry->get_buf().data_begin()); + RELEASE_ASSERT_EQ(tmp_jentry->major_version, repl_journal_entry::JOURNAL_ENTRY_MAJOR, + "Mismatched version of journal entry received from RAFT peer"); + + RD_LOG(TRACE, "Received Raft log_entry=[term={}], journal_entry=[{}] ", lentry->get_term(), + tmp_jentry->to_string()); + + if (tmp_jentry->server_id == m_rd.server_id()) { + // We are the proposer for this entry, lets pull the request from the map. We don't need any actual + // transformation here, because the entry is already is local + repl_key rkey{.server_id = tmp_jentry->server_id, .term = lentry->get_term(), .dsn = tmp_jentry->dsn}; + auto rreq = m_rd.repl_key_to_req(rkey); + RELEASE_ASSERT(rreq != nullptr, + "Log entry write with local server_id rkey={} but its corresponding req is missting in map", + rkey.to_string()); + DEBUG_ASSERT(rreq->is_proposer, "Log entry has same server_id={}, but rreq says its not a proposer", + m_rd.server_id()) + return rreq; } - auto log_to_journal_entry = [](raft_buf_ptr_t const& log_buf, auto const log_buf_data_offset) { + auto log_to_journal_entry = [](raft_buf_ptr_t const& log_buf, auto log_buf_data_offset) { repl_journal_entry* jentry = r_cast< repl_journal_entry* >(log_buf->data_begin() + log_buf_data_offset); sisl::blob const header = sisl::blob{uintptr_cast(jentry) + sizeof(repl_journal_entry), jentry->user_header_size}; @@ -141,44 +104,85 @@ repl_req_ptr_t RaftStateMachine::transform_journal_entry(nuraft::ptr< nuraft::lo }; // Serialize the log_entry buffer which returns the actual raft log_entry buffer. - auto log_buf = lentry->serialize(); - auto const log_buf_data_offset = log_buf->size() - lentry->get_buf().size(); + raft_buf_ptr_t log_buf; + size_t log_buf_data_offset; + if (tmp_jentry->code == journal_type_t::HS_LARGE_DATA) { + DEBUG_ASSERT_GT(tmp_jentry->value_size, 0, "Entry marked as large data, but value size is notified as 0"); + log_buf = lentry->serialize(); + log_buf_data_offset = log_buf->size() - lentry->get_buf().size(); + } else { + DEBUG_ASSERT_EQ(tmp_jentry->value_size, 0, "Entry marked as inline data, but value size is not 0"); + log_buf = lentry->get_buf_ptr(); + log_buf_data_offset = 0; + } + auto const [jentry, header, key] = log_to_journal_entry(log_buf, log_buf_data_offset); + RD_LOG(DEBUG, "Received Raft server_id={}, term={}, dsn={}, journal_entry=[{}] ", jentry->server_id, + lentry->get_term(), jentry->dsn, jentry->to_string()); - LOGINFO("Received Raft server_id={}, term={}, dsn={}, journal_entry=[{}] ", jentry->server_id, lentry->get_term(), - jentry->dsn, jentry->to_string()); // From the repl_key, get the repl_req. In cases where log stream got here first, this method will create a new // repl_req and return that back. Fill up all of the required journal entry inside the repl_req - auto rreq = m_rd.follower_create_req( + auto rreq = m_rd.applier_create_req( repl_key{.server_id = jentry->server_id, .term = lentry->get_term(), .dsn = jentry->dsn}, header, key, jentry->value_size); rreq->journal_buf = std::move(log_buf); rreq->journal_entry = jentry; - MultiBlkId entry_blkid; - entry_blkid.deserialize(sisl::blob{key.cbytes() + key.size(), jentry->value_size}, true /* copy */); - rreq->remote_blkid = RemoteBlkId{jentry->server_id, entry_blkid}; - - auto const local_size = rreq->local_blkid.serialized_size(); - auto const remote_size = entry_blkid.serialized_size(); - uint8_t* blkid_location; - if (local_size > remote_size) { - // We need to copy the entire log_entry to accomodate local blkid - auto new_buf = nuraft::buffer::expand(*rreq->raft_journal_buf(), - rreq->raft_journal_buf()->size() + local_size - remote_size); - blkid_location = uintptr_cast(new_buf->data_begin()) + rreq->raft_journal_buf()->size() - jentry->value_size; - std::tie(rreq->journal_entry, rreq->header, rreq->key) = log_to_journal_entry(new_buf, log_buf_data_offset); - rreq->journal_buf = std::move(new_buf); - } else { - // Can do in-place replace of remote blkid with local blkid. - blkid_location = uintptr_cast(rreq->raft_journal_buf()->data_begin()) + rreq->raft_journal_buf()->size() - - jentry->value_size; + if (jentry->value_size > 0) { + MultiBlkId entry_blkid; + entry_blkid.deserialize(sisl::blob{key.cbytes() + key.size(), jentry->value_size}, true /* copy */); + rreq->remote_blkid = RemoteBlkId{jentry->server_id, entry_blkid}; + + auto const local_size = rreq->local_blkid.serialized_size(); + auto const remote_size = entry_blkid.serialized_size(); + uint8_t* blkid_location; + if (local_size > remote_size) { + // We need to copy the entire log_entry to accomodate local blkid + auto new_buf = nuraft::buffer::expand(*rreq->raft_journal_buf(), + rreq->raft_journal_buf()->size() + local_size - remote_size); + blkid_location = + uintptr_cast(new_buf->data_begin()) + rreq->raft_journal_buf()->size() - jentry->value_size; + std::tie(rreq->journal_entry, rreq->header, rreq->key) = log_to_journal_entry(new_buf, log_buf_data_offset); + rreq->journal_buf = std::move(new_buf); + } else { + // Can do in-place replace of remote blkid with local blkid. + blkid_location = uintptr_cast(rreq->raft_journal_buf()->data_begin()) + rreq->raft_journal_buf()->size() - + jentry->value_size; + } + std::memcpy(blkid_location, rreq->local_blkid.serialize().cbytes(), local_size); } - std::memcpy(blkid_location, rreq->local_blkid.serialize().cbytes(), local_size); - return rreq; } +raft_buf_ptr_t RaftStateMachine::pre_commit_ext(nuraft::state_machine::ext_op_params const& params) { + int64_t lsn = s_cast< int64_t >(params.log_idx); + + repl_req_ptr_t rreq = lsn_to_req(lsn); + RD_LOG(DEBUG, "Raft channel: Precommit rreq=[{}]", rreq->to_compact_string()); + m_rd.m_listener->on_pre_commit(rreq->lsn, rreq->header, rreq->key, rreq); + + return m_success_ptr; +} + +raft_buf_ptr_t RaftStateMachine::commit_ext(nuraft::state_machine::ext_op_params const& params) { + int64_t lsn = s_cast< int64_t >(params.log_idx); + + repl_req_ptr_t rreq = lsn_to_req(lsn); + RD_LOG(DEBUG, "Raft channel: Received Commit message rreq=[{}]", rreq->to_compact_string()); + if (rreq->is_proposer) { + // This is the time to ensure flushing of journal happens in the proposer + if (m_rd.m_data_journal->last_durable_index() < uint64_cast(lsn)) { m_rd.m_data_journal->flush(); } + rreq->state.fetch_or(uint32_cast(repl_req_state_t::LOG_FLUSHED)); + } + + m_lsn_req_map.erase(rreq->lsn); + m_rd.report_committed(rreq); + + return m_success_ptr; +} + +uint64_t RaftStateMachine::last_commit_index() { return uint64_cast(m_rd.get_last_commit_lsn()); } + void RaftStateMachine::link_lsn_to_req(repl_req_ptr_t rreq, int64_t lsn) { rreq->lsn = lsn; rreq->state.fetch_or(uint32_cast(repl_req_state_t::LOG_RECEIVED)); diff --git a/src/lib/replication/repl_dev/solo_repl_dev.cpp b/src/lib/replication/repl_dev/solo_repl_dev.cpp index 73348f53e..5f07487d3 100644 --- a/src/lib/replication/repl_dev/solo_repl_dev.cpp +++ b/src/lib/replication/repl_dev/solo_repl_dev.cpp @@ -36,6 +36,8 @@ void SoloReplDev::async_alloc_write(sisl::blob const& header, sisl::blob const& // If it is header only entry, directly write to the journal if (rreq->value.size) { + rreq->value_inlined = false; + // Step 1: Alloc Blkid auto status = data_service().alloc_blks(uint32_cast(rreq->value.size), m_listener->get_blk_alloc_hints(rreq->header, rreq->value.size), @@ -50,6 +52,7 @@ void SoloReplDev::async_alloc_write(sisl::blob const& header, sisl::blob const& write_journal(std::move(rreq)); }); } else { + rreq->value_inlined = true; write_journal(std::move(rreq)); } } diff --git a/src/tests/test_common/hs_repl_test_common.hpp b/src/tests/test_common/hs_repl_test_common.hpp index 6367be928..fbdcd6861 100644 --- a/src/tests/test_common/hs_repl_test_common.hpp +++ b/src/tests/test_common/hs_repl_test_common.hpp @@ -121,7 +121,7 @@ class HSReplTestHelper { public: friend class TestReplApplication; - HSReplTestHelper(std::string const& name, char** argv) : name_{name}, argv_{argv} {} + HSReplTestHelper(std::string const& name, int argc, char** argv) : name_{name}, argc_{argc}, argv_{argv} {} void setup() { replica_num_ = SISL_OPTIONS["replica_num"].as< uint16_t >(); @@ -153,8 +153,13 @@ class HSReplTestHelper { for (uint32_t i{1}; i < num_replicas; ++i) { LOGINFO("Spawning Homestore replica={} instance", i); - boost::process::child c(argv_[0], "--log_mods", "replication:trace", "--replica_num", std::to_string(i), - proc_grp_); + std::string cmd_line; + fmt::format_to(std::back_inserter(cmd_line), "{} --replica_num {}", argv_[0], i); + for (int j{1}; j < argc_; ++j) { + fmt::format_to(std::back_inserter(cmd_line), " {}", argv_[j]); + } + boost::process::child c(boost::process::cmd = cmd_line, proc_grp_); + // boost::process::child c(argv_[0], "--replica_num", std::to_string(i), proc_grp_); c.detach(); } } else { @@ -282,6 +287,7 @@ class HSReplTestHelper { private: uint16_t replica_num_; std::string name_; + int argc_; char** argv_; boost::process::group proc_grp_; diff --git a/src/tests/test_raft_repl_dev.cpp b/src/tests/test_raft_repl_dev.cpp index e31b61a1a..3e8e1ae7e 100644 --- a/src/tests/test_raft_repl_dev.cpp +++ b/src/tests/test_raft_repl_dev.cpp @@ -52,6 +52,8 @@ SISL_OPTION_GROUP(test_raft_repl_dev, SISL_OPTIONS_ENABLE(logging, test_raft_repl_dev, iomgr, config, test_common_setup, test_repl_common_setup) static std::unique_ptr< test_common::HSReplTestHelper > g_helper; +static std::random_device g_rd{}; +static std::default_random_engine g_re{g_rd()}; class TestReplicatedDB : public homestore::ReplDevListener { public: @@ -174,20 +176,25 @@ class TestReplicatedDB : public homestore::ReplDevListener { ++it; } - auto block_size = SISL_OPTIONS["block_size"].as< uint32_t >(); - auto read_sgs = test_common::HSTestHelper::create_sgs(v.data_size_, block_size); - - repl_dev()->async_read(v.blkid_, read_sgs, v.data_size_).thenValue([read_sgs, k, v](auto const ec) { - RELEASE_ASSERT(!ec, "Read of blkid={} for key={} error={}", v.blkid_.to_string(), k.id_, ec.message()); - for (auto const& iov : read_sgs.iovs) { - test_common::HSTestHelper::validate_data_buf(uintptr_cast(iov.iov_base), iov.iov_len, - v.data_pattern_); - iomanager.iobuf_free(uintptr_cast(iov.iov_base)); - } - LOGINFO("Validated successfully key={} value[blkid={} pattern={}]", k.id_, v.blkid_.to_string(), - v.data_pattern_); + if (v.data_size_ != 0) { + auto block_size = SISL_OPTIONS["block_size"].as< uint32_t >(); + auto read_sgs = test_common::HSTestHelper::create_sgs(v.data_size_, block_size); + + repl_dev()->async_read(v.blkid_, read_sgs, v.data_size_).thenValue([read_sgs, k, v](auto const ec) { + RELEASE_ASSERT(!ec, "Read of blkid={} for key={} error={}", v.blkid_.to_string(), k.id_, + ec.message()); + for (auto const& iov : read_sgs.iovs) { + test_common::HSTestHelper::validate_data_buf(uintptr_cast(iov.iov_base), iov.iov_len, + v.data_pattern_); + iomanager.iobuf_free(uintptr_cast(iov.iov_base)); + } + LOGINFO("Validated successfully key={} value[blkid={} pattern={}]", k.id_, v.blkid_.to_string(), + v.data_pattern_); + g_helper->runner().next_task(); + }); + } else { g_helper->runner().next_task(); - }); + } }); g_helper->runner().execute().get(); } @@ -269,7 +276,6 @@ class RaftReplDevTest : public testing::Test { }; TEST_F(RaftReplDevTest, All_Append_Restart_Append) { - LOGINFO("Homestore replica={} setup completed", g_helper->replica_num()); g_helper->sync_for_test_start(); @@ -277,7 +283,10 @@ TEST_F(RaftReplDevTest, All_Append_Restart_Append) { if (g_helper->replica_num() == 0) { auto block_size = SISL_OPTIONS["block_size"].as< uint32_t >(); LOGINFO("Run on worker threads to schedule append on repldev for {} Bytes.", block_size); - g_helper->runner().set_task([this, block_size]() { this->generate_writes(block_size, block_size); }); + g_helper->runner().set_task([this, block_size]() { + static std::normal_distribution<> num_blks_gen{3.0, 2.0}; + this->generate_writes(std::abs(std::round(num_blks_gen(g_re))) * block_size, block_size); + }); g_helper->runner().execute().get(); } this->wait_for_all_writes(exp_entries); @@ -298,7 +307,10 @@ TEST_F(RaftReplDevTest, All_Append_Restart_Append) { LOGINFO("Post restart write the data again"); auto block_size = SISL_OPTIONS["block_size"].as< uint32_t >(); - g_helper->runner().set_task([this, block_size]() { this->generate_writes(block_size, block_size); }); + g_helper->runner().set_task([this, block_size]() { + static std::normal_distribution<> num_blks_gen{3.0, 2.0}; + this->generate_writes(std::abs(std::round(num_blks_gen(g_re))) * block_size, block_size); + }); g_helper->runner().execute().get(); } this->wait_for_all_writes(exp_entries); @@ -467,7 +479,7 @@ int main(int argc, char* argv[]) { test_repl_common_setup); FLAGS_folly_global_cpu_executor_threads = 4; - g_helper = std::make_unique< test_common::HSReplTestHelper >("test_raft_repl_dev", orig_argv); + g_helper = std::make_unique< test_common::HSReplTestHelper >("test_raft_repl_dev", argc, orig_argv); g_helper->setup(); (g_helper->replica_num() == 0) ? ::testing::GTEST_FLAG(filter) = "*Primary_*:*All_*"