diff --git a/conanfile.py b/conanfile.py index 5030e6c2e..948c45701 100644 --- a/conanfile.py +++ b/conanfile.py @@ -9,7 +9,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "6.4.32" + version = "6.4.33" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" topics = ("ebay", "nublox") diff --git a/src/include/homestore/logstore/log_store.hpp b/src/include/homestore/logstore/log_store.hpp index 0ec046e66..2f0619f2a 100644 --- a/src/include/homestore/logstore/log_store.hpp +++ b/src/include/homestore/logstore/log_store.hpp @@ -254,6 +254,8 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > { std::shared_ptr< LogDev > get_logdev() { return m_logdev; } + auto get_comp_cb() const { return m_comp_cb; } + private: logstore_id_t m_store_id; std::shared_ptr< LogDev > m_logdev; diff --git a/src/include/homestore/logstore/log_store_internal.hpp b/src/include/homestore/logstore/log_store_internal.hpp index 50a60720a..551f15ea8 100644 --- a/src/include/homestore/logstore/log_store_internal.hpp +++ b/src/include/homestore/logstore/log_store_internal.hpp @@ -106,9 +106,8 @@ struct log_dump_req { struct logstore_record { logdev_key m_dev_key; + // indicates the safe truncation point of the log store logdev_key m_trunc_key; - // only when out-of-order write happens and the higher lsn is flushed in a lower LogGroup while the lower lsn is - // flushed in a higher LogGroup , they are different logstore_record() = default; logstore_record(const logdev_key& key, const logdev_key& trunc_key) : m_dev_key{key}, m_trunc_key{trunc_key} {} diff --git a/src/lib/device/journal_vdev.cpp b/src/lib/device/journal_vdev.cpp index d526103fc..bcddae40a 100644 --- a/src/lib/device/journal_vdev.cpp +++ b/src/lib/device/journal_vdev.cpp @@ -46,15 +46,15 @@ JournalVirtualDev::JournalVirtualDev(DeviceManager& dmgr, const vdev_info& vinfo m_init_private_data = std::make_shared< JournalChunkPrivate >(); m_chunk_pool = std::make_unique< ChunkPool >( dmgr, - ChunkPool::Params{HS_DYNAMIC_CONFIG(generic.journal_chunk_pool_capacity), - [this]() { - m_init_private_data->created_at = get_time_since_epoch_ms(); - m_init_private_data->end_of_chunk = m_vdev_info.chunk_size; - sisl::blob private_blob{r_cast< uint8_t* >(m_init_private_data.get()), - sizeof(JournalChunkPrivate)}; - return private_blob; - }, - m_vdev_info.hs_dev_type, m_vdev_info.vdev_id, m_vdev_info.chunk_size}); + ChunkPool::Params{ + HS_DYNAMIC_CONFIG(generic.journal_chunk_pool_capacity), + [this]() { + m_init_private_data->created_at = get_time_since_epoch_ms(); + m_init_private_data->end_of_chunk = m_vdev_info.chunk_size; + sisl::blob private_blob{r_cast< uint8_t* >(m_init_private_data.get()), sizeof(JournalChunkPrivate)}; + return private_blob; + }, + m_vdev_info.hs_dev_type, m_vdev_info.vdev_id, m_vdev_info.chunk_size}); resource_mgr().register_journal_vdev_exceed_cb([this]([[maybe_unused]] int64_t dirty_buf_count, bool critical) { // either it is critical or non-critical, call cp_flush; @@ -370,16 +370,16 @@ folly::Future< std::error_code > JournalVirtualDev::Descriptor::async_pwritev(co return m_vdev.async_writev(iov, iovcnt, chunk, offset_in_chunk); } -void JournalVirtualDev::Descriptor::sync_pwrite(const uint8_t* buf, size_t size, off_t offset) { +std::error_code JournalVirtualDev::Descriptor::sync_pwrite(const uint8_t* buf, size_t size, off_t offset) { HS_REL_ASSERT_LE(size, m_reserved_sz, "Write size: larger then reserved size is not allowed!"); m_reserved_sz -= size; // update reserved size auto const [chunk, index, offset_in_chunk] = process_pwrite_offset(size, offset); - m_vdev.sync_write(r_cast< const char* >(buf), size, chunk, offset_in_chunk); + return m_vdev.sync_write(r_cast< const char* >(buf), size, chunk, offset_in_chunk); } -void JournalVirtualDev::Descriptor::sync_pwritev(const iovec* iov, int iovcnt, off_t offset) { +std::error_code JournalVirtualDev::Descriptor::sync_pwritev(const iovec* iov, int iovcnt, off_t offset) { auto const size = VirtualDev::get_len(iov, iovcnt); // if size is smaller than reserved size, it means write will never be overlapping start offset; @@ -388,7 +388,7 @@ void JournalVirtualDev::Descriptor::sync_pwritev(const iovec* iov, int iovcnt, o m_reserved_sz -= size; auto const [chunk, _, offset_in_chunk] = process_pwrite_offset(size, offset); - m_vdev.sync_writev(iov, iovcnt, chunk, offset_in_chunk); + return m_vdev.sync_writev(iov, iovcnt, chunk, offset_in_chunk); } /////////////////////////////// Read Section ////////////////////////////////// diff --git a/src/lib/device/journal_vdev.hpp b/src/lib/device/journal_vdev.hpp index c9bb1dc23..460db4012 100644 --- a/src/lib/device/journal_vdev.hpp +++ b/src/lib/device/journal_vdev.hpp @@ -144,10 +144,19 @@ class JournalVirtualDev : public VirtualDev { /// @param buf : buffer pointing to the data being written /// @param size : size of buffer to be written /// @param offset : offset to be written - /// @return : On success, the number of bytes written is returned, or -1 on error. - void sync_pwrite(const uint8_t* buf, size_t size, off_t offset); + /// @return : error_code + std::error_code sync_pwrite(const uint8_t* buf, size_t size, off_t offset); - void sync_pwritev(const iovec* iov, int iovcnt, off_t offset); + /// @brief writes up to count bytes from the buffer starting at buf at offset offset. The cursor is not + /// changed. pwrite always use offset returned from alloc_next_append_blk to do the write;pwrite should not + /// across chunk boundaries because alloc_next_append_blk guarantees offset returned always doesn't across chunk + /// boundary; + /// + /// @param iov : the iovec that holds vector of data buffers + /// @param offset : offset to be written + /// @param offset : offset to be written + /// @return : error_code + std::error_code sync_pwritev(const iovec* iov, int iovcnt, off_t offset); /** * @brief : read up to count bytes into the buffer starting at buf. diff --git a/src/lib/logstore/log_dev.cpp b/src/lib/logstore/log_dev.cpp index b72e21957..dc311c8f2 100644 --- a/src/lib/logstore/log_dev.cpp +++ b/src/lib/logstore/log_dev.cpp @@ -85,7 +85,6 @@ void LogDev::start(bool format, std::shared_ptr< JournalVirtualDev > vdev) { m_log_idx = m_logdev_meta.get_start_log_idx(); do_load(m_logdev_meta.get_start_dev_offset()); m_log_records->reinit(m_log_idx); - m_log_records_last_truncate_to_idx = m_log_idx - 1; m_last_flush_idx = m_log_idx - 1; } @@ -253,14 +252,13 @@ void LogDev::assert_next_pages(log_stream_reader& lstream) { int64_t LogDev::append_async(logstore_id_t store_id, logstore_seq_num_t seq_num, const sisl::io_blob& data, void* cb_context) { const auto idx = m_log_idx.fetch_add(1, std::memory_order_acq_rel); - m_log_records->create(idx, store_id, seq_num, data, cb_context); m_pending_flush_size.fetch_add(data.size(), std::memory_order_relaxed); + m_log_records->create(idx, store_id, seq_num, data, cb_context); if (allow_inline_flush()) flush_if_necessary(); return idx; } -log_buffer LogDev::read(const logdev_key& key, serialized_log_record& return_record_header) { - +log_buffer LogDev::read(const logdev_key& key) { auto buf = sisl::make_byte_array(initial_read_size, m_flush_size_multiple, sisl::buftag::logread); auto ec = m_vdev_jd->sync_pread(buf->bytes(), initial_read_size, key.dev_offset); if (ec) { @@ -269,25 +267,7 @@ log_buffer LogDev::read(const logdev_key& key, serialized_log_record& return_rec } auto* header = r_cast< const log_group_header* >(buf->cbytes()); - // THIS_LOGDEV_LOG(TRACE, "Logdev read log group header {}", *header); - HS_REL_ASSERT_EQ(header->magic_word(), LOG_GROUP_HDR_MAGIC, "Log header corrupted with magic mismatch! {} {}", - m_logdev_id, *header); - HS_REL_ASSERT_EQ(header->get_version(), log_group_header::header_version, "Log header version mismatch! {} {}", - m_logdev_id, *header); - HS_REL_ASSERT_LE(header->start_idx(), key.idx, "log key offset does not match with log_idx {} }{}", m_logdev_id, - *header); - HS_REL_ASSERT_GT((header->start_idx() + header->nrecords()), key.idx, - "log key offset does not match with log_idx {} {}", m_logdev_id, *header); - HS_LOG_ASSERT_GE(header->total_size(), header->_inline_data_offset(), "Inconsistent size data in log group {} {}", - m_logdev_id, *header); - - // We can only do crc match in read if we have read all the blocks. We don't want to aggressively read more data - // than we need to just to compare CRC for read operation. It can be done during recovery. - if (header->total_size() <= initial_read_size) { - crc32_t const crc = crc32_ieee(init_crc32, (buf->cbytes() + sizeof(log_group_header)), - header->total_size() - sizeof(log_group_header)); - HS_REL_ASSERT_EQ(header->this_group_crc(), crc, "CRC mismatch on read data"); - } + verify_log_group_header(key.idx, header); auto record_header = header->nth_record(key.idx - header->start_log_idx); uint32_t const data_offset = (record_header->offset + (record_header->get_inlined() ? 0 : header->oob_data_offset)); @@ -303,11 +283,42 @@ log_buffer LogDev::read(const logdev_key& key, serialized_log_record& return_rec ret_view = sisl::byte_view{new_buf, s_cast< uint32_t >(data_offset - rounded_data_offset), record_header->size}; } + return ret_view; +} + +void LogDev::read_record_header(const logdev_key& key, serialized_log_record& return_record_header) { + auto buf = sisl::make_byte_array(initial_read_size, m_flush_size_multiple, sisl::buftag::logread); + auto ec = m_vdev_jd->sync_pread(buf->bytes(), initial_read_size, key.dev_offset); + if (ec) LOGERROR("Failed to read from Journal vdev log_dev={} {} {}", m_logdev_id, ec.value(), ec.message()); + + auto* header = r_cast< const log_group_header* >(buf->cbytes()); + verify_log_group_header(key.idx, header); + + auto record_header = header->nth_record(key.idx - header->start_log_idx); return_record_header = serialized_log_record(record_header->size, record_header->offset, record_header->get_inlined(), record_header->store_seq_num, record_header->store_id); +} - return ret_view; +void LogDev::verify_log_group_header(const logid_t idx, const log_group_header* header) { + HS_REL_ASSERT_EQ(header->magic_word(), LOG_GROUP_HDR_MAGIC, "Log header corrupted with magic mismatch! {} {}", + m_logdev_id, *header); + HS_REL_ASSERT_EQ(header->get_version(), log_group_header::header_version, "Log header version mismatch! {} {}", + m_logdev_id, *header); + HS_REL_ASSERT_LE(header->start_idx(), idx, "log key offset does not match with log_idx {} }{}", m_logdev_id, + *header); + HS_REL_ASSERT_GT((header->start_idx() + header->nrecords()), idx, + "log key offset does not match with log_idx {} {}", m_logdev_id, *header); + HS_LOG_ASSERT_GE(header->total_size(), header->_inline_data_offset(), "Inconsistent size data in log group {} {}", + m_logdev_id, *header); + + // We can only do crc match in read if we have read all the blocks. We don't want to aggressively read more data + // than we need to just to compare CRC for read operation. It can be done during recovery. + if (header->total_size() <= initial_read_size) { + crc32_t const crc = crc32_ieee(init_crc32, (r_cast< const uint8_t* >(header) + sizeof(log_group_header)), + header->total_size() - sizeof(log_group_header)); + HS_REL_ASSERT_EQ(header->this_group_crc(), crc, "CRC mismatch on read data"); + } } logstore_id_t LogDev::reserve_store_id() { @@ -399,6 +410,15 @@ bool LogDev::flush_if_necessary(int64_t threshold_size) { bool LogDev::flush_under_guard() { std::unique_lock lg = flush_guard(); + +#ifdef _PRERELEASE + if (iomgr_flip::instance()->delay_flip< int >( + "simulate_log_flush_delay", [this]() { return flush(); }, m_logdev_id)) { + THIS_LOGDEV_LOG(INFO, "Delaying flush by rescheduling the async write"); + return true; + } +#endif + return flush(); } @@ -416,7 +436,7 @@ bool LogDev::flush() { return false; } auto sz = m_pending_flush_size.fetch_sub(lg->actual_data_size(), std::memory_order_relaxed); - HS_REL_ASSERT_GE((sz - lg->actual_data_size()), 0, "size {} lg size{}", sz, lg->actual_data_size()); + HS_REL_ASSERT_GE((sz - lg->actual_data_size()), 0, "size {} lg size {}", sz, lg->actual_data_size()); off_t offset = m_vdev_jd->alloc_next_append_blk(lg->header()->total_size()); lg->m_log_dev_offset = offset; @@ -427,8 +447,8 @@ bool LogDev::flush() { HISTOGRAM_OBSERVE(logstore_service().m_metrics, logdev_flush_records_distribution, lg->nrecords()); HISTOGRAM_OBSERVE(logstore_service().m_metrics, logdev_flush_size_distribution, lg->actual_data_size()); - // write log - m_vdev_jd->sync_pwritev(lg->iovecs().data(), int_cast(lg->iovecs().size()), lg->m_log_dev_offset); + // FIXME:: add logic to handle this error in upper layer + if (m_vdev_jd->sync_pwritev(lg->iovecs().data(), int_cast(lg->iovecs().size()), lg->m_log_dev_offset)) return false; on_flush_completion(lg); return true; @@ -441,6 +461,7 @@ void LogDev::on_flush_completion(LogGroup* lg) { m_log_records->complete(lg->m_flush_log_idx_from, lg->m_flush_log_idx_upto); m_last_flush_idx = lg->m_flush_log_idx_upto; m_last_crc = lg->header()->cur_grp_crc; + std::unordered_map< logid_t, std::pair< logstore_req*, HomeLogStore* > > req_map; auto from_indx = lg->m_flush_log_idx_from; auto upto_indx = lg->m_flush_log_idx_upto; @@ -451,31 +472,33 @@ void LogDev::on_flush_completion(LogGroup* lg) { HomeLogStore* log_store = req->log_store; HS_LOG_ASSERT_EQ(log_store->get_store_id(), record.store_id, "Expecting store id in log store and flush completion to match"); - + HISTOGRAM_OBSERVE(logstore_service().m_metrics, logstore_append_latency, get_elapsed_time_us(req->start_time)); log_store->on_write_completion(req, logdev_key{idx, dev_offset}); + req_map[idx] = std::make_pair(req, log_store); } - if (m_log_records_safe_truncate_to_idx < upto_indx) m_log_records_safe_truncate_to_idx = upto_indx; - HISTOGRAM_OBSERVE(logstore_service().m_metrics, logdev_flush_time_us, get_elapsed_time_us(m_last_flush_time, done_time)); HISTOGRAM_OBSERVE(logstore_service().m_metrics, logdev_post_flush_processing_latency, get_elapsed_time_us(done_time)); free_log_group(lg); - - // Truncate the log records, as it is no longer needed to be kept in memory. Everytime there is a read, we actually - // read from the drives. - // TODO:: we can keep this in memory to accelerate read until it is truncated - if (from_indx == m_log_records_last_truncate_to_idx + 1) { - m_log_records->truncate(m_log_records_safe_truncate_to_idx); - m_log_records_last_truncate_to_idx = m_log_records_safe_truncate_to_idx; - } + m_log_records->truncate(upto_indx); + + // since we support out-of-order lsn write, so no need to guarantee the order of logstore write completion + // TODO:: add some logic to guarantee all the callback is done when stop. + for (auto const& [idx, req_store_pair] : req_map) + iomanager.run_on_forget(iomgr::reactor_regex::random_worker, iomgr::fiber_regex::syncio_only, [=]() { + auto ld_key = logdev_key{idx, dev_offset}; + auto req = req_store_pair.first; + auto comp_cb = req_store_pair.second->get_comp_cb(); + (req->cb) ? req->cb(req, ld_key) : comp_cb(req, ld_key); + }); } uint64_t LogDev::truncate() { - // Order of this lock has to be preserved. We take externally visible lock which is flush lock first. This prevents - // any further update to tail_lsn and also flushes conurrently with truncation. Then we take the store map lock, - // which is contained in this class and then meta_mutex. Reason for this is, we take meta_mutex under other - // store_map lock, so reversing could cause deadlock + // Order of this lock has to be preserved. We take externally visible lock which is flush lock first. This + // prevents any further update to tail_lsn and also flushes conurrently with truncation. Then we take the store + // map lock, which is contained in this class and then meta_mutex. Reason for this is, we take meta_mutex under + // other store_map lock, so reversing could cause deadlock std::unique_lock fg = flush_guard(); folly::SharedMutexWritePriority::ReadHolder holder(m_store_map_mtx); std::unique_lock mg{m_meta_mutex}; @@ -499,7 +522,7 @@ uint64_t LogDev::truncate() { } // There are no writes or no truncation called for any of the store, so we can't truncate anything - if (min_safe_ld_key == logdev_key::out_of_bound_ld_key()) { return 0; } + if (min_safe_ld_key == logdev_key::out_of_bound_ld_key() || min_safe_ld_key.idx <= m_last_truncate_idx) return 0; uint64_t const num_records_to_truncate = uint64_cast(min_safe_ld_key.idx - m_last_truncate_idx); HS_PERIODIC_LOG(INFO, logstore, "Truncating log_dev={} log_id={} vdev_offset={} truncated {} log records", @@ -515,8 +538,8 @@ uint64_t LogDev::truncate() { m_stopped /* persist_now */); // When a logstore is removed, it unregisteres the store and keeps the store id in garbage list. We can capture - // these store_ids upto the log_idx which is truncated and then unreserve those. Now on we can re-use the store_id - // on new store creation + // these store_ids upto the log_idx which is truncated and then unreserve those. Now on we can re-use the + // store_id on new store creation for (auto it{std::cbegin(m_garbage_store_ids)}; it != std::cend(m_garbage_store_ids);) { if (it->first > min_safe_ld_key.idx) { break; } @@ -793,7 +816,7 @@ void LogDevMetadata::unreserve_store(logstore_id_t store_id, bool persist_now) { remove_all_rollback_records(store_id, persist_now); resize_logdev_sb_if_needed(); - if (store_id < *m_store_info.rbegin()) { + if (!m_store_info.empty() && store_id < *m_store_info.rbegin()) { HS_LOG(DEBUG, logstore, "logdev meta not shrunk log_idx={} highest indx {}", store_id, *m_store_info.rbegin(), m_sb->num_stores); // We have not shrunk the store info, so we need to explicitly clear the store meta in on-disk meta diff --git a/src/lib/logstore/log_dev.hpp b/src/lib/logstore/log_dev.hpp index fda95fbb5..d5a545453 100644 --- a/src/lib/logstore/log_dev.hpp +++ b/src/lib/logstore/log_dev.hpp @@ -261,7 +261,6 @@ class LogGroup { log_group_header* header() { return reinterpret_cast< log_group_header* >(m_cur_log_buf); } const log_group_header* header() const { return reinterpret_cast< const log_group_header* >(m_cur_log_buf); } iovec_array const& iovecs() const { return m_iovecs; } - // uint32_t data_size() const { return header()->group_size - sizeof(log_group_header); } uint32_t actual_data_size() const { return m_actual_data_size; } uint32_t nrecords() const { return m_nrecords; } @@ -639,16 +638,21 @@ class LogDev : public std::enable_shared_from_this< LogDev > { /** * @brief Read the log id from the device offset * - * @param idx log_id to read - * @param dev_offset device offset of the log id which was provided upon append. This is needed to locate the log - * idx within the device. A log data can be looked up only by pair of log_id and dev_offset. - * - * @param record_header Pass the pointer to the header of the read record + * @param logdev_key : log_id and dev_offset pair to read * * @return log_buffer : Opaque structure which contains the data blob and its size. It is safe buffer and hence it * need not be freed and can be cheaply passed it around. */ - log_buffer read(const logdev_key& key, serialized_log_record& record_header); + log_buffer read(const logdev_key& key); + + /** + * @brief Read the log id from the device offset + * + * @param logdev_key : log_id and dev_offset pair to read + * + * @param record_header Pass the pointer to the header of the read record + */ + void read_record_header(const logdev_key& key, serialized_log_record& record_header); /// @brief Flush the log device in case if pending data size is at least the threshold size. This is a blocking call /// and hence it is required to run on thread/fiber which can run blocking io. If not run on such thread, it will @@ -729,6 +733,8 @@ class LogDev : public std::enable_shared_from_this< LogDev > { bool allow_timer_flush() const { return uint32_cast(m_flush_mode) & uint32_cast(flush_mode_t::TIMER); } bool allow_explicit_flush() const { return uint32_cast(m_flush_mode) & uint32_cast(flush_mode_t::EXPLICIT); } + void verify_log_group_header(const logid_t idx, const log_group_header* header); + /** * @brief Reserve logstore id and persist if needed. It persists the entire map about the logstore id inside the * @@ -787,37 +793,6 @@ class LogDev : public std::enable_shared_from_this< LogDev > { logid_t m_last_flush_idx{-1}; // Track last flushed, last device offset and truncated log idx logid_t m_last_truncate_idx{std::numeric_limits< logid_t >::min()}; // logdev truncate up to this idx - - // after completing flush, we can not truncate the log records directly. since we might shedule new flush/append in - // the callback of the append_async, which can be found in the logstore test case. - // this means we might schedule call on_flush_completion in another on_flush_completion, which is a recursive call. - // for example: - // on_flush_completion_1 will flush log record 0 - 10 and will truncate to 10 - // on_flush_completion_2 will flush log record 10 - 20 and will truncate to 20 - // on_flush_completion_3 will flush log record 20 - 30 and will truncate to 30 - // on_flush_completion_1 will call on_flush_completion_2 and on_flush_completion_2 will call on_flush_completion_3 - // in this case, on_flush_completion_3 will be completed before on_flush_completion_2, so when on_flush_completion_3 - // is done and on_flush_completion_2 is back to be executed, the log records are already truncated to 30, so the - // truncate to 20 (which will be called in on_flush_completion_2) will be invalid. - // what`s more, `auto& record = m_log_records->at(idx);` this line will be invalid, since the log record in that - // index has already been truncated. - - // so , in the recursive case, we need to keep log records not being truancated when flush is done until it is not - // be used by the lower stack function. coming to the example here, on_flush_completion_3 can not truncate to 30, - // since log records 10 -20 is needed for on_flush_completion_2. for the same reason, on_flush_completion_2 can not - // truncate to 20, since log records 0 -10 is needed for on_flush_completion_1. - - // the solution is we involve the following two member, - // 1 m_log_records_safe_truncate_to_idx , the index where we can truncate to safely when doing real truncation. this - // is updated by the last call in the call stack. in our example, on_flush_completion_3 will update it to 30. when - // calling in on_flush_completion_1 and on_flush_completion_2, this will not be updated since the upto_indx in these - // two will be smaller than that in on_flush_completion_3. - - // m_log_records_last_truncate_to_idx, the index where we can truncate from. it is used to keeps all the truncation - // sequentiall - logid_t m_log_records_safe_truncate_to_idx{std::numeric_limits< logid_t >::min()}; - logid_t m_log_records_last_truncate_to_idx{std::numeric_limits< logid_t >::min()}; - crc32_t m_last_crc{INVALID_CRC32_VALUE}; // LogDev Info block related fields diff --git a/src/lib/logstore/log_store.cpp b/src/lib/logstore/log_store.cpp index 09fae7002..80e307913 100644 --- a/src/lib/logstore/log_store.cpp +++ b/src/lib/logstore/log_store.cpp @@ -55,7 +55,6 @@ void HomeLogStore::write_async(logstore_req* req, const log_req_comp_cb_t& cb) { HS_DBG_ASSERT(0, "Assertion"); } #endif - // Todo :: any read after this should get the log m_records.create(req->seq_num); COUNTER_INCREMENT(m_metrics, logstore_append_count, 1); HISTOGRAM_OBSERVE(m_metrics, logstore_record_size, req->data.size()); @@ -96,7 +95,7 @@ log_buffer HomeLogStore::read_sync(logstore_seq_num_t seq_num) { // If seq_num has not been flushed yet, but issued, then we flush them before reading auto const s = m_records.status(seq_num); if (s.is_out_of_range || s.is_hole) { - throw std::out_of_range("key not valid"); + throw std::out_of_range("key not valid since it has been truncated"); } else if (!s.is_completed) { THIS_LOGSTORE_LOG(TRACE, "Reading lsn={}:{} before flushed, doing flush first", m_store_id, seq_num); m_logdev->flush_under_guard(); @@ -110,11 +109,8 @@ log_buffer HomeLogStore::read_sync(logstore_seq_num_t seq_num) { } const auto start_time = Clock::now(); - THIS_LOGSTORE_LOG(TRACE, "Reading lsn={}:{} mapped to logdev_key=[idx={} dev_offset={}]", m_store_id, seq_num, - ld_key.idx, ld_key.dev_offset); COUNTER_INCREMENT(m_metrics, logstore_read_count, 1); - serialized_log_record header; - const auto b = m_logdev->read(ld_key, header); + const auto b = m_logdev->read(ld_key); HISTOGRAM_OBSERVE(m_metrics, logstore_read_latency, get_elapsed_time_us(start_time)); return b; } @@ -160,8 +156,6 @@ void HomeLogStore::on_write_completion(logstore_req* req, const logdev_key& ld_k THIS_LOGSTORE_LOG(TRACE, "Completed write of lsn={} logdev_key:{} tail_lsn={} trunc_key:{}", req->seq_num, ld_key.to_string(), m_tail_lsn.load(std::memory_order_relaxed), trunc_key.to_string()); - HISTOGRAM_OBSERVE(m_metrics, logstore_append_latency, get_elapsed_time_us(req->start_time)); - (req->cb) ? req->cb(req, ld_key) : m_comp_cb(req, ld_key); } void HomeLogStore::on_log_found(logstore_seq_num_t seq_num, const logdev_key& ld_key, const logdev_key& flush_ld_key, @@ -235,7 +229,8 @@ nlohmann::json HomeLogStore::dump_log_store(const log_dump_req& dump_req) { nlohmann::json json_val = nlohmann::json::object(); serialized_log_record record_header; - const auto log_buffer = m_logdev->read(rec.m_dev_key, record_header); + const auto log_buffer = m_logdev->read(rec.m_dev_key); + m_logdev->read_record_header(rec.m_dev_key, record_header); try { json_val["size"] = uint32_cast(record_header.size); json_val["offset"] = uint32_cast(record_header.offset); @@ -260,9 +255,7 @@ nlohmann::json HomeLogStore::dump_log_store(const log_dump_req& dump_req) { void HomeLogStore::foreach (int64_t start_idx, const std::function< bool(logstore_seq_num_t, log_buffer) >& cb) { m_records.foreach_all_completed(start_idx, [&](int64_t cur_idx, homestore::logstore_record& record) -> bool { - // do a sync read - serialized_log_record header; - auto log_buf = m_logdev->read(record.m_dev_key, header); + auto log_buf = m_logdev->read(record.m_dev_key); return cb(cur_idx, log_buf); }); } diff --git a/src/lib/replication/log_store/home_raft_log_store.cpp b/src/lib/replication/log_store/home_raft_log_store.cpp index 0798187a8..0e8b2c655 100644 --- a/src/lib/replication/log_store/home_raft_log_store.cpp +++ b/src/lib/replication/log_store/home_raft_log_store.cpp @@ -148,6 +148,11 @@ ulong HomeRaftLogStore::start_index() const { nuraft::ptr< nuraft::log_entry > HomeRaftLogStore::last_entry() const { store_lsn_t max_seq = m_log_store->get_contiguous_issued_seq_num(m_last_durable_lsn); if (max_seq < 0) { return m_dummy_log_entry; } + auto lsn = to_repl_lsn(max_seq); + { + std::shared_lock lk(m_mutex); + if (m_cached_log_entries.count(lsn)) return m_cached_log_entries[lsn]; + } nuraft::ptr< nuraft::log_entry > nle; try { @@ -168,7 +173,12 @@ ulong HomeRaftLogStore::append(nuraft::ptr< nuraft::log_entry >& entry) { auto const next_seq = m_log_store->append_async(sisl::io_blob{buf->data_begin(), uint32_cast(buf->size()), false /* is_aligned */}, nullptr /* cookie */, [buf](int64_t, sisl::io_blob&, logdev_key, void*) {}); - return to_repl_lsn(next_seq); + auto lsn = to_repl_lsn(next_seq); + { + std::unique_lock lk(m_mutex); + m_cached_log_entries[lsn] = entry; + } + return lsn; } void HomeRaftLogStore::write_at(ulong index, nuraft::ptr< nuraft::log_entry >& entry) { @@ -180,14 +190,20 @@ void HomeRaftLogStore::write_at(ulong index, nuraft::ptr< nuraft::log_entry >& e // calls, but it is dangerous to set higher number. m_last_durable_lsn = -1; - m_log_store->append_async(sisl::io_blob{buf->data_begin(), uint32_cast(buf->size()), false /* is_aligned */}, - nullptr /* cookie */, [buf](int64_t, sisl::io_blob&, logdev_key, void*) {}); + auto const next_seq = + m_log_store->append_async(sisl::io_blob{buf->data_begin(), uint32_cast(buf->size()), false /* is_aligned */}, + nullptr /* cookie */, [buf](int64_t, sisl::io_blob&, logdev_key, void*) {}); + auto lsn = to_repl_lsn(next_seq); + std::unique_lock lk(m_mutex); + m_cached_log_entries[lsn] = entry; } void HomeRaftLogStore::end_of_append_batch(ulong start, ulong cnt) { auto end_lsn = to_store_lsn(start + cnt - 1); m_log_store->flush(end_lsn); m_last_durable_lsn = end_lsn; + std::unique_lock lk(m_mutex); + m_cached_log_entries.clear(); } nuraft::ptr< std::vector< nuraft::ptr< nuraft::log_entry > > > HomeRaftLogStore::log_entries(ulong start, ulong end) { @@ -201,6 +217,11 @@ nuraft::ptr< std::vector< nuraft::ptr< nuraft::log_entry > > > HomeRaftLogStore: } nuraft::ptr< nuraft::log_entry > HomeRaftLogStore::entry_at(ulong index) { + { + std::shared_lock lk(m_mutex); + if (m_cached_log_entries.count(index)) return m_cached_log_entries[index]; + } + nuraft::ptr< nuraft::log_entry > nle; try { auto log_bytes = m_log_store->read_sync(to_store_lsn(index)); @@ -213,6 +234,11 @@ nuraft::ptr< nuraft::log_entry > HomeRaftLogStore::entry_at(ulong index) { } ulong HomeRaftLogStore::term_at(ulong index) { + { + std::shared_lock lk(m_mutex); + if (m_cached_log_entries.count(index)) return m_cached_log_entries[index]->get_term(); + } + ulong term; try { auto log_bytes = m_log_store->read_sync(to_store_lsn(index)); diff --git a/src/lib/replication/log_store/home_raft_log_store.h b/src/lib/replication/log_store/home_raft_log_store.h index 5e6e589ad..669566424 100644 --- a/src/lib/replication/log_store/home_raft_log_store.h +++ b/src/lib/replication/log_store/home_raft_log_store.h @@ -199,5 +199,7 @@ class HomeRaftLogStore : public nuraft::log_store { nuraft::ptr< nuraft::log_entry > m_dummy_log_entry; store_lsn_t m_last_durable_lsn{-1}; folly::Future< folly::Unit > m_log_store_future; + mutable std::unordered_map< ulong, nuraft::ptr< nuraft::log_entry > > m_cached_log_entries; + std::shared_mutex m_mutex; }; } // namespace homestore diff --git a/src/tests/test_log_store.cpp b/src/tests/test_log_store.cpp index 0f53a24a8..4fa666409 100644 --- a/src/tests/test_log_store.cpp +++ b/src/tests/test_log_store.cpp @@ -35,7 +35,6 @@ #include #include #include -#include #include #include @@ -548,7 +547,6 @@ class LogStoreTest : public ::testing::Test { // m_nrecords_waiting_to_issue = std::lround(n_total_records / _batch_size) * _batch_size; m_nrecords_waiting_to_issue = n_total_records; m_nrecords_waiting_to_complete = 0; - m_schedule_more_inserts.store(true); SampleDB::instance().m_on_schedule_io_cb = std::bind(&LogStoreTest::do_insert, this); SampleDB::instance().m_io_closure = bind_this(LogStoreTest::on_insert_completion, 3); @@ -562,15 +560,6 @@ class LogStoreTest : public ::testing::Test { m_batch_size = batch_size; m_q_depth = q_depth; m_holes_per_batch = holes_per_batch; - std::thread([this]() { - while (m_schedule_more_inserts.load() && m_nrecords_waiting_to_issue) { - lanch_inserts(); - m_pending_smph.try_acquire_for((std::chrono::seconds(1))); - } - }).detach(); - } - - void lanch_inserts() { iomanager.run_on_forget(iomgr::reactor_regex::all_io, []() { if (SampleDB::instance().m_on_schedule_io_cb) SampleDB::instance().m_on_schedule_io_cb(); }); @@ -578,19 +567,21 @@ class LogStoreTest : public ::testing::Test { void do_insert() { // Randomly pick a store client and write journal entry batch. - uint32_t batch_size{0}; - { - std::unique_lock< std::mutex > lock{m_pending_mtx}; - const bool insert = (m_nrecords_waiting_to_issue > 0) && (m_nrecords_waiting_to_complete < m_q_depth); - if (insert) { - batch_size = std::min< uint32_t >(m_batch_size, m_nrecords_waiting_to_issue); - m_nrecords_waiting_to_issue -= batch_size; - m_nrecords_waiting_to_complete += batch_size; - } else { - return; + for (;;) { + uint32_t batch_size{0}; + { + std::unique_lock< std::mutex > lock{m_pending_mtx}; + const bool insert = (m_nrecords_waiting_to_issue > 0) && (m_nrecords_waiting_to_complete < m_q_depth); + if (insert) { + batch_size = std::min< uint32_t >(m_batch_size, m_nrecords_waiting_to_issue); + m_nrecords_waiting_to_issue -= batch_size; + m_nrecords_waiting_to_complete += batch_size; + } else { + break; + } } + pick_log_store()->insert_next_batch(batch_size, std::min(batch_size, m_holes_per_batch)); } - pick_log_store()->insert_next_batch(batch_size, std::min(batch_size, m_holes_per_batch)); } void on_insert_completion([[maybe_unused]] logdev_id_t fid, logstore_seq_num_t lsn, logdev_key ld_key) { @@ -602,10 +593,10 @@ class LogStoreTest : public ::testing::Test { if ((--m_nrecords_waiting_to_complete == 0) && (waiting_to_issue == 0)) { notify = true; } } if (notify) { - m_schedule_more_inserts.store(false); m_pending_cv.notify_all(); + } else if (waiting_to_issue > 0) { + do_insert(); } - if (!m_nrecords_waiting_to_complete) m_pending_smph.release(); } void wait_for_inserts() { @@ -881,9 +872,6 @@ class LogStoreTest : public ::testing::Test { std::map< logdev_id_t, std::map< logid_t, uint32_t > > m_garbage_stores_upto; std::array< uint32_t, 100 > m_store_distribution; - std::binary_semaphore m_pending_smph{0}; - std::atomic_bool m_schedule_more_inserts{true}; - uint32_t m_q_depth{64}; uint32_t m_batch_size{1}; uint32_t m_holes_per_batch{0};