Skip to content

Commit

Permalink
Only call cp_flush for those consumer paticipated in this cp.
Browse files Browse the repository at this point in the history
If a consumer registered after a cp goes to flushing state,
the on_switchover_cp cb will not be called for this consumer.
In this CP, the ctx for this consumer is nullptr as the consumer
never participant in the cp.

Previous code calling cp_flush for every consumer, leaving the duty
of properly handle the nullptr returned by cp->context(svc_id) to
consumer. However, none of the existing consumer handled the case.

As a result, we hit an occurance that Index generate a CP sololy,
but before the cp fully flushed, other consumer registered and be
called into cp_flush(), the replication service, doesnt properly
handled the nullptr like below, `get_repl_dev_ctx` was called with this_ptr
is null, it is dangerous as invalid memory get accessed.

This change is a breaking change for consumer like HO so bump up the version.
HomeObject participant the CP as CLIENT, current implementation of HO always
returns nullptr for `on_switchover_cp` which will result the CLIENT be excluded
from cp_flush after this commit merged.

callstack:
```
homestore::ReplSvcCPContext::get_repl_dev_ctx (this=0x0, dev=0x56010ab52b00) at /home/ubuntu/HomeStore/src/lib/replication/service/raft_repl_service.cpp:521
0x0000560106d58f1e in homestore::RaftReplServiceCPHandler::cp_flush (this=<optimized out>, cp=0x56010a467940) at /home/ubuntu/HomeStore/src/lib/replication/service/raft_repl_service.cpp:549
```

code:

```
 auto cp_ctx = s_cast< ReplSvcCPContext* >(cp->context(cp_consumer_t::REPLICATION_SVC));
 ...
 auto dev_ctx = cp_ctx->get_repl_dev_ctx(repl_dev.get());
```

Signed-off-by: Xiaoxi Chen <[email protected]>
  • Loading branch information
xiaoxichen committed Dec 23, 2024
1 parent 9f9bd45 commit c388f69
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 5 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.6.0"
version = "6.6.1"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
9 changes: 6 additions & 3 deletions src/lib/checkpoint/cp_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,8 @@ folly::Future< bool > CPManager::do_trigger_cp_flush(bool force, bool flush_on_s
// sealer should be the first one to switch over
auto& sealer_cp = m_cp_cb_table[(size_t)cp_consumer_t::SEALER];
if (sealer_cp) {
new_cp->m_contexts[(size_t)cp_consumer_t::SEALER] = std::move(sealer_cp->on_switchover_cp(cur_cp.get(), new_cp));
new_cp->m_contexts[(size_t)cp_consumer_t::SEALER] =
std::move(sealer_cp->on_switchover_cp(cur_cp.get(), new_cp));
}
// switch over other consumers
for (size_t svcid = 0; svcid < (size_t)cp_consumer_t::SENTINEL; svcid++) {
Expand Down Expand Up @@ -227,15 +228,17 @@ void CPManager::cp_start_flush(CP* cp) {
for (size_t svcid = 0; svcid < (size_t)cp_consumer_t::SENTINEL; svcid++) {
if (svcid == (size_t)cp_consumer_t::SEALER) { continue; }
auto& consumer = m_cp_cb_table[svcid];
if (consumer) { futs.emplace_back(std::move(consumer->cp_flush(cp))); }
bool participated = (cp->m_contexts[svcid] != nullptr);
if (consumer && participated) { futs.emplace_back(std::move(consumer->cp_flush(cp))); }
}

folly::collectAllUnsafe(futs).thenValue([this, cp](auto) {
// Sync flushing SEALER svc which is the replication service
// at last as the cp_lsn updated here. Other component should
// at least flushed to cp_lsn.
auto& sealer_cp = m_cp_cb_table[(size_t)cp_consumer_t::SEALER];
if (sealer_cp) { sealer_cp->cp_flush(cp).wait(); }
bool participated = (cp->m_contexts[(size_t)cp_consumer_t::SEALER] != nullptr);
if (sealer_cp && participated) { sealer_cp->cp_flush(cp).wait(); }
// All consumers have flushed for the cp
on_cp_flush_done(cp);
});
Expand Down
4 changes: 3 additions & 1 deletion src/lib/replication/service/generic_repl_svc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,9 @@ AsyncReplResult<> SoloReplService::replace_member(group_id_t group_id, const rep
return make_async_error<>(ReplServiceError::NOT_IMPLEMENTED);
}

std::unique_ptr< CPContext > SoloReplServiceCPHandler::on_switchover_cp(CP* cur_cp, CP* new_cp) { return nullptr; }
std::unique_ptr< CPContext > SoloReplServiceCPHandler::on_switchover_cp(CP* cur_cp, CP* new_cp) {
return std::make_unique< CPContext >(new_cp);
}

folly::Future< bool > SoloReplServiceCPHandler::cp_flush(CP* cp) {
repl_service().iterate_repl_devs([cp](cshared< ReplDev >& repl_dev) {
Expand Down

0 comments on commit c388f69

Please sign in to comment.