diff --git a/conanfile.py b/conanfile.py index 47c449e25..af88ccdcb 100644 --- a/conanfile.py +++ b/conanfile.py @@ -5,7 +5,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "6.4.8" + version = "6.4.9" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" diff --git a/src/lib/device/journal_vdev.cpp b/src/lib/device/journal_vdev.cpp index 247f0a16b..ac59ca258 100644 --- a/src/lib/device/journal_vdev.cpp +++ b/src/lib/device/journal_vdev.cpp @@ -45,15 +45,15 @@ JournalVirtualDev::JournalVirtualDev(DeviceManager& dmgr, const vdev_info& vinfo m_init_private_data = std::make_shared< JournalChunkPrivate >(); m_chunk_pool = std::make_unique< ChunkPool >( dmgr, - ChunkPool::Params{ - HS_DYNAMIC_CONFIG(generic.journal_chunk_pool_capacity), - [this]() { - m_init_private_data->created_at = get_time_since_epoch_ms(); - m_init_private_data->end_of_chunk = m_vdev_info.chunk_size; - sisl::blob private_blob{r_cast< uint8_t* >(m_init_private_data.get()), sizeof(JournalChunkPrivate)}; - return private_blob; - }, - m_vdev_info.hs_dev_type, m_vdev_info.vdev_id, m_vdev_info.chunk_size}); + ChunkPool::Params{HS_DYNAMIC_CONFIG(generic.journal_chunk_pool_capacity), + [this]() { + m_init_private_data->created_at = get_time_since_epoch_ms(); + m_init_private_data->end_of_chunk = m_vdev_info.chunk_size; + sisl::blob private_blob{r_cast< uint8_t* >(m_init_private_data.get()), + sizeof(JournalChunkPrivate)}; + return private_blob; + }, + m_vdev_info.hs_dev_type, m_vdev_info.vdev_id, m_vdev_info.chunk_size}); resource_mgr().register_journal_vdev_exceed_cb([this]([[maybe_unused]] int64_t dirty_buf_count, bool critical) { // either it is critical or non-critical, call cp_flush; @@ -152,6 +152,19 @@ void JournalVirtualDev::remove_journal_chunks(std::vector< shared< Chunk > >& ch } } +void JournalVirtualDev::release_chunk_to_pool(shared< Chunk > chunk) { + // Clear the private chunk data before adding to pool. + auto* data = r_cast< JournalChunkPrivate* >(const_cast< uint8_t* >(chunk->user_private())); + *data = JournalChunkPrivate{}; + update_chunk_private(chunk, data); + + // We ideally want to zero out chunks as chunks are reused after free across + // logdev's. But zero out chunk is very expensive, We look at crc mismatches + // to know the end offset of the log dev during recovery. + m_chunk_pool->enqueue(chunk); + LOGINFOMOD(journalvdev, "Released chunk to pool {}", chunk->to_string()); +} + void JournalVirtualDev::update_chunk_private(shared< Chunk >& chunk, JournalChunkPrivate* private_data) { sisl::blob private_blob{r_cast< uint8_t* >(private_data), sizeof(JournalChunkPrivate)}; chunk->set_user_private(private_blob); @@ -172,7 +185,7 @@ shared< JournalVirtualDev::Descriptor > JournalVirtualDev::open(logdev_id_t logd LOGINFOMOD(journalvdev, "Opened journal vdev descriptor log_dev={}", logdev_id); for (auto& chunk : it->second->m_journal_chunks) { - LOGINFOMOD(journalvdev, " log_dev={} end_of_chunk={} chunk={}", logdev_id, get_end_of_chunk(chunk), + LOGINFOMOD(journalvdev, "log_dev={} end_of_chunk={} chunk {}", logdev_id, to_hex(get_end_of_chunk(chunk)), chunk->to_string()); } return it->second; @@ -242,7 +255,7 @@ off_t JournalVirtualDev::Descriptor::alloc_next_append_blk(size_t sz) { if ((tail_offset() + static_cast< off_t >(sz)) >= m_end_offset) { // not enough space left, add a new chunk. - LOGDEBUGMOD(journalvdev, "No space left for size {} Creating chunk desc {}", sz, to_string()); + LOGINFOMOD(journalvdev, "No space left for size {} Creating chunk desc {}", sz, to_string()); #ifdef _PRERELEASE iomgr_flip::test_and_abort("abort_before_update_eof_cur_chunk"); @@ -373,13 +386,13 @@ void JournalVirtualDev::Descriptor::sync_pwritev(const iovec* iov, int iovcnt, o } /////////////////////////////// Read Section ////////////////////////////////// -size_t JournalVirtualDev::Descriptor::sync_next_read(uint8_t* buf, size_t size_rd) { - if (m_journal_chunks.empty()) { return 0; } +int64_t JournalVirtualDev::Descriptor::sync_next_read(uint8_t* buf, size_t size_rd) { + if (m_journal_chunks.empty()) { return -1; } HS_REL_ASSERT_LE(m_seek_cursor, m_end_offset, "seek_cursor {} exceeded end_offset {}", m_seek_cursor, m_end_offset); if (m_seek_cursor >= m_end_offset) { LOGTRACEMOD(journalvdev, "sync_next_read reached end of chunks"); - return 0; + return -1; } auto [chunk, _, offset_in_chunk] = offset_to_chunk(m_seek_cursor); @@ -400,6 +413,10 @@ size_t JournalVirtualDev::Descriptor::sync_next_read(uint8_t* buf, size_t size_r // truncate size to what is left; size_rd = end_of_chunk - offset_in_chunk; across_chunk = true; + if (size_rd == 0) { + // If there are no more data in the current chunk, move the seek_cursor to the next chunk. + m_seek_cursor += (chunk->size() - end_of_chunk); + } } if (buf == nullptr) { return size_rd; } @@ -542,11 +559,9 @@ void JournalVirtualDev::Descriptor::update_tail_offset(off_t tail) { LOGINFOMOD(journalvdev, "Updated tail offset arg 0x{} desc {} ", to_hex(tail), to_string()); } -void JournalVirtualDev::Descriptor::truncate(off_t truncate_offset) { +off_t JournalVirtualDev::Descriptor::truncate(off_t truncate_offset) { const off_t ds_off = data_start_offset(); - COUNTER_INCREMENT(m_vdev.m_metrics, vdev_truncate_count, 1); - HS_PERIODIC_LOG(DEBUG, journalvdev, "truncating to logical offset: 0x{} desc {}", to_hex(truncate_offset), to_string()); @@ -559,11 +574,26 @@ void JournalVirtualDev::Descriptor::truncate(off_t truncate_offset) { } // Find the chunk which has the truncation offset. This will be the new - // head chunk in the list. We first update the is_head is true of this chunk. + // head chunk in the list. We first update the is_head to true for that new head chunk. // So if a crash happens after this, we could have two chunks which has is_head // true in the list and during recovery we select head with the highest creation - // timestamp and reuse or cleanup the other. - auto [new_head_chunk, _, offset_in_chunk] = offset_to_chunk(truncate_offset); + // timestamp and cleanup the other. We check if the truncation offset happens to be + // between the end_of_chunk mark and actual chunk end. In that case there is not data + // in that chunk and we release this chunk and select the next chunk. + bool update_truncate_offset = false; + auto [new_head_chunk, index, offset_in_chunk] = offset_to_chunk(truncate_offset); + auto end_new_head_chunk = m_vdev.get_end_of_chunk(new_head_chunk); + if (offset_in_chunk >= static_cast< int64_t >(end_new_head_chunk)) { + // If the truncation offset is same as end_of_chunk, we dont have any more + // data in this chunk to read. Select the next chunk. + if (++index <= (m_journal_chunks.size() - 1)) { + // Go to the next chunk to make it the new head chunk. + new_head_chunk = m_journal_chunks[index]; + update_truncate_offset = true; + size_to_truncate += (new_head_chunk->size() - end_new_head_chunk); + } + } + auto* private_data = r_cast< JournalChunkPrivate* >(const_cast< uint8_t* >(new_head_chunk->user_private())); private_data->is_head = true; private_data->logdev_id = m_logdev_id; @@ -571,34 +601,63 @@ void JournalVirtualDev::Descriptor::truncate(off_t truncate_offset) { // Find all chunks which needs to be removed from the start of m_journal_chunks. // We stop till the truncation offset. Start from the old data_start_offset. - // Align the data_start_offset to the chunk_size as we deleting chunks and - // all chunks are same size in a journal vdev. - uint32_t start = sisl::round_down(ds_off, m_vdev.info().chunk_size); + // All the chunks which are covered inside the truncate offset are released. + // cover_offset is a logical offset. + index = 0; + off_t tail_off = + static_cast< off_t >(data_start_offset() + m_write_sz_in_total.load(std::memory_order_relaxed)) + m_reserved_sz; + auto chunk_size = m_vdev.info().chunk_size; + LOGINFOMOD(journalvdev, "Truncate begin truncate {} desc {}", to_hex(truncate_offset), to_string()); + +#ifdef _PRERELEASE + for (auto it = m_journal_chunks.begin(); it != m_journal_chunks.end(); ++it) { + auto chunk = *it; + auto* private_data = r_cast< JournalChunkPrivate* >(const_cast< uint8_t* >(chunk->user_private())); + LOGINFOMOD(journalvdev, "log_dev={} chunk_id={} is_head={}", m_logdev_id, chunk->chunk_id(), + private_data->is_head) + } +#endif + + off_t cover_offset = sisl::round_down(data_start_offset(), chunk_size); + auto total_num_chunks = m_journal_chunks.size(); for (auto it = m_journal_chunks.begin(); it != m_journal_chunks.end();) { auto chunk = *it; - start += chunk->size(); + off_t end_of_chunk = m_vdev.get_end_of_chunk(chunk); + off_t chunk_hole = 0; + if (index == total_num_chunks - 1) { + // If its the last chunk, only read upto the tail_offset + // There are no holes in the last chunk. + cover_offset += (tail_off % chunk_size); + } else { + // For other chunks take the whole size. + cover_offset += chunk_size; + chunk_hole = (chunk_size - end_of_chunk); + } - // Also if its the last chunk and there is no data after truncate, we release chunk. - auto write_sz_in_total = m_write_sz_in_total.load(std::memory_order_relaxed); - if (start >= truncate_offset) { break; } + index++; - m_total_size -= chunk->size(); - it = m_journal_chunks.erase(it); + // Check if the offset is inside the truncate offset. We also check if the truncate offset lies + // between end_of_chunk - chunk_size. If either condition satifies, we release the chunks. + if (cover_offset <= truncate_offset || ((cover_offset - chunk_hole) <= truncate_offset)) { + m_total_size -= chunk->size(); + it = m_journal_chunks.erase(it); - // Clear the private chunk data before adding to pool. - auto* data = r_cast< JournalChunkPrivate* >(const_cast< uint8_t* >(chunk->user_private())); - *data = JournalChunkPrivate{}; - m_vdev.update_chunk_private(chunk, data); + // Release the chunk back to pool. + LOGINFOMOD(journalvdev, + "Released chunk_id={} log_dev={} cover={} truncate_offset={} tail={} end_of_chunk={} desc {}", + chunk->chunk_id(), m_logdev_id, to_hex(cover_offset), to_hex(truncate_offset), to_hex(tail_off), + m_vdev.get_end_of_chunk(chunk), to_string()); + m_vdev.release_chunk_to_pool(chunk); + } else { + ++it; + } + } - // We ideally want to zero out chunks as chunks are reused after free across - // logdev's. But zero out chunk is very expensive, We look at crc mismatches - // to know the end offset of the log dev during recovery. - // Format and add back to pool. - m_vdev.m_chunk_pool->enqueue(chunk); - LOGINFOMOD(journalvdev, "After truncate released chunk {}", chunk->to_string()); + if (update_truncate_offset) { + // Update the truncate offset to align with the chunk size. + truncate_offset = sisl::round_up(truncate_offset, m_vdev.info().chunk_size); } - // Update our start offset, to keep track of actual size HS_REL_ASSERT_LE(truncate_offset, m_end_offset, "truncate offset less than end offset"); update_data_start_offset(truncate_offset); @@ -606,7 +665,17 @@ void JournalVirtualDev::Descriptor::truncate(off_t truncate_offset) { m_write_sz_in_total.fetch_sub(size_to_truncate, std::memory_order_relaxed); m_truncate_done = true; - HS_PERIODIC_LOG(INFO, journalvdev, "After truncate desc {}", to_string()); +#ifdef _PRERELEASE + for (auto it = m_journal_chunks.begin(); it != m_journal_chunks.end(); ++it) { + auto chunk = *it; + auto* private_data = r_cast< JournalChunkPrivate* >(const_cast< uint8_t* >(chunk->user_private())); + LOGINFOMOD(journalvdev, "log_dev={} chunk_id={} is_head={}", m_logdev_id, chunk->chunk_id(), + private_data->is_head) + } +#endif + + LOGINFOMOD(journalvdev, "Truncate end truncate {} desc {}", to_hex(truncate_offset), to_string()); + return data_start_offset(); } #if 0 diff --git a/src/lib/device/journal_vdev.hpp b/src/lib/device/journal_vdev.hpp index 388799a87..c9bb1dc23 100644 --- a/src/lib/device/journal_vdev.hpp +++ b/src/lib/device/journal_vdev.hpp @@ -158,9 +158,9 @@ class JournalVirtualDev : public VirtualDev { * * @return : On success, the number of bytes read is returned (zero indicates end of file), and the cursor is * advanced by this number. it is not an error if this number is smaller than the number requested, because it - * can be end of chunk, since read won't across chunk. + * can be end of chunk, since read won't across chunk. Returns -1 if there are no bytes available to read. */ - size_t sync_next_read(uint8_t* buf, size_t count_in); + int64_t sync_next_read(uint8_t* buf, size_t count_in); /** * @brief : reads up to count bytes at offset into the buffer starting at buf. @@ -275,8 +275,9 @@ class JournalVirtualDev : public VirtualDev { * 1. update in-memory counter of total write size. * 2. update vdev superblock of the new start logical offset that is being truncate to; * + * @return : return the new data start offset after truncation. */ - void truncate(off_t truncate_offset); + off_t truncate(off_t truncate_offset); /** * @brief : get the total size in journal @@ -416,6 +417,7 @@ class JournalVirtualDev : public VirtualDev { uint64_t num_descriptors() const { return m_journal_descriptors.size(); } void remove_journal_chunks(std::vector< shared< Chunk > >& chunks); + void release_chunk_to_pool(shared< Chunk > chunk); void update_chunk_private(shared< Chunk >& chunk, JournalChunkPrivate* chunk_private); uint64_t get_end_of_chunk(shared< Chunk >& chunk) const; diff --git a/src/lib/logstore/log_dev.cpp b/src/lib/logstore/log_dev.cpp index b09c6eb45..d43ec8b21 100644 --- a/src/lib/logstore/log_dev.cpp +++ b/src/lib/logstore/log_dev.cpp @@ -271,7 +271,11 @@ int64_t LogDev::append_async(const logstore_id_t store_id, const logstore_seq_nu log_buffer LogDev::read(const logdev_key& key, serialized_log_record& return_record_header) { auto buf = sisl::make_byte_array(initial_read_size, m_flush_size_multiple, sisl::buftag::logread); - m_vdev_jd->sync_pread(buf->bytes(), initial_read_size, key.dev_offset); + auto ec = m_vdev_jd->sync_pread(buf->bytes(), initial_read_size, key.dev_offset); + if (ec) { + LOGERROR("Failed to read from ournal vdev log_dev={} {} {}", m_logdev_id, ec.value(), ec.message()); + return {}; + } auto* header = r_cast< const log_group_header* >(buf->cbytes()); // THIS_LOGDEV_LOG(TRACE, "Logdev read log group header {}", *header); @@ -571,15 +575,15 @@ uint64_t LogDev::truncate(const logdev_key& key) { "Truncating log device upto log_dev={} log_id={} vdev_offset={} truncated {} log records", m_logdev_id, key.idx, key.dev_offset, num_records_to_truncate); m_log_records->truncate(key.idx); - m_vdev_jd->truncate(key.dev_offset); - THIS_LOGDEV_LOG(DEBUG, "LogDev::truncate done {} ", key.idx); + off_t new_offset = m_vdev_jd->truncate(key.dev_offset); + THIS_LOGDEV_LOG(DEBUG, "LogDev::truncate done {} offset old {} new {}", key.idx, key.dev_offset, new_offset); m_last_truncate_idx = key.idx; { std::unique_lock< std::mutex > lk{m_meta_mutex}; // Update the start offset to be read upon restart - m_logdev_meta.set_start_dev_offset(key.dev_offset, key.idx + 1, false /* persist_now */); + m_logdev_meta.set_start_dev_offset(new_offset, key.idx + 1, false /* persist_now */); // Now that store is truncated, we can reclaim the store ids which are garbaged (if any) earlier #ifdef _PRERELEASE @@ -658,7 +662,7 @@ std::shared_ptr< HomeLogStore > LogDev::create_new_log_store(bool append_mode) { HS_REL_ASSERT((it == m_id_logstore_map.end()), "store_id {}-{} already exists", m_logdev_id, store_id); m_id_logstore_map.insert(std::pair(store_id, logstore_info{.log_store = lstore, .append_mode = append_mode})); } - LOGINFO("Created log store log_dev={} log_store={}", m_logdev_id, store_id); + HS_LOG(INFO, logstore, "Created log store log_dev={} log_store={}", m_logdev_id, store_id); return lstore; } @@ -934,7 +938,9 @@ void LogDevMetadata::reset() { } void LogDevMetadata::logdev_super_blk_found(const sisl::byte_view& buf, void* meta_cookie) { + m_sb.load(buf, meta_cookie); + LOGINFO("Logdev superblk found log_dev={}", m_sb->logdev_id); HS_REL_ASSERT_EQ(m_sb->get_magic(), logdev_superblk::LOGDEV_SB_MAGIC, "Invalid logdev metablk, magic mismatch"); HS_REL_ASSERT_EQ(m_sb->get_version(), logdev_superblk::LOGDEV_SB_VERSION, "Invalid version of logdev metablk"); } diff --git a/src/lib/logstore/log_dev.hpp b/src/lib/logstore/log_dev.hpp index 7affcce96..2614ba25c 100644 --- a/src/lib/logstore/log_dev.hpp +++ b/src/lib/logstore/log_dev.hpp @@ -569,7 +569,7 @@ class log_stream_reader { sisl::byte_view group_in_next_page(); private: - sisl::byte_view read_next_bytes(uint64_t nbytes); + sisl::byte_view read_next_bytes(uint64_t nbytes, bool& end_of_stream); private: JournalVirtualDev* m_vdev; diff --git a/src/lib/logstore/log_store_service.cpp b/src/lib/logstore/log_store_service.cpp index 148363325..46e55655f 100644 --- a/src/lib/logstore/log_store_service.cpp +++ b/src/lib/logstore/log_store_service.cpp @@ -125,7 +125,7 @@ logdev_id_t LogStoreService::create_new_logdev() { auto logdev = create_new_logdev_internal(logdev_id); logdev->start(true /* format */); COUNTER_INCREMENT(m_metrics, logdevs_count, 1); - LOGINFO("Created log_dev={}", logdev_id); + HS_LOG(INFO, logstore, "Created log_dev={}", logdev_id); return logdev_id; } @@ -149,12 +149,12 @@ void LogStoreService::destroy_log_dev(logdev_id_t logdev_id) { m_id_logdev_map.erase(it); COUNTER_DECREMENT(m_metrics, logdevs_count, 1); - LOGINFO("Removed log_dev={}", logdev_id); + HS_LOG(INFO, logstore, "Removed log_dev={}", logdev_id); } void LogStoreService::delete_unopened_logdevs() { for (auto logdev_id : m_unopened_logdev) { - LOGINFO("Deleting unopened log_dev={}", logdev_id); + HS_LOG(INFO, logstore, "Deleting unopened log_dev={}", logdev_id); destroy_log_dev(logdev_id); } m_unopened_logdev.clear(); @@ -178,7 +178,7 @@ void LogStoreService::open_logdev(logdev_id_t logdev_id) { LOGDEBUGMOD(logstore, "log_dev={} does not exist, created!", logdev_id); } m_unopened_logdev.erase(logdev_id); - LOGDEBUGMOD(logstore, "Opened log_dev={}", logdev_id); + HS_LOG(INFO, logstore, "Opened log_dev={}", logdev_id); } std::vector< std::shared_ptr< LogDev > > LogStoreService::get_all_logdevs() { @@ -206,7 +206,7 @@ void LogStoreService::logdev_super_blk_found(const sisl::byte_view& buf, void* m folly::SharedMutexWritePriority::WriteHolder holder(m_logdev_map_mtx); std::shared_ptr< LogDev > logdev; auto id = sb->logdev_id; - LOGDEBUGMOD(logstore, "Log dev superblk found logdev={}", id); + HS_LOG(DEBUG, logstore, "Log dev superblk found logdev={}", id); const auto it = m_id_logdev_map.find(id); // We could update the logdev map either with logdev or rollback superblks found callbacks. if (it != m_id_logdev_map.end()) { @@ -235,7 +235,7 @@ void LogStoreService::rollback_super_blk_found(const sisl::byte_view& buf, void* folly::SharedMutexWritePriority::WriteHolder holder(m_logdev_map_mtx); std::shared_ptr< LogDev > logdev; auto id = rollback_sb->logdev_id; - LOGDEBUGMOD(logstore, "Log dev rollback superblk found logdev={}", id); + HS_LOG(DEBUG, logstore, "Log dev rollback superblk found logdev={}", id); const auto it = m_id_logdev_map.find(id); HS_REL_ASSERT((it != m_id_logdev_map.end()), "found a rollback_super_blk of logdev id {}, but the logdev with id {} doesnt exist", id); diff --git a/src/lib/logstore/log_stream.cpp b/src/lib/logstore/log_stream.cpp index 931e03eef..2a00cd31c 100644 --- a/src/lib/logstore/log_stream.cpp +++ b/src/lib/logstore/log_stream.cpp @@ -44,10 +44,15 @@ sisl::byte_view log_stream_reader::next_group(off_t* out_dev_offset) { read_again: if (m_cur_log_buf.size() < min_needed) { do { - m_cur_log_buf = read_next_bytes(std::max(min_needed, bulk_read_size)); + bool end_of_stream{false}; + m_cur_log_buf = read_next_bytes(std::max(min_needed, bulk_read_size), end_of_stream); + if (end_of_stream) { + LOGDEBUGMOD(logstore, "Logdev reached end of stream {} {}", m_vdev_jd->to_string(), m_cur_read_bytes); + return ret_buf; + } if (m_cur_log_buf.size() == 0) { - LOGINFOMOD(logstore, "Logdev data empty log_dev={}", m_vdev_jd->logdev_id()); - return {}; + LOGDEBUGMOD(logstore, "Logdev data empty {}", m_vdev_jd->logdev_id()); + continue; } } while (m_cur_log_buf.size() < sizeof(log_group_header)); min_needed = 0; @@ -56,8 +61,8 @@ sisl::byte_view log_stream_reader::next_group(off_t* out_dev_offset) { HS_REL_ASSERT_GE(m_cur_log_buf.size(), m_read_size_multiple); const auto* header = r_cast< log_group_header const* >(m_cur_log_buf.bytes()); if (header->magic_word() != LOG_GROUP_HDR_MAGIC) { - LOGINFOMOD(logstore, "Logdev data not seeing magic at pos {}, must have come to end of log_dev={}", - m_vdev_jd->dev_offset(m_cur_read_bytes), m_vdev_jd->logdev_id()); + LOGERROR("Logdev data not seeing magic at pos {}, must have come to end of log_dev={}", + m_vdev_jd->dev_offset(m_cur_read_bytes), m_vdev_jd->logdev_id()); *out_dev_offset = m_vdev_jd->dev_offset(m_cur_read_bytes); // move it by dma boundary if header is not valid m_prev_crc = 0; @@ -71,9 +76,8 @@ sisl::byte_view log_stream_reader::next_group(off_t* out_dev_offset) { // compare it with prev crc if (m_prev_crc != 0 && m_prev_crc != header->prev_grp_crc) { // we reached at the end - LOGINFOMOD(logstore, - "we have reached the end. crc doesn't match offset {} prev crc {} header prev crc {} log_dev={}", - m_vdev_jd->dev_offset(m_cur_read_bytes), header->prev_grp_crc, m_prev_crc, m_vdev_jd->logdev_id()); + LOGERROR("we have reached the end. crc doesn't match offset {} prev crc {} header prev crc {} log_dev={}", + m_vdev_jd->dev_offset(m_cur_read_bytes), header->prev_grp_crc, m_prev_crc, m_vdev_jd->logdev_id()); *out_dev_offset = m_vdev_jd->dev_offset(m_cur_read_bytes); if (!m_vdev_jd->is_offset_at_last_chunk(*out_dev_offset)) { HS_REL_ASSERT(0, "data is corrupted {}", m_vdev_jd->logdev_id()); @@ -92,7 +96,7 @@ sisl::byte_view log_stream_reader::next_group(off_t* out_dev_offset) { goto read_again; } - LOGTRACEMOD(logstore, + LOGDEBUGMOD(logstore, "Logstream read log group of size={} nrecords={} journal_dev_offset {} cur_read_bytes {} buf size " "remaining {} log_dev={}", header->total_size(), header->nrecords(), m_vdev_jd->dev_offset(m_cur_read_bytes), m_cur_read_bytes, @@ -101,10 +105,9 @@ sisl::byte_view log_stream_reader::next_group(off_t* out_dev_offset) { // At this point data seems to be valid. Lets see if a data is written completely by comparing the footer const auto* footer = r_cast< log_group_footer* >((uint64_t)m_cur_log_buf.bytes() + header->footer_offset); if (footer->magic != LOG_GROUP_FOOTER_MAGIC || footer->start_log_idx != header->start_log_idx) { - LOGINFOMOD(logstore, - "last write is not completely written. footer magic {} footer start_log_idx {} header log indx {} " - "log_dev={}", - footer->magic, footer->start_log_idx, header->start_log_idx, m_vdev_jd->logdev_id()); + LOGERROR("last write is not completely written. footer magic {} footer start_log_idx {} header log indx {} " + "log_dev={}", + footer->magic, footer->start_log_idx, header->start_log_idx, m_vdev_jd->logdev_id()); *out_dev_offset = m_vdev_jd->dev_offset(m_cur_read_bytes); // move it by dma boundary if header is not valid m_prev_crc = 0; @@ -123,8 +126,7 @@ sisl::byte_view log_stream_reader::next_group(off_t* out_dev_offset) { (header->total_size() - sizeof(log_group_header))); if (cur_crc != header->cur_grp_crc) { /* This is a valid entry so crc should match */ - LOGERRORMOD(logstore, "crc doesn't match {} log_dev={}", m_vdev_jd->dev_offset(m_cur_read_bytes), - m_vdev_jd->logdev_id()); + LOGERROR("crc doesn't match {} log_dev={}", m_vdev_jd->dev_offset(m_cur_read_bytes), m_vdev_jd->logdev_id()); HS_REL_ASSERT(0, "data is corrupted {}", m_vdev_jd->logdev_id()); *out_dev_offset = m_vdev_jd->dev_offset(m_cur_read_bytes); @@ -151,10 +153,15 @@ sisl::byte_view log_stream_reader::group_in_next_page() { return next_group(&dev_offset); } -sisl::byte_view log_stream_reader::read_next_bytes(uint64_t nbytes) { +sisl::byte_view log_stream_reader::read_next_bytes(uint64_t nbytes, bool& end_of_stream) { // TO DO: Might need to address alignment based on data or fast type const auto prev_pos = m_vdev_jd->seeked_pos(); auto sz_to_read = m_vdev_jd->sync_next_read(nullptr, nbytes); + if (sz_to_read == -1) { + end_of_stream = true; + return sisl::byte_view{m_cur_log_buf}; + } + if (sz_to_read == 0) { return sisl::byte_view{m_cur_log_buf}; } auto out_buf = diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt index 7365d88c5..c10b09dcb 100644 --- a/src/tests/CMakeLists.txt +++ b/src/tests/CMakeLists.txt @@ -89,6 +89,10 @@ if (${io_tests}) target_sources(test_log_dev PRIVATE test_log_dev.cpp) target_link_libraries(test_log_dev hs_logdev homestore ${COMMON_TEST_DEPS} ) + add_executable(test_log_store_long_run) + target_sources(test_log_store_long_run PRIVATE test_log_store_long_run.cpp) + target_link_libraries(test_log_store_long_run hs_logdev homestore ${COMMON_TEST_DEPS}) + set(TEST_METABLK_SOURCE_FILES test_meta_blk_mgr.cpp) add_executable(test_meta_blk_mgr ${TEST_METABLK_SOURCE_FILES}) target_link_libraries(test_meta_blk_mgr homestore ${COMMON_TEST_DEPS} GTest::gmock) @@ -113,7 +117,7 @@ if (${io_tests}) can_build_epoll_io_tests(epoll_tests) if(${epoll_tests}) add_test(NAME LogDev-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_log_dev) - #add_test(NAME LogStore-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_log_store) + add_test(NAME LogStore-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_log_store) add_test(NAME MetaBlkMgr-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_meta_blk_mgr) add_test(NAME DataService-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_data_service) diff --git a/src/tests/test_journal_vdev.cpp b/src/tests/test_journal_vdev.cpp index cfd4d6500..ce0b302ca 100644 --- a/src/tests/test_journal_vdev.cpp +++ b/src/tests/test_journal_vdev.cpp @@ -202,10 +202,7 @@ class JournalDescriptorTest { m_vdev_jd->truncate(off_to_truncate); auto tail_after = m_vdev_jd->tail_offset(); - if (m_vdev_jd->num_chunks_used()) { - HS_DBG_ASSERT_EQ(tail_before, tail_after); - HS_DBG_ASSERT_EQ(off_to_truncate, m_vdev_jd->data_start_offset()); - } + if (m_vdev_jd->num_chunks_used()) { HS_DBG_ASSERT_EQ(tail_before, tail_after); } if (off_to_truncate > m_start_off) { // remove the offsets before truncate offset, since they are not valid for read anymore; @@ -251,12 +248,10 @@ class JournalDescriptorTest { void space_usage_asserts() { auto used_space = m_vdev_jd->used_size(); - auto start_off = m_vdev_jd->data_start_offset(); if (m_vdev_jd->num_chunks_used() != 0) { HS_DBG_ASSERT_GT(m_vdev_jd->size(), 0); HS_DBG_ASSERT_LE(used_space, m_vdev_jd->size()); - HS_DBG_ASSERT_EQ(start_off, m_start_off); } else { HS_DBG_ASSERT_EQ(m_vdev_jd->size(), 0); } @@ -293,7 +288,6 @@ class JournalDescriptorTest { auto tail_offset = m_vdev_jd->tail_offset(); auto start_offset = m_vdev_jd->data_start_offset(); - HS_DBG_ASSERT_EQ(m_start_off, start_offset); if (start_offset < tail_offset) { HS_DBG_ASSERT_GE(off, start_offset, "Wrong offset: {}, start_off: {}", off, start_offset); HS_DBG_ASSERT_LE(off, tail_offset, "Wrong offset: {}, tail_offset: {}", off, tail_offset); @@ -529,10 +523,10 @@ TEST_F(VDevJournalIOTest, MultipleChunkTest) { LOGINFO("Inserting two entries"); for (int i = 0; i < 2; i++) { test.fixed_write(data_size); + writesz += data_size; } // Verify write size has two data entries and one chunk. - writesz = 2 * data_size; test.verify_journal_descriptor(log_dev_jd, {.ds = 0x0, .end = chunk_size, .writesz = writesz, .rsvdsz = 0, .chunks = 1, .trunc = false, .total = chunk_size, .seek = 0x0}); @@ -572,35 +566,42 @@ TEST_F(VDevJournalIOTest, MultipleChunkTest) { test.verify_journal_descriptor(log_dev_jd, {.ds = 0x0, .end = 4 * chunk_size, .writesz = writesz, .rsvdsz = 0, .chunks = 4, .trunc = false, .total = 4 * chunk_size, .seek = 0x0}); - // Truncate two data entries. No change in chunk count, only write size and data start changed. + // Truncate two data entries. Num chunks reduced to 3. Write and total size reduce by 1 chunk. LOGINFO("Truncating two entries"); - uint64_t trunc_sz = 2 * data_size; uint64_t trunc_offset = 2 * data_size; + writesz -= chunk_size; test.truncate(trunc_offset); - test.verify_journal_descriptor(log_dev_jd, {.ds = trunc_offset, .end = 4 * chunk_size, .writesz = writesz - trunc_sz, - .rsvdsz = 0, .chunks = 4, .trunc = true, .total = 4 * chunk_size, .seek = 0x0}); + test.verify_journal_descriptor(log_dev_jd, {.ds = chunk_size, .end = 4 * chunk_size, .writesz = writesz, + .rsvdsz = 0, .chunks = 3, .trunc = true, .total = 3 * chunk_size, .seek = 0x0}); - // Truncate one more entry. Release one chunk back and reduce chunk count. Increase the data start. + // Truncate one more entry. LOGINFO("Truncating one entry"); trunc_offset = chunk_size + data_size; - trunc_sz = chunk_size + data_size; test.truncate(trunc_offset); - test.verify_journal_descriptor(log_dev_jd, {.ds = trunc_offset, .end = 4 * chunk_size, .writesz = writesz - trunc_sz, + test.verify_journal_descriptor(log_dev_jd, {.ds = trunc_offset, .end = 4 * chunk_size, .writesz = writesz - data_size, .rsvdsz = 0, .chunks = 3, .trunc = true, .total = 3 * chunk_size, .seek = 0x0}); // Restart homestore and restore the offsets. LOGINFO("Restart homestore"); restart_restore(); - test.verify_journal_descriptor(log_dev_jd, {.ds = trunc_offset, .end = 4 * chunk_size, .writesz = writesz - trunc_sz, + test.verify_journal_descriptor(log_dev_jd, {.ds = trunc_offset, .end = 4 * chunk_size, .writesz = writesz - data_size, .rsvdsz = 0, .chunks = 3, .trunc = false, .total = 3 * chunk_size, .seek = 0x0}); test.read_all(); - // Truncate all entries. Num chunks 1, write sz should be 0. + // Truncate one more entry. This will release one more chunk. + LOGINFO("Truncating one entry"); + trunc_offset = chunk_size + 2 * data_size; + writesz -= chunk_size; + test.truncate(trunc_offset); + test.verify_journal_descriptor(log_dev_jd, {.ds = 2 * chunk_size, .end = 4 * chunk_size, .writesz = writesz, + .rsvdsz = 0, .chunks = 2, .trunc = true, .total = 2 * chunk_size, .seek = 0x0}); + + // Truncate all entries. Release all chunks. Num chunks 0, write sz should be 0. LOGINFO("Truncating all entries"); trunc_offset = log_dev_jd->tail_offset(); test.truncate(trunc_offset); - test.verify_journal_descriptor(log_dev_jd, {.ds = trunc_offset, .end = 4 * chunk_size, .writesz = 0, .rsvdsz = 0, - .chunks = 1, .trunc = true, .total = chunk_size, .seek = 0x0}); + test.verify_journal_descriptor(log_dev_jd, {.ds = 4 * chunk_size, .end = 4 * chunk_size, .writesz = 0, .rsvdsz = 0, + .chunks = 0, .trunc = true, .total = 0, .seek = 0x0}); // clang-format on } diff --git a/src/tests/test_log_store.cpp b/src/tests/test_log_store.cpp index 8d5d6c17d..1858236f3 100644 --- a/src/tests/test_log_store.cpp +++ b/src/tests/test_log_store.cpp @@ -257,7 +257,7 @@ class SampleLogStoreClient { validate_data(tl, i); } } catch (const std::exception& e) { - logstore_seq_num_t trunc_upto = get_truncated_upto(); + logstore_seq_num_t trunc_upto = m_log_store->truncated_upto(); if (!expect_all_completed) { LOGINFO("got store {} trunc_upto {} {} {}", m_log_store->get_store_id(), trunc_upto, i, m_log_store->log_records().get_status(2).dump(' ', 2)); @@ -319,7 +319,6 @@ class SampleLogStoreClient { } void recovery_validate() { - LOGINFO("Truncated upto {}", get_truncated_upto()); LOGINFO("Totally recovered {} non-truncated lsns and {} truncated lsns for store {} log_dev {}", m_n_recovered_lsns, m_n_recovered_truncated_lsns, m_log_store->get_store_id(), m_log_store->get_logdev()->get_id()); @@ -503,6 +502,7 @@ class SampleDB { restart); if (!restart) { + // Create one logdev for each logstore. std::vector< logdev_id_t > logdev_id_vec; for (uint32_t i{0}; i < n_log_stores; ++i) { logdev_id_vec.push_back(logstore_service().create_new_logdev()); @@ -569,8 +569,8 @@ class LogStoreTest : public ::testing::Test { virtual ~LogStoreTest() override = default; protected: - virtual void SetUp() override{}; - virtual void TearDown() override{}; + virtual void SetUp() override {}; + virtual void TearDown() override {}; void init(uint64_t n_total_records, const std::vector< std::pair< size_t, int > >& inp_freqs = {}) { // m_nrecords_waiting_to_issue = std::lround(n_total_records / _batch_size) * _batch_size; @@ -960,6 +960,8 @@ TEST_F(LogStoreTest, BurstRandInsertThenTruncate) { } } +// TODO evaluate this test is valid and enable after fixing the flush lock. +#if 0 TEST_F(LogStoreTest, BurstSeqInsertAndTruncateInParallel) { const auto num_records = SISL_OPTIONS["num_records"].as< uint32_t >(); const auto iterations = SISL_OPTIONS["iterations"].as< uint32_t >(); @@ -997,6 +999,7 @@ TEST_F(LogStoreTest, BurstSeqInsertAndTruncateInParallel) { this->truncate_validate(); } } +#endif TEST_F(LogStoreTest, RandInsertsWithHoles) { const auto num_records = SISL_OPTIONS["num_records"].as< uint32_t >(); diff --git a/src/tests/test_log_store_long_run.cpp b/src/tests/test_log_store_long_run.cpp new file mode 100644 index 000000000..e5f15961f --- /dev/null +++ b/src/tests/test_log_store_long_run.cpp @@ -0,0 +1,635 @@ +/********************************************************************************* + * Modifications Copyright 2017-2019 eBay Inc. + * + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + *********************************************************************************/ +#include // std::shuffle +#include +#include +#include // std::chrono::system_clock +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include // std::default_random_engine +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "logstore/log_dev.hpp" +#include "test_common/homestore_test_common.hpp" + +using namespace homestore; +RCU_REGISTER_INIT +SISL_LOGGING_INIT(HOMESTORE_LOG_MODS) +std::vector< std::string > test_common::HSTestHelper::s_dev_names; + +struct test_log_data { + test_log_data() = default; + test_log_data(const test_log_data&) = delete; + test_log_data(test_log_data&&) noexcept = delete; + test_log_data& operator=(const test_log_data&) = delete; + test_log_data& operator=(test_log_data&&) noexcept = delete; + ~test_log_data() = default; + + uint32_t size; + + uint8_t* get_data() { return uintptr_cast(this) + sizeof(test_log_data); }; + uint8_t const* get_data_const() const { return r_cast< uint8_t const* >(this) + sizeof(test_log_data); } + const uint8_t* get_data() const { return r_cast< const uint8_t* >(this) + sizeof(test_log_data); } + uint32_t total_size() const { return sizeof(test_log_data) + size; } + std::string get_data_str() const { + return std::string(r_cast< const char* >(get_data_const()), static_cast< size_t >(size)); + } +}; + +typedef std::function< void(logdev_id_t, logstore_seq_num_t, logdev_key) > test_log_store_comp_cb_t; +class SampleLogStoreClient { +public: + SampleLogStoreClient(std::shared_ptr< HomeLogStore > store, const logdev_id_t logdev_id, + const test_log_store_comp_cb_t& cb) : + m_store_id{store->get_store_id()}, m_comp_cb{cb}, m_logdev_id{logdev_id} { + set_log_store(store); + } + + explicit SampleLogStoreClient(const logdev_id_t logdev_id, const test_log_store_comp_cb_t& cb) : + SampleLogStoreClient(logstore_service().create_new_log_store(logdev_id, false /* append_mode */), logdev_id, + cb) {} + + SampleLogStoreClient(const SampleLogStoreClient&) = delete; + SampleLogStoreClient(SampleLogStoreClient&&) noexcept = delete; + SampleLogStoreClient& operator=(const SampleLogStoreClient&) = delete; + SampleLogStoreClient& operator=(SampleLogStoreClient&&) noexcept = delete; + ~SampleLogStoreClient() = default; + + void set_log_store(std::shared_ptr< HomeLogStore > store) { + m_log_store = store; + m_log_store->register_log_found_cb(bind_this(SampleLogStoreClient::on_log_found, 3)); + } + + void reset_recovery() { + m_n_recovered_lsns = 0; + m_n_recovered_truncated_lsns = 0; + } + + void insert_next_batch(uint32_t batch_size) { + const auto cur_lsn = m_cur_lsn.fetch_add(batch_size); + insert(cur_lsn, batch_size, false); + } + + void insert(logstore_seq_num_t start_lsn, int64_t nparallel_count, bool wait_for_completion = true) { + std::vector< logstore_seq_num_t > lsns; + lsns.reserve(nparallel_count); + + for (auto lsn{start_lsn}; lsn < start_lsn + nparallel_count; ++lsn) { + lsns.push_back(lsn); + } + + ASSERT_LT(m_log_store->get_contiguous_issued_seq_num(-1), start_lsn + nparallel_count); + ASSERT_LT(m_log_store->get_contiguous_completed_seq_num(-1), start_lsn + nparallel_count); + for (const auto lsn : lsns) { + bool io_memory{false}; + auto* d = prepare_data(lsn, io_memory); + m_log_store->write_async( + lsn, {uintptr_cast(d), d->total_size(), false}, nullptr, + [io_memory, d, this](logstore_seq_num_t seq_num, const sisl::io_blob& b, logdev_key ld_key, void* ctx) { + assert(ld_key); + if (io_memory) { + iomanager.iobuf_free(uintptr_cast(d)); + } else { + std::free(voidptr_cast(d)); + } + m_comp_cb(m_logdev_id, seq_num, ld_key); + }); + } + + // Because of restart in tests, we have torce the flush of log entries. + m_log_store->get_logdev()->flush_if_needed(1); + } + + void read_validate(const bool expect_all_completed = false) { + const auto trunc_upto = m_log_store->truncated_upto(); + for (std::remove_const_t< decltype(trunc_upto) > i{0}; i <= trunc_upto; ++i) { + ASSERT_THROW(m_log_store->read_sync(i), std::out_of_range) + << "Expected std::out_of_range exception for lsn=" << m_log_store->get_store_id() << ":" << i + << " but not thrown"; + } + + const auto upto = + expect_all_completed ? m_cur_lsn.load() - 1 : m_log_store->get_contiguous_completed_seq_num(-1); + for (auto i = m_log_store->truncated_upto() + 1; i < upto; ++i) { + try { + const auto b = m_log_store->read_sync(i); + auto* tl = r_cast< test_log_data const* >(b.bytes()); + ASSERT_EQ(tl->total_size(), b.size()) + << "Size Mismatch for lsn=" << m_log_store->get_store_id() << ":" << i; + validate_data(tl, i); + + } catch (const std::exception& e) { + LOGFATAL("Unexpected out_of_range exception for lsn={}:{} upto {} trunc_upto {}", + m_log_store->get_store_id(), i, upto, trunc_upto); + } + } + } + + void rollback_validate(uint32_t num_lsns_to_rollback) { + std::mutex mtx; + std::condition_variable cv; + bool rollback_done = false; + + if (m_log_store->truncated_upto() == m_log_store->get_contiguous_completed_seq_num(-1)) { + // No records to rollback. + return; + } + + auto const upto_lsn = m_cur_lsn.fetch_sub(num_lsns_to_rollback) - num_lsns_to_rollback - 1; + m_log_store->rollback_async(upto_lsn, [&](logstore_seq_num_t) { + ASSERT_EQ(m_log_store->get_contiguous_completed_seq_num(-1), upto_lsn) + << "Last completed seq num is not reset after rollback"; + ASSERT_EQ(m_log_store->get_contiguous_issued_seq_num(-1), upto_lsn) + << "Last issued seq num is not reset after rollback"; + read_validate(true); + { + std::unique_lock lock(mtx); + rollback_done = true; + } + cv.notify_one(); + }); + + // We wait till async rollback is finished as we do validation. + std::unique_lock lock(mtx); + cv.wait(lock, [&rollback_done] { return rollback_done == true; }); + } + + void recovery_validate() { + LOGINFO( + "Totally recovered {} non-truncated lsns and {} truncated lsns for store {} log_dev {} truncated_upto {}", + m_n_recovered_lsns, m_n_recovered_truncated_lsns, m_log_store->get_store_id(), + m_log_store->get_logdev()->get_id(), m_truncated_upto_lsn.load()); + if (m_n_recovered_lsns != (m_cur_lsn.load() - m_truncated_upto_lsn.load() - 1)) { + EXPECT_EQ(m_n_recovered_lsns, m_cur_lsn.load() - m_truncated_upto_lsn.load() - 1) + << "Recovered " << m_n_recovered_lsns << " valid lsns for store " << m_log_store->get_store_id() + << " Expected to have " << m_cur_lsn.load() - m_truncated_upto_lsn.load() - 1 + << " lsns: m_cur_lsn=" << m_cur_lsn.load() << " truncated_upto_lsn=" << m_truncated_upto_lsn; + assert(false); + } + } + + void on_log_found(const logstore_seq_num_t lsn, const log_buffer buf, void* ctx) { + LOGDEBUG("Recovered lsn {}:{} with log data of size {}", m_log_store->get_store_id(), lsn, buf.size()) + EXPECT_LE(lsn, m_cur_lsn.load()) << "Recovered incorrect lsn " << m_log_store->get_store_id() << ":" << lsn + << "Expected less than cur_lsn " << m_cur_lsn.load(); + auto* tl = r_cast< test_log_data const* >(buf.bytes()); + validate_data(tl, lsn); + + // Count only the ones which are after truncated, because recovery could receive even truncated lsns + (lsn > m_truncated_upto_lsn) ? ++m_n_recovered_lsns : ++m_n_recovered_truncated_lsns; + } + + void truncate(const logstore_seq_num_t lsn) { + m_log_store->truncate(lsn); + m_truncated_upto_lsn = lsn; + } + + void flush() { m_log_store->flush_sync(); } + + bool has_all_lsns_truncated() const { return (m_truncated_upto_lsn.load() == (m_cur_lsn.load() - 1)); } + + static test_log_data* prepare_data(const logstore_seq_num_t lsn, bool& io_memory) { + static thread_local std::random_device rd{}; + static thread_local std::default_random_engine re{rd()}; + uint32_t sz{0}; + uint8_t* raw_buf{nullptr}; + + // Generate buffer of random size and fill with specific data + std::uniform_int_distribution< uint8_t > gen_percentage{0, 99}; + std::uniform_int_distribution< uint32_t > gen_data_size{0, max_data_size - 1}; + if (gen_percentage(re) < static_cast< uint8_t >(10)) { + // 10% of data is dma'ble aligned boundary + const auto alloc_sz = sisl::round_up(gen_data_size(re) + sizeof(test_log_data), s_max_flush_multiple); + raw_buf = iomanager.iobuf_alloc(dma_address_boundary, alloc_sz); + sz = alloc_sz - sizeof(test_log_data); + io_memory = true; + } else { + sz = gen_data_size(re); + raw_buf = static_cast< uint8_t* >(std::malloc(sizeof(test_log_data) + sz)); + io_memory = false; + } + + test_log_data* d = new (raw_buf) test_log_data(); + d->size = sz; + + assert(uintptr_cast(d) == raw_buf); + + const char c = static_cast< char >((lsn % 94) + 33); + std::memset(voidptr_cast(d->get_data()), c, static_cast< size_t >(d->size)); + return d; + } + +private: + void validate_data(const test_log_data* d, const logstore_seq_num_t lsn) { + const char c = static_cast< char >((lsn % 94) + 33); + const std::string actual = d->get_data_str(); + const std::string expected(static_cast< size_t >(d->size), + c); // needs to be () because of same reason as vector + ASSERT_EQ(actual, expected) << "Data mismatch for LSN=" << m_log_store->get_store_id() << ":" << lsn + << " size=" << d->size; + } + + friend class LogStoreLongRun; + +private: + static constexpr uint32_t max_data_size = 1024; + static uint64_t s_max_flush_multiple; + + logstore_id_t m_store_id; + test_log_store_comp_cb_t m_comp_cb; + std::atomic< logstore_seq_num_t > m_truncated_upto_lsn = -1; + std::atomic< logstore_seq_num_t > m_cur_lsn = 0; + std::shared_ptr< HomeLogStore > m_log_store; + int64_t m_n_recovered_lsns = 0; + int64_t m_n_recovered_truncated_lsns = 0; + logdev_id_t m_logdev_id; +}; + +uint64_t SampleLogStoreClient::s_max_flush_multiple = 0; + +class LogStoreLongRun : public ::testing::Test { +public: + void start_homestore(bool restart = false) { + auto n_log_stores = SISL_OPTIONS["num_logstores"].as< uint32_t >(); + if (n_log_stores < 4u) { + LOGINFO("Log store test needs minimum 4 log stores for testing, setting them to 4"); + n_log_stores = 4u; + } + + if (restart) { + for (auto& lsc : m_log_store_clients) { + lsc->flush(); + } + } + + test_common::HSTestHelper::start_homestore( + "test_log_store", + {{HS_SERVICE::META, {.size_pct = 5.0}}, + {HS_SERVICE::LOG, {.size_pct = 84.0, .chunk_size = 8 * 1024 * 1024, .min_chunk_size = 8 * 1024 * 1024}}}, + [this, restart, n_log_stores]() { + HS_SETTINGS_FACTORY().modifiable_settings([](auto& s) { + // Disable flush and resource mgr timer in UT. + s.logstore.flush_timer_frequency_us = 0; + s.resource_limits.resource_audit_timer_ms = 0; + }); + HS_SETTINGS_FACTORY().save(); + + if (restart) { + for (uint32_t i{0}; i < n_log_stores; ++i) { + SampleLogStoreClient* client = m_log_store_clients[i].get(); + logstore_service().open_logdev(client->m_logdev_id); + logstore_service() + .open_log_store(client->m_logdev_id, client->m_store_id, false /* append_mode */) + .thenValue([i, this, client](auto log_store) { client->set_log_store(log_store); }); + } + } + }, + restart); + + if (!restart) { + logdev_id_t logdev_id; + for (uint32_t i{0}; i < n_log_stores; ++i) { + logdev_id = logstore_service().create_new_logdev(); + m_log_store_clients.push_back(std::make_unique< SampleLogStoreClient >( + logdev_id, bind_this(LogStoreLongRun::on_log_insert_completion, 3))); + } + SampleLogStoreClient::s_max_flush_multiple = + logstore_service().get_logdev(logdev_id)->get_flush_size_multiple(); + } + } + + void shutdown(bool cleanup = true) { + test_common::HSTestHelper::shutdown_homestore(cleanup); + if (cleanup) { + m_log_store_clients.clear(); + m_highest_log_idx.clear(); + } + } + + bool delete_log_store(logstore_id_t store_id) { + bool removed{false}; + for (auto it = std::begin(m_log_store_clients); it != std::end(m_log_store_clients); ++it) { + if ((*it)->m_log_store->get_store_id() == store_id) { + logstore_service().remove_log_store((*it)->m_logdev_id, store_id); + logstore_service().destroy_log_dev((*it)->m_logdev_id); + m_log_store_clients.erase(it); + removed = true; + break; + } + } + return removed; + } + + logid_t highest_log_idx(logdev_id_t fid) { + return m_highest_log_idx.count(fid) ? m_highest_log_idx[fid].load() : -1; + } + + void init() { + for (auto& lsc : m_log_store_clients) { + lsc->reset_recovery(); + } + } + + void kickstart_inserts(uint64_t n_total_records, uint32_t batch_size, uint32_t q_depth) { + m_nrecords_waiting_to_issue = m_log_store_clients.size() * n_total_records; + m_nrecords_waiting_to_complete = 0; + + m_batch_size = batch_size; + m_q_depth = (m_log_store_clients.size() + 1) * q_depth; + iomanager.run_on_forget(iomgr::reactor_regex::all_io, [this]() { do_insert(); }); + } + + void do_insert() { + // We insert batch of records for each logstore in round robin fashion. + auto it = m_log_store_clients.begin(); + for (;;) { + uint32_t batch_size{0}; + { + std::unique_lock< std::mutex > lock{m_pending_mtx}; + const bool insert = (m_nrecords_waiting_to_issue > 0) && (m_nrecords_waiting_to_complete < m_q_depth); + if (insert) { + batch_size = std::min< uint32_t >(m_batch_size, m_nrecords_waiting_to_issue); + m_nrecords_waiting_to_issue -= batch_size; + m_nrecords_waiting_to_complete += batch_size; + } else { + break; + } + } + + (*it)->insert_next_batch(batch_size); + it++; + if (it == m_log_store_clients.end()) { it = m_log_store_clients.begin(); } + } + } + + void on_log_insert_completion(logdev_id_t fid, logstore_seq_num_t lsn, logdev_key ld_key) { + if (m_highest_log_idx.count(fid) == 0) { m_highest_log_idx[fid] = std::atomic{-1}; } + atomic_update_max(m_highest_log_idx[fid], ld_key.idx); + on_insert_completion(fid, lsn, ld_key); + } + + void on_insert_completion([[maybe_unused]] logdev_id_t fid, logstore_seq_num_t lsn, logdev_key ld_key) { + bool notify{false}; + uint64_t waiting_to_issue{0}; + { + std::unique_lock< std::mutex > lock{m_pending_mtx}; + waiting_to_issue = m_nrecords_waiting_to_issue; + if ((--m_nrecords_waiting_to_complete == 0) && (waiting_to_issue == 0)) { notify = true; } + } + if (notify) { + m_pending_cv.notify_all(); + } else if (waiting_to_issue > 0) { + do_insert(); + } + } + + void wait_for_inserts() { + std::unique_lock< std::mutex > lk{m_pending_mtx}; + m_pending_cv.wait(lk, + [&] { return (m_nrecords_waiting_to_issue == 0) && (m_nrecords_waiting_to_complete == 0); }); + } + + void read_validate(bool expect_all_completed = false) { + for (const auto& lsc : m_log_store_clients) { + lsc->read_validate(expect_all_completed); + } + } + + void rollback_validate() { + for (const auto& lsc : m_log_store_clients) { + lsc->rollback_validate(1); + } + } + + void truncate_validate(bool is_parallel_to_write = false) { + int skip_truncation = 0; + + for (size_t i{0}; i < m_log_store_clients.size(); ++i) { + const auto& lsc = m_log_store_clients[i]; + + // lsc->truncate(lsc->m_cur_lsn.load() - 1); + const auto t_seq_num = lsc->m_log_store->truncated_upto(); + const auto c_seq_num = lsc->m_log_store->get_contiguous_completed_seq_num(-1); + if (t_seq_num == c_seq_num) { + ++skip_truncation; + continue; + } + + // Truncate at random point between last truncated and completed seq num. + std::uniform_int_distribution< int64_t > gen_data_size{t_seq_num, c_seq_num}; + lsc->truncate(gen_data_size(rd)); + lsc->read_validate(); + } + + if (skip_truncation) { + /* not needed to call device truncate as one log store is not truncated */ + return; + } + + logstore_service().device_truncate( + [this](auto& trunc_lds) { + for (auto [fid, trunc_loc] : trunc_lds) { + m_truncate_log_idx[fid].store(trunc_loc.idx); + } + }, + true /* wait_till_done */); + validate_num_stores(); + } + + void flush() { + for (auto& lsc : m_log_store_clients) { + lsc->flush(); + } + } + + void recovery_validate() { + for (size_t i{0}; i < m_log_store_clients.size(); ++i) { + const auto& lsc = m_log_store_clients[i]; + lsc->recovery_validate(); + } + } + + void delete_create_logstore() { + // Delete a random logstore. + std::uniform_int_distribution< uint64_t > gen{0, m_log_store_clients.size() - 1}; + uint64_t idx = gen(rd); + auto fid = m_log_store_clients[idx]->m_logdev_id; + + delete_log_store(m_log_store_clients[idx]->m_store_id); + validate_num_stores(); + + // Create a new logstore. + auto logdev_id = logstore_service().create_new_logdev(); + m_log_store_clients.push_back(std::make_unique< SampleLogStoreClient >( + logdev_id, bind_this(LogStoreLongRun::on_log_insert_completion, 3))); + validate_num_stores(); + } + + void validate_num_stores() { + size_t actual_valid_ids{0}; + size_t actual_garbage_ids{0}; + size_t exp_garbage_store_count{0}; + size_t actual_logdevs{0}; + + for (auto& logdev : logstore_service().get_all_logdevs()) { + auto fid = logdev->get_id(); + std::vector< logstore_id_t > reg_ids, garbage_ids; + + logdev->get_registered_store_ids(reg_ids, garbage_ids); + actual_valid_ids += reg_ids.size() - garbage_ids.size(); + actual_logdevs++; + } + + ASSERT_EQ(actual_logdevs, m_log_store_clients.size()); + ASSERT_EQ(actual_valid_ids, m_log_store_clients.size()); + } + + virtual void SetUp() override { start_homestore(); } + virtual void TearDown() override { shutdown(true /* cleanup*/); } + + uint64_t get_elapsed_time(Clock::time_point start) { + std::chrono::seconds sec = std::chrono::duration_cast< std::chrono::seconds >(Clock::now() - start); + return sec.count(); + } + + bool elapsed(uint64_t print_every_n_secs) { + static Clock::time_point start = Clock::now(); + auto elapsed_time = get_elapsed_time(start); + if (elapsed_time > print_every_n_secs) { + start = Clock::now(); + return true; + } + return false; + } + +private: + std::vector< std::unique_ptr< SampleLogStoreClient > > m_log_store_clients; + std::map< logdev_id_t, std::atomic< logid_t > > m_highest_log_idx; + uint64_t m_nrecords_waiting_to_issue{0}; + uint64_t m_nrecords_waiting_to_complete{0}; + std::mutex m_pending_mtx; + std::condition_variable m_pending_cv; + std::map< logdev_id_t, std::atomic< logid_t > > m_truncate_log_idx; + uint32_t m_q_depth{64}; + uint32_t m_batch_size{1}; + std::random_device rd{}; + std::default_random_engine re{rd()}; +}; + +TEST_F(LogStoreLongRun, LongRunning) { + auto run_time = SISL_OPTIONS["run_time"].as< uint64_t >(); + auto num_iterations = SISL_OPTIONS["num_iterations"].as< uint32_t >(); + auto start_time = Clock::now(); + uint32_t iterations = 1; + + init(); + + while (true) { + // Start insert of 100 log entries on all logstores with num batch of 10 in parallel fashion as a burst. + kickstart_inserts(100 /* num_records */, 10 /* batch */, 5000 /* q_depth */); + + // Wait for inserts. + wait_for_inserts(); + + if (iterations % 60 == 0) { + // Validate all the logstores. + read_validate(true); + } + + if (iterations % 15 == 0) { + // Truncate at random lsn every 15 iterations. + truncate_validate(); + } + + if (iterations % 10 == 0) { + // Restart at every 30 iterations. + LOGDEBUG("Restart homestore"); + start_homestore(true /* restart */); + recovery_validate(); + init(); + } + + if (iterations % 60 == 0) { + // Add and validate rollback records. + rollback_validate(); + } + + if (iterations % 10 == 0) { + // Add and validate rollback records. + delete_create_logstore(); + } + + if (iterations % 10 == 0) { LOGINFO("Iterations completed {}", iterations); } + + auto elapsed = get_elapsed_time(start_time); + if (elapsed >= run_time && iterations >= num_iterations) { + LOGINFO("Finished test. Num iterations {} Elapsed {}", iterations, elapsed); + break; + } + + iterations++; + } +} + +SISL_OPTIONS_ENABLE(logging, test_log_store, iomgr, test_common_setup) +SISL_OPTION_GROUP(test_log_store, + (num_logstores, "", "num_logstores", "number of log stores", + ::cxxopts::value< uint32_t >()->default_value("1000"), "number"), + (num_records, "", "num_records", "number of record to test", + ::cxxopts::value< uint32_t >()->default_value("10000"), "number"), + (num_iterations, "", "num_iterations", "Iterations", + ::cxxopts::value< uint32_t >()->default_value("1"), "the number of iterations to run each test"), + (run_time, "", "run_time", "running time in seconds", + ::cxxopts::value< uint64_t >()->default_value("600"), "number")); + +int main(int argc, char* argv[]) { + int parsed_argc = argc; + ::testing::InitGoogleTest(&parsed_argc, argv); + SISL_OPTIONS_LOAD(parsed_argc, argv, logging, test_log_store, iomgr, test_common_setup); + sisl::logging::SetLogger("test_log_store"); + spdlog::set_pattern("[%D %T%z] [%^%l%$] [%t] %v"); + sisl::logging::SetModuleLogLevel("logstore", spdlog::level::level_enum::debug); + sisl::logging::SetModuleLogLevel("journalvdev", spdlog::level::level_enum::info); + + const int ret = RUN_ALL_TESTS(); + return ret; +}