From 54dd65ecde3fe0f698860b2ced0ac4fa57e1187b Mon Sep 17 00:00:00 2001 From: Sanal P Date: Tue, 30 Apr 2024 16:06:49 -0700 Subject: [PATCH] Fix truncation issues and add long running log store tests. Fix truncation issues on boundary cases. Release chunks if truncate cross end of chunk boundaries. Enable logstore test except the parallel write and truncate test case. Truncate can cause data start to go to next chunk start offset. Change truncate api to return that offset. --- conanfile.py | 2 +- src/lib/device/journal_vdev.cpp | 151 ++++-- src/lib/device/journal_vdev.hpp | 8 +- src/lib/logstore/log_dev.cpp | 16 +- src/lib/logstore/log_dev.hpp | 2 +- src/lib/logstore/log_store_service.cpp | 12 +- src/lib/logstore/log_stream.cpp | 39 +- src/tests/CMakeLists.txt | 6 +- src/tests/test_journal_vdev.cpp | 39 +- src/tests/test_log_store.cpp | 11 +- src/tests/test_log_store_long_run.cpp | 635 +++++++++++++++++++++++++ 11 files changed, 824 insertions(+), 97 deletions(-) create mode 100644 src/tests/test_log_store_long_run.cpp diff --git a/conanfile.py b/conanfile.py index 47c449e25..af88ccdcb 100644 --- a/conanfile.py +++ b/conanfile.py @@ -5,7 +5,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "6.4.8" + version = "6.4.9" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" diff --git a/src/lib/device/journal_vdev.cpp b/src/lib/device/journal_vdev.cpp index 247f0a16b..ac59ca258 100644 --- a/src/lib/device/journal_vdev.cpp +++ b/src/lib/device/journal_vdev.cpp @@ -45,15 +45,15 @@ JournalVirtualDev::JournalVirtualDev(DeviceManager& dmgr, const vdev_info& vinfo m_init_private_data = std::make_shared< JournalChunkPrivate >(); m_chunk_pool = std::make_unique< ChunkPool >( dmgr, - ChunkPool::Params{ - HS_DYNAMIC_CONFIG(generic.journal_chunk_pool_capacity), - [this]() { - m_init_private_data->created_at = get_time_since_epoch_ms(); - m_init_private_data->end_of_chunk = m_vdev_info.chunk_size; - sisl::blob private_blob{r_cast< uint8_t* >(m_init_private_data.get()), sizeof(JournalChunkPrivate)}; - return private_blob; - }, - m_vdev_info.hs_dev_type, m_vdev_info.vdev_id, m_vdev_info.chunk_size}); + ChunkPool::Params{HS_DYNAMIC_CONFIG(generic.journal_chunk_pool_capacity), + [this]() { + m_init_private_data->created_at = get_time_since_epoch_ms(); + m_init_private_data->end_of_chunk = m_vdev_info.chunk_size; + sisl::blob private_blob{r_cast< uint8_t* >(m_init_private_data.get()), + sizeof(JournalChunkPrivate)}; + return private_blob; + }, + m_vdev_info.hs_dev_type, m_vdev_info.vdev_id, m_vdev_info.chunk_size}); resource_mgr().register_journal_vdev_exceed_cb([this]([[maybe_unused]] int64_t dirty_buf_count, bool critical) { // either it is critical or non-critical, call cp_flush; @@ -152,6 +152,19 @@ void JournalVirtualDev::remove_journal_chunks(std::vector< shared< Chunk > >& ch } } +void JournalVirtualDev::release_chunk_to_pool(shared< Chunk > chunk) { + // Clear the private chunk data before adding to pool. + auto* data = r_cast< JournalChunkPrivate* >(const_cast< uint8_t* >(chunk->user_private())); + *data = JournalChunkPrivate{}; + update_chunk_private(chunk, data); + + // We ideally want to zero out chunks as chunks are reused after free across + // logdev's. But zero out chunk is very expensive, We look at crc mismatches + // to know the end offset of the log dev during recovery. + m_chunk_pool->enqueue(chunk); + LOGINFOMOD(journalvdev, "Released chunk to pool {}", chunk->to_string()); +} + void JournalVirtualDev::update_chunk_private(shared< Chunk >& chunk, JournalChunkPrivate* private_data) { sisl::blob private_blob{r_cast< uint8_t* >(private_data), sizeof(JournalChunkPrivate)}; chunk->set_user_private(private_blob); @@ -172,7 +185,7 @@ shared< JournalVirtualDev::Descriptor > JournalVirtualDev::open(logdev_id_t logd LOGINFOMOD(journalvdev, "Opened journal vdev descriptor log_dev={}", logdev_id); for (auto& chunk : it->second->m_journal_chunks) { - LOGINFOMOD(journalvdev, " log_dev={} end_of_chunk={} chunk={}", logdev_id, get_end_of_chunk(chunk), + LOGINFOMOD(journalvdev, "log_dev={} end_of_chunk={} chunk {}", logdev_id, to_hex(get_end_of_chunk(chunk)), chunk->to_string()); } return it->second; @@ -242,7 +255,7 @@ off_t JournalVirtualDev::Descriptor::alloc_next_append_blk(size_t sz) { if ((tail_offset() + static_cast< off_t >(sz)) >= m_end_offset) { // not enough space left, add a new chunk. - LOGDEBUGMOD(journalvdev, "No space left for size {} Creating chunk desc {}", sz, to_string()); + LOGINFOMOD(journalvdev, "No space left for size {} Creating chunk desc {}", sz, to_string()); #ifdef _PRERELEASE iomgr_flip::test_and_abort("abort_before_update_eof_cur_chunk"); @@ -373,13 +386,13 @@ void JournalVirtualDev::Descriptor::sync_pwritev(const iovec* iov, int iovcnt, o } /////////////////////////////// Read Section ////////////////////////////////// -size_t JournalVirtualDev::Descriptor::sync_next_read(uint8_t* buf, size_t size_rd) { - if (m_journal_chunks.empty()) { return 0; } +int64_t JournalVirtualDev::Descriptor::sync_next_read(uint8_t* buf, size_t size_rd) { + if (m_journal_chunks.empty()) { return -1; } HS_REL_ASSERT_LE(m_seek_cursor, m_end_offset, "seek_cursor {} exceeded end_offset {}", m_seek_cursor, m_end_offset); if (m_seek_cursor >= m_end_offset) { LOGTRACEMOD(journalvdev, "sync_next_read reached end of chunks"); - return 0; + return -1; } auto [chunk, _, offset_in_chunk] = offset_to_chunk(m_seek_cursor); @@ -400,6 +413,10 @@ size_t JournalVirtualDev::Descriptor::sync_next_read(uint8_t* buf, size_t size_r // truncate size to what is left; size_rd = end_of_chunk - offset_in_chunk; across_chunk = true; + if (size_rd == 0) { + // If there are no more data in the current chunk, move the seek_cursor to the next chunk. + m_seek_cursor += (chunk->size() - end_of_chunk); + } } if (buf == nullptr) { return size_rd; } @@ -542,11 +559,9 @@ void JournalVirtualDev::Descriptor::update_tail_offset(off_t tail) { LOGINFOMOD(journalvdev, "Updated tail offset arg 0x{} desc {} ", to_hex(tail), to_string()); } -void JournalVirtualDev::Descriptor::truncate(off_t truncate_offset) { +off_t JournalVirtualDev::Descriptor::truncate(off_t truncate_offset) { const off_t ds_off = data_start_offset(); - COUNTER_INCREMENT(m_vdev.m_metrics, vdev_truncate_count, 1); - HS_PERIODIC_LOG(DEBUG, journalvdev, "truncating to logical offset: 0x{} desc {}", to_hex(truncate_offset), to_string()); @@ -559,11 +574,26 @@ void JournalVirtualDev::Descriptor::truncate(off_t truncate_offset) { } // Find the chunk which has the truncation offset. This will be the new - // head chunk in the list. We first update the is_head is true of this chunk. + // head chunk in the list. We first update the is_head to true for that new head chunk. // So if a crash happens after this, we could have two chunks which has is_head // true in the list and during recovery we select head with the highest creation - // timestamp and reuse or cleanup the other. - auto [new_head_chunk, _, offset_in_chunk] = offset_to_chunk(truncate_offset); + // timestamp and cleanup the other. We check if the truncation offset happens to be + // between the end_of_chunk mark and actual chunk end. In that case there is not data + // in that chunk and we release this chunk and select the next chunk. + bool update_truncate_offset = false; + auto [new_head_chunk, index, offset_in_chunk] = offset_to_chunk(truncate_offset); + auto end_new_head_chunk = m_vdev.get_end_of_chunk(new_head_chunk); + if (offset_in_chunk >= static_cast< int64_t >(end_new_head_chunk)) { + // If the truncation offset is same as end_of_chunk, we dont have any more + // data in this chunk to read. Select the next chunk. + if (++index <= (m_journal_chunks.size() - 1)) { + // Go to the next chunk to make it the new head chunk. + new_head_chunk = m_journal_chunks[index]; + update_truncate_offset = true; + size_to_truncate += (new_head_chunk->size() - end_new_head_chunk); + } + } + auto* private_data = r_cast< JournalChunkPrivate* >(const_cast< uint8_t* >(new_head_chunk->user_private())); private_data->is_head = true; private_data->logdev_id = m_logdev_id; @@ -571,34 +601,63 @@ void JournalVirtualDev::Descriptor::truncate(off_t truncate_offset) { // Find all chunks which needs to be removed from the start of m_journal_chunks. // We stop till the truncation offset. Start from the old data_start_offset. - // Align the data_start_offset to the chunk_size as we deleting chunks and - // all chunks are same size in a journal vdev. - uint32_t start = sisl::round_down(ds_off, m_vdev.info().chunk_size); + // All the chunks which are covered inside the truncate offset are released. + // cover_offset is a logical offset. + index = 0; + off_t tail_off = + static_cast< off_t >(data_start_offset() + m_write_sz_in_total.load(std::memory_order_relaxed)) + m_reserved_sz; + auto chunk_size = m_vdev.info().chunk_size; + LOGINFOMOD(journalvdev, "Truncate begin truncate {} desc {}", to_hex(truncate_offset), to_string()); + +#ifdef _PRERELEASE + for (auto it = m_journal_chunks.begin(); it != m_journal_chunks.end(); ++it) { + auto chunk = *it; + auto* private_data = r_cast< JournalChunkPrivate* >(const_cast< uint8_t* >(chunk->user_private())); + LOGINFOMOD(journalvdev, "log_dev={} chunk_id={} is_head={}", m_logdev_id, chunk->chunk_id(), + private_data->is_head) + } +#endif + + off_t cover_offset = sisl::round_down(data_start_offset(), chunk_size); + auto total_num_chunks = m_journal_chunks.size(); for (auto it = m_journal_chunks.begin(); it != m_journal_chunks.end();) { auto chunk = *it; - start += chunk->size(); + off_t end_of_chunk = m_vdev.get_end_of_chunk(chunk); + off_t chunk_hole = 0; + if (index == total_num_chunks - 1) { + // If its the last chunk, only read upto the tail_offset + // There are no holes in the last chunk. + cover_offset += (tail_off % chunk_size); + } else { + // For other chunks take the whole size. + cover_offset += chunk_size; + chunk_hole = (chunk_size - end_of_chunk); + } - // Also if its the last chunk and there is no data after truncate, we release chunk. - auto write_sz_in_total = m_write_sz_in_total.load(std::memory_order_relaxed); - if (start >= truncate_offset) { break; } + index++; - m_total_size -= chunk->size(); - it = m_journal_chunks.erase(it); + // Check if the offset is inside the truncate offset. We also check if the truncate offset lies + // between end_of_chunk - chunk_size. If either condition satifies, we release the chunks. + if (cover_offset <= truncate_offset || ((cover_offset - chunk_hole) <= truncate_offset)) { + m_total_size -= chunk->size(); + it = m_journal_chunks.erase(it); - // Clear the private chunk data before adding to pool. - auto* data = r_cast< JournalChunkPrivate* >(const_cast< uint8_t* >(chunk->user_private())); - *data = JournalChunkPrivate{}; - m_vdev.update_chunk_private(chunk, data); + // Release the chunk back to pool. + LOGINFOMOD(journalvdev, + "Released chunk_id={} log_dev={} cover={} truncate_offset={} tail={} end_of_chunk={} desc {}", + chunk->chunk_id(), m_logdev_id, to_hex(cover_offset), to_hex(truncate_offset), to_hex(tail_off), + m_vdev.get_end_of_chunk(chunk), to_string()); + m_vdev.release_chunk_to_pool(chunk); + } else { + ++it; + } + } - // We ideally want to zero out chunks as chunks are reused after free across - // logdev's. But zero out chunk is very expensive, We look at crc mismatches - // to know the end offset of the log dev during recovery. - // Format and add back to pool. - m_vdev.m_chunk_pool->enqueue(chunk); - LOGINFOMOD(journalvdev, "After truncate released chunk {}", chunk->to_string()); + if (update_truncate_offset) { + // Update the truncate offset to align with the chunk size. + truncate_offset = sisl::round_up(truncate_offset, m_vdev.info().chunk_size); } - // Update our start offset, to keep track of actual size HS_REL_ASSERT_LE(truncate_offset, m_end_offset, "truncate offset less than end offset"); update_data_start_offset(truncate_offset); @@ -606,7 +665,17 @@ void JournalVirtualDev::Descriptor::truncate(off_t truncate_offset) { m_write_sz_in_total.fetch_sub(size_to_truncate, std::memory_order_relaxed); m_truncate_done = true; - HS_PERIODIC_LOG(INFO, journalvdev, "After truncate desc {}", to_string()); +#ifdef _PRERELEASE + for (auto it = m_journal_chunks.begin(); it != m_journal_chunks.end(); ++it) { + auto chunk = *it; + auto* private_data = r_cast< JournalChunkPrivate* >(const_cast< uint8_t* >(chunk->user_private())); + LOGINFOMOD(journalvdev, "log_dev={} chunk_id={} is_head={}", m_logdev_id, chunk->chunk_id(), + private_data->is_head) + } +#endif + + LOGINFOMOD(journalvdev, "Truncate end truncate {} desc {}", to_hex(truncate_offset), to_string()); + return data_start_offset(); } #if 0 diff --git a/src/lib/device/journal_vdev.hpp b/src/lib/device/journal_vdev.hpp index 388799a87..c9bb1dc23 100644 --- a/src/lib/device/journal_vdev.hpp +++ b/src/lib/device/journal_vdev.hpp @@ -158,9 +158,9 @@ class JournalVirtualDev : public VirtualDev { * * @return : On success, the number of bytes read is returned (zero indicates end of file), and the cursor is * advanced by this number. it is not an error if this number is smaller than the number requested, because it - * can be end of chunk, since read won't across chunk. + * can be end of chunk, since read won't across chunk. Returns -1 if there are no bytes available to read. */ - size_t sync_next_read(uint8_t* buf, size_t count_in); + int64_t sync_next_read(uint8_t* buf, size_t count_in); /** * @brief : reads up to count bytes at offset into the buffer starting at buf. @@ -275,8 +275,9 @@ class JournalVirtualDev : public VirtualDev { * 1. update in-memory counter of total write size. * 2. update vdev superblock of the new start logical offset that is being truncate to; * + * @return : return the new data start offset after truncation. */ - void truncate(off_t truncate_offset); + off_t truncate(off_t truncate_offset); /** * @brief : get the total size in journal @@ -416,6 +417,7 @@ class JournalVirtualDev : public VirtualDev { uint64_t num_descriptors() const { return m_journal_descriptors.size(); } void remove_journal_chunks(std::vector< shared< Chunk > >& chunks); + void release_chunk_to_pool(shared< Chunk > chunk); void update_chunk_private(shared< Chunk >& chunk, JournalChunkPrivate* chunk_private); uint64_t get_end_of_chunk(shared< Chunk >& chunk) const; diff --git a/src/lib/logstore/log_dev.cpp b/src/lib/logstore/log_dev.cpp index b09c6eb45..d43ec8b21 100644 --- a/src/lib/logstore/log_dev.cpp +++ b/src/lib/logstore/log_dev.cpp @@ -271,7 +271,11 @@ int64_t LogDev::append_async(const logstore_id_t store_id, const logstore_seq_nu log_buffer LogDev::read(const logdev_key& key, serialized_log_record& return_record_header) { auto buf = sisl::make_byte_array(initial_read_size, m_flush_size_multiple, sisl::buftag::logread); - m_vdev_jd->sync_pread(buf->bytes(), initial_read_size, key.dev_offset); + auto ec = m_vdev_jd->sync_pread(buf->bytes(), initial_read_size, key.dev_offset); + if (ec) { + LOGERROR("Failed to read from ournal vdev log_dev={} {} {}", m_logdev_id, ec.value(), ec.message()); + return {}; + } auto* header = r_cast< const log_group_header* >(buf->cbytes()); // THIS_LOGDEV_LOG(TRACE, "Logdev read log group header {}", *header); @@ -571,15 +575,15 @@ uint64_t LogDev::truncate(const logdev_key& key) { "Truncating log device upto log_dev={} log_id={} vdev_offset={} truncated {} log records", m_logdev_id, key.idx, key.dev_offset, num_records_to_truncate); m_log_records->truncate(key.idx); - m_vdev_jd->truncate(key.dev_offset); - THIS_LOGDEV_LOG(DEBUG, "LogDev::truncate done {} ", key.idx); + off_t new_offset = m_vdev_jd->truncate(key.dev_offset); + THIS_LOGDEV_LOG(DEBUG, "LogDev::truncate done {} offset old {} new {}", key.idx, key.dev_offset, new_offset); m_last_truncate_idx = key.idx; { std::unique_lock< std::mutex > lk{m_meta_mutex}; // Update the start offset to be read upon restart - m_logdev_meta.set_start_dev_offset(key.dev_offset, key.idx + 1, false /* persist_now */); + m_logdev_meta.set_start_dev_offset(new_offset, key.idx + 1, false /* persist_now */); // Now that store is truncated, we can reclaim the store ids which are garbaged (if any) earlier #ifdef _PRERELEASE @@ -658,7 +662,7 @@ std::shared_ptr< HomeLogStore > LogDev::create_new_log_store(bool append_mode) { HS_REL_ASSERT((it == m_id_logstore_map.end()), "store_id {}-{} already exists", m_logdev_id, store_id); m_id_logstore_map.insert(std::pair(store_id, logstore_info{.log_store = lstore, .append_mode = append_mode})); } - LOGINFO("Created log store log_dev={} log_store={}", m_logdev_id, store_id); + HS_LOG(INFO, logstore, "Created log store log_dev={} log_store={}", m_logdev_id, store_id); return lstore; } @@ -934,7 +938,9 @@ void LogDevMetadata::reset() { } void LogDevMetadata::logdev_super_blk_found(const sisl::byte_view& buf, void* meta_cookie) { + m_sb.load(buf, meta_cookie); + LOGINFO("Logdev superblk found log_dev={}", m_sb->logdev_id); HS_REL_ASSERT_EQ(m_sb->get_magic(), logdev_superblk::LOGDEV_SB_MAGIC, "Invalid logdev metablk, magic mismatch"); HS_REL_ASSERT_EQ(m_sb->get_version(), logdev_superblk::LOGDEV_SB_VERSION, "Invalid version of logdev metablk"); } diff --git a/src/lib/logstore/log_dev.hpp b/src/lib/logstore/log_dev.hpp index 7affcce96..2614ba25c 100644 --- a/src/lib/logstore/log_dev.hpp +++ b/src/lib/logstore/log_dev.hpp @@ -569,7 +569,7 @@ class log_stream_reader { sisl::byte_view group_in_next_page(); private: - sisl::byte_view read_next_bytes(uint64_t nbytes); + sisl::byte_view read_next_bytes(uint64_t nbytes, bool& end_of_stream); private: JournalVirtualDev* m_vdev; diff --git a/src/lib/logstore/log_store_service.cpp b/src/lib/logstore/log_store_service.cpp index 148363325..46e55655f 100644 --- a/src/lib/logstore/log_store_service.cpp +++ b/src/lib/logstore/log_store_service.cpp @@ -125,7 +125,7 @@ logdev_id_t LogStoreService::create_new_logdev() { auto logdev = create_new_logdev_internal(logdev_id); logdev->start(true /* format */); COUNTER_INCREMENT(m_metrics, logdevs_count, 1); - LOGINFO("Created log_dev={}", logdev_id); + HS_LOG(INFO, logstore, "Created log_dev={}", logdev_id); return logdev_id; } @@ -149,12 +149,12 @@ void LogStoreService::destroy_log_dev(logdev_id_t logdev_id) { m_id_logdev_map.erase(it); COUNTER_DECREMENT(m_metrics, logdevs_count, 1); - LOGINFO("Removed log_dev={}", logdev_id); + HS_LOG(INFO, logstore, "Removed log_dev={}", logdev_id); } void LogStoreService::delete_unopened_logdevs() { for (auto logdev_id : m_unopened_logdev) { - LOGINFO("Deleting unopened log_dev={}", logdev_id); + HS_LOG(INFO, logstore, "Deleting unopened log_dev={}", logdev_id); destroy_log_dev(logdev_id); } m_unopened_logdev.clear(); @@ -178,7 +178,7 @@ void LogStoreService::open_logdev(logdev_id_t logdev_id) { LOGDEBUGMOD(logstore, "log_dev={} does not exist, created!", logdev_id); } m_unopened_logdev.erase(logdev_id); - LOGDEBUGMOD(logstore, "Opened log_dev={}", logdev_id); + HS_LOG(INFO, logstore, "Opened log_dev={}", logdev_id); } std::vector< std::shared_ptr< LogDev > > LogStoreService::get_all_logdevs() { @@ -206,7 +206,7 @@ void LogStoreService::logdev_super_blk_found(const sisl::byte_view& buf, void* m folly::SharedMutexWritePriority::WriteHolder holder(m_logdev_map_mtx); std::shared_ptr< LogDev > logdev; auto id = sb->logdev_id; - LOGDEBUGMOD(logstore, "Log dev superblk found logdev={}", id); + HS_LOG(DEBUG, logstore, "Log dev superblk found logdev={}", id); const auto it = m_id_logdev_map.find(id); // We could update the logdev map either with logdev or rollback superblks found callbacks. if (it != m_id_logdev_map.end()) { @@ -235,7 +235,7 @@ void LogStoreService::rollback_super_blk_found(const sisl::byte_view& buf, void* folly::SharedMutexWritePriority::WriteHolder holder(m_logdev_map_mtx); std::shared_ptr< LogDev > logdev; auto id = rollback_sb->logdev_id; - LOGDEBUGMOD(logstore, "Log dev rollback superblk found logdev={}", id); + HS_LOG(DEBUG, logstore, "Log dev rollback superblk found logdev={}", id); const auto it = m_id_logdev_map.find(id); HS_REL_ASSERT((it != m_id_logdev_map.end()), "found a rollback_super_blk of logdev id {}, but the logdev with id {} doesnt exist", id); diff --git a/src/lib/logstore/log_stream.cpp b/src/lib/logstore/log_stream.cpp index 931e03eef..2a00cd31c 100644 --- a/src/lib/logstore/log_stream.cpp +++ b/src/lib/logstore/log_stream.cpp @@ -44,10 +44,15 @@ sisl::byte_view log_stream_reader::next_group(off_t* out_dev_offset) { read_again: if (m_cur_log_buf.size() < min_needed) { do { - m_cur_log_buf = read_next_bytes(std::max(min_needed, bulk_read_size)); + bool end_of_stream{false}; + m_cur_log_buf = read_next_bytes(std::max(min_needed, bulk_read_size), end_of_stream); + if (end_of_stream) { + LOGDEBUGMOD(logstore, "Logdev reached end of stream {} {}", m_vdev_jd->to_string(), m_cur_read_bytes); + return ret_buf; + } if (m_cur_log_buf.size() == 0) { - LOGINFOMOD(logstore, "Logdev data empty log_dev={}", m_vdev_jd->logdev_id()); - return {}; + LOGDEBUGMOD(logstore, "Logdev data empty {}", m_vdev_jd->logdev_id()); + continue; } } while (m_cur_log_buf.size() < sizeof(log_group_header)); min_needed = 0; @@ -56,8 +61,8 @@ sisl::byte_view log_stream_reader::next_group(off_t* out_dev_offset) { HS_REL_ASSERT_GE(m_cur_log_buf.size(), m_read_size_multiple); const auto* header = r_cast< log_group_header const* >(m_cur_log_buf.bytes()); if (header->magic_word() != LOG_GROUP_HDR_MAGIC) { - LOGINFOMOD(logstore, "Logdev data not seeing magic at pos {}, must have come to end of log_dev={}", - m_vdev_jd->dev_offset(m_cur_read_bytes), m_vdev_jd->logdev_id()); + LOGERROR("Logdev data not seeing magic at pos {}, must have come to end of log_dev={}", + m_vdev_jd->dev_offset(m_cur_read_bytes), m_vdev_jd->logdev_id()); *out_dev_offset = m_vdev_jd->dev_offset(m_cur_read_bytes); // move it by dma boundary if header is not valid m_prev_crc = 0; @@ -71,9 +76,8 @@ sisl::byte_view log_stream_reader::next_group(off_t* out_dev_offset) { // compare it with prev crc if (m_prev_crc != 0 && m_prev_crc != header->prev_grp_crc) { // we reached at the end - LOGINFOMOD(logstore, - "we have reached the end. crc doesn't match offset {} prev crc {} header prev crc {} log_dev={}", - m_vdev_jd->dev_offset(m_cur_read_bytes), header->prev_grp_crc, m_prev_crc, m_vdev_jd->logdev_id()); + LOGERROR("we have reached the end. crc doesn't match offset {} prev crc {} header prev crc {} log_dev={}", + m_vdev_jd->dev_offset(m_cur_read_bytes), header->prev_grp_crc, m_prev_crc, m_vdev_jd->logdev_id()); *out_dev_offset = m_vdev_jd->dev_offset(m_cur_read_bytes); if (!m_vdev_jd->is_offset_at_last_chunk(*out_dev_offset)) { HS_REL_ASSERT(0, "data is corrupted {}", m_vdev_jd->logdev_id()); @@ -92,7 +96,7 @@ sisl::byte_view log_stream_reader::next_group(off_t* out_dev_offset) { goto read_again; } - LOGTRACEMOD(logstore, + LOGDEBUGMOD(logstore, "Logstream read log group of size={} nrecords={} journal_dev_offset {} cur_read_bytes {} buf size " "remaining {} log_dev={}", header->total_size(), header->nrecords(), m_vdev_jd->dev_offset(m_cur_read_bytes), m_cur_read_bytes, @@ -101,10 +105,9 @@ sisl::byte_view log_stream_reader::next_group(off_t* out_dev_offset) { // At this point data seems to be valid. Lets see if a data is written completely by comparing the footer const auto* footer = r_cast< log_group_footer* >((uint64_t)m_cur_log_buf.bytes() + header->footer_offset); if (footer->magic != LOG_GROUP_FOOTER_MAGIC || footer->start_log_idx != header->start_log_idx) { - LOGINFOMOD(logstore, - "last write is not completely written. footer magic {} footer start_log_idx {} header log indx {} " - "log_dev={}", - footer->magic, footer->start_log_idx, header->start_log_idx, m_vdev_jd->logdev_id()); + LOGERROR("last write is not completely written. footer magic {} footer start_log_idx {} header log indx {} " + "log_dev={}", + footer->magic, footer->start_log_idx, header->start_log_idx, m_vdev_jd->logdev_id()); *out_dev_offset = m_vdev_jd->dev_offset(m_cur_read_bytes); // move it by dma boundary if header is not valid m_prev_crc = 0; @@ -123,8 +126,7 @@ sisl::byte_view log_stream_reader::next_group(off_t* out_dev_offset) { (header->total_size() - sizeof(log_group_header))); if (cur_crc != header->cur_grp_crc) { /* This is a valid entry so crc should match */ - LOGERRORMOD(logstore, "crc doesn't match {} log_dev={}", m_vdev_jd->dev_offset(m_cur_read_bytes), - m_vdev_jd->logdev_id()); + LOGERROR("crc doesn't match {} log_dev={}", m_vdev_jd->dev_offset(m_cur_read_bytes), m_vdev_jd->logdev_id()); HS_REL_ASSERT(0, "data is corrupted {}", m_vdev_jd->logdev_id()); *out_dev_offset = m_vdev_jd->dev_offset(m_cur_read_bytes); @@ -151,10 +153,15 @@ sisl::byte_view log_stream_reader::group_in_next_page() { return next_group(&dev_offset); } -sisl::byte_view log_stream_reader::read_next_bytes(uint64_t nbytes) { +sisl::byte_view log_stream_reader::read_next_bytes(uint64_t nbytes, bool& end_of_stream) { // TO DO: Might need to address alignment based on data or fast type const auto prev_pos = m_vdev_jd->seeked_pos(); auto sz_to_read = m_vdev_jd->sync_next_read(nullptr, nbytes); + if (sz_to_read == -1) { + end_of_stream = true; + return sisl::byte_view{m_cur_log_buf}; + } + if (sz_to_read == 0) { return sisl::byte_view{m_cur_log_buf}; } auto out_buf = diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt index 7365d88c5..c10b09dcb 100644 --- a/src/tests/CMakeLists.txt +++ b/src/tests/CMakeLists.txt @@ -89,6 +89,10 @@ if (${io_tests}) target_sources(test_log_dev PRIVATE test_log_dev.cpp) target_link_libraries(test_log_dev hs_logdev homestore ${COMMON_TEST_DEPS} ) + add_executable(test_log_store_long_run) + target_sources(test_log_store_long_run PRIVATE test_log_store_long_run.cpp) + target_link_libraries(test_log_store_long_run hs_logdev homestore ${COMMON_TEST_DEPS}) + set(TEST_METABLK_SOURCE_FILES test_meta_blk_mgr.cpp) add_executable(test_meta_blk_mgr ${TEST_METABLK_SOURCE_FILES}) target_link_libraries(test_meta_blk_mgr homestore ${COMMON_TEST_DEPS} GTest::gmock) @@ -113,7 +117,7 @@ if (${io_tests}) can_build_epoll_io_tests(epoll_tests) if(${epoll_tests}) add_test(NAME LogDev-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_log_dev) - #add_test(NAME LogStore-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_log_store) + add_test(NAME LogStore-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_log_store) add_test(NAME MetaBlkMgr-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_meta_blk_mgr) add_test(NAME DataService-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_data_service) diff --git a/src/tests/test_journal_vdev.cpp b/src/tests/test_journal_vdev.cpp index cfd4d6500..ce0b302ca 100644 --- a/src/tests/test_journal_vdev.cpp +++ b/src/tests/test_journal_vdev.cpp @@ -202,10 +202,7 @@ class JournalDescriptorTest { m_vdev_jd->truncate(off_to_truncate); auto tail_after = m_vdev_jd->tail_offset(); - if (m_vdev_jd->num_chunks_used()) { - HS_DBG_ASSERT_EQ(tail_before, tail_after); - HS_DBG_ASSERT_EQ(off_to_truncate, m_vdev_jd->data_start_offset()); - } + if (m_vdev_jd->num_chunks_used()) { HS_DBG_ASSERT_EQ(tail_before, tail_after); } if (off_to_truncate > m_start_off) { // remove the offsets before truncate offset, since they are not valid for read anymore; @@ -251,12 +248,10 @@ class JournalDescriptorTest { void space_usage_asserts() { auto used_space = m_vdev_jd->used_size(); - auto start_off = m_vdev_jd->data_start_offset(); if (m_vdev_jd->num_chunks_used() != 0) { HS_DBG_ASSERT_GT(m_vdev_jd->size(), 0); HS_DBG_ASSERT_LE(used_space, m_vdev_jd->size()); - HS_DBG_ASSERT_EQ(start_off, m_start_off); } else { HS_DBG_ASSERT_EQ(m_vdev_jd->size(), 0); } @@ -293,7 +288,6 @@ class JournalDescriptorTest { auto tail_offset = m_vdev_jd->tail_offset(); auto start_offset = m_vdev_jd->data_start_offset(); - HS_DBG_ASSERT_EQ(m_start_off, start_offset); if (start_offset < tail_offset) { HS_DBG_ASSERT_GE(off, start_offset, "Wrong offset: {}, start_off: {}", off, start_offset); HS_DBG_ASSERT_LE(off, tail_offset, "Wrong offset: {}, tail_offset: {}", off, tail_offset); @@ -529,10 +523,10 @@ TEST_F(VDevJournalIOTest, MultipleChunkTest) { LOGINFO("Inserting two entries"); for (int i = 0; i < 2; i++) { test.fixed_write(data_size); + writesz += data_size; } // Verify write size has two data entries and one chunk. - writesz = 2 * data_size; test.verify_journal_descriptor(log_dev_jd, {.ds = 0x0, .end = chunk_size, .writesz = writesz, .rsvdsz = 0, .chunks = 1, .trunc = false, .total = chunk_size, .seek = 0x0}); @@ -572,35 +566,42 @@ TEST_F(VDevJournalIOTest, MultipleChunkTest) { test.verify_journal_descriptor(log_dev_jd, {.ds = 0x0, .end = 4 * chunk_size, .writesz = writesz, .rsvdsz = 0, .chunks = 4, .trunc = false, .total = 4 * chunk_size, .seek = 0x0}); - // Truncate two data entries. No change in chunk count, only write size and data start changed. + // Truncate two data entries. Num chunks reduced to 3. Write and total size reduce by 1 chunk. LOGINFO("Truncating two entries"); - uint64_t trunc_sz = 2 * data_size; uint64_t trunc_offset = 2 * data_size; + writesz -= chunk_size; test.truncate(trunc_offset); - test.verify_journal_descriptor(log_dev_jd, {.ds = trunc_offset, .end = 4 * chunk_size, .writesz = writesz - trunc_sz, - .rsvdsz = 0, .chunks = 4, .trunc = true, .total = 4 * chunk_size, .seek = 0x0}); + test.verify_journal_descriptor(log_dev_jd, {.ds = chunk_size, .end = 4 * chunk_size, .writesz = writesz, + .rsvdsz = 0, .chunks = 3, .trunc = true, .total = 3 * chunk_size, .seek = 0x0}); - // Truncate one more entry. Release one chunk back and reduce chunk count. Increase the data start. + // Truncate one more entry. LOGINFO("Truncating one entry"); trunc_offset = chunk_size + data_size; - trunc_sz = chunk_size + data_size; test.truncate(trunc_offset); - test.verify_journal_descriptor(log_dev_jd, {.ds = trunc_offset, .end = 4 * chunk_size, .writesz = writesz - trunc_sz, + test.verify_journal_descriptor(log_dev_jd, {.ds = trunc_offset, .end = 4 * chunk_size, .writesz = writesz - data_size, .rsvdsz = 0, .chunks = 3, .trunc = true, .total = 3 * chunk_size, .seek = 0x0}); // Restart homestore and restore the offsets. LOGINFO("Restart homestore"); restart_restore(); - test.verify_journal_descriptor(log_dev_jd, {.ds = trunc_offset, .end = 4 * chunk_size, .writesz = writesz - trunc_sz, + test.verify_journal_descriptor(log_dev_jd, {.ds = trunc_offset, .end = 4 * chunk_size, .writesz = writesz - data_size, .rsvdsz = 0, .chunks = 3, .trunc = false, .total = 3 * chunk_size, .seek = 0x0}); test.read_all(); - // Truncate all entries. Num chunks 1, write sz should be 0. + // Truncate one more entry. This will release one more chunk. + LOGINFO("Truncating one entry"); + trunc_offset = chunk_size + 2 * data_size; + writesz -= chunk_size; + test.truncate(trunc_offset); + test.verify_journal_descriptor(log_dev_jd, {.ds = 2 * chunk_size, .end = 4 * chunk_size, .writesz = writesz, + .rsvdsz = 0, .chunks = 2, .trunc = true, .total = 2 * chunk_size, .seek = 0x0}); + + // Truncate all entries. Release all chunks. Num chunks 0, write sz should be 0. LOGINFO("Truncating all entries"); trunc_offset = log_dev_jd->tail_offset(); test.truncate(trunc_offset); - test.verify_journal_descriptor(log_dev_jd, {.ds = trunc_offset, .end = 4 * chunk_size, .writesz = 0, .rsvdsz = 0, - .chunks = 1, .trunc = true, .total = chunk_size, .seek = 0x0}); + test.verify_journal_descriptor(log_dev_jd, {.ds = 4 * chunk_size, .end = 4 * chunk_size, .writesz = 0, .rsvdsz = 0, + .chunks = 0, .trunc = true, .total = 0, .seek = 0x0}); // clang-format on } diff --git a/src/tests/test_log_store.cpp b/src/tests/test_log_store.cpp index 8d5d6c17d..1858236f3 100644 --- a/src/tests/test_log_store.cpp +++ b/src/tests/test_log_store.cpp @@ -257,7 +257,7 @@ class SampleLogStoreClient { validate_data(tl, i); } } catch (const std::exception& e) { - logstore_seq_num_t trunc_upto = get_truncated_upto(); + logstore_seq_num_t trunc_upto = m_log_store->truncated_upto(); if (!expect_all_completed) { LOGINFO("got store {} trunc_upto {} {} {}", m_log_store->get_store_id(), trunc_upto, i, m_log_store->log_records().get_status(2).dump(' ', 2)); @@ -319,7 +319,6 @@ class SampleLogStoreClient { } void recovery_validate() { - LOGINFO("Truncated upto {}", get_truncated_upto()); LOGINFO("Totally recovered {} non-truncated lsns and {} truncated lsns for store {} log_dev {}", m_n_recovered_lsns, m_n_recovered_truncated_lsns, m_log_store->get_store_id(), m_log_store->get_logdev()->get_id()); @@ -503,6 +502,7 @@ class SampleDB { restart); if (!restart) { + // Create one logdev for each logstore. std::vector< logdev_id_t > logdev_id_vec; for (uint32_t i{0}; i < n_log_stores; ++i) { logdev_id_vec.push_back(logstore_service().create_new_logdev()); @@ -569,8 +569,8 @@ class LogStoreTest : public ::testing::Test { virtual ~LogStoreTest() override = default; protected: - virtual void SetUp() override{}; - virtual void TearDown() override{}; + virtual void SetUp() override {}; + virtual void TearDown() override {}; void init(uint64_t n_total_records, const std::vector< std::pair< size_t, int > >& inp_freqs = {}) { // m_nrecords_waiting_to_issue = std::lround(n_total_records / _batch_size) * _batch_size; @@ -960,6 +960,8 @@ TEST_F(LogStoreTest, BurstRandInsertThenTruncate) { } } +// TODO evaluate this test is valid and enable after fixing the flush lock. +#if 0 TEST_F(LogStoreTest, BurstSeqInsertAndTruncateInParallel) { const auto num_records = SISL_OPTIONS["num_records"].as< uint32_t >(); const auto iterations = SISL_OPTIONS["iterations"].as< uint32_t >(); @@ -997,6 +999,7 @@ TEST_F(LogStoreTest, BurstSeqInsertAndTruncateInParallel) { this->truncate_validate(); } } +#endif TEST_F(LogStoreTest, RandInsertsWithHoles) { const auto num_records = SISL_OPTIONS["num_records"].as< uint32_t >(); diff --git a/src/tests/test_log_store_long_run.cpp b/src/tests/test_log_store_long_run.cpp new file mode 100644 index 000000000..e5f15961f --- /dev/null +++ b/src/tests/test_log_store_long_run.cpp @@ -0,0 +1,635 @@ +/********************************************************************************* + * Modifications Copyright 2017-2019 eBay Inc. + * + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + *********************************************************************************/ +#include // std::shuffle +#include +#include +#include // std::chrono::system_clock +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include // std::default_random_engine +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "logstore/log_dev.hpp" +#include "test_common/homestore_test_common.hpp" + +using namespace homestore; +RCU_REGISTER_INIT +SISL_LOGGING_INIT(HOMESTORE_LOG_MODS) +std::vector< std::string > test_common::HSTestHelper::s_dev_names; + +struct test_log_data { + test_log_data() = default; + test_log_data(const test_log_data&) = delete; + test_log_data(test_log_data&&) noexcept = delete; + test_log_data& operator=(const test_log_data&) = delete; + test_log_data& operator=(test_log_data&&) noexcept = delete; + ~test_log_data() = default; + + uint32_t size; + + uint8_t* get_data() { return uintptr_cast(this) + sizeof(test_log_data); }; + uint8_t const* get_data_const() const { return r_cast< uint8_t const* >(this) + sizeof(test_log_data); } + const uint8_t* get_data() const { return r_cast< const uint8_t* >(this) + sizeof(test_log_data); } + uint32_t total_size() const { return sizeof(test_log_data) + size; } + std::string get_data_str() const { + return std::string(r_cast< const char* >(get_data_const()), static_cast< size_t >(size)); + } +}; + +typedef std::function< void(logdev_id_t, logstore_seq_num_t, logdev_key) > test_log_store_comp_cb_t; +class SampleLogStoreClient { +public: + SampleLogStoreClient(std::shared_ptr< HomeLogStore > store, const logdev_id_t logdev_id, + const test_log_store_comp_cb_t& cb) : + m_store_id{store->get_store_id()}, m_comp_cb{cb}, m_logdev_id{logdev_id} { + set_log_store(store); + } + + explicit SampleLogStoreClient(const logdev_id_t logdev_id, const test_log_store_comp_cb_t& cb) : + SampleLogStoreClient(logstore_service().create_new_log_store(logdev_id, false /* append_mode */), logdev_id, + cb) {} + + SampleLogStoreClient(const SampleLogStoreClient&) = delete; + SampleLogStoreClient(SampleLogStoreClient&&) noexcept = delete; + SampleLogStoreClient& operator=(const SampleLogStoreClient&) = delete; + SampleLogStoreClient& operator=(SampleLogStoreClient&&) noexcept = delete; + ~SampleLogStoreClient() = default; + + void set_log_store(std::shared_ptr< HomeLogStore > store) { + m_log_store = store; + m_log_store->register_log_found_cb(bind_this(SampleLogStoreClient::on_log_found, 3)); + } + + void reset_recovery() { + m_n_recovered_lsns = 0; + m_n_recovered_truncated_lsns = 0; + } + + void insert_next_batch(uint32_t batch_size) { + const auto cur_lsn = m_cur_lsn.fetch_add(batch_size); + insert(cur_lsn, batch_size, false); + } + + void insert(logstore_seq_num_t start_lsn, int64_t nparallel_count, bool wait_for_completion = true) { + std::vector< logstore_seq_num_t > lsns; + lsns.reserve(nparallel_count); + + for (auto lsn{start_lsn}; lsn < start_lsn + nparallel_count; ++lsn) { + lsns.push_back(lsn); + } + + ASSERT_LT(m_log_store->get_contiguous_issued_seq_num(-1), start_lsn + nparallel_count); + ASSERT_LT(m_log_store->get_contiguous_completed_seq_num(-1), start_lsn + nparallel_count); + for (const auto lsn : lsns) { + bool io_memory{false}; + auto* d = prepare_data(lsn, io_memory); + m_log_store->write_async( + lsn, {uintptr_cast(d), d->total_size(), false}, nullptr, + [io_memory, d, this](logstore_seq_num_t seq_num, const sisl::io_blob& b, logdev_key ld_key, void* ctx) { + assert(ld_key); + if (io_memory) { + iomanager.iobuf_free(uintptr_cast(d)); + } else { + std::free(voidptr_cast(d)); + } + m_comp_cb(m_logdev_id, seq_num, ld_key); + }); + } + + // Because of restart in tests, we have torce the flush of log entries. + m_log_store->get_logdev()->flush_if_needed(1); + } + + void read_validate(const bool expect_all_completed = false) { + const auto trunc_upto = m_log_store->truncated_upto(); + for (std::remove_const_t< decltype(trunc_upto) > i{0}; i <= trunc_upto; ++i) { + ASSERT_THROW(m_log_store->read_sync(i), std::out_of_range) + << "Expected std::out_of_range exception for lsn=" << m_log_store->get_store_id() << ":" << i + << " but not thrown"; + } + + const auto upto = + expect_all_completed ? m_cur_lsn.load() - 1 : m_log_store->get_contiguous_completed_seq_num(-1); + for (auto i = m_log_store->truncated_upto() + 1; i < upto; ++i) { + try { + const auto b = m_log_store->read_sync(i); + auto* tl = r_cast< test_log_data const* >(b.bytes()); + ASSERT_EQ(tl->total_size(), b.size()) + << "Size Mismatch for lsn=" << m_log_store->get_store_id() << ":" << i; + validate_data(tl, i); + + } catch (const std::exception& e) { + LOGFATAL("Unexpected out_of_range exception for lsn={}:{} upto {} trunc_upto {}", + m_log_store->get_store_id(), i, upto, trunc_upto); + } + } + } + + void rollback_validate(uint32_t num_lsns_to_rollback) { + std::mutex mtx; + std::condition_variable cv; + bool rollback_done = false; + + if (m_log_store->truncated_upto() == m_log_store->get_contiguous_completed_seq_num(-1)) { + // No records to rollback. + return; + } + + auto const upto_lsn = m_cur_lsn.fetch_sub(num_lsns_to_rollback) - num_lsns_to_rollback - 1; + m_log_store->rollback_async(upto_lsn, [&](logstore_seq_num_t) { + ASSERT_EQ(m_log_store->get_contiguous_completed_seq_num(-1), upto_lsn) + << "Last completed seq num is not reset after rollback"; + ASSERT_EQ(m_log_store->get_contiguous_issued_seq_num(-1), upto_lsn) + << "Last issued seq num is not reset after rollback"; + read_validate(true); + { + std::unique_lock lock(mtx); + rollback_done = true; + } + cv.notify_one(); + }); + + // We wait till async rollback is finished as we do validation. + std::unique_lock lock(mtx); + cv.wait(lock, [&rollback_done] { return rollback_done == true; }); + } + + void recovery_validate() { + LOGINFO( + "Totally recovered {} non-truncated lsns and {} truncated lsns for store {} log_dev {} truncated_upto {}", + m_n_recovered_lsns, m_n_recovered_truncated_lsns, m_log_store->get_store_id(), + m_log_store->get_logdev()->get_id(), m_truncated_upto_lsn.load()); + if (m_n_recovered_lsns != (m_cur_lsn.load() - m_truncated_upto_lsn.load() - 1)) { + EXPECT_EQ(m_n_recovered_lsns, m_cur_lsn.load() - m_truncated_upto_lsn.load() - 1) + << "Recovered " << m_n_recovered_lsns << " valid lsns for store " << m_log_store->get_store_id() + << " Expected to have " << m_cur_lsn.load() - m_truncated_upto_lsn.load() - 1 + << " lsns: m_cur_lsn=" << m_cur_lsn.load() << " truncated_upto_lsn=" << m_truncated_upto_lsn; + assert(false); + } + } + + void on_log_found(const logstore_seq_num_t lsn, const log_buffer buf, void* ctx) { + LOGDEBUG("Recovered lsn {}:{} with log data of size {}", m_log_store->get_store_id(), lsn, buf.size()) + EXPECT_LE(lsn, m_cur_lsn.load()) << "Recovered incorrect lsn " << m_log_store->get_store_id() << ":" << lsn + << "Expected less than cur_lsn " << m_cur_lsn.load(); + auto* tl = r_cast< test_log_data const* >(buf.bytes()); + validate_data(tl, lsn); + + // Count only the ones which are after truncated, because recovery could receive even truncated lsns + (lsn > m_truncated_upto_lsn) ? ++m_n_recovered_lsns : ++m_n_recovered_truncated_lsns; + } + + void truncate(const logstore_seq_num_t lsn) { + m_log_store->truncate(lsn); + m_truncated_upto_lsn = lsn; + } + + void flush() { m_log_store->flush_sync(); } + + bool has_all_lsns_truncated() const { return (m_truncated_upto_lsn.load() == (m_cur_lsn.load() - 1)); } + + static test_log_data* prepare_data(const logstore_seq_num_t lsn, bool& io_memory) { + static thread_local std::random_device rd{}; + static thread_local std::default_random_engine re{rd()}; + uint32_t sz{0}; + uint8_t* raw_buf{nullptr}; + + // Generate buffer of random size and fill with specific data + std::uniform_int_distribution< uint8_t > gen_percentage{0, 99}; + std::uniform_int_distribution< uint32_t > gen_data_size{0, max_data_size - 1}; + if (gen_percentage(re) < static_cast< uint8_t >(10)) { + // 10% of data is dma'ble aligned boundary + const auto alloc_sz = sisl::round_up(gen_data_size(re) + sizeof(test_log_data), s_max_flush_multiple); + raw_buf = iomanager.iobuf_alloc(dma_address_boundary, alloc_sz); + sz = alloc_sz - sizeof(test_log_data); + io_memory = true; + } else { + sz = gen_data_size(re); + raw_buf = static_cast< uint8_t* >(std::malloc(sizeof(test_log_data) + sz)); + io_memory = false; + } + + test_log_data* d = new (raw_buf) test_log_data(); + d->size = sz; + + assert(uintptr_cast(d) == raw_buf); + + const char c = static_cast< char >((lsn % 94) + 33); + std::memset(voidptr_cast(d->get_data()), c, static_cast< size_t >(d->size)); + return d; + } + +private: + void validate_data(const test_log_data* d, const logstore_seq_num_t lsn) { + const char c = static_cast< char >((lsn % 94) + 33); + const std::string actual = d->get_data_str(); + const std::string expected(static_cast< size_t >(d->size), + c); // needs to be () because of same reason as vector + ASSERT_EQ(actual, expected) << "Data mismatch for LSN=" << m_log_store->get_store_id() << ":" << lsn + << " size=" << d->size; + } + + friend class LogStoreLongRun; + +private: + static constexpr uint32_t max_data_size = 1024; + static uint64_t s_max_flush_multiple; + + logstore_id_t m_store_id; + test_log_store_comp_cb_t m_comp_cb; + std::atomic< logstore_seq_num_t > m_truncated_upto_lsn = -1; + std::atomic< logstore_seq_num_t > m_cur_lsn = 0; + std::shared_ptr< HomeLogStore > m_log_store; + int64_t m_n_recovered_lsns = 0; + int64_t m_n_recovered_truncated_lsns = 0; + logdev_id_t m_logdev_id; +}; + +uint64_t SampleLogStoreClient::s_max_flush_multiple = 0; + +class LogStoreLongRun : public ::testing::Test { +public: + void start_homestore(bool restart = false) { + auto n_log_stores = SISL_OPTIONS["num_logstores"].as< uint32_t >(); + if (n_log_stores < 4u) { + LOGINFO("Log store test needs minimum 4 log stores for testing, setting them to 4"); + n_log_stores = 4u; + } + + if (restart) { + for (auto& lsc : m_log_store_clients) { + lsc->flush(); + } + } + + test_common::HSTestHelper::start_homestore( + "test_log_store", + {{HS_SERVICE::META, {.size_pct = 5.0}}, + {HS_SERVICE::LOG, {.size_pct = 84.0, .chunk_size = 8 * 1024 * 1024, .min_chunk_size = 8 * 1024 * 1024}}}, + [this, restart, n_log_stores]() { + HS_SETTINGS_FACTORY().modifiable_settings([](auto& s) { + // Disable flush and resource mgr timer in UT. + s.logstore.flush_timer_frequency_us = 0; + s.resource_limits.resource_audit_timer_ms = 0; + }); + HS_SETTINGS_FACTORY().save(); + + if (restart) { + for (uint32_t i{0}; i < n_log_stores; ++i) { + SampleLogStoreClient* client = m_log_store_clients[i].get(); + logstore_service().open_logdev(client->m_logdev_id); + logstore_service() + .open_log_store(client->m_logdev_id, client->m_store_id, false /* append_mode */) + .thenValue([i, this, client](auto log_store) { client->set_log_store(log_store); }); + } + } + }, + restart); + + if (!restart) { + logdev_id_t logdev_id; + for (uint32_t i{0}; i < n_log_stores; ++i) { + logdev_id = logstore_service().create_new_logdev(); + m_log_store_clients.push_back(std::make_unique< SampleLogStoreClient >( + logdev_id, bind_this(LogStoreLongRun::on_log_insert_completion, 3))); + } + SampleLogStoreClient::s_max_flush_multiple = + logstore_service().get_logdev(logdev_id)->get_flush_size_multiple(); + } + } + + void shutdown(bool cleanup = true) { + test_common::HSTestHelper::shutdown_homestore(cleanup); + if (cleanup) { + m_log_store_clients.clear(); + m_highest_log_idx.clear(); + } + } + + bool delete_log_store(logstore_id_t store_id) { + bool removed{false}; + for (auto it = std::begin(m_log_store_clients); it != std::end(m_log_store_clients); ++it) { + if ((*it)->m_log_store->get_store_id() == store_id) { + logstore_service().remove_log_store((*it)->m_logdev_id, store_id); + logstore_service().destroy_log_dev((*it)->m_logdev_id); + m_log_store_clients.erase(it); + removed = true; + break; + } + } + return removed; + } + + logid_t highest_log_idx(logdev_id_t fid) { + return m_highest_log_idx.count(fid) ? m_highest_log_idx[fid].load() : -1; + } + + void init() { + for (auto& lsc : m_log_store_clients) { + lsc->reset_recovery(); + } + } + + void kickstart_inserts(uint64_t n_total_records, uint32_t batch_size, uint32_t q_depth) { + m_nrecords_waiting_to_issue = m_log_store_clients.size() * n_total_records; + m_nrecords_waiting_to_complete = 0; + + m_batch_size = batch_size; + m_q_depth = (m_log_store_clients.size() + 1) * q_depth; + iomanager.run_on_forget(iomgr::reactor_regex::all_io, [this]() { do_insert(); }); + } + + void do_insert() { + // We insert batch of records for each logstore in round robin fashion. + auto it = m_log_store_clients.begin(); + for (;;) { + uint32_t batch_size{0}; + { + std::unique_lock< std::mutex > lock{m_pending_mtx}; + const bool insert = (m_nrecords_waiting_to_issue > 0) && (m_nrecords_waiting_to_complete < m_q_depth); + if (insert) { + batch_size = std::min< uint32_t >(m_batch_size, m_nrecords_waiting_to_issue); + m_nrecords_waiting_to_issue -= batch_size; + m_nrecords_waiting_to_complete += batch_size; + } else { + break; + } + } + + (*it)->insert_next_batch(batch_size); + it++; + if (it == m_log_store_clients.end()) { it = m_log_store_clients.begin(); } + } + } + + void on_log_insert_completion(logdev_id_t fid, logstore_seq_num_t lsn, logdev_key ld_key) { + if (m_highest_log_idx.count(fid) == 0) { m_highest_log_idx[fid] = std::atomic{-1}; } + atomic_update_max(m_highest_log_idx[fid], ld_key.idx); + on_insert_completion(fid, lsn, ld_key); + } + + void on_insert_completion([[maybe_unused]] logdev_id_t fid, logstore_seq_num_t lsn, logdev_key ld_key) { + bool notify{false}; + uint64_t waiting_to_issue{0}; + { + std::unique_lock< std::mutex > lock{m_pending_mtx}; + waiting_to_issue = m_nrecords_waiting_to_issue; + if ((--m_nrecords_waiting_to_complete == 0) && (waiting_to_issue == 0)) { notify = true; } + } + if (notify) { + m_pending_cv.notify_all(); + } else if (waiting_to_issue > 0) { + do_insert(); + } + } + + void wait_for_inserts() { + std::unique_lock< std::mutex > lk{m_pending_mtx}; + m_pending_cv.wait(lk, + [&] { return (m_nrecords_waiting_to_issue == 0) && (m_nrecords_waiting_to_complete == 0); }); + } + + void read_validate(bool expect_all_completed = false) { + for (const auto& lsc : m_log_store_clients) { + lsc->read_validate(expect_all_completed); + } + } + + void rollback_validate() { + for (const auto& lsc : m_log_store_clients) { + lsc->rollback_validate(1); + } + } + + void truncate_validate(bool is_parallel_to_write = false) { + int skip_truncation = 0; + + for (size_t i{0}; i < m_log_store_clients.size(); ++i) { + const auto& lsc = m_log_store_clients[i]; + + // lsc->truncate(lsc->m_cur_lsn.load() - 1); + const auto t_seq_num = lsc->m_log_store->truncated_upto(); + const auto c_seq_num = lsc->m_log_store->get_contiguous_completed_seq_num(-1); + if (t_seq_num == c_seq_num) { + ++skip_truncation; + continue; + } + + // Truncate at random point between last truncated and completed seq num. + std::uniform_int_distribution< int64_t > gen_data_size{t_seq_num, c_seq_num}; + lsc->truncate(gen_data_size(rd)); + lsc->read_validate(); + } + + if (skip_truncation) { + /* not needed to call device truncate as one log store is not truncated */ + return; + } + + logstore_service().device_truncate( + [this](auto& trunc_lds) { + for (auto [fid, trunc_loc] : trunc_lds) { + m_truncate_log_idx[fid].store(trunc_loc.idx); + } + }, + true /* wait_till_done */); + validate_num_stores(); + } + + void flush() { + for (auto& lsc : m_log_store_clients) { + lsc->flush(); + } + } + + void recovery_validate() { + for (size_t i{0}; i < m_log_store_clients.size(); ++i) { + const auto& lsc = m_log_store_clients[i]; + lsc->recovery_validate(); + } + } + + void delete_create_logstore() { + // Delete a random logstore. + std::uniform_int_distribution< uint64_t > gen{0, m_log_store_clients.size() - 1}; + uint64_t idx = gen(rd); + auto fid = m_log_store_clients[idx]->m_logdev_id; + + delete_log_store(m_log_store_clients[idx]->m_store_id); + validate_num_stores(); + + // Create a new logstore. + auto logdev_id = logstore_service().create_new_logdev(); + m_log_store_clients.push_back(std::make_unique< SampleLogStoreClient >( + logdev_id, bind_this(LogStoreLongRun::on_log_insert_completion, 3))); + validate_num_stores(); + } + + void validate_num_stores() { + size_t actual_valid_ids{0}; + size_t actual_garbage_ids{0}; + size_t exp_garbage_store_count{0}; + size_t actual_logdevs{0}; + + for (auto& logdev : logstore_service().get_all_logdevs()) { + auto fid = logdev->get_id(); + std::vector< logstore_id_t > reg_ids, garbage_ids; + + logdev->get_registered_store_ids(reg_ids, garbage_ids); + actual_valid_ids += reg_ids.size() - garbage_ids.size(); + actual_logdevs++; + } + + ASSERT_EQ(actual_logdevs, m_log_store_clients.size()); + ASSERT_EQ(actual_valid_ids, m_log_store_clients.size()); + } + + virtual void SetUp() override { start_homestore(); } + virtual void TearDown() override { shutdown(true /* cleanup*/); } + + uint64_t get_elapsed_time(Clock::time_point start) { + std::chrono::seconds sec = std::chrono::duration_cast< std::chrono::seconds >(Clock::now() - start); + return sec.count(); + } + + bool elapsed(uint64_t print_every_n_secs) { + static Clock::time_point start = Clock::now(); + auto elapsed_time = get_elapsed_time(start); + if (elapsed_time > print_every_n_secs) { + start = Clock::now(); + return true; + } + return false; + } + +private: + std::vector< std::unique_ptr< SampleLogStoreClient > > m_log_store_clients; + std::map< logdev_id_t, std::atomic< logid_t > > m_highest_log_idx; + uint64_t m_nrecords_waiting_to_issue{0}; + uint64_t m_nrecords_waiting_to_complete{0}; + std::mutex m_pending_mtx; + std::condition_variable m_pending_cv; + std::map< logdev_id_t, std::atomic< logid_t > > m_truncate_log_idx; + uint32_t m_q_depth{64}; + uint32_t m_batch_size{1}; + std::random_device rd{}; + std::default_random_engine re{rd()}; +}; + +TEST_F(LogStoreLongRun, LongRunning) { + auto run_time = SISL_OPTIONS["run_time"].as< uint64_t >(); + auto num_iterations = SISL_OPTIONS["num_iterations"].as< uint32_t >(); + auto start_time = Clock::now(); + uint32_t iterations = 1; + + init(); + + while (true) { + // Start insert of 100 log entries on all logstores with num batch of 10 in parallel fashion as a burst. + kickstart_inserts(100 /* num_records */, 10 /* batch */, 5000 /* q_depth */); + + // Wait for inserts. + wait_for_inserts(); + + if (iterations % 60 == 0) { + // Validate all the logstores. + read_validate(true); + } + + if (iterations % 15 == 0) { + // Truncate at random lsn every 15 iterations. + truncate_validate(); + } + + if (iterations % 10 == 0) { + // Restart at every 30 iterations. + LOGDEBUG("Restart homestore"); + start_homestore(true /* restart */); + recovery_validate(); + init(); + } + + if (iterations % 60 == 0) { + // Add and validate rollback records. + rollback_validate(); + } + + if (iterations % 10 == 0) { + // Add and validate rollback records. + delete_create_logstore(); + } + + if (iterations % 10 == 0) { LOGINFO("Iterations completed {}", iterations); } + + auto elapsed = get_elapsed_time(start_time); + if (elapsed >= run_time && iterations >= num_iterations) { + LOGINFO("Finished test. Num iterations {} Elapsed {}", iterations, elapsed); + break; + } + + iterations++; + } +} + +SISL_OPTIONS_ENABLE(logging, test_log_store, iomgr, test_common_setup) +SISL_OPTION_GROUP(test_log_store, + (num_logstores, "", "num_logstores", "number of log stores", + ::cxxopts::value< uint32_t >()->default_value("1000"), "number"), + (num_records, "", "num_records", "number of record to test", + ::cxxopts::value< uint32_t >()->default_value("10000"), "number"), + (num_iterations, "", "num_iterations", "Iterations", + ::cxxopts::value< uint32_t >()->default_value("1"), "the number of iterations to run each test"), + (run_time, "", "run_time", "running time in seconds", + ::cxxopts::value< uint64_t >()->default_value("600"), "number")); + +int main(int argc, char* argv[]) { + int parsed_argc = argc; + ::testing::InitGoogleTest(&parsed_argc, argv); + SISL_OPTIONS_LOAD(parsed_argc, argv, logging, test_log_store, iomgr, test_common_setup); + sisl::logging::SetLogger("test_log_store"); + spdlog::set_pattern("[%D %T%z] [%^%l%$] [%t] %v"); + sisl::logging::SetModuleLogLevel("logstore", spdlog::level::level_enum::debug); + sisl::logging::SetModuleLogLevel("journalvdev", spdlog::level::level_enum::info); + + const int ret = RUN_ALL_TESTS(); + return ret; +}