Skip to content

Commit

Permalink
Generalize and introduce Sealer into CP.
Browse files Browse the repository at this point in the history
Sealer is a special consumer that provides information regarding where the cp is up to.
It will be the first one during cp switch over , as a conservative marker of everything
before or equals to this point, should be in current cp, possibly some consumer are above this point which is fine.
And Sealer is the last one during cp flush after all other services flushed successfully.

Signed-off-by: Xiaoxi Chen <[email protected]>
  • Loading branch information
xiaoxichen committed Sep 28, 2024
1 parent 9fc0ed9 commit fc7b272
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 11 deletions.
5 changes: 5 additions & 0 deletions src/include/homestore/checkpoint/cp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ class CPContext;
class CPManager;

VENUM(cp_consumer_t, uint8_t,
// Sealer is a special consumer that provides information regarding where the cp is up to.
// It will be the first one during cp switch over , as a conservative marker of everything
// before or equals to this point, should be in current cp, possibly some consumer are above this point which is fine.
// And Sealer is the last one during cp flush after all other services flushed successfully.
SEALER = 3,
HS_CLIENT = 0, // Client of the homestore module
INDEX_SVC = 1, // Index service module
BLK_DATA_SVC = 2, // Block data service module
Expand Down
27 changes: 16 additions & 11 deletions src/lib/checkpoint/cp_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,16 @@ folly::Future< bool > CPManager::do_trigger_cp_flush(bool force, bool flush_on_s
new_cp->m_cp_id = cur_cp->m_cp_id + 1;

HS_PERIODIC_LOG(DEBUG, cp, "Create New CP session", new_cp->id());
size_t idx{0};
for (auto& consumer : m_cp_cb_table) {
if (consumer) { new_cp->m_contexts[idx] = std::move(consumer->on_switchover_cp(cur_cp.get(), new_cp)); }
++idx;
// sealer should be the first one to switch over
auto& sealer_cp = m_cp_cb_table[(size_t)cp_consumer_t::SEALER];
if (sealer_cp) {
new_cp->m_contexts[(size_t)cp_consumer_t::SEALER] = std::move(sealer_cp->on_switchover_cp(cur_cp.get(), new_cp));
}
// switch over other consumers
for (size_t svcid = 0; svcid < (size_t)cp_consumer_t::SENTINEL; svcid++) {
if (svcid == (size_t)cp_consumer_t::SEALER) { continue; }
auto& consumer = m_cp_cb_table[svcid];
if (consumer) { new_cp->m_contexts[svcid] = std::move(consumer->on_switchover_cp(cur_cp.get(), new_cp)); }
}

HS_PERIODIC_LOG(DEBUG, cp, "CP Attached completed, proceed to exit cp critical section");
Expand Down Expand Up @@ -219,18 +225,17 @@ void CPManager::cp_start_flush(CP* cp) {
HS_PERIODIC_LOG(INFO, cp, "Starting CP {} flush", cp->id());
cp->m_cp_status = cp_status_t::cp_flushing;
for (size_t svcid = 0; svcid < (size_t)cp_consumer_t::SENTINEL; svcid++) {
if (svcid == (size_t)cp_consumer_t::REPLICATION_SVC) {
continue;
}
if (svcid == (size_t)cp_consumer_t::SEALER) { continue; }
auto& consumer = m_cp_cb_table[svcid];
if (consumer) { futs.emplace_back(std::move(consumer->cp_flush(cp))); }
}

folly::collectAllUnsafe(futs).thenValue([this, cp](auto) {
// Sync flushing replication svc at last as the cp_lsn updated here
// other component should at least flushed to cp_lsn
auto& repl_cp = m_cp_cb_table[(size_t)cp_consumer_t::REPLICATION_SVC];
if (repl_cp) {repl_cp->cp_flush(cp).wait();}
// Sync flushing SEALER svc which is the replication service
// at last as the cp_lsn updated here. Other component should
// at least flushed to cp_lsn.
auto& sealer_cp = m_cp_cb_table[(size_t)cp_consumer_t::SEALER];
if (sealer_cp) { sealer_cp->cp_flush(cp).wait(); }
// All consumers have flushed for the cp
on_cp_flush_done(cp);
});
Expand Down

0 comments on commit fc7b272

Please sign in to comment.