diff --git a/.github/workflows/merge_build_3x.yml b/.github/workflows/merge_build_3x.yml new file mode 100644 index 000000000..2744dd33d --- /dev/null +++ b/.github/workflows/merge_build_3x.yml @@ -0,0 +1,35 @@ +name: Homestore 3.x Build + +on: + workflow_dispatch: + push: + branches: + - stable/v3.x + +jobs: + Build: + strategy: + fail-fast: false + matrix: + platform: ["ubuntu-22.04", "ubuntu-20.04"] + build-type: ["Debug", "Release"] + malloc-impl: ["libc", "tcmalloc"] + prerelease: ["True", "False"] + exclude: + - build-type: Debug + platform: ubuntu-20.04 + - build-type: Debug + malloc-impl: tcmalloc + - malloc-impl: tcmalloc + platform: ubuntu-20.04 + - malloc-impl: libc + build-type: Release + platform: ubuntu-22.04 + - prerelease: "True" + platform: ubuntu-20.04 + uses: ./.github/workflows/build_commit.yml + with: + platform: ${{ matrix.platform }} + build-type: ${{ matrix.build-type }} + malloc-impl: ${{ matrix.malloc-impl }} + prerelease: ${{ matrix.prerelease }} diff --git a/conanfile.py b/conanfile.py index 78381279e..722314053 100644 --- a/conanfile.py +++ b/conanfile.py @@ -5,7 +5,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "6.4.25" + version = "6.4.27" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" topics = ("ebay", "nublox") @@ -31,7 +31,6 @@ class HomestoreConan(ConanFile): 'skip_testing': False, } - generators = "cmake", "cmake_find_package" exports_sources = "cmake/*", "src/*", "CMakeLists.txt", "test_wrap.sh", "LICENSE" keep_imports = True diff --git a/src/include/homestore/logstore/log_store.hpp b/src/include/homestore/logstore/log_store.hpp index 6c0b493ec..eec7eac50 100644 --- a/src/include/homestore/logstore/log_store.hpp +++ b/src/include/homestore/logstore/log_store.hpp @@ -368,6 +368,7 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > { // Sync flush sections std::atomic< logstore_seq_num_t > m_sync_flush_waiter_lsn{invalid_lsn()}; std::mutex m_sync_flush_mtx; + std::mutex m_single_sync_flush_mtx; std::condition_variable m_sync_flush_cv; std::vector< seq_ld_key_pair > m_truncation_barriers; // List of truncation barriers diff --git a/src/lib/device/journal_vdev.cpp b/src/lib/device/journal_vdev.cpp index d82feb5bf..d526103fc 100644 --- a/src/lib/device/journal_vdev.cpp +++ b/src/lib/device/journal_vdev.cpp @@ -284,7 +284,8 @@ off_t JournalVirtualDev::Descriptor::alloc_next_append_blk(size_t sz) { // update reserved size; m_reserved_sz += sz; - high_watermark_check(); + // TODO enable after resource manager changes. + // high_watermark_check(); // assert that returnning logical offset is in good range HS_DBG_ASSERT_LE(tail_off, m_end_offset); diff --git a/src/lib/logstore/log_dev.cpp b/src/lib/logstore/log_dev.cpp index 4ae040ae9..12e4de193 100644 --- a/src/lib/logstore/log_dev.cpp +++ b/src/lib/logstore/log_dev.cpp @@ -262,6 +262,11 @@ int64_t LogDev::append_async(const logstore_id_t store_id, const logstore_seq_nu auto threshold_size = LogDev::flush_data_threshold_size(); m_log_records->create(idx, store_id, seq_num, data, cb_context); + if (HS_DYNAMIC_CONFIG(logstore.flush_threshold_size) == 0) { + // This is set in tests to disable implicit flush. This will be removed in future. + return idx; + } + if (flush_wait || ((prev_size < threshold_size && ((prev_size + data.size()) >= threshold_size) && !m_is_flushing.load(std::memory_order_relaxed)))) { diff --git a/src/lib/logstore/log_store.cpp b/src/lib/logstore/log_store.cpp index bd60291c6..672f05558 100644 --- a/src/lib/logstore/log_store.cpp +++ b/src/lib/logstore/log_store.cpp @@ -173,10 +173,11 @@ void HomeLogStore::read_async(logstore_seq_num_t seq_num, void* cookie, const lo #endif void HomeLogStore::on_write_completion(logstore_req* req, const logdev_key& ld_key) { + std::unique_lock lk(m_sync_flush_mtx); // Upon completion, create the mapping between seq_num and log dev key m_records.update(req->seq_num, [&](logstore_record& rec) -> bool { rec.m_dev_key = ld_key; - // THIS_LOGSTORE_LOG(DEBUG, "Completed write of lsn {} logdev_key={}", req->seq_num, ld_key); + THIS_LOGSTORE_LOG(DEBUG, "Completed write of store lsn {} logdev_key={}", req->seq_num, ld_key); return true; }); // assert(flush_ld_key.idx >= m_last_flush_ldkey.idx); @@ -402,6 +403,7 @@ void HomeLogStore::flush_sync(logstore_seq_num_t upto_seq_num) { HS_DBG_ASSERT_EQ(LogDev::can_flush_in_this_thread(), false, "Logstore flush sync cannot be called on same thread which could do logdev flush"); + std::unique_lock lk(m_single_sync_flush_mtx); if (upto_seq_num == invalid_lsn()) { upto_seq_num = m_records.active_upto(); } // if we have flushed already, we are done @@ -426,6 +428,7 @@ void HomeLogStore::flush_sync(logstore_seq_num_t upto_seq_num) { // NOTE: We are not resetting the lsn because same seq number should never have 2 completions and thus not // doing it saves an atomic instruction + THIS_LOGSTORE_LOG(TRACE, "flush_sync over upto_seq_num {}", upto_seq_num); } } diff --git a/src/lib/replication/log_store/home_raft_log_store.cpp b/src/lib/replication/log_store/home_raft_log_store.cpp index 80e08d809..2163a886c 100644 --- a/src/lib/replication/log_store/home_raft_log_store.cpp +++ b/src/lib/replication/log_store/home_raft_log_store.cpp @@ -130,7 +130,7 @@ ulong HomeRaftLogStore::next_slot() const { } ulong HomeRaftLogStore::last_index() const { - uint64_t last_index = m_log_store->get_contiguous_completed_seq_num(m_last_durable_lsn); + uint64_t last_index = to_repl_lsn(m_log_store->get_contiguous_completed_seq_num(m_last_durable_lsn)); return last_index; } diff --git a/src/lib/replication/log_store/repl_log_store.cpp b/src/lib/replication/log_store/repl_log_store.cpp index c678a90d4..4f9ffc00d 100644 --- a/src/lib/replication/log_store/repl_log_store.cpp +++ b/src/lib/replication/log_store/repl_log_store.cpp @@ -41,14 +41,20 @@ void ReplLogStore::end_of_append_batch(ulong start_lsn, ulong count) { // Start fetch the batch of data for this lsn range from remote if its not available yet. auto reqs = sisl::VectorPool< repl_req_ptr_t >::alloc(); + auto proposer_reqs = sisl::VectorPool< repl_req_ptr_t >::alloc(); for (int64_t lsn = int64_cast(start_lsn); lsn <= end_lsn; ++lsn) { auto rreq = m_sm.lsn_to_req(lsn); // Skip this call in proposer, since this method will synchronously flush the data, which is not required for // leader. Proposer will call the flush as part of commit after receiving quorum, upon which time, there is a // high possibility the log entry is already flushed. Skip it for rreq == nullptr which is the case for raft // config entries. - if ((rreq == nullptr) || rreq->is_proposer()) { continue; } - reqs->emplace_back(std::move(rreq)); + if ((rreq == nullptr) /*|| rreq->is_proposer()*/) { + continue; + } else if (rreq->is_proposer()) { + proposer_reqs->emplace_back(std::move(rreq)); + } else { + reqs->emplace_back(std::move(rreq)); + } } RD_LOGT("Raft Channel: end_of_append_batch start_lsn={} count={} num_data_to_be_written={}", start_lsn, count, @@ -73,8 +79,17 @@ void ReplLogStore::end_of_append_batch(ulong start_lsn, ulong count) { for (auto const& rreq : *reqs) { if (rreq) { rreq->add_state(repl_req_state_t::LOG_FLUSHED); } } + } else if (!proposer_reqs->empty()) { + RD_LOGT("Raft Channel: end_of_append_batch, I am proposer, only flush log s from {} , count {}", start_lsn, + count); + // Mark all the reqs also completely written + HomeRaftLogStore::end_of_append_batch(start_lsn, count); + for (auto const& rreq : *proposer_reqs) { + if (rreq) { rreq->add_state(repl_req_state_t::LOG_FLUSHED); } + } } sisl::VectorPool< repl_req_ptr_t >::free(reqs); + sisl::VectorPool< repl_req_ptr_t >::free(proposer_reqs); } std::string ReplLogStore::rdev_name() const { return m_rd.rdev_name(); } diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index ac02e8cde..f44e85e03 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -148,13 +148,15 @@ void RaftReplDev::async_alloc_write(sisl::blob const& header, sisl::blob const& repl_req_ptr_t rreq) { if (!rreq) { auto rreq = repl_req_ptr_t(new repl_req_ctx{}); } - auto const guard = m_stage.access(); - if (auto const stage = *guard.get(); stage != repl_dev_stage_t::ACTIVE) { - RD_LOGW("Raft channel: Not ready to accept writes, stage={}", enum_name(stage)); - handle_error(rreq, - (stage == repl_dev_stage_t::INIT) ? ReplServiceError::SERVER_IS_JOINING - : ReplServiceError::SERVER_IS_LEAVING); - return; + { + auto const guard = m_stage.access(); + if (auto const stage = *guard.get(); stage != repl_dev_stage_t::ACTIVE) { + RD_LOGW("Raft channel: Not ready to accept writes, stage={}", enum_name(stage)); + handle_error(rreq, + (stage == repl_dev_stage_t::INIT) ? ReplServiceError::SERVER_IS_JOINING + : ReplServiceError::SERVER_IS_LEAVING); + return; + } } rreq->init(repl_key{.server_id = server_id(), .term = raft_server()->get_term(), .dsn = m_next_dsn.fetch_add(1)}, @@ -249,6 +251,11 @@ void RaftReplDev::push_data_to_all_followers(repl_req_ptr_t rreq, sisl::sg_list void RaftReplDev::on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_data) { auto const& incoming_buf = rpc_data->request_blob(); + if (!incoming_buf.cbytes()) { + RD_LOGW("Data Channel: PushData received with empty buffer, ignoring this call"); + rpc_data->send_response(); + return; + } auto const fb_size = flatbuffers::ReadScalar< flatbuffers::uoffset_t >(incoming_buf.cbytes()) + sizeof(flatbuffers::uoffset_t); auto push_req = GetSizePrefixedPushDataRequest(incoming_buf.cbytes()); @@ -517,7 +524,7 @@ void RaftReplDev::check_and_fetch_remote_data(std::vector< repl_req_ptr_t > rreq void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs) { if (rreqs.size() == 0) { return; } - std::vector<::flatbuffers::Offset< RequestEntry > > entries; + std::vector< ::flatbuffers::Offset< RequestEntry > > entries; entries.reserve(rreqs.size()); shared< flatbuffers::FlatBufferBuilder > builder = std::make_shared< flatbuffers::FlatBufferBuilder >(); @@ -597,6 +604,11 @@ void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs) { void RaftReplDev::on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_data) { auto const& incoming_buf = rpc_data->request_blob(); + if (!incoming_buf.cbytes()) { + RD_LOGW("Data Channel: PushData received with empty buffer, ignoring this call"); + rpc_data->send_response(); + return; + } auto fetch_req = GetSizePrefixedFetchData(incoming_buf.cbytes()); RD_LOGD("Data Channel: FetchData received: fetch_req.size={}", fetch_req->request()->entries()->size()); diff --git a/src/lib/replication/repl_dev/raft_state_machine.cpp b/src/lib/replication/repl_dev/raft_state_machine.cpp index a805b229b..b97b38945 100644 --- a/src/lib/replication/repl_dev/raft_state_machine.cpp +++ b/src/lib/replication/repl_dev/raft_state_machine.cpp @@ -7,6 +7,8 @@ #include "service/raft_repl_service.h" #include "repl_dev/raft_state_machine.h" #include "repl_dev/raft_repl_dev.h" +#include +#include "common/homestore_config.hpp" SISL_LOGGING_DECL(replication) @@ -187,11 +189,10 @@ raft_buf_ptr_t RaftStateMachine::commit_ext(nuraft::state_machine::ext_op_params RD_LOGD("Raft channel: Received Commit message lsn {} store {} logdev {} size {}", lsn, m_rd.m_data_journal->logstore_id(), m_rd.m_data_journal->logdev_id(), params.data->size()); repl_req_ptr_t rreq = lsn_to_req(lsn); - if (!rreq) { RD_LOGD("Raft channel got null rreq"); } + RD_DBG_ASSERT(rreq != nullptr, "Raft channel got null rreq"); RD_LOGD("Raft channel: Received Commit message rreq=[{}]", rreq->to_compact_string()); if (rreq->is_proposer()) { // This is the time to ensure flushing of journal happens in the proposer - if (m_rd.m_data_journal->last_durable_index() < uint64_cast(lsn)) { m_rd.m_data_journal->flush(); } rreq->add_state(repl_req_state_t::LOG_FLUSHED); } diff --git a/src/tests/test_raft_repl_dev.cpp b/src/tests/test_raft_repl_dev.cpp index 8013822bb..3e3075190 100644 --- a/src/tests/test_raft_repl_dev.cpp +++ b/src/tests/test_raft_repl_dev.cpp @@ -728,6 +728,10 @@ int main(int argc, char* argv[]) { s.consensus.leadership_expiry_ms = -1; // -1 means never expires; s.generic.repl_dev_cleanup_interval_sec = 0; + // Disable implicit flush and timer. + s.logstore.flush_threshold_size = 0; + s.logstore.flush_timer_frequency_us = 0; + // only reset when user specified the value for test; if (SISL_OPTIONS.count("snapshot_distance")) { s.consensus.snapshot_freq_distance = SISL_OPTIONS["snapshot_distance"].as< uint32_t >();