Skip to content

Commit

Permalink
waiting until all the callbacks are done before stopping (#497)
Browse files Browse the repository at this point in the history
* waiting until all the callbacks are done before stopping

* fix segfault

* add lock for read
  • Loading branch information
JacksonYao287 authored Aug 14, 2024
1 parent b12097d commit 2fa025e
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 9 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.4.42"
version = "6.4.43"
homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
topics = ("ebay", "nublox")
Expand Down
8 changes: 8 additions & 0 deletions src/include/homestore/logstore/log_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,14 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > {
* @param b Blob of data to append
* @param cookie Passed as is to the completion callback
* @param completion_cb Completion callback which contains the seqnum, status and cookie
*
* Note that: completion_cb will be executed in background fibers, so different completion_cbs probabaly be executed
* concurrently. also, logstore does not guarantee the order of completion_cb execution. that means, the caller
* should:
*
* 1 add lock in the completion_cb if the caller wants to make sure the safety of concurrent execution.
* 2 keep in mind that the completion_cb probabaly be executed in different order than the append order.
*
* @return internally generated sequence number
*/
logstore_seq_num_t append_async(const sisl::io_blob& b, void* cookie, const log_write_comp_cb_t& completion_cb);
Expand Down
12 changes: 10 additions & 2 deletions src/lib/logstore/log_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,12 @@ void LogDev::stop() {
{
std::unique_lock lg = flush_guard();
m_stopped = true;
// waiting under lock to make sure no new flush is started
while (m_pending_callback.load() > 0) {
THIS_LOGDEV_LOG(INFO, "Waiting for pending callbacks to complete, pending callbacks {}",
m_pending_callback.load());
std::this_thread::sleep_for(std::chrono::milliseconds{1000});
}
}
// after we call stop, we need to do any pending device truncations
truncate();
Expand Down Expand Up @@ -488,14 +494,16 @@ void LogDev::on_flush_completion(LogGroup* lg) {
m_last_flush_idx = upto_indx;

// since we support out-of-order lsn write, so no need to guarantee the order of logstore write completion
// TODO:: add some logic to guarantee all the callback is done when stop.
for (auto const& [idx, req] : req_map)
for (auto const& [idx, req] : req_map) {
m_pending_callback++;
iomanager.run_on_forget(iomgr::reactor_regex::random_worker, iomgr::fiber_regex::syncio_only,
[this, dev_offset, idx, req]() {
auto ld_key = logdev_key{idx, dev_offset};
auto comp_cb = req->log_store->get_comp_cb();
(req->cb) ? req->cb(req, ld_key) : comp_cb(req, ld_key);
m_pending_callback--;
});
}
}

uint64_t LogDev::truncate() {
Expand Down
1 change: 1 addition & 0 deletions src/lib/logstore/log_dev.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -810,6 +810,7 @@ class LogDev : public std::enable_shared_from_this< LogDev > {
// callback of the append_async we schedule another flush.), so we need the lock to be locked for multitimes in the
// same thread.
iomgr::FiberManagerLib::mutex m_flush_mtx;
std::atomic_uint64_t m_pending_callback{0};
}; // LogDev

} // namespace homestore
9 changes: 5 additions & 4 deletions src/lib/logstore/log_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ sisl::byte_view log_stream_reader::next_group(off_t* out_dev_offset) {
HS_REL_ASSERT_GE(m_cur_log_buf.size(), m_read_size_multiple);
const auto* header = r_cast< log_group_header const* >(m_cur_log_buf.bytes());
if (header->magic_word() != LOG_GROUP_HDR_MAGIC) {
LOGERROR("Logdev data not seeing magic at pos {}, must have come to end of log_dev={}",
m_vdev_jd->dev_offset(m_cur_read_bytes), m_vdev_jd->logdev_id());
LOGDEBUGMOD(logstore, "Logdev data not seeing magic at pos {}, must have come to end of log_dev={}",
m_vdev_jd->dev_offset(m_cur_read_bytes), m_vdev_jd->logdev_id());
*out_dev_offset = m_vdev_jd->dev_offset(m_cur_read_bytes);
// move it by dma boundary if header is not valid
m_prev_crc = 0;
Expand All @@ -86,8 +86,9 @@ sisl::byte_view log_stream_reader::next_group(off_t* out_dev_offset) {
// compare it with prev crc
if (m_prev_crc != 0 && m_prev_crc != header->prev_grp_crc) {
// we reached at the end
LOGERROR("we have reached the end. crc doesn't match offset {} prev crc {} header prev crc {} log_dev={}",
m_vdev_jd->dev_offset(m_cur_read_bytes), header->prev_grp_crc, m_prev_crc, m_vdev_jd->logdev_id());
LOGDEBUGMOD(logstore,
"we have reached the end. crc doesn't match offset {} prev crc {} header prev crc {} log_dev={}",
m_vdev_jd->dev_offset(m_cur_read_bytes), header->prev_grp_crc, m_prev_crc, m_vdev_jd->logdev_id());
*out_dev_offset = m_vdev_jd->dev_offset(m_cur_read_bytes);
if (!m_vdev_jd->is_offset_at_last_chunk(*out_dev_offset)) {
HS_REL_ASSERT(0, "data is corrupted {}", m_vdev_jd->logdev_id());
Expand Down
11 changes: 9 additions & 2 deletions src/tests/test_log_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -495,13 +495,18 @@ class SampleDB {
m_helper.shutdown_homestore(cleanup);
if (cleanup) {
m_log_store_clients.clear();
std::unique_lock lock{m_completion_mtx};
m_highest_log_idx.clear();
}
}

void on_log_insert_completion(logdev_id_t fid, logstore_seq_num_t lsn, logdev_key ld_key) {
if (m_highest_log_idx.count(fid) == 0) { m_highest_log_idx[fid] = std::atomic{-1}; }
atomic_update_max(m_highest_log_idx[fid], ld_key.idx);
{
std::unique_lock lock{m_completion_mtx};
if (m_highest_log_idx.count(fid) == 0) { m_highest_log_idx[fid] = std::atomic{-1}; }
atomic_update_max(m_highest_log_idx[fid], ld_key.idx);
}

if (m_io_closure) m_io_closure(fid, lsn, ld_key);
}

Expand All @@ -519,6 +524,7 @@ class SampleDB {
}

logid_t highest_log_idx(logdev_id_t fid) {
std::unique_lock lock{m_completion_mtx};
return m_highest_log_idx.count(fid) ? m_highest_log_idx[fid].load() : -1;
}

Expand All @@ -529,6 +535,7 @@ class SampleDB {
std::vector< std::unique_ptr< SampleLogStoreClient > > m_log_store_clients;
std::map< logdev_id_t, std::atomic< logid_t > > m_highest_log_idx;
test_common::HSTestHelper m_helper;
std::mutex m_completion_mtx;
};

class LogStoreTest : public ::testing::Test {
Expand Down

0 comments on commit 2fa025e

Please sign in to comment.