From fc7b272cc55976a6a9d2bdb21dc5b9a48c439693 Mon Sep 17 00:00:00 2001 From: Xiaoxi Chen Date: Tue, 24 Sep 2024 14:32:07 +0800 Subject: [PATCH] Generalize and introduce Sealer into CP. Sealer is a special consumer that provides information regarding where the cp is up to. It will be the first one during cp switch over , as a conservative marker of everything before or equals to this point, should be in current cp, possibly some consumer are above this point which is fine. And Sealer is the last one during cp flush after all other services flushed successfully. Signed-off-by: Xiaoxi Chen --- src/include/homestore/checkpoint/cp.hpp | 5 +++++ src/lib/checkpoint/cp_mgr.cpp | 27 +++++++++++++++---------- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/src/include/homestore/checkpoint/cp.hpp b/src/include/homestore/checkpoint/cp.hpp index c15bed87a..e88a9e4e2 100644 --- a/src/include/homestore/checkpoint/cp.hpp +++ b/src/include/homestore/checkpoint/cp.hpp @@ -70,6 +70,11 @@ class CPContext; class CPManager; VENUM(cp_consumer_t, uint8_t, + // Sealer is a special consumer that provides information regarding where the cp is up to. + // It will be the first one during cp switch over , as a conservative marker of everything + // before or equals to this point, should be in current cp, possibly some consumer are above this point which is fine. + // And Sealer is the last one during cp flush after all other services flushed successfully. + SEALER = 3, HS_CLIENT = 0, // Client of the homestore module INDEX_SVC = 1, // Index service module BLK_DATA_SVC = 2, // Block data service module diff --git a/src/lib/checkpoint/cp_mgr.cpp b/src/lib/checkpoint/cp_mgr.cpp index 2913e1da6..7072d7c91 100644 --- a/src/lib/checkpoint/cp_mgr.cpp +++ b/src/lib/checkpoint/cp_mgr.cpp @@ -184,10 +184,16 @@ folly::Future< bool > CPManager::do_trigger_cp_flush(bool force, bool flush_on_s new_cp->m_cp_id = cur_cp->m_cp_id + 1; HS_PERIODIC_LOG(DEBUG, cp, "Create New CP session", new_cp->id()); - size_t idx{0}; - for (auto& consumer : m_cp_cb_table) { - if (consumer) { new_cp->m_contexts[idx] = std::move(consumer->on_switchover_cp(cur_cp.get(), new_cp)); } - ++idx; + // sealer should be the first one to switch over + auto& sealer_cp = m_cp_cb_table[(size_t)cp_consumer_t::SEALER]; + if (sealer_cp) { + new_cp->m_contexts[(size_t)cp_consumer_t::SEALER] = std::move(sealer_cp->on_switchover_cp(cur_cp.get(), new_cp)); + } + // switch over other consumers + for (size_t svcid = 0; svcid < (size_t)cp_consumer_t::SENTINEL; svcid++) { + if (svcid == (size_t)cp_consumer_t::SEALER) { continue; } + auto& consumer = m_cp_cb_table[svcid]; + if (consumer) { new_cp->m_contexts[svcid] = std::move(consumer->on_switchover_cp(cur_cp.get(), new_cp)); } } HS_PERIODIC_LOG(DEBUG, cp, "CP Attached completed, proceed to exit cp critical section"); @@ -219,18 +225,17 @@ void CPManager::cp_start_flush(CP* cp) { HS_PERIODIC_LOG(INFO, cp, "Starting CP {} flush", cp->id()); cp->m_cp_status = cp_status_t::cp_flushing; for (size_t svcid = 0; svcid < (size_t)cp_consumer_t::SENTINEL; svcid++) { - if (svcid == (size_t)cp_consumer_t::REPLICATION_SVC) { - continue; - } + if (svcid == (size_t)cp_consumer_t::SEALER) { continue; } auto& consumer = m_cp_cb_table[svcid]; if (consumer) { futs.emplace_back(std::move(consumer->cp_flush(cp))); } } folly::collectAllUnsafe(futs).thenValue([this, cp](auto) { - // Sync flushing replication svc at last as the cp_lsn updated here - // other component should at least flushed to cp_lsn - auto& repl_cp = m_cp_cb_table[(size_t)cp_consumer_t::REPLICATION_SVC]; - if (repl_cp) {repl_cp->cp_flush(cp).wait();} + // Sync flushing SEALER svc which is the replication service + // at last as the cp_lsn updated here. Other component should + // at least flushed to cp_lsn. + auto& sealer_cp = m_cp_cb_table[(size_t)cp_consumer_t::SEALER]; + if (sealer_cp) { sealer_cp->cp_flush(cp).wait(); } // All consumers have flushed for the cp on_cp_flush_done(cp); });