Skip to content

Commit

Permalink
run append log callback aschronously and add cache for home_raft_log_…
Browse files Browse the repository at this point in the history
…store
  • Loading branch information
JacksonYao287 committed Jul 31, 2024
1 parent f064bc6 commit 54c4f81
Show file tree
Hide file tree
Showing 11 changed files with 162 additions and 145 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.4.32"
version = "6.4.33"
homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
topics = ("ebay", "nublox")
Expand Down
2 changes: 2 additions & 0 deletions src/include/homestore/logstore/log_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,8 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > {

std::shared_ptr< LogDev > get_logdev() { return m_logdev; }

auto get_comp_cb() const { return m_comp_cb; }

private:
logstore_id_t m_store_id;
std::shared_ptr< LogDev > m_logdev;
Expand Down
3 changes: 1 addition & 2 deletions src/include/homestore/logstore/log_store_internal.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,8 @@ struct log_dump_req {

struct logstore_record {
logdev_key m_dev_key;
// indicates the safe truncation point of the log store
logdev_key m_trunc_key;
// only when out-of-order write happens and the higher lsn is flushed in a lower LogGroup while the lower lsn is
// flushed in a higher LogGroup , they are different

logstore_record() = default;
logstore_record(const logdev_key& key, const logdev_key& trunc_key) : m_dev_key{key}, m_trunc_key{trunc_key} {}
Expand Down
26 changes: 13 additions & 13 deletions src/lib/device/journal_vdev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,15 @@ JournalVirtualDev::JournalVirtualDev(DeviceManager& dmgr, const vdev_info& vinfo
m_init_private_data = std::make_shared< JournalChunkPrivate >();
m_chunk_pool = std::make_unique< ChunkPool >(
dmgr,
ChunkPool::Params{HS_DYNAMIC_CONFIG(generic.journal_chunk_pool_capacity),
[this]() {
m_init_private_data->created_at = get_time_since_epoch_ms();
m_init_private_data->end_of_chunk = m_vdev_info.chunk_size;
sisl::blob private_blob{r_cast< uint8_t* >(m_init_private_data.get()),
sizeof(JournalChunkPrivate)};
return private_blob;
},
m_vdev_info.hs_dev_type, m_vdev_info.vdev_id, m_vdev_info.chunk_size});
ChunkPool::Params{
HS_DYNAMIC_CONFIG(generic.journal_chunk_pool_capacity),
[this]() {
m_init_private_data->created_at = get_time_since_epoch_ms();
m_init_private_data->end_of_chunk = m_vdev_info.chunk_size;
sisl::blob private_blob{r_cast< uint8_t* >(m_init_private_data.get()), sizeof(JournalChunkPrivate)};
return private_blob;
},
m_vdev_info.hs_dev_type, m_vdev_info.vdev_id, m_vdev_info.chunk_size});

resource_mgr().register_journal_vdev_exceed_cb([this]([[maybe_unused]] int64_t dirty_buf_count, bool critical) {
// either it is critical or non-critical, call cp_flush;
Expand Down Expand Up @@ -370,16 +370,16 @@ folly::Future< std::error_code > JournalVirtualDev::Descriptor::async_pwritev(co
return m_vdev.async_writev(iov, iovcnt, chunk, offset_in_chunk);
}

void JournalVirtualDev::Descriptor::sync_pwrite(const uint8_t* buf, size_t size, off_t offset) {
std::error_code JournalVirtualDev::Descriptor::sync_pwrite(const uint8_t* buf, size_t size, off_t offset) {

HS_REL_ASSERT_LE(size, m_reserved_sz, "Write size: larger then reserved size is not allowed!");
m_reserved_sz -= size; // update reserved size

auto const [chunk, index, offset_in_chunk] = process_pwrite_offset(size, offset);
m_vdev.sync_write(r_cast< const char* >(buf), size, chunk, offset_in_chunk);
return m_vdev.sync_write(r_cast< const char* >(buf), size, chunk, offset_in_chunk);
}

void JournalVirtualDev::Descriptor::sync_pwritev(const iovec* iov, int iovcnt, off_t offset) {
std::error_code JournalVirtualDev::Descriptor::sync_pwritev(const iovec* iov, int iovcnt, off_t offset) {
auto const size = VirtualDev::get_len(iov, iovcnt);

// if size is smaller than reserved size, it means write will never be overlapping start offset;
Expand All @@ -388,7 +388,7 @@ void JournalVirtualDev::Descriptor::sync_pwritev(const iovec* iov, int iovcnt, o

m_reserved_sz -= size;
auto const [chunk, _, offset_in_chunk] = process_pwrite_offset(size, offset);
m_vdev.sync_writev(iov, iovcnt, chunk, offset_in_chunk);
return m_vdev.sync_writev(iov, iovcnt, chunk, offset_in_chunk);
}

/////////////////////////////// Read Section //////////////////////////////////
Expand Down
15 changes: 12 additions & 3 deletions src/lib/device/journal_vdev.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,19 @@ class JournalVirtualDev : public VirtualDev {
/// @param buf : buffer pointing to the data being written
/// @param size : size of buffer to be written
/// @param offset : offset to be written
/// @return : On success, the number of bytes written is returned, or -1 on error.
void sync_pwrite(const uint8_t* buf, size_t size, off_t offset);
/// @return : error_code
std::error_code sync_pwrite(const uint8_t* buf, size_t size, off_t offset);

void sync_pwritev(const iovec* iov, int iovcnt, off_t offset);
/// @brief writes up to count bytes from the buffer starting at buf at offset offset. The cursor is not
/// changed. pwrite always use offset returned from alloc_next_append_blk to do the write;pwrite should not
/// across chunk boundaries because alloc_next_append_blk guarantees offset returned always doesn't across chunk
/// boundary;
///
/// @param iov : the iovec that holds vector of data buffers
/// @param offset : offset to be written
/// @param offset : offset to be written
/// @return : error_code
std::error_code sync_pwritev(const iovec* iov, int iovcnt, off_t offset);

/**
* @brief : read up to count bytes into the buffer starting at buf.
Expand Down
115 changes: 69 additions & 46 deletions src/lib/logstore/log_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ void LogDev::start(bool format, std::shared_ptr< JournalVirtualDev > vdev) {
m_log_idx = m_logdev_meta.get_start_log_idx();
do_load(m_logdev_meta.get_start_dev_offset());
m_log_records->reinit(m_log_idx);
m_log_records_last_truncate_to_idx = m_log_idx - 1;
m_last_flush_idx = m_log_idx - 1;
}

Expand Down Expand Up @@ -253,14 +252,13 @@ void LogDev::assert_next_pages(log_stream_reader& lstream) {
int64_t LogDev::append_async(logstore_id_t store_id, logstore_seq_num_t seq_num, const sisl::io_blob& data,
void* cb_context) {
const auto idx = m_log_idx.fetch_add(1, std::memory_order_acq_rel);
m_log_records->create(idx, store_id, seq_num, data, cb_context);
m_pending_flush_size.fetch_add(data.size(), std::memory_order_relaxed);
m_log_records->create(idx, store_id, seq_num, data, cb_context);
if (allow_inline_flush()) flush_if_necessary();
return idx;
}

log_buffer LogDev::read(const logdev_key& key, serialized_log_record& return_record_header) {

log_buffer LogDev::read(const logdev_key& key) {
auto buf = sisl::make_byte_array(initial_read_size, m_flush_size_multiple, sisl::buftag::logread);
auto ec = m_vdev_jd->sync_pread(buf->bytes(), initial_read_size, key.dev_offset);
if (ec) {
Expand All @@ -269,25 +267,7 @@ log_buffer LogDev::read(const logdev_key& key, serialized_log_record& return_rec
}

auto* header = r_cast< const log_group_header* >(buf->cbytes());
// THIS_LOGDEV_LOG(TRACE, "Logdev read log group header {}", *header);
HS_REL_ASSERT_EQ(header->magic_word(), LOG_GROUP_HDR_MAGIC, "Log header corrupted with magic mismatch! {} {}",
m_logdev_id, *header);
HS_REL_ASSERT_EQ(header->get_version(), log_group_header::header_version, "Log header version mismatch! {} {}",
m_logdev_id, *header);
HS_REL_ASSERT_LE(header->start_idx(), key.idx, "log key offset does not match with log_idx {} }{}", m_logdev_id,
*header);
HS_REL_ASSERT_GT((header->start_idx() + header->nrecords()), key.idx,
"log key offset does not match with log_idx {} {}", m_logdev_id, *header);
HS_LOG_ASSERT_GE(header->total_size(), header->_inline_data_offset(), "Inconsistent size data in log group {} {}",
m_logdev_id, *header);

// We can only do crc match in read if we have read all the blocks. We don't want to aggressively read more data
// than we need to just to compare CRC for read operation. It can be done during recovery.
if (header->total_size() <= initial_read_size) {
crc32_t const crc = crc32_ieee(init_crc32, (buf->cbytes() + sizeof(log_group_header)),
header->total_size() - sizeof(log_group_header));
HS_REL_ASSERT_EQ(header->this_group_crc(), crc, "CRC mismatch on read data");
}
verify_log_group_header(key.idx, header);
auto record_header = header->nth_record(key.idx - header->start_log_idx);
uint32_t const data_offset = (record_header->offset + (record_header->get_inlined() ? 0 : header->oob_data_offset));

Expand All @@ -303,11 +283,42 @@ log_buffer LogDev::read(const logdev_key& key, serialized_log_record& return_rec
ret_view = sisl::byte_view{new_buf, s_cast< uint32_t >(data_offset - rounded_data_offset), record_header->size};
}

return ret_view;
}

void LogDev::read_record_header(const logdev_key& key, serialized_log_record& return_record_header) {
auto buf = sisl::make_byte_array(initial_read_size, m_flush_size_multiple, sisl::buftag::logread);
auto ec = m_vdev_jd->sync_pread(buf->bytes(), initial_read_size, key.dev_offset);
if (ec) LOGERROR("Failed to read from Journal vdev log_dev={} {} {}", m_logdev_id, ec.value(), ec.message());

auto* header = r_cast< const log_group_header* >(buf->cbytes());
verify_log_group_header(key.idx, header);

auto record_header = header->nth_record(key.idx - header->start_log_idx);
return_record_header =
serialized_log_record(record_header->size, record_header->offset, record_header->get_inlined(),
record_header->store_seq_num, record_header->store_id);
}

return ret_view;
void LogDev::verify_log_group_header(const logid_t idx, const log_group_header* header) {
HS_REL_ASSERT_EQ(header->magic_word(), LOG_GROUP_HDR_MAGIC, "Log header corrupted with magic mismatch! {} {}",
m_logdev_id, *header);
HS_REL_ASSERT_EQ(header->get_version(), log_group_header::header_version, "Log header version mismatch! {} {}",
m_logdev_id, *header);
HS_REL_ASSERT_LE(header->start_idx(), idx, "log key offset does not match with log_idx {} }{}", m_logdev_id,
*header);
HS_REL_ASSERT_GT((header->start_idx() + header->nrecords()), idx,
"log key offset does not match with log_idx {} {}", m_logdev_id, *header);
HS_LOG_ASSERT_GE(header->total_size(), header->_inline_data_offset(), "Inconsistent size data in log group {} {}",
m_logdev_id, *header);

// We can only do crc match in read if we have read all the blocks. We don't want to aggressively read more data
// than we need to just to compare CRC for read operation. It can be done during recovery.
if (header->total_size() <= initial_read_size) {
crc32_t const crc = crc32_ieee(init_crc32, (r_cast< const uint8_t* >(header) + sizeof(log_group_header)),
header->total_size() - sizeof(log_group_header));
HS_REL_ASSERT_EQ(header->this_group_crc(), crc, "CRC mismatch on read data");
}
}

logstore_id_t LogDev::reserve_store_id() {
Expand Down Expand Up @@ -399,6 +410,15 @@ bool LogDev::flush_if_necessary(int64_t threshold_size) {

bool LogDev::flush_under_guard() {
std::unique_lock lg = flush_guard();

#ifdef _PRERELEASE
if (iomgr_flip::instance()->delay_flip< int >(
"simulate_log_flush_delay", [this]() { return flush(); }, m_logdev_id)) {
THIS_LOGDEV_LOG(INFO, "Delaying flush by rescheduling the async write");
return true;
}
#endif

return flush();
}

Expand All @@ -416,7 +436,7 @@ bool LogDev::flush() {
return false;
}
auto sz = m_pending_flush_size.fetch_sub(lg->actual_data_size(), std::memory_order_relaxed);
HS_REL_ASSERT_GE((sz - lg->actual_data_size()), 0, "size {} lg size{}", sz, lg->actual_data_size());
HS_REL_ASSERT_GE((sz - lg->actual_data_size()), 0, "size {} lg size {}", sz, lg->actual_data_size());
off_t offset = m_vdev_jd->alloc_next_append_blk(lg->header()->total_size());
lg->m_log_dev_offset = offset;

Expand All @@ -427,8 +447,8 @@ bool LogDev::flush() {
HISTOGRAM_OBSERVE(logstore_service().m_metrics, logdev_flush_records_distribution, lg->nrecords());
HISTOGRAM_OBSERVE(logstore_service().m_metrics, logdev_flush_size_distribution, lg->actual_data_size());

// write log
m_vdev_jd->sync_pwritev(lg->iovecs().data(), int_cast(lg->iovecs().size()), lg->m_log_dev_offset);
// FIXME:: add logic to handle this error in upper layer
if (m_vdev_jd->sync_pwritev(lg->iovecs().data(), int_cast(lg->iovecs().size()), lg->m_log_dev_offset)) return false;

on_flush_completion(lg);
return true;
Expand All @@ -441,6 +461,7 @@ void LogDev::on_flush_completion(LogGroup* lg) {
m_log_records->complete(lg->m_flush_log_idx_from, lg->m_flush_log_idx_upto);
m_last_flush_idx = lg->m_flush_log_idx_upto;
m_last_crc = lg->header()->cur_grp_crc;
std::unordered_map< logid_t, std::pair< logstore_req*, HomeLogStore* > > req_map;

auto from_indx = lg->m_flush_log_idx_from;
auto upto_indx = lg->m_flush_log_idx_upto;
Expand All @@ -451,31 +472,33 @@ void LogDev::on_flush_completion(LogGroup* lg) {
HomeLogStore* log_store = req->log_store;
HS_LOG_ASSERT_EQ(log_store->get_store_id(), record.store_id,
"Expecting store id in log store and flush completion to match");

HISTOGRAM_OBSERVE(logstore_service().m_metrics, logstore_append_latency, get_elapsed_time_us(req->start_time));
log_store->on_write_completion(req, logdev_key{idx, dev_offset});
req_map[idx] = std::make_pair(req, log_store);
}
if (m_log_records_safe_truncate_to_idx < upto_indx) m_log_records_safe_truncate_to_idx = upto_indx;

HISTOGRAM_OBSERVE(logstore_service().m_metrics, logdev_flush_time_us,
get_elapsed_time_us(m_last_flush_time, done_time));
HISTOGRAM_OBSERVE(logstore_service().m_metrics, logdev_post_flush_processing_latency,
get_elapsed_time_us(done_time));
free_log_group(lg);

// Truncate the log records, as it is no longer needed to be kept in memory. Everytime there is a read, we actually
// read from the drives.
// TODO:: we can keep this in memory to accelerate read until it is truncated
if (from_indx == m_log_records_last_truncate_to_idx + 1) {
m_log_records->truncate(m_log_records_safe_truncate_to_idx);
m_log_records_last_truncate_to_idx = m_log_records_safe_truncate_to_idx;
}
m_log_records->truncate(upto_indx);

// since we support out-of-order lsn write, so no need to guarantee the order of logstore write completion
// TODO:: add some logic to guarantee all the callback is done when stop.
for (auto const& [idx, req_store_pair] : req_map)
iomanager.run_on_forget(iomgr::reactor_regex::random_worker, iomgr::fiber_regex::syncio_only, [=]() {
auto ld_key = logdev_key{idx, dev_offset};
auto req = req_store_pair.first;
auto comp_cb = req_store_pair.second->get_comp_cb();
(req->cb) ? req->cb(req, ld_key) : comp_cb(req, ld_key);
});
}

uint64_t LogDev::truncate() {
// Order of this lock has to be preserved. We take externally visible lock which is flush lock first. This prevents
// any further update to tail_lsn and also flushes conurrently with truncation. Then we take the store map lock,
// which is contained in this class and then meta_mutex. Reason for this is, we take meta_mutex under other
// store_map lock, so reversing could cause deadlock
// Order of this lock has to be preserved. We take externally visible lock which is flush lock first. This
// prevents any further update to tail_lsn and also flushes conurrently with truncation. Then we take the store
// map lock, which is contained in this class and then meta_mutex. Reason for this is, we take meta_mutex under
// other store_map lock, so reversing could cause deadlock
std::unique_lock fg = flush_guard();
folly::SharedMutexWritePriority::ReadHolder holder(m_store_map_mtx);
std::unique_lock mg{m_meta_mutex};
Expand All @@ -499,7 +522,7 @@ uint64_t LogDev::truncate() {
}

// There are no writes or no truncation called for any of the store, so we can't truncate anything
if (min_safe_ld_key == logdev_key::out_of_bound_ld_key()) { return 0; }
if (min_safe_ld_key == logdev_key::out_of_bound_ld_key() || min_safe_ld_key.idx <= m_last_truncate_idx) return 0;

uint64_t const num_records_to_truncate = uint64_cast(min_safe_ld_key.idx - m_last_truncate_idx);
HS_PERIODIC_LOG(INFO, logstore, "Truncating log_dev={} log_id={} vdev_offset={} truncated {} log records",
Expand All @@ -515,8 +538,8 @@ uint64_t LogDev::truncate() {
m_stopped /* persist_now */);

// When a logstore is removed, it unregisteres the store and keeps the store id in garbage list. We can capture
// these store_ids upto the log_idx which is truncated and then unreserve those. Now on we can re-use the store_id
// on new store creation
// these store_ids upto the log_idx which is truncated and then unreserve those. Now on we can re-use the
// store_id on new store creation
for (auto it{std::cbegin(m_garbage_store_ids)}; it != std::cend(m_garbage_store_ids);) {
if (it->first > min_safe_ld_key.idx) { break; }

Expand Down Expand Up @@ -793,7 +816,7 @@ void LogDevMetadata::unreserve_store(logstore_id_t store_id, bool persist_now) {
remove_all_rollback_records(store_id, persist_now);

resize_logdev_sb_if_needed();
if (store_id < *m_store_info.rbegin()) {
if (!m_store_info.empty() && store_id < *m_store_info.rbegin()) {
HS_LOG(DEBUG, logstore, "logdev meta not shrunk log_idx={} highest indx {}", store_id, *m_store_info.rbegin(),
m_sb->num_stores);
// We have not shrunk the store info, so we need to explicitly clear the store meta in on-disk meta
Expand Down
Loading

0 comments on commit 54c4f81

Please sign in to comment.