From 2c500c573fa2c5733218e0cbdd44226fb3e6504f Mon Sep 17 00:00:00 2001 From: Sanal Date: Mon, 11 Mar 2024 12:41:21 -0700 Subject: [PATCH] Fix log store read across chunks and device manager load of chunks. (#321) Chunks getting duplciate start offset while recovery. Add or cleanup logs for debugging. Add test case. Add more logdev for logstore test. Enable logstore test. Move rollback test to new logdev test. --- conanfile.py | 2 +- src/include/homestore/logstore/log_store.hpp | 5 +- .../homestore/logstore/log_store_internal.hpp | 1 + src/lib/device/device.h | 1 + src/lib/device/device_manager.cpp | 10 + src/lib/device/hs_super_blk.h | 11 +- src/lib/device/journal_vdev.cpp | 138 ++++-- src/lib/device/journal_vdev.hpp | 20 +- src/lib/device/physical_dev.cpp | 8 +- src/lib/device/physical_dev.hpp | 1 + src/lib/logstore/log_dev.cpp | 119 +++--- src/lib/logstore/log_dev.hpp | 25 +- src/lib/logstore/log_group.cpp | 3 +- src/lib/logstore/log_store.cpp | 38 +- src/lib/logstore/log_store_service.cpp | 9 +- src/lib/logstore/log_stream.cpp | 80 ++-- .../log_store/home_raft_log_store.cpp | 4 +- .../replication/repl_dev/raft_repl_dev.cpp | 8 +- src/tests/CMakeLists.txt | 8 +- src/tests/log_store_benchmark.cpp | 2 +- .../test_common/homestore_test_common.hpp | 22 + src/tests/test_device_manager.cpp | 56 +++ src/tests/test_log_dev.cpp | 396 +++++++++++++----- src/tests/test_log_store.cpp | 179 ++++---- 24 files changed, 788 insertions(+), 358 deletions(-) diff --git a/conanfile.py b/conanfile.py index ab2de30ce..efe16b56a 100644 --- a/conanfile.py +++ b/conanfile.py @@ -5,7 +5,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "5.1.12" + version = "5.1.14" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" diff --git a/src/include/homestore/logstore/log_store.hpp b/src/include/homestore/logstore/log_store.hpp index dfb725bb5..71a1cdcda 100644 --- a/src/include/homestore/logstore/log_store.hpp +++ b/src/include/homestore/logstore/log_store.hpp @@ -110,7 +110,8 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > { * @param cookie : Any cookie or context which will passed back in the callback * @param cb Callback upon completion which is called with the status, seq_num and cookie that was passed. */ - void write_async(logstore_seq_num_t seq_num, const sisl::io_blob& b, void* cookie, const log_write_comp_cb_t& cb); + void write_async(logstore_seq_num_t seq_num, const sisl::io_blob& b, void* cookie, const log_write_comp_cb_t& cb, + bool flush_wait = false); /** * @brief This method appends the blob into the log and it returns the generated seq number @@ -210,6 +211,8 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > { return (ts == std::numeric_limits< logstore_seq_num_t >::max()) ? -1 : ts; } + sisl::StreamTracker< logstore_record >& log_records() { return m_records; } + /** * @brief iterator to get all the log buffers; * diff --git a/src/include/homestore/logstore/log_store_internal.hpp b/src/include/homestore/logstore/log_store_internal.hpp index 6a2103d01..125e35daa 100644 --- a/src/include/homestore/logstore/log_store_internal.hpp +++ b/src/include/homestore/logstore/log_store_internal.hpp @@ -125,6 +125,7 @@ struct logstore_req { bool is_internal_req; // If the req is created internally by HomeLogStore itself log_req_comp_cb_t cb; // Callback upon completion of write (overridden than default) Clock::time_point start_time; + bool flush_wait{false}; // Wait for the flush to happen logstore_req(const logstore_req&) = delete; logstore_req& operator=(const logstore_req&) = delete; diff --git a/src/lib/device/device.h b/src/lib/device/device.h index cbcdde8ea..a69e740dd 100644 --- a/src/lib/device/device.h +++ b/src/lib/device/device.h @@ -173,6 +173,7 @@ class DeviceManager { std::vector< PhysicalDev* > get_pdevs_by_dev_type(HSDevType dtype) const; std::vector< shared< VirtualDev > > get_vdevs() const; + std::vector< shared< Chunk > > get_chunks() const; uint64_t total_capacity() const; uint64_t total_capacity(HSDevType dtype) const; diff --git a/src/lib/device/device_manager.cpp b/src/lib/device/device_manager.cpp index ad450910e..6c7c59ef1 100644 --- a/src/lib/device/device_manager.cpp +++ b/src/lib/device/device_manager.cpp @@ -543,6 +543,16 @@ std::vector< shared< VirtualDev > > DeviceManager::get_vdevs() const { return ret_v; } +std::vector< shared< Chunk > > DeviceManager::get_chunks() const { + std::unique_lock lg{m_vdev_mutex}; + std::vector< shared< Chunk > > res; + res.reserve(m_chunks.size()); + for (auto& chunk : m_chunks) { + if (chunk) res.push_back(chunk); + } + return res; +} + // Some of the hs_super_blk details uint64_t hs_super_blk::vdev_super_block_size() { return (hs_super_blk::MAX_VDEVS_IN_SYSTEM * vdev_info::size); } diff --git a/src/lib/device/hs_super_blk.h b/src/lib/device/hs_super_blk.h index e156ba94a..a2df05f3d 100644 --- a/src/lib/device/hs_super_blk.h +++ b/src/lib/device/hs_super_blk.h @@ -17,6 +17,7 @@ #include #include +#include #include #include #include @@ -193,9 +194,13 @@ class hs_super_blk { return (dinfo.dev_type == HSDevType::Fast) ? EXTRA_SB_SIZE_FOR_FAST_DEVICE : EXTRA_SB_SIZE_FOR_DATA_DEVICE; } static uint32_t max_chunks_in_pdev(const dev_info& dinfo) { - return (dinfo.dev_size - 1) / - ((dinfo.dev_type == HSDevType::Fast) ? MIN_CHUNK_SIZE_FAST_DEVICE : MIN_CHUNK_SIZE_DATA_DEVICE) + - 1; + uint64_t min_chunk_size = + (dinfo.dev_type == HSDevType::Fast) ? MIN_CHUNK_SIZE_FAST_DEVICE : MIN_CHUNK_SIZE_DATA_DEVICE; +#ifdef _PRERELEASE + auto chunk_size = iomgr_flip::instance()->get_test_flip< long >("set_minimum_chunk_size"); + if (chunk_size) { min_chunk_size = chunk_size.get(); } +#endif + return (dinfo.dev_size - 1) / min_chunk_size + 1; } }; diff --git a/src/lib/device/journal_vdev.cpp b/src/lib/device/journal_vdev.cpp index 87eab5f72..d6063ae54 100644 --- a/src/lib/device/journal_vdev.cpp +++ b/src/lib/device/journal_vdev.cpp @@ -39,14 +39,15 @@ JournalVirtualDev::JournalVirtualDev(DeviceManager& dmgr, const vdev_info& vinfo VirtualDev{dmgr, vinfo, std::move(event_cb), false /* is_auto_recovery */} { // Private data stored when chunks are created. - init_private_data = std::make_shared< JournalChunkPrivate >(); + m_init_private_data = std::make_shared< JournalChunkPrivate >(); m_chunk_pool = std::make_unique< ChunkPool >( dmgr, ChunkPool::Params{ HS_DYNAMIC_CONFIG(generic.journal_chunk_pool_capacity), [this]() { - init_private_data->created_at = get_time_since_epoch_ms(); - sisl::blob private_blob{r_cast< uint8_t* >(init_private_data.get()), sizeof(JournalChunkPrivate)}; + m_init_private_data->created_at = get_time_since_epoch_ms(); + m_init_private_data->end_of_chunk = m_vdev_info.chunk_size; + sisl::blob private_blob{r_cast< uint8_t* >(m_init_private_data.get()), sizeof(JournalChunkPrivate)}; return private_blob; }, m_vdev_info.hs_dev_type, m_vdev_info.vdev_id, m_vdev_info.chunk_size}); @@ -84,17 +85,17 @@ void JournalVirtualDev::init() { // Create descriptor for each logdev_id auto journal_desc = std::make_shared< JournalVirtualDev::Descriptor >(*this, logdev_id); m_journal_descriptors.emplace(logdev_id, journal_desc); - LOGDEBUGMOD(journalvdev, "Loading descriptor {}", logdev_id); + LOGINFOMOD(journalvdev, "Loading descriptor log_dev={}", logdev_id); // Traverse the list starting from the head and add those chunks // in order to the journal descriptor. next_chunk is stored in private_data. // Last chunk will have next_chunk as 0. auto chunk_num = head.chunk_num; while (chunk_num != 0) { auto& c = chunk_map[chunk_num]; - RELEASE_ASSERT(c, "Invalid chunk found logdev {} chunk {}", logdev_id, chunk_num); + RELEASE_ASSERT(c, "Invalid chunk found log_dev={} chunk={}", logdev_id, c->to_string()); journal_desc->m_journal_chunks.push_back(c); visited_chunks.insert(chunk_num); - LOGDEBUGMOD(journalvdev, "Loading chunk {} descriptor {}", chunk_num, logdev_id); + LOGINFOMOD(journalvdev, "Loading log_dev={} chunk={}", logdev_id, c->to_string()); // Increase the the total size. journal_desc->m_total_size += c->size(); @@ -121,8 +122,8 @@ void JournalVirtualDev::init() { *data = JournalChunkPrivate{}; update_chunk_private(chunk, data); - LOGDEBUGMOD(journalvdev, "Removing orphan chunk {} found for logdev {} next {}.", chunk_id, logdev_id, - next_chunk); + LOGINFOMOD(journalvdev, "Removing orphan chunk {} found for logdev {} next {}.", chunk_id, logdev_id, + next_chunk); m_dmgr.remove_chunk_locked(chunk); } @@ -149,7 +150,11 @@ shared< JournalVirtualDev::Descriptor > JournalVirtualDev::open(logdev_id_t logd return journal_desc; } - LOGDEBUGMOD(journalvdev, "Opened log device descriptor {}", logdev_id); + LOGINFOMOD(journalvdev, "Opened journal vdev descriptor log_dev={}", logdev_id); + for (auto& chunk : it->second->m_journal_chunks) { + LOGINFOMOD(journalvdev, " log_dev={} end_of_chunk={} chunk={}", logdev_id, get_end_of_chunk(chunk), + chunk->to_string()); + } return it->second; } @@ -181,18 +186,19 @@ void JournalVirtualDev::Descriptor::append_chunk() { last_chunk_private->end_of_chunk = offset_in_chunk; } m_vdev.update_chunk_private(last_chunk, last_chunk_private); - LOGDEBUGMOD(journalvdev, "Added chunk new {} last {} desc {}", new_chunk->chunk_id(), last_chunk->chunk_id(), - to_string()); + LOGINFOMOD(journalvdev, "Added chunk new {} last {} desc {}", new_chunk->to_string(), last_chunk->chunk_id(), + to_string()); } else { // If the list is empty, update the new chunk as the head. Only head chunk contains the logdev_id. auto* new_chunk_private = r_cast< JournalChunkPrivate* >(const_cast< uint8_t* >(new_chunk->user_private())); new_chunk_private->is_head = true; new_chunk_private->logdev_id = m_logdev_id; + new_chunk_private->end_of_chunk = m_vdev.info().chunk_size; // Append the new chunk m_journal_chunks.push_back(new_chunk); m_vdev.update_chunk_private(new_chunk, new_chunk_private); - LOGDEBUGMOD(journalvdev, "Added head chunk {} desc {}", new_chunk->chunk_id(), to_string()); + LOGINFOMOD(journalvdev, "Added head chunk={} desc {}", new_chunk->to_string(), to_string()); } } @@ -231,7 +237,8 @@ off_t JournalVirtualDev::Descriptor::alloc_next_append_blk(size_t sz) { // assert that returnning logical offset is in good range HS_DBG_ASSERT_LE(tail_off, m_end_offset); - LOGDEBUGMOD(journalvdev, "returned tail_off 0x{} desc {}", to_hex(tail_off), to_string()); + LOGDEBUGMOD(journalvdev, "returned tail_off 0x{} tail_off {} size {} desc {}", to_hex(tail_off), tail_off, sz, + to_string()); return tail_off; } @@ -336,30 +343,45 @@ void JournalVirtualDev::Descriptor::sync_pwritev(const iovec* iov, int iovcnt, o size_t JournalVirtualDev::Descriptor::sync_next_read(uint8_t* buf, size_t size_rd) { if (m_journal_chunks.empty()) { return 0; } + HS_REL_ASSERT_LE(m_seek_cursor, m_end_offset, "seek_cursor {} exceeded end_offset {}", m_seek_cursor, m_end_offset); + if (m_seek_cursor >= m_end_offset) { + LOGTRACEMOD(journalvdev, "sync_next_read reached end of chunks"); + return 0; + } + auto [chunk, _, offset_in_chunk] = offset_to_chunk(m_seek_cursor); auto const end_of_chunk = m_vdev.get_end_of_chunk(chunk); auto const chunk_size = std::min< uint64_t >(end_of_chunk, chunk->size()); bool across_chunk{false}; + // LOGINFO("sync_next_read size_rd {} chunk {} seek_cursor {} end_of_chunk {} {}", size_rd, chunk->to_string(), + // m_seek_cursor, end_of_chunk, chunk_size); + HS_REL_ASSERT_LE((uint64_t)end_of_chunk, chunk->size(), "Invalid end of chunk: {} detected on chunk num: {}", end_of_chunk, chunk->chunk_id()); HS_REL_ASSERT_LE((uint64_t)offset_in_chunk, chunk->size(), "Invalid m_seek_cursor: {} which falls in beyond end of chunk: {}!", m_seek_cursor, end_of_chunk); // if read size is larger then what's left in this chunk - if (size_rd >= (chunk->size() - offset_in_chunk)) { + if (size_rd >= (end_of_chunk - offset_in_chunk)) { // truncate size to what is left; - size_rd = chunk->size() - offset_in_chunk; + size_rd = end_of_chunk - offset_in_chunk; across_chunk = true; } + if (buf == nullptr) { return size_rd; } + auto ec = sync_pread(buf, size_rd, m_seek_cursor); // TODO: Check if we can have tolerate this error and somehow start homestore without replaying or in degraded mode? HS_REL_ASSERT(!ec, "Error in reading next stream of bytes, proceeding could cause some inconsistency, exiting"); // Update seek cursor after read; m_seek_cursor += size_rd; - if (across_chunk) { m_seek_cursor += (chunk->size() - end_of_chunk); } + if (across_chunk) { + m_seek_cursor += (chunk->size() - end_of_chunk); + LOGTRACEMOD(journalvdev, "Across size_rd {} chunk {} seek_cursor {} end_of_chunk {}", size_rd, + chunk->to_string(), m_seek_cursor, end_of_chunk); + } return size_rd; } @@ -412,6 +434,7 @@ off_t JournalVirtualDev::Descriptor::lseek(off_t offset, int whence) { break; } + LOGINFOMOD(journalvdev, "lseek desc {} offset 0x{} whence {} ", to_string(), to_hex(offset), whence); return m_seek_cursor; } @@ -419,30 +442,43 @@ off_t JournalVirtualDev::Descriptor::lseek(off_t offset, int whence) { * @brief :- it returns the vdev offset after nbytes from start offset */ off_t JournalVirtualDev::Descriptor::dev_offset(off_t nbytes) const { - if (m_journal_chunks.empty()) { return data_start_offset(); } + if (nbytes == 0 || m_journal_chunks.empty()) { + // If no chunks return start offset. + return data_start_offset(); + } off_t vdev_offset = data_start_offset(); - uint32_t dev_id{0}, chunk_id{0}; - off_t offset_in_chunk{0}; - off_t cur_read_cur{0}; - - while (cur_read_cur != nbytes) { - auto [chunk, _, offset_in_chunk] = offset_to_chunk(vdev_offset); - - auto const end_of_chunk = m_vdev.get_end_of_chunk(chunk); - auto const chunk_size = std::min< uint64_t >(end_of_chunk, chunk->size()); - auto const remaining = nbytes - cur_read_cur; - if (remaining >= (static_cast< off_t >(chunk->size()) - offset_in_chunk)) { - cur_read_cur += (chunk->size() - offset_in_chunk); - vdev_offset += (chunk->size() - offset_in_chunk); - } else { + auto chunk_size = m_vdev.info().chunk_size; + uint64_t remaining = nbytes; + auto start_offset = data_start_offset() % chunk_size; + + // data_start_offset coulde be anywhere in the first chunk. + // because when we truncate and data_start_offset lies in first chunk + // we dont delete that first chunk. other chunks will have start_offset as 0. + for (auto chunk : m_journal_chunks) { + uint64_t end_of_chunk = std::min< uint64_t >(m_vdev.get_end_of_chunk(chunk), chunk_size); + + auto num_data_bytes = end_of_chunk - start_offset; + if (remaining < num_data_bytes) { vdev_offset += remaining; - cur_read_cur = nbytes; + break; } + + remaining -= num_data_bytes; + vdev_offset += (chunk_size - start_offset); + start_offset = 0; } return vdev_offset; } +void JournalVirtualDev::Descriptor::update_data_start_offset(off_t offset) { + m_data_start_offset = offset; + auto data_start_offset_aligned = sisl::round_down(m_data_start_offset, m_vdev.info().chunk_size); + m_end_offset = data_start_offset_aligned + m_journal_chunks.size() * m_vdev.info().chunk_size; + LOGINFOMOD(journalvdev, "Updated data start offset off 0x{} {}", to_hex(offset), to_string()); + RELEASE_ASSERT_EQ(m_end_offset - data_start_offset_aligned, m_total_size, "offset size mismatch {}", to_string()); +} + off_t JournalVirtualDev::Descriptor::tail_offset(bool reserve_space_include) const { off_t tail = static_cast< off_t >(data_start_offset() + m_write_sz_in_total.load(std::memory_order_relaxed)); if (reserve_space_include) { tail += m_reserved_sz; } @@ -456,13 +492,13 @@ void JournalVirtualDev::Descriptor::update_tail_offset(off_t tail) { if (tail >= start) { m_write_sz_in_total.store(tail - start, std::memory_order_relaxed); - } else { - RELEASE_ASSERT(false, "tail {} less than start offset {}", tail, start); + } else if (tail != 0) { + LOGERROR("tail {} less than start offset {} desc {}", tail, start, to_string()); + RELEASE_ASSERT(false, "Invalid tail offset"); } lseek(tail); - LOGDEBUGMOD(journalvdev, "tail arg 0x{} desc {} ", to_hex(tail), to_string()); - HS_REL_ASSERT(tail_offset() == tail, "tail offset mismatch after calculation 0x{} : {}", tail_offset(), tail); + LOGINFOMOD(journalvdev, "Updated tail offset arg 0x{} desc {} ", to_hex(tail), to_string()); } void JournalVirtualDev::Descriptor::truncate(off_t truncate_offset) { @@ -510,8 +546,11 @@ void JournalVirtualDev::Descriptor::truncate(off_t truncate_offset) { *data = JournalChunkPrivate{}; m_vdev.update_chunk_private(chunk, data); + // We ideally want to zero out chunks as chunks are reused after free across + // logdev's. But zero out chunk is very expensive, We look at crc mismatches + // to know the end offset of the log dev during recovery. + // Format and add back to pool. m_vdev.m_chunk_pool->enqueue(chunk); - HS_PERIODIC_LOG(TRACE, journalvdev, "adding chunk {} back to pool desc {}", chunk->chunk_id(), to_string()); } // Update our start offset, to keep track of actual size @@ -561,7 +600,8 @@ uint64_t JournalVirtualDev::Descriptor::logical_to_dev_offset(off_t log_offset, } #endif -std::tuple< shared< Chunk >, uint32_t, off_t > JournalVirtualDev::Descriptor::offset_to_chunk(off_t log_offset) const { +std::tuple< shared< Chunk >, uint32_t, off_t > JournalVirtualDev::Descriptor::offset_to_chunk(off_t log_offset, + bool check) const { uint64_t chunk_aligned_offset = sisl::round_down(m_data_start_offset, m_vdev.info().chunk_size); uint64_t off_l{static_cast< uint64_t >(log_offset) - chunk_aligned_offset}; uint32_t index = 0; @@ -574,10 +614,17 @@ std::tuple< shared< Chunk >, uint32_t, off_t > JournalVirtualDev::Descriptor::of } } - HS_DBG_ASSERT(false, "Input log_offset is invalid: {}", log_offset); + if (check) { HS_DBG_ASSERT(false, "Input log_offset is invalid: {} {}", log_offset, to_string()); } return {nullptr, 0L, 0L}; } +bool JournalVirtualDev::Descriptor::is_offset_at_last_chunk(off_t bytes_offset) { + auto [chunk, chunk_index, _] = offset_to_chunk(bytes_offset, false); + if (chunk == nullptr) return true; + if (chunk_index == m_journal_chunks.size() - 1) { return true; } + return false; +} + void JournalVirtualDev::Descriptor::high_watermark_check() { if (resource_mgr().check_journal_size(used_size(), size())) { COUNTER_INCREMENT(m_vdev.m_metrics, vdev_high_watermark_count, 1); @@ -598,7 +645,7 @@ bool JournalVirtualDev::Descriptor::is_alloc_accross_chunk(size_t size) const { nlohmann::json JournalVirtualDev::Descriptor::get_status(int log_level) const { nlohmann::json j; - j["logdev_id"] = m_logdev_id; + j["logdev"] = m_logdev_id; j["seek_cursor"] = m_seek_cursor; j["data_start_offset"] = m_data_start_offset; j["end_offset"] = m_end_offset; @@ -613,7 +660,7 @@ nlohmann::json JournalVirtualDev::Descriptor::get_status(int log_level) const { nlohmann::json c; auto* private_data = r_cast< JournalChunkPrivate* >(const_cast< uint8_t* >(chunk->user_private())); c["chunk_id"] = chunk->chunk_id(); - c["logdev_id"] = private_data->logdev_id; + c["logdev"] = private_data->logdev_id; c["is_head"] = private_data->is_head; c["end_of_chunk"] = private_data->end_of_chunk; c["next_chunk"] = private_data->next_chunk; @@ -627,12 +674,13 @@ nlohmann::json JournalVirtualDev::Descriptor::get_status(int log_level) const { } std::string JournalVirtualDev::Descriptor::to_string() const { - std::string str{fmt::format("id={};ds=0x{};end=0x{};writesz={};tail=0x{};" + off_t tail = + static_cast< off_t >(data_start_offset() + m_write_sz_in_total.load(std::memory_order_relaxed)) + m_reserved_sz; + std::string str{fmt::format("log_dev={};ds=0x{};end=0x{};writesz={};tail=0x{};" "rsvdsz={};chunks={};trunc={};total={};seek=0x{} ", m_logdev_id, to_hex(m_data_start_offset), to_hex(m_end_offset), - m_write_sz_in_total.load(std::memory_order_relaxed), to_hex(tail_offset()), - m_reserved_sz, m_journal_chunks.size(), m_truncate_done, m_total_size, - to_hex(m_seek_cursor))}; + m_write_sz_in_total.load(std::memory_order_relaxed), to_hex(tail), m_reserved_sz, + m_journal_chunks.size(), m_truncate_done, m_total_size, to_hex(m_seek_cursor))}; return str; } diff --git a/src/lib/device/journal_vdev.hpp b/src/lib/device/journal_vdev.hpp index 13ec18c65..18bc9608d 100644 --- a/src/lib/device/journal_vdev.hpp +++ b/src/lib/device/journal_vdev.hpp @@ -233,12 +233,7 @@ class JournalVirtualDev : public VirtualDev { * * @param offset : the start logical offset to be persisted */ - void update_data_start_offset(off_t offset) { - m_data_start_offset = offset; - auto data_start_offset_aligned = sisl::round_down(m_data_start_offset, m_vdev.info().chunk_size); - m_end_offset = data_start_offset_aligned + m_journal_chunks.size() * m_vdev.info().chunk_size; - RELEASE_ASSERT_EQ(m_end_offset - data_start_offset_aligned, m_total_size, "offset size mismatch"); - } + void update_data_start_offset(off_t offset); /** * @brief : get the logical tail offset; @@ -302,6 +297,13 @@ class JournalVirtualDev : public VirtualDev { */ uint64_t available_blks() const { return available_size() / m_vdev.block_size(); } + /** + * @brief : Check if the offset_bytes lies at the last chunk. + * + * @return : check if last chunk or not. + */ + bool is_offset_at_last_chunk(off_t offset_bytes); + /** * @brief Get the status of the journal vdev and its internal structures * @param log_level: Log level to do verbosity. @@ -309,6 +311,8 @@ class JournalVirtualDev : public VirtualDev { */ nlohmann::json get_status(int log_level) const; + logdev_id_t logdev_id() const { return m_logdev_id; } + std::string to_string() const; private: @@ -361,7 +365,7 @@ class JournalVirtualDev : public VirtualDev { // off_t& offset_in_chunk) const; // Return the chunk, its index and offset in the chunk list. - std::tuple< shared< Chunk >, uint32_t, off_t > offset_to_chunk(off_t log_offset) const; + std::tuple< shared< Chunk >, uint32_t, off_t > offset_to_chunk(off_t log_offset, bool check = true) const; bool validate_append_size(size_t count) const; @@ -409,7 +413,7 @@ class JournalVirtualDev : public VirtualDev { // Cache the chunks. Getting a chunk from the pool causes a single write of the // last chunk in the list to update its end_of_chunk and next_chunk. std::unique_ptr< ChunkPool > m_chunk_pool; - std::shared_ptr< JournalChunkPrivate > init_private_data; + std::shared_ptr< JournalChunkPrivate > m_init_private_data; }; } // namespace homestore diff --git a/src/lib/device/physical_dev.cpp b/src/lib/device/physical_dev.cpp index 9517c4422..b431cc384 100644 --- a/src/lib/device/physical_dev.cpp +++ b/src/lib/device/physical_dev.cpp @@ -247,12 +247,13 @@ std::vector< shared< Chunk > > PhysicalDev::create_chunks(const std::vector< uin auto chunk = std::make_shared< Chunk >(this, *cinfo, cslot); ret_chunks.push_back(chunk); get_stream(chunk).m_chunks_map.insert(std::pair{chunk_ids[cit], std::move(chunk)}); - + HS_LOG(INFO, device, "Creating chunk {}", chunk->to_string()); cinfo->~chunk_info(); } m_chunk_info_slots->set_bits(b.start_bit, b.nbits); write_super_block(buf, chunk_info::size * b.nbits, chunk_info_offset_nth(b.start_bit)); + hs_utils::iobuf_free(buf, sisl::buftag::superblk); chunks_remaining -= b.nbits; @@ -296,6 +297,7 @@ shared< Chunk > PhysicalDev::create_chunk(uint32_t chunk_id, uint32_t vdev_id, u auto bitmap_mem = m_chunk_info_slots->serialize(m_pdev_info.dev_attr.align_size); write_super_block(bitmap_mem->cbytes(), bitmap_mem->size(), hs_super_blk::chunk_sb_offset()); + HS_LOG(INFO, device, "Created chunk {}", chunk->to_string()); cinfo->~chunk_info(); hs_utils::iobuf_free(buf, sisl::buftag::superblk); @@ -397,6 +399,7 @@ void PhysicalDev::do_remove_chunk(cshared< Chunk >& chunk) { get_stream(chunk).m_chunks_map.erase(chunk->chunk_id()); cinfo->~chunk_info(); hs_utils::iobuf_free(buf, sisl::buftag::superblk); + HS_LOG(DEBUG, device, "Removed chunk {}", chunk->to_string()); } uint64_t PhysicalDev::chunk_info_offset_nth(uint32_t slot) const { @@ -418,11 +421,14 @@ void PhysicalDev::populate_chunk_info(chunk_info* cinfo, uint32_t vdev_id, uint6 cinfo->set_allocated(); cinfo->set_user_private(private_data); cinfo->compute_checksum(); + auto [_, inserted] = m_chunk_start.insert(cinfo->chunk_start_offset); + RELEASE_ASSERT(inserted, "Duplicate start offset {} for chunk {}", cinfo->chunk_start_offset, cinfo->chunk_id); } void PhysicalDev::free_chunk_info(chunk_info* cinfo) { auto ival = ChunkInterval::right_open(cinfo->chunk_start_offset, cinfo->chunk_start_offset + cinfo->chunk_size); m_chunk_data_area.erase(ival); + m_chunk_start.erase(cinfo->chunk_start_offset); cinfo->set_free(); cinfo->checksum = 0; diff --git a/src/lib/device/physical_dev.hpp b/src/lib/device/physical_dev.hpp index 2987fb45e..41eb9221d 100644 --- a/src/lib/device/physical_dev.hpp +++ b/src/lib/device/physical_dev.hpp @@ -136,6 +136,7 @@ class PhysicalDev { ChunkIntervalSet m_chunk_data_area; // Range of chunks data area created std::unique_ptr< sisl::Bitset > m_chunk_info_slots; // Slots to write the chunk info uint32_t m_chunk_sb_size{0}; // Total size of the chunk sb at present + std::unordered_set< uint64_t > m_chunk_start; // Store and verify start offset of all chunks for debugging. public: PhysicalDev(const dev_info& dinfo, int oflags, const pdev_info_header& pinfo); diff --git a/src/lib/logstore/log_dev.cpp b/src/lib/logstore/log_dev.cpp index 90cc8f2e7..541c54768 100644 --- a/src/lib/logstore/log_dev.cpp +++ b/src/lib/logstore/log_dev.cpp @@ -35,9 +35,9 @@ namespace homestore { SISL_LOGGING_DECL(logstore) -#define THIS_LOGDEV_LOG(level, msg, ...) HS_SUBMOD_LOG(level, logstore, , "logdev", m_logdev_id, msg, __VA_ARGS__) +#define THIS_LOGDEV_LOG(level, msg, ...) HS_SUBMOD_LOG(level, logstore, , "log_dev", m_logdev_id, msg, __VA_ARGS__) #define THIS_LOGDEV_PERIODIC_LOG(level, msg, ...) \ - HS_PERIODIC_DETAILED_LOG(level, logstore, "logdev", m_logdev_id, , , msg, __VA_ARGS__) + HS_PERIODIC_DETAILED_LOG(level, logstore, "log_dev", m_logdev_id, , , msg, __VA_ARGS__) static bool has_data_service() { return HomeStore::instance()->has_data_service(); } // static BlkDataService& data_service() { return HomeStore::instance()->data_service(); } @@ -87,12 +87,7 @@ void LogDev::start(bool format, JournalVirtualDev* vdev) { m_last_flush_idx = m_log_idx - 1; } - iomanager.run_on_wait(logstore_service().flush_thread(), [this]() { - m_flush_timer_hdl = iomanager.schedule_thread_timer(HS_DYNAMIC_CONFIG(logstore.flush_timer_frequency_us) * 1000, - true /* recurring */, nullptr /* cookie */, - [this](void*) { flush_if_needed(); }); - }); - + start_timer(); handle_unopened_log_stores(format); { @@ -130,9 +125,7 @@ void LogDev::stop() { m_block_flush_q_cv.wait(lk, [&] { return m_stopped; }); } - // cancel the timer - iomanager.run_on_wait(logstore_service().flush_thread(), - [this]() { iomanager.cancel_timer(m_flush_timer_hdl, true); }); + stop_timer(); { folly::SharedMutexWritePriority::WriteHolder holder(m_store_map_mtx); @@ -158,12 +151,31 @@ void LogDev::stop() { m_hs.reset(); } +void LogDev::start_timer() { + // Currently only tests set it to 0. + if (HS_DYNAMIC_CONFIG(logstore.flush_timer_frequency_us) == 0) { return; } + + iomanager.run_on_wait(logstore_service().flush_thread(), [this]() { + m_flush_timer_hdl = iomanager.schedule_thread_timer(HS_DYNAMIC_CONFIG(logstore.flush_timer_frequency_us) * 1000, + true /* recurring */, nullptr /* cookie */, + [this](void*) { flush_if_needed(); }); + }); +} + +void LogDev::stop_timer() { + if (m_flush_timer_hdl != iomgr::null_timer_handle) { + // cancel the timer + iomanager.run_on_wait(logstore_service().flush_thread(), + [this]() { iomanager.cancel_timer(m_flush_timer_hdl, true); }); + } +} + void LogDev::do_load(const off_t device_cursor) { log_stream_reader lstream{device_cursor, m_vdev, m_vdev_jd, m_flush_size_multiple}; logid_t loaded_from{-1}; off_t group_dev_offset = 0; - THIS_LOGDEV_LOG(TRACE, "LogDev::do_load start {} ", m_logdev_id); + THIS_LOGDEV_LOG(TRACE, "LogDev::do_load start log_dev={} ", m_logdev_id); do { const auto buf = lstream.next_group(&group_dev_offset); @@ -180,6 +192,7 @@ void LogDev::do_load(const off_t device_cursor) { break; } + THIS_LOGDEV_LOG(INFO, "Found log group header offset=0x{} header {}", to_hex(group_dev_offset), *header); HS_REL_ASSERT_EQ(header->start_idx(), m_log_idx.load(), "log indx is not the expected one"); if (loaded_from == -1) { loaded_from = header->start_idx(); } @@ -239,15 +252,16 @@ void LogDev::assert_next_pages(log_stream_reader& lstream) { } int64_t LogDev::append_async(const logstore_id_t store_id, const logstore_seq_num_t seq_num, const sisl::io_blob& data, - void* cb_context) { + void* cb_context, bool flush_wait) { auto prev_size = m_pending_flush_size.fetch_add(data.size(), std::memory_order_relaxed); const auto idx = m_log_idx.fetch_add(1, std::memory_order_acq_rel); auto threshold_size = LogDev::flush_data_threshold_size(); m_log_records->create(idx, store_id, seq_num, data, cb_context); - if (prev_size < threshold_size && ((prev_size + data.size()) >= threshold_size) && - !m_is_flushing.load(std::memory_order_relaxed)) { - flush_if_needed(); + if (flush_wait || + ((prev_size < threshold_size && ((prev_size + data.size()) >= threshold_size) && + !m_is_flushing.load(std::memory_order_relaxed)))) { + flush_if_needed(flush_wait ? 1 : -1); } return idx; } @@ -257,11 +271,17 @@ log_buffer LogDev::read(const logdev_key& key, serialized_log_record& return_rec m_vdev_jd->sync_pread(buf->bytes(), initial_read_size, key.dev_offset); auto* header = r_cast< const log_group_header* >(buf->cbytes()); - HS_REL_ASSERT_EQ(header->magic_word(), LOG_GROUP_HDR_MAGIC, "Log header corrupted with magic mismatch!"); - HS_REL_ASSERT_EQ(header->get_version(), log_group_header::header_version, "Log header version mismatch!"); - HS_REL_ASSERT_LE(header->start_idx(), key.idx, "log key offset does not match with log_idx"); - HS_REL_ASSERT_GT((header->start_idx() + header->nrecords()), key.idx, "log key offset does not match with log_idx"); - HS_LOG_ASSERT_GE(header->total_size(), header->_inline_data_offset(), "Inconsistent size data in log group"); + // THIS_LOGDEV_LOG(TRACE, "Logdev read log group header {}", *header); + HS_REL_ASSERT_EQ(header->magic_word(), LOG_GROUP_HDR_MAGIC, "Log header corrupted with magic mismatch! {} {}", + m_logdev_id, *header); + HS_REL_ASSERT_EQ(header->get_version(), log_group_header::header_version, "Log header version mismatch! {} {}", + m_logdev_id, *header); + HS_REL_ASSERT_LE(header->start_idx(), key.idx, "log key offset does not match with log_idx {} }{}", m_logdev_id, + *header); + HS_REL_ASSERT_GT((header->start_idx() + header->nrecords()), key.idx, + "log key offset does not match with log_idx {} {}", m_logdev_id, *header); + HS_LOG_ASSERT_GE(header->total_size(), header->_inline_data_offset(), "Inconsistent size data in log group {} {}", + m_logdev_id, *header); // We can only do crc match in read if we have read all the blocks. We don't want to aggressively read more data // than we need to just to compare CRC for read operation. It can be done during recovery. @@ -336,7 +356,7 @@ LogGroup* LogDev::prepare_flush(const int32_t estimated_records) { } }); - lg->finish(get_prev_crc()); + lg->finish(m_logdev_id, get_prev_crc()); if (sisl_unlikely(flushing_upto_idx == -1)) { return nullptr; } lg->m_flush_log_idx_from = m_last_flush_idx + 1; lg->m_flush_log_idx_upto = flushing_upto_idx; @@ -345,7 +365,6 @@ LogGroup* LogDev::prepare_flush(const int32_t estimated_records) { HS_DBG_ASSERT_GT(lg->header()->oob_data_offset, 0); THIS_LOGDEV_LOG(DEBUG, "Flushing upto log_idx={}", flushing_upto_idx); - THIS_LOGDEV_LOG(DEBUG, "Log Group: {}", *lg); return lg; } @@ -371,7 +390,8 @@ bool LogDev::flush_if_needed(int64_t threshold_size) { if (flush_by_size || flush_by_time) { // First off, check if we can flush in this thread itself, if not, schedule it into different thread if (!can_flush_in_this_thread()) { - iomanager.run_on_forget(logstore_service().flush_thread(), [this]() { flush_if_needed(); }); + iomanager.run_on_forget(logstore_service().flush_thread(), + [this, threshold_size]() { flush_if_needed(threshold_size); }); return false; } @@ -406,8 +426,9 @@ bool LogDev::flush_if_needed(int64_t threshold_size) { off_t offset = m_vdev_jd->alloc_next_append_blk(lg->header()->total_size()); lg->m_log_dev_offset = offset; HS_REL_ASSERT_NE(lg->m_log_dev_offset, INVALID_OFFSET, "log dev is full"); - THIS_LOGDEV_LOG(TRACE, "Flush prepared, flushing data size={} at offset={}", lg->actual_data_size(), offset); - + THIS_LOGDEV_LOG(TRACE, "Flushing log group data size={} at offset=0x{} log_group={}", lg->actual_data_size(), + to_hex(offset), *lg); + // THIS_LOGDEV_LOG(DEBUG, "Log Group: {}", *lg); do_flush(lg); return true; } else { @@ -508,8 +529,8 @@ void LogDev::unlock_flush(bool do_flush) { } if (!cb()) { // NOTE: Under this if condition DO NOT ASSUME flush lock is still being held. This is because - // callee is saying, I will unlock the flush lock on my own and before returning from cb to here, the - // callee could have schedule a job in other thread and unlock the flush. + // callee is saying, I will unlock the flush lock on my own and before returning from cb to here, + // the callee could have schedule a job in other thread and unlock the flush. std::unique_lock lk{m_block_flush_q_mutex}; THIS_LOGDEV_LOG(DEBUG, "flush cb wanted to hold onto the flush lock, so putting the {} remaining entries back " @@ -539,14 +560,14 @@ void LogDev::unlock_flush(bool do_flush) { uint64_t LogDev::truncate(const logdev_key& key) { HS_DBG_ASSERT_GE(key.idx, m_last_truncate_idx); uint64_t const num_records_to_truncate = static_cast< uint64_t >(key.idx - m_last_truncate_idx); - LOGINFO("LogDev::truncate {}", num_records_to_truncate); + THIS_LOGDEV_LOG(DEBUG, "LogDev::truncate num {} idx {}", num_records_to_truncate, key.idx); if (num_records_to_truncate > 0) { HS_PERIODIC_LOG(INFO, logstore, - "Truncating log device upto logdev {} log_id={} vdev_offset={} truncated {} log records", + "Truncating log device upto log_dev={} log_id={} vdev_offset={} truncated {} log records", m_logdev_id, key.idx, key.dev_offset, num_records_to_truncate); m_log_records->truncate(key.idx); m_vdev_jd->truncate(key.dev_offset); - LOGINFO("LogDev::truncate {}", key.idx); + THIS_LOGDEV_LOG(DEBUG, "LogDev::truncate done {} ", key.idx); m_last_truncate_idx = key.idx; { @@ -562,8 +583,8 @@ uint64_t LogDev::truncate(const logdev_key& key) { for (auto it{std::cbegin(m_garbage_store_ids)}; it != std::cend(m_garbage_store_ids);) { if (it->first > key.idx) break; - HS_PERIODIC_LOG(INFO, logstore, "Garbage collecting the log store id {} log_idx={}", it->second, - it->first); + HS_PERIODIC_LOG(DEBUG, logstore, "Garbage collecting the log_dev={} log_store={} log_idx={}", + m_logdev_id, it->second, it->first); m_logdev_meta.unreserve_store(it->second, false /* persist_now */); it = m_garbage_store_ids.erase(it); #ifdef _PRERELEASE @@ -573,11 +594,11 @@ uint64_t LogDev::truncate(const logdev_key& key) { // We can remove the rollback records of those upto which logid is getting truncated m_logdev_meta.remove_rollback_record_upto(key.idx, false /* persist_now */); - LOGINFO("LogDev::truncate remove rollback {}", key.idx); + THIS_LOGDEV_LOG(DEBUG, "LogDev::truncate remove rollback {}", key.idx); m_logdev_meta.persist(); #ifdef _PRERELEASE if (garbage_collect && iomgr_flip::instance()->test_flip("logdev_abort_after_garbage")) { - LOGINFO("logdev aborting after unreserving garbage ids"); + THIS_LOGDEV_LOG(INFO, "logdev aborting after unreserving garbage ids"); raise(SIGKILL); } #endif @@ -604,9 +625,9 @@ void LogDev::handle_unopened_log_stores(bool format) { } m_unopened_store_io.clear(); - // If there are any unopened storeids found, loop and check again if they are indeed open later. Unopened log store - // could be possible if the ids are deleted, but it is delayed to remove from store id reserver. In that case, - // do the remove from store id reserver now. + // If there are any unopened storeids found, loop and check again if they are indeed open later. Unopened log + // store could be possible if the ids are deleted, but it is delayed to remove from store id reserver. In that + // case, do the remove from store id reserver now. // TODO: At present we are assuming all unopened store ids could be removed. In future have a callback to this // start routine, which takes the list of unopened store ids and can return a new set, which can be removed. { @@ -632,7 +653,7 @@ std::shared_ptr< HomeLogStore > LogDev::create_new_log_store(bool append_mode) { HS_REL_ASSERT((it == m_id_logstore_map.end()), "store_id {}-{} already exists", m_logdev_id, store_id); m_id_logstore_map.insert(std::pair(store_id, logstore_info{.log_store = lstore, .append_mode = append_mode})); } - LOGINFO("Created log store id {}-{}", m_logdev_id, store_id); + LOGINFO("Created log store log_dev={} log_store={}", m_logdev_id, store_id); return lstore; } @@ -652,7 +673,7 @@ folly::Future< shared< HomeLogStore > > LogDev::open_log_store(logstore_id_t sto } void LogDev::remove_log_store(logstore_id_t store_id) { - LOGINFO("Removing log store id {}-{}", m_logdev_id, store_id); + LOGINFO("Removing log_dev={} log_store={}", m_logdev_id, store_id); { folly::SharedMutexWritePriority::WriteHolder holder(m_store_map_mtx); auto ret = m_id_logstore_map.erase(store_id); @@ -694,8 +715,8 @@ void LogDev::on_log_store_found(logstore_id_t store_id, const logstore_superblk& return; } - LOGINFO("Found a logstore store_id={}-{} with start seq_num={}, Creating a new HomeLogStore instance", m_logdev_id, - store_id, sb.m_first_seq_num); + LOGINFO("Found a logstore log_dev={} log_store={} with start seq_num={}, Creating a new HomeLogStore instance", + m_logdev_id, store_id, sb.m_first_seq_num); logstore_info& info = it->second; info.log_store = std::make_shared< HomeLogStore >(shared_from_this(), store_id, info.append_mode, sb.m_first_seq_num); @@ -800,17 +821,17 @@ logdev_key LogDev::do_device_truncate(bool dry_run) { } if ((min_safe_ld_key == logdev_key::out_of_bound_ld_key()) || (min_safe_ld_key.idx < 0)) { - HS_PERIODIC_LOG( - INFO, logstore, - "[Logdev={}] No log store append on any log stores, skipping device truncation, all_logstore_info:<{}>", - m_logdev_id, dbg_str); + HS_PERIODIC_LOG(INFO, logstore, + "[log_dev={}] No log store append on any log stores, skipping device truncation, " + "all_logstore_info:<{}>", + m_logdev_id, dbg_str); return min_safe_ld_key; } // Got the safest log id to truncate and actually truncate upto the safe log idx to the log device if (!dry_run) { truncate(min_safe_ld_key); } HS_PERIODIC_LOG(INFO, logstore, - "[Logdev={}] LogDevice truncate, all_logstore_info:<{}> safe log dev key to truncate={}", + "[log_dev={}] LogDevice truncate, all_logstore_info:<{}> safe log dev key to truncate={}", m_logdev_id, dbg_str, min_safe_ld_key); // We call post device truncation only to the log stores whose prepared truncation points are fully @@ -1054,7 +1075,7 @@ void LogDevMetadata::remove_rollback_record_upto(logid_t upto_id, bool persist_n uint32_t n_removed{0}; for (auto i = m_rollback_sb->num_records; i > 0; --i) { auto& rec = m_rollback_sb->at(i - 1); - LOGINFO("Removing record sb {} {}", rec.idx_range.second, upto_id); + HS_LOG(TRACE, logstore, "Removing record sb {} {}", rec.idx_range.second, upto_id); if (rec.idx_range.second <= upto_id) { m_rollback_sb->remove_ith_record(i - 1); ++n_removed; @@ -1063,7 +1084,7 @@ void LogDevMetadata::remove_rollback_record_upto(logid_t upto_id, bool persist_n if (n_removed) { for (auto it = m_rollback_info.begin(); it != m_rollback_info.end();) { - LOGINFO("Removing info {} {}", it->second.second, upto_id); + HS_LOG(TRACE, logstore, "Removing info {} {}", it->second.second, upto_id); if (it->second.second <= upto_id) { it = m_rollback_info.erase(it); } else { diff --git a/src/lib/logstore/log_dev.hpp b/src/lib/logstore/log_dev.hpp index bfb2afb1a..f356102a0 100644 --- a/src/lib/logstore/log_dev.hpp +++ b/src/lib/logstore/log_dev.hpp @@ -136,6 +136,7 @@ struct log_group_header { uint32_t footer_offset; // offset of where footer starts crc32_t prev_grp_crc; // Checksum of the previous group that was written crc32_t cur_grp_crc; // Checksum of the current group record + logdev_id_t logdev_id; // Logdev id log_group_header() : magic{LOG_GROUP_HDR_MAGIC}, version{header_version} {} log_group_header(const log_group_header&) = delete; @@ -195,9 +196,10 @@ struct fmt::formatter< homestore::log_group_header > { return fmt::format_to( ctx.out(), "magic = {} version={} n_log_records = {} start_log_idx = {} group_size = {} inline_data_offset = {} " - "oob_data_offset = {} prev_grp_crc = {} cur_grp_crc = {}", + "oob_data_offset = {} prev_grp_crc = {} cur_grp_crc = {} logdev = {}", header.magic, header.version, header.n_log_records, header.start_log_idx, header.group_size, - header.inline_data_offset, header.oob_data_offset, header.prev_grp_crc, header.cur_grp_crc); + header.inline_data_offset, header.oob_data_offset, header.prev_grp_crc, header.cur_grp_crc, + header.logdev_id); } }; @@ -253,7 +255,7 @@ class LogGroup { bool add_record(log_record& record, const int64_t log_idx); bool can_accomodate(const log_record& record) const { return (m_nrecords <= m_max_records); } - const iovec_array& finish(const crc32_t prev_crc); + const iovec_array& finish(logdev_id_t logdev_id, const crc32_t prev_crc); crc32_t compute_crc(); log_group_header* header() { return reinterpret_cast< log_group_header* >(m_cur_log_buf); } @@ -634,6 +636,18 @@ class LogDev : public std::enable_shared_from_this< LogDev > { */ void stop(); + /** + * @brief Start the flush timer. + * + */ + void start_timer(); + + /** + * @brief Stop the flush timer. + * + */ + void stop_timer(); + /** * @brief Append the data to the log device asynchronously. The buffer that is passed is expected to be valid, till * the append callback is done. @@ -648,7 +662,7 @@ class LogDev : public std::enable_shared_from_this< LogDev > { * @return logid_t : log_idx of the log of the data. */ logid_t append_async(logstore_id_t store_id, logstore_seq_num_t seq_num, const sisl::io_blob& data, - void* cb_context); + void* cb_context, bool flush_wait = false); /** * @brief Read the log id from the device offset @@ -781,7 +795,6 @@ class LogDev : public std::enable_shared_from_this< LogDev > { void device_truncate_under_lock(const std::shared_ptr< truncate_req >& treq); logdev_key do_device_truncate(bool dry_run = false); void handle_unopened_log_stores(bool format); - logdev_id_t get_id() { return m_logdev_id; } private: @@ -858,7 +871,7 @@ class LogDev : public std::enable_shared_from_this< LogDev > { uint32_t m_log_group_idx{0}; std::atomic< bool > m_flush_status = false; // Timer handle - iomgr::timer_handle_t m_flush_timer_hdl; + iomgr::timer_handle_t m_flush_timer_hdl{iomgr::null_timer_handle}; }; // LogDev } // namespace homestore diff --git a/src/lib/logstore/log_group.cpp b/src/lib/logstore/log_group.cpp index 2f54da8d8..597d97849 100644 --- a/src/lib/logstore/log_group.cpp +++ b/src/lib/logstore/log_group.cpp @@ -117,13 +117,14 @@ bool LogGroup::new_iovec_for_footer() const { return ((m_inline_data_pos + sizeof(log_group_footer)) >= m_cur_buf_len || m_oob_data_pos != 0); } -const iovec_array& LogGroup::finish(const crc32_t prev_crc) { +const iovec_array& LogGroup::finish(logdev_id_t logdev_id, const crc32_t prev_crc) { // add footer auto footer = add_and_get_footer(); m_iovecs[0].iov_len = sisl::round_up(m_iovecs[0].iov_len, m_flush_multiple_size); log_group_header* hdr = new (header()) log_group_header{}; + hdr->logdev_id = logdev_id; hdr->n_log_records = m_nrecords; hdr->prev_grp_crc = prev_crc; hdr->inline_data_offset = sizeof(log_group_header) + (m_max_records * sizeof(serialized_log_record)); diff --git a/src/lib/logstore/log_store.cpp b/src/lib/logstore/log_store.cpp index 089bb3286..48023f0e5 100644 --- a/src/lib/logstore/log_store.cpp +++ b/src/lib/logstore/log_store.cpp @@ -28,9 +28,9 @@ namespace homestore { SISL_LOGGING_DECL(logstore) -#define THIS_LOGSTORE_LOG(level, msg, ...) HS_SUBMOD_LOG(level, logstore, , "store", m_fq_name, msg, __VA_ARGS__) +#define THIS_LOGSTORE_LOG(level, msg, ...) HS_SUBMOD_LOG(level, logstore, , "log_store", m_fq_name, msg, __VA_ARGS__) #define THIS_LOGSTORE_PERIODIC_LOG(level, msg, ...) \ - HS_PERIODIC_DETAILED_LOG(level, logstore, "store", m_fq_name, , , msg, __VA_ARGS__) + HS_PERIODIC_DETAILED_LOG(level, logstore, "log_store", m_fq_name, , , msg, __VA_ARGS__) HomeLogStore::HomeLogStore(std::shared_ptr< LogDev > logdev, logstore_id_t id, bool append_mode, logstore_seq_num_t start_lsn) : @@ -39,7 +39,7 @@ HomeLogStore::HomeLogStore(std::shared_ptr< LogDev > logdev, logstore_id_t id, b m_records{"HomeLogStoreRecords", start_lsn - 1}, m_append_mode{append_mode}, m_seq_num{start_lsn}, - m_fq_name{fmt::format("{}.{}", logdev->get_id(), id)}, + m_fq_name{fmt::format("{} log_dev={}", id, logdev->get_id())}, m_metrics{logstore_service().metrics()} { m_truncation_barriers.reserve(10000); m_safe_truncation_boundary.ld_key = m_logdev->get_last_flush_ld_key(); @@ -57,18 +57,19 @@ bool HomeLogStore::write_sync(logstore_seq_num_t seq_num, const sisl::io_blob& b bool ret{false}; }; auto ctx = std::make_shared< Context >(); - this->write_async(seq_num, b, nullptr, - [seq_num, this, ctx](homestore::logstore_seq_num_t seq_num_cb, - [[maybe_unused]] const sisl::io_blob& b, homestore::logdev_key ld_key, - [[maybe_unused]] void* cb_ctx) { - HS_DBG_ASSERT((ld_key && seq_num == seq_num_cb), "Write_Async failed or corrupted"); - { - std::unique_lock< std::mutex > lk{ctx->write_mutex}; - ctx->write_done = true; - ctx->ret = true; - } - ctx->write_cv.notify_one(); - }); + this->write_async( + seq_num, b, nullptr, + [seq_num, this, ctx](homestore::logstore_seq_num_t seq_num_cb, [[maybe_unused]] const sisl::io_blob& b, + homestore::logdev_key ld_key, [[maybe_unused]] void* cb_ctx) { + HS_DBG_ASSERT((ld_key && seq_num == seq_num_cb), "Write_Async failed or corrupted"); + { + std::unique_lock< std::mutex > lk{ctx->write_mutex}; + ctx->write_done = true; + ctx->ret = true; + } + ctx->write_cv.notify_one(); + }, + true /* flush_wait */); { std::unique_lock< std::mutex > lk{ctx->write_mutex}; @@ -95,14 +96,15 @@ void HomeLogStore::write_async(logstore_req* req, const log_req_comp_cb_t& cb) { m_records.create(req->seq_num); COUNTER_INCREMENT(m_metrics, logstore_append_count, 1); HISTOGRAM_OBSERVE(m_metrics, logstore_record_size, req->data.size()); - m_logdev->append_async(m_store_id, req->seq_num, req->data, static_cast< void* >(req)); + m_logdev->append_async(m_store_id, req->seq_num, req->data, static_cast< void* >(req), req->flush_wait); } void HomeLogStore::write_async(logstore_seq_num_t seq_num, const sisl::io_blob& b, void* cookie, - const log_write_comp_cb_t& cb) { + const log_write_comp_cb_t& cb, bool flush_wait) { // Form an internal request and issue the write auto* req = logstore_req::make(this, seq_num, b, true /* is_write_req */); req->cookie = cookie; + req->flush_wait = flush_wait; write_async(req, [cb](logstore_req* req, logdev_key written_lkey) { if (cb) { cb(req->seq_num, req->data, written_lkey, req->cookie); } @@ -121,7 +123,7 @@ log_buffer HomeLogStore::read_sync(logstore_seq_num_t seq_num) { // If seq_num has not been flushed yet, but issued, then we flush them before reading auto const s = m_records.status(seq_num); if (s.is_out_of_range || s.is_hole) { - // THIS_LOGSTORE_LOG(DEBUG, "ld_key not valid {}", seq_num); + // THIS_LOGSTORE_LOG(ERROR, "ld_key not valid {}", seq_num); throw std::out_of_range("key not valid"); } else if (!s.is_completed) { THIS_LOGSTORE_LOG(TRACE, "Reading lsn={}:{} before flushed, doing flush first", m_store_id, seq_num); diff --git a/src/lib/logstore/log_store_service.cpp b/src/lib/logstore/log_store_service.cpp index 90e35cf06..68f08d275 100644 --- a/src/lib/logstore/log_store_service.cpp +++ b/src/lib/logstore/log_store_service.cpp @@ -60,6 +60,14 @@ folly::Future< std::error_code > LogStoreService::create_vdev(uint64_t size, uin hs_vdev_context hs_ctx; hs_ctx.type = hs_vdev_type_t::LOGDEV_VDEV; +#ifdef _PRERELEASE + auto min_size = iomgr_flip::instance()->get_test_flip< long >("set_minimum_chunk_size"); + if (min_size) { + chunk_size = min_size.get(); + LOGINFO("Flip set_minimum_chunk_size is enabled, min_chunk_size now is {}", chunk_size); + } +#endif + // reason we set alloc_type/chunk_sel_type here instead of by homestore logstore service consumer is because // consumer doesn't care or understands the underlying alloc/chunkSel for this service, if this changes in the // future, we can let consumer set it by then; @@ -115,7 +123,6 @@ logdev_id_t LogStoreService::create_new_logdev() { auto logdev = create_new_logdev_internal(logdev_id); logdev->start(true /* format */, m_logdev_vdev.get()); COUNTER_INCREMENT(m_metrics, logdevs_count, 1); - LOGINFO("Created log dev id {}", logdev_id); return logdev_id; } diff --git a/src/lib/logstore/log_stream.cpp b/src/lib/logstore/log_stream.cpp index afa7e67e9..105a7714e 100644 --- a/src/lib/logstore/log_stream.cpp +++ b/src/lib/logstore/log_stream.cpp @@ -29,6 +29,8 @@ log_stream_reader::log_stream_reader(off_t device_cursor, JournalVirtualDev* vde m_vdev_jd{std::move(vdev_jd)}, m_first_group_cursor{device_cursor}, m_read_size_multiple{read_size_multiple} { + // We set the journal descriptor seek_cursor here so that + // sync_next_read reads from the seek_cursor. m_vdev_jd->lseek(m_first_group_cursor); } @@ -44,7 +46,7 @@ sisl::byte_view log_stream_reader::next_group(off_t* out_dev_offset) { do { m_cur_log_buf = read_next_bytes(std::max(min_needed, bulk_read_size)); if (m_cur_log_buf.size() == 0) { - LOGINFOMOD(logstore, "Logdev data empty"); + LOGINFOMOD(logstore, "Logdev data empty log_dev={}", m_vdev_jd->logdev_id()); return {}; } } while (m_cur_log_buf.size() < sizeof(log_group_header)); @@ -54,56 +56,65 @@ sisl::byte_view log_stream_reader::next_group(off_t* out_dev_offset) { HS_REL_ASSERT_GE(m_cur_log_buf.size(), m_read_size_multiple); const auto* header = r_cast< log_group_header const* >(m_cur_log_buf.bytes()); if (header->magic_word() != LOG_GROUP_HDR_MAGIC) { - LOGINFOMOD(logstore, "Logdev data not seeing magic at pos {}, must have come to end of logdev", - m_vdev_jd->dev_offset(m_cur_read_bytes)); + LOGINFOMOD(logstore, "Logdev data not seeing magic at pos {}, must have come to end of log_dev={}", + m_vdev_jd->dev_offset(m_cur_read_bytes), m_vdev_jd->logdev_id()); *out_dev_offset = m_vdev_jd->dev_offset(m_cur_read_bytes); - // move it by dma boundary if header is not valid m_prev_crc = 0; m_cur_read_bytes += m_read_size_multiple; return ret_buf; } - if (header->total_size() > m_cur_log_buf.size()) { - LOGINFOMOD(logstore, "Logstream group size {} is more than available buffer size {}, reading from store", - header->total_size(), m_cur_log_buf.size()); - // Bigger group size than needed bytes, read again - min_needed = sisl::round_up(header->total_size(), m_read_size_multiple); - goto read_again; - } - - LOGTRACEMOD(logstore, - "Logstream read log group of size={} nrecords={} m_cur_log_dev_offset {} buf size " - "remaining {} ", - header->total_size(), header->nrecords(), m_vdev_jd->dev_offset(m_cur_read_bytes), - m_cur_log_buf.size()); - + // Because reuse chunks without cleaning up, we could get chunks used by other logdev's + // and it can happen that log group headers couldnt match. In that case check we dont error + // if its the last chunk or not with is_offset_at_last_chunk else raise assert. // compare it with prev crc if (m_prev_crc != 0 && m_prev_crc != header->prev_grp_crc) { // we reached at the end - LOGINFOMOD(logstore, "we have reached the end. crc doesn't match with the prev crc {}", - m_vdev_jd->dev_offset(m_cur_read_bytes)); + LOGINFOMOD(logstore, + "we have reached the end. crc doesn't match offset {} prev crc {} header prev crc {} log_dev={}", + m_vdev_jd->dev_offset(m_cur_read_bytes), header->prev_grp_crc, m_prev_crc, m_vdev_jd->logdev_id()); *out_dev_offset = m_vdev_jd->dev_offset(m_cur_read_bytes); - + if (!m_vdev_jd->is_offset_at_last_chunk(m_cur_read_bytes)) { + HS_REL_ASSERT(0, "data is corrupted {}", m_vdev_jd->logdev_id()); + } // move it by dma boundary if header is not valid m_prev_crc = 0; m_cur_read_bytes += m_read_size_multiple; return ret_buf; } + if (header->total_size() > m_cur_log_buf.size()) { + LOGTRACEMOD(logstore, "Logstream group size {} is more than available buffer size {}, reading from log_dev={}", + header->total_size(), m_cur_log_buf.size(), m_vdev_jd->logdev_id()); + // Bigger group size than needed bytes, read again + min_needed = sisl::round_up(header->total_size(), m_read_size_multiple); + goto read_again; + } + + LOGTRACEMOD(logstore, + "Logstream read log group of size={} nrecords={} journal_dev_offset {} cur_read_bytes {} buf size " + "remaining {} log_dev={}", + header->total_size(), header->nrecords(), m_vdev_jd->dev_offset(m_cur_read_bytes), m_cur_read_bytes, + m_cur_log_buf.size(), m_vdev_jd->logdev_id()); + // At this point data seems to be valid. Lets see if a data is written completely by comparing the footer const auto* footer = r_cast< log_group_footer* >((uint64_t)m_cur_log_buf.bytes() + header->footer_offset); if (footer->magic != LOG_GROUP_FOOTER_MAGIC || footer->start_log_idx != header->start_log_idx) { LOGINFOMOD(logstore, - "last write is not completely written. footer magic {} footer start_log_idx {} header log indx {}", - footer->magic, footer->start_log_idx, header->start_log_idx); + "last write is not completely written. footer magic {} footer start_log_idx {} header log indx {} " + "log_dev={}", + footer->magic, footer->start_log_idx, header->start_log_idx, m_vdev_jd->logdev_id()); *out_dev_offset = m_vdev_jd->dev_offset(m_cur_read_bytes); - // move it by dma boundary if header is not valid m_prev_crc = 0; m_cur_read_bytes += m_read_size_multiple; + if (!m_vdev_jd->is_offset_at_last_chunk(m_cur_read_bytes)) { + HS_REL_ASSERT(0, "data is corrupted {}", m_vdev_jd->logdev_id()); + } return ret_buf; } + HS_DBG_ASSERT_EQ(footer->version, log_group_footer::footer_version, "Log footer version mismatch"); // verify crc with data @@ -112,8 +123,9 @@ sisl::byte_view log_stream_reader::next_group(off_t* out_dev_offset) { (header->total_size() - sizeof(log_group_header))); if (cur_crc != header->cur_grp_crc) { /* This is a valid entry so crc should match */ - HS_REL_ASSERT(0, "data is corrupted"); - LOGINFOMOD(logstore, "crc doesn't match {}", m_vdev_jd->dev_offset(m_cur_read_bytes)); + LOGERRORMOD(logstore, "crc doesn't match {} log_dev={}", m_vdev_jd->dev_offset(m_cur_read_bytes), + m_vdev_jd->logdev_id()); + HS_REL_ASSERT(0, "data is corrupted {}", m_vdev_jd->logdev_id()); *out_dev_offset = m_vdev_jd->dev_offset(m_cur_read_bytes); // move it by dma boundary if header is not valid @@ -141,16 +153,20 @@ sisl::byte_view log_stream_reader::group_in_next_page() { sisl::byte_view log_stream_reader::read_next_bytes(uint64_t nbytes) { // TO DO: Might need to address alignment based on data or fast type + const auto prev_pos = m_vdev_jd->seeked_pos(); + auto sz_to_read = m_vdev_jd->sync_next_read(nullptr, nbytes); + if (sz_to_read == 0) { return sisl::byte_view{m_cur_log_buf}; } + auto out_buf = - hs_utils::make_byte_array(nbytes + m_cur_log_buf.size(), true, sisl::buftag::logread, m_vdev->align_size()); + hs_utils::make_byte_array(sz_to_read + m_cur_log_buf.size(), true, sisl::buftag::logread, m_vdev->align_size()); if (m_cur_log_buf.size()) { memcpy(out_buf->bytes(), m_cur_log_buf.bytes(), m_cur_log_buf.size()); } - const auto prev_pos = m_vdev_jd->seeked_pos(); - auto sz_read = m_vdev_jd->sync_next_read(out_buf->bytes() + m_cur_log_buf.size(), nbytes); - if (sz_read == 0) { return {}; } + auto sz_read = m_vdev_jd->sync_next_read(out_buf->bytes() + m_cur_log_buf.size(), sz_to_read); + assert(sz_read == sz_to_read); - LOGINFOMOD(logstore, "LogStream read {} bytes from vdev offset {} and vdev cur offset {}", nbytes, prev_pos, - m_vdev_jd->seeked_pos()); + LOGTRACEMOD(logstore, + "LogStream read {} bytes req bytes {} from vdev prev offset {} and vdev cur offset {} log_dev={}", + sz_read, nbytes, prev_pos, m_vdev_jd->seeked_pos(), m_vdev_jd->logdev_id()); return sisl::byte_view{out_buf}; } } // namespace homestore diff --git a/src/lib/replication/log_store/home_raft_log_store.cpp b/src/lib/replication/log_store/home_raft_log_store.cpp index 7444169ee..41b2ba1c4 100644 --- a/src/lib/replication/log_store/home_raft_log_store.cpp +++ b/src/lib/replication/log_store/home_raft_log_store.cpp @@ -64,11 +64,11 @@ HomeRaftLogStore::HomeRaftLogStore(logdev_id_t logdev_id, logstore_id_t logstore m_log_store = logstore_service().create_new_log_store(m_logdev_id, true); if (!m_log_store) { throw std::runtime_error("Failed to create log store"); } m_logstore_id = m_log_store->get_store_id(); - LOGDEBUGMOD(replication, "Opened new home log dev = {} store id={}", m_logdev_id, m_logstore_id); + LOGDEBUGMOD(replication, "Opened new home log_dev={} log_store={}", m_logdev_id, m_logstore_id); } else { m_logdev_id = logdev_id; m_logstore_id = logstore_id; - LOGDEBUGMOD(replication, "Opening existing home log dev = {} store id={}", m_logdev_id, logstore_id); + LOGDEBUGMOD(replication, "Opening existing home log_dev={} log_store={}", m_logdev_id, logstore_id); logstore_service().open_logdev(m_logdev_id); logstore_service().open_log_store(m_logdev_id, logstore_id, true).thenValue([this](auto log_store) { m_log_store = std::move(log_store); diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index 327660270..73ebf9fd1 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -68,9 +68,11 @@ RaftReplDev::RaftReplDev(RaftReplService& svc, superblk< raft_repl_dev_superblk m_rd_sb.write(); } - RD_LOG(INFO, "Started {} RaftReplDev group_id={}, replica_id={}, raft_server_id={} commited_lsn={} next_dsn={}", + RD_LOG(INFO, + "Started {} RaftReplDev group_id={}, replica_id={}, raft_server_id={} commited_lsn={} next_dsn={} " + "log_dev={} log_store={}", (load_existing ? "Existing" : "New"), group_id_str(), my_replica_id_str(), m_raft_server_id, - m_commit_upto_lsn.load(), m_next_dsn.load()); + m_commit_upto_lsn.load(), m_next_dsn.load(), m_rd_sb->logdev_id, m_rd_sb->logstore_id); m_msg_mgr.bind_data_service_request(PUSH_DATA, m_group_id, bind_this(RaftReplDev::on_push_data_received, 1)); m_msg_mgr.bind_data_service_request(FETCH_DATA, m_group_id, bind_this(RaftReplDev::on_fetch_data_received, 1)); @@ -466,7 +468,7 @@ void RaftReplDev::check_and_fetch_remote_data(std::vector< repl_req_ptr_t >* rre void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs) { if (rreqs.size() == 0) { return; } - std::vector<::flatbuffers::Offset< RequestEntry > > entries; + std::vector< ::flatbuffers::Offset< RequestEntry > > entries; entries.reserve(rreqs.size()); shared< flatbuffers::FlatBufferBuilder > builder = std::make_shared< flatbuffers::FlatBufferBuilder >(); diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt index e5f4ea0b2..e3b40fc90 100644 --- a/src/tests/CMakeLists.txt +++ b/src/tests/CMakeLists.txt @@ -85,6 +85,10 @@ if (${io_tests}) target_sources(test_log_store PRIVATE test_log_store.cpp) target_link_libraries(test_log_store hs_logdev homestore ${COMMON_TEST_DEPS} ) + add_executable(test_log_dev) + target_sources(test_log_dev PRIVATE test_log_dev.cpp) + target_link_libraries(test_log_dev hs_logdev homestore ${COMMON_TEST_DEPS} ) + set(TEST_METABLK_SOURCE_FILES test_meta_blk_mgr.cpp) add_executable(test_meta_blk_mgr ${TEST_METABLK_SOURCE_FILES}) target_link_libraries(test_meta_blk_mgr homestore ${COMMON_TEST_DEPS} GTest::gmock) @@ -108,7 +112,8 @@ if (${io_tests}) can_build_epoll_io_tests(epoll_tests) if(${epoll_tests}) - # add_test(NAME LogStore-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_log_store) + add_test(NAME LogDev-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_log_dev) + add_test(NAME LogStore-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_log_store) add_test(NAME MetaBlkMgr-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_meta_blk_mgr) add_test(NAME DataService-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_data_service) @@ -120,6 +125,7 @@ if (${io_tests}) can_build_spdk_io_tests(spdk_tests) if(${spdk_tests}) add_test(NAME LogStore-Spdk COMMAND ${CMAKE_BINARY_DIR}/bin/test_log_store -- --spdk "true") + add_test(NAME LogDev-Spdk COMMAND ${CMAKE_BINARY_DIR}/bin/test_log_dev -- --spdk "true") add_test(NAME MetaBlkMgr-Spdk COMMAND ${CMAKE_BINARY_DIR}/bin/test_meta_blk_mgr -- --spdk "true") add_test(NAME DataSerice-Spdk COMMAND ${CMAKE_BINARY_DIR}/bin/test_data_service -- --spdk "true") add_test(NAME SoloReplDev-Spdk COMMAND ${CMAKE_BINARY_DIR}/bin/test_solo_repl_dev -- --spdk "true") diff --git a/src/tests/log_store_benchmark.cpp b/src/tests/log_store_benchmark.cpp index d6fb50a4c..20b81f302 100644 --- a/src/tests/log_store_benchmark.cpp +++ b/src/tests/log_store_benchmark.cpp @@ -169,7 +169,7 @@ static void test_append(benchmark::State& state) { static void setup() { test_common::HSTestHelper::start_homestore( - "test_log_store", {{HS_SERVICE::META, {.size_pct = 5.0}}, {HS_SERVICE::LOG, {.size_pct = 87.0}}}); + "test_log_store_bench", {{HS_SERVICE::META, {.size_pct = 5.0}}, {HS_SERVICE::LOG, {.size_pct = 87.0}}}); } static void teardown() { test_common::HSTestHelper::shutdown_homestore(); } diff --git a/src/tests/test_common/homestore_test_common.hpp b/src/tests/test_common/homestore_test_common.hpp index 33b6d63a6..1890339bf 100644 --- a/src/tests/test_common/homestore_test_common.hpp +++ b/src/tests/test_common/homestore_test_common.hpp @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -170,6 +171,7 @@ class HSTestHelper { shared< ReplApplication > repl_app{nullptr}; chunk_num_t num_chunks{1}; uint64_t chunk_size{32 * 1024 * 1024}; // Chunk size in MB. + uint64_t min_chunk_size{0}; vdev_size_type_t vdev_size_type{vdev_size_type_t::VDEV_SIZE_STATIC}; }; @@ -246,6 +248,11 @@ class HSTestHelper { bool need_format = hsi->start(hs_input_params{.devices = device_info, .app_mem_size = app_mem_size}, std::move(cb)); + // We need to set the min chunk size before homestore format + if (svc_params[HS_SERVICE::LOG].min_chunk_size != 0) { + set_min_chunk_size(svc_params[HS_SERVICE::LOG].min_chunk_size); + } + if (need_format) { hsi->format_and_start({{HS_SERVICE::META, {.size_pct = svc_params[HS_SERVICE::META].size_pct}}, {HS_SERVICE::LOG, @@ -278,6 +285,21 @@ class HSTestHelper { s_dev_names.clear(); } + static void set_min_chunk_size(uint64_t chunk_size) { +#ifdef _PRERELEASE + LOGINFO("Set minimum chunk size {}", chunk_size); + flip::FlipClient* fc = iomgr_flip::client_instance(); + + flip::FlipFrequency freq; + freq.set_count(2000000); + freq.set_percent(100); + + flip::FlipCondition dont_care_cond; + fc->create_condition("", flip::Operator::DONT_CARE, (int)1, &dont_care_cond); + fc->inject_retval_flip< long >("set_minimum_chunk_size", {dont_care_cond}, freq, chunk_size); +#endif + } + static void fill_data_buf(uint8_t* buf, uint64_t size, uint64_t pattern = 0) { uint64_t* ptr = r_cast< uint64_t* >(buf); for (uint64_t i = 0ul; i < size / sizeof(uint64_t); ++i) { diff --git a/src/tests/test_device_manager.cpp b/src/tests/test_device_manager.cpp index 13bef9d6b..6e1cdcfcc 100644 --- a/src/tests/test_device_manager.cpp +++ b/src/tests/test_device_manager.cpp @@ -174,6 +174,62 @@ TEST_F(DeviceMgrTest, StripedVDevCreation) { this->validate_striped_vdevs(); } +TEST_F(DeviceMgrTest, CreateChunk) { + // Create dynamically chunks and verify no two chunks ahve same start offset. + uint64_t avail_size{0}; + for (auto& pdev : m_pdevs) { + avail_size += pdev->data_size(); + } + + LOGINFO("Step 1: Creating test_vdev with size={}", in_bytes(avail_size)); + auto vdev = + m_dmgr->create_vdev(homestore::vdev_parameters{.vdev_name = "test_vdev", + .size_type = vdev_size_type_t::VDEV_SIZE_DYNAMIC, + .vdev_size = avail_size, + .num_chunks = 0, + .blk_size = 512, + .chunk_size = 512 * 1024, + .dev_type = HSDevType::Data, + .alloc_type = blk_allocator_type_t::none, + .chunk_sel_type = chunk_selector_type_t::NONE, + .multi_pdev_opts = vdev_multi_pdev_opts_t::ALL_PDEV_STRIPED, + .context_data = sisl::blob{}}); + + auto num_chunks = 10; + LOGINFO("Step 2: Creating {} chunks", num_chunks); + std::unordered_map< uint32_t, chunk_info > chunk_info_map; + std::unordered_set< uint64_t > chunk_start; + + for (int i = 0; i < num_chunks; i++) { + auto chunk = m_dmgr->create_chunk(HSDevType::Data, vdev->info().vdev_id, 512 * 1024, {}); + chunk_info_map[chunk->chunk_id()] = chunk->info(); + auto [_, inserted] = chunk_start.insert(chunk->info().chunk_start_offset); + ASSERT_EQ(inserted, true) << "chunk start duplicate " << chunk->info().chunk_start_offset; + } + + LOGINFO("Step 3: Restarting homestore"); + this->restart(); + + LOGINFO("Step 4: Creating additional {} chunks", num_chunks); + for (int i = 0; i < num_chunks; i++) { + auto chunk = m_dmgr->create_chunk(HSDevType::Data, vdev->info().vdev_id, 512 * 1024, {}); + chunk_info_map[chunk->chunk_id()] = chunk->info(); + auto [_, inserted] = chunk_start.insert(chunk->info().chunk_start_offset); + ASSERT_EQ(inserted, true) << "chunk start duplicate " << chunk->info().chunk_start_offset; + } + + chunk_start.clear(); + auto chunk_vec = m_dmgr->get_chunks(); + ASSERT_EQ(chunk_vec.size(), num_chunks * 2); + for (const auto& chunk : chunk_vec) { + ASSERT_EQ(chunk->info().chunk_start_offset, chunk_info_map[chunk->chunk_id()].chunk_start_offset) + << "Chunk offsets mismatch"; + auto [_, inserted] = chunk_start.insert(chunk->info().chunk_start_offset); + ASSERT_EQ(inserted, true) << "chunk start duplicate " << chunk->info().chunk_start_offset; + } + vdev.reset(); +} + int main(int argc, char* argv[]) { SISL_OPTIONS_LOAD(argc, argv, logging, test_device_manager, iomgr); ::testing::InitGoogleTest(&argc, argv); diff --git a/src/tests/test_log_dev.cpp b/src/tests/test_log_dev.cpp index b41a5f9a5..2a1bba56b 100644 --- a/src/tests/test_log_dev.cpp +++ b/src/tests/test_log_dev.cpp @@ -13,135 +13,339 @@ * specific language governing permissions and limitations under the License. * *********************************************************************************/ -#include + +#include #include #include -#include -#include #include #include -#include +#include +#include +#include +#include #include +#include +#include #include #include +#include +#include +#include +#include "common/homestore_utils.hpp" +#include "common/homestore_assert.hpp" #include "logstore/log_dev.hpp" #include "test_common/homestore_test_common.hpp" using namespace homestore; + RCU_REGISTER_INIT SISL_LOGGING_INIT(HOMESTORE_LOG_MODS) +SISL_OPTIONS_ENABLE(logging, test_log_dev, iomgr, test_common_setup) +SISL_LOGGING_DECL(test_log_dev) -std::vector< logdev_key > s_logdev_keys; -static uint64_t first_offset{~static_cast< uint64_t >(0)}; +std::vector< std::string > test_common::HSTestHelper::s_dev_names; -static void on_append_completion(const logdev_key lkey, void* const ctx) { - s_logdev_keys.push_back(lkey); - LOGINFO("Append completed with log_idx = {} offset = {}", lkey.idx, lkey.dev_offset); - if (first_offset == ~static_cast< uint64_t >(0)) { first_offset = lkey.dev_offset; } -} +struct test_log_data { + test_log_data() = default; + test_log_data(const test_log_data&) = delete; + test_log_data(test_log_data&&) noexcept = delete; + test_log_data& operator=(const test_log_data&) = delete; + test_log_data& operator=(test_log_data&&) noexcept = delete; + ~test_log_data() = default; -static void on_log_found(const logdev_key lkey, const log_buffer buf) { - s_logdev_keys.push_back(lkey); - LOGINFO("Found a log with log_idx = {} offset = {}", lkey.idx, lkey.dev_offset); -} + uint32_t size; -[[nodiscard]] static std::shared_ptr< iomgr::ioMgr > start_homestore(const uint32_t ndevices, const uint64_t dev_size, - const uint32_t nthreads) { - std::vector< dev_info > device_info; - // these should be static so that they stay in scope in the lambda in case function ends before lambda completes - static std::mutex start_mutex; - static std::condition_variable cv; - static bool inited; - - inited = false; - LOGINFO("creating {} device files with each of size {} ", ndevices, dev_size); - for (uint32_t i{0}; i < ndevices; ++i) { - const std::filesystem::path fpath{"/tmp/" + std::to_string(i + 1)}; - std::ofstream ofs{fpath.string(), std::ios::binary | std::ios::out}; - std::filesystem::resize_file(fpath, dev_size); - device_info.emplace_back(std::filesystem::canonical(fpath).string(), HSDevType::Data); + uint8_t* get_data() { return uintptr_cast(this) + sizeof(test_log_data); }; + uint8_t const* get_data_const() const { return r_cast< uint8_t const* >(this) + sizeof(test_log_data); } + const uint8_t* get_data() const { return r_cast< const uint8_t* >(this) + sizeof(test_log_data); } + uint32_t total_size() const { return sizeof(test_log_data) + size; } + std::string get_data_str() const { + return std::string(r_cast< const char* >(get_data_const()), static_cast< size_t >(size)); } +}; - LOGINFO("Creating iomgr with {} threads", nthreads); - auto iomgr_obj{std::make_shared< iomgr::ioMgr >(2, nthreads)}; - - const uint64_t cache_size{((ndevices * dev_size) * 10) / 100}; - LOGINFO("Initialize and start HomeBlks with cache_size = {}", cache_size); - - boost::uuids::string_generator gen; - init_params params; - params.open_flags = homestore::io_flag::DIRECT_IO; - params.min_virtual_page_size = 4096; - params.cache_size = cache_size; - params.devices = device_info; - params.iomgr = iomgr_obj; - params.init_done_cb = [&iomgr_obj, &tl_start_mutex = start_mutex, &tl_cv = cv, - &tl_inited = inited](std::error_condition err, const out_params& params) { - iomgr_obj->start(); - LOGINFO("HomeBlks Init completed"); - { - std::unique_lock< std::mutex > lk{tl_start_mutex}; - tl_inited = true; - } - tl_cv.notify_one(); - }; - params.vol_mounted_cb = [](const VolumePtr& vol_obj, vol_state state) {}; - params.vol_state_change_cb = [](const VolumePtr& vol, vol_state old_state, vol_state new_state) {}; - params.vol_found_cb = [](boost::uuids::uuid uuid) -> bool { return true; }; +class LogDevTest : public ::testing::Test { +public: + const std::map< uint32_t, test_common::HSTestHelper::test_params > svc_params = {}; + static constexpr uint32_t max_data_size = 1024; + uint64_t s_max_flush_multiple = 0; - test_common::set_random_http_port(); - VolInterface::init(params); + virtual void SetUp() override { start_homestore(); } - { - std::unique_lock< std::mutex > lk{start_mutex}; - cv.wait(lk, [] { return inited; }); + void start_homestore(bool restart = false, hs_before_services_starting_cb_t starting_cb = nullptr) { + auto const ndevices = SISL_OPTIONS["num_devs"].as< uint32_t >(); + auto const dev_size = SISL_OPTIONS["dev_size_mb"].as< uint64_t >() * 1024 * 1024; + test_common::HSTestHelper::start_homestore("test_log_dev", + { + {HS_SERVICE::META, {.size_pct = 15.0}}, + {HS_SERVICE::LOG, + {.size_pct = 50.0, + .chunk_size = 8 * 1024 * 1024, + .min_chunk_size = 8 * 1024 * 1024, + .vdev_size_type = vdev_size_type_t::VDEV_SIZE_DYNAMIC}}, + }, + starting_cb, restart); } - return iomgr_obj; -} -SISL_OPTIONS_ENABLE(logging, test_log_dev) -SISL_OPTION_GROUP(test_log_dev, - (num_threads, "", "num_threads", "number of threads", - ::cxxopts::value< uint32_t >()->default_value("2"), "number"), - (num_devs, "", "num_devs", "number of devices to create", - ::cxxopts::value< uint32_t >()->default_value("2"), "number"), - (dev_size_mb, "", "dev_size_mb", "size of each device in MB", - ::cxxopts::value< uint64_t >()->default_value("5120"), "number")); + virtual void TearDown() override { test_common::HSTestHelper::shutdown_homestore(); } -int main(int argc, char* argv[]) { - SISL_OPTIONS_LOAD(argc, argv, logging, test_log_dev); - sisl::logging::SetLogger("test_log_dev"); - spdlog::set_pattern("[%D %T%z] [%^%l%$] [%n] [%t] %v"); + test_log_data* prepare_data(const logstore_seq_num_t lsn, bool& io_memory) { + static thread_local std::random_device rd{}; + static thread_local std::default_random_engine re{rd()}; + uint32_t sz{0}; + uint8_t* raw_buf{nullptr}; + + // Generate buffer of random size and fill with specific data + std::uniform_int_distribution< uint8_t > gen_percentage{0, 99}; + std::uniform_int_distribution< uint32_t > gen_data_size{0, max_data_size - 1}; + if (gen_percentage(re) < static_cast< uint8_t >(10)) { + // 10% of data is dma'ble aligned boundary + const auto alloc_sz = sisl::round_up(gen_data_size(re) + sizeof(test_log_data), s_max_flush_multiple); + raw_buf = iomanager.iobuf_alloc(dma_address_boundary, alloc_sz); + sz = alloc_sz - sizeof(test_log_data); + io_memory = true; + } else { + sz = gen_data_size(re); + raw_buf = static_cast< uint8_t* >(std::malloc(sizeof(test_log_data) + sz)); + io_memory = false; + } - auto iomgr_obj{start_homestore(SISL_OPTIONS["num_devs"].as< uint32_t >(), - SISL_OPTIONS["dev_size_mb"].as< uint64_t >() * 1024 * 1024, - SISL_OPTIONS["num_threads"].as< uint32_t >())}; + test_log_data* d = new (raw_buf) test_log_data(); + d->size = sz; - std::array< std::string, 1024 > s; - auto ld{LogDev::instance()}; - ld->register_append_cb(on_append_completion); - ld->register_logfound_cb(on_log_found); + assert(uintptr_cast(d) == raw_buf); - for ( - size_t i{0}; (i < std::min< size_t >(195, s.size()); ++i) { - s[i] = std::to_string(i); - ld->append_async(0, 0, {reinterpret_cast< const uint8_t* >(s[i].c_str()), s[i].size() + 1}, nullptr); + const char c = static_cast< char >((lsn % 94) + 33); + std::memset(voidptr_cast(d->get_data()), c, static_cast< size_t >(d->size)); + return d; } - size_t i{0}; - for (const auto& lk : s_logdev_keys) { - const auto b{ld->read(lk)}; - const auto exp_val{std::to_string(i)}; - const auto actual_val{reinterpret_cast< const char* >(b.data()), static_cast< size_t >(b.size())}; - if (actual_val != exp_val) { - LOGERROR("Error in reading value for log_idx {} actual_val={} expected_val={}", i, actual_val, exp_val); + void validate_data(std::shared_ptr< HomeLogStore > log_store, const test_log_data* d, + const logstore_seq_num_t lsn) { + const char c = static_cast< char >((lsn % 94) + 33); + const std::string actual = d->get_data_str(); + const std::string expected(static_cast< size_t >(d->size), + c); // needs to be () because of same reason as vector + ASSERT_EQ(actual, expected) << "Data mismatch for LSN=" << log_store->get_store_id() << ":" << lsn + << " size=" << d->size; + } + + void insert_sync(std::shared_ptr< HomeLogStore > log_store, logstore_seq_num_t lsn) { + bool io_memory{false}; + auto* d = prepare_data(lsn, io_memory); + const bool succ = log_store->write_sync(lsn, {uintptr_cast(d), d->total_size(), false}); + EXPECT_TRUE(succ); + LOGINFO("Written sync data for LSN -> {}:{}", log_store->get_store_id(), lsn); + if (io_memory) { + iomanager.iobuf_free(uintptr_cast(d)); } else { - LOGINFO("Read value {} for log_idx {}", actual_val, i); + std::free(voidptr_cast(d)); + } + } + + void kickstart_inserts(std::shared_ptr< HomeLogStore > log_store, logstore_seq_num_t& cur_lsn, int64_t batch) { + auto last = cur_lsn + batch; + for (; cur_lsn < last; cur_lsn++) { + insert_sync(log_store, cur_lsn); + } + } + + void read_verify(std::shared_ptr< HomeLogStore > log_store, logstore_seq_num_t lsn) { + auto b = log_store->read_sync(lsn); + auto* d = r_cast< test_log_data const* >(b.bytes()); + ASSERT_EQ(d->total_size(), b.size()) << "Size Mismatch for lsn=" << log_store->get_store_id() << ":" << lsn; + validate_data(log_store, d, lsn); + } + + void read_all_verify(std::shared_ptr< HomeLogStore > log_store) { + const auto trunc_upto = log_store->truncated_upto(); + const auto upto = log_store->get_contiguous_completed_seq_num(-1); + + for (std::remove_const_t< decltype(trunc_upto) > i{0}; i <= trunc_upto; ++i) { + ASSERT_THROW(log_store->read_sync(i), std::out_of_range) + << "Expected std::out_of_range exception for lsn=" << log_store->get_store_id() << ":" << i + << " but not thrown"; + } + + for (auto lsn = trunc_upto + 1; lsn < upto; lsn++) { + try { + read_verify(log_store, lsn); + } catch (const std::exception& ex) { + logstore_seq_num_t trunc_upto = 0; + std::mutex mtx; + std::condition_variable cv; + bool get_trunc_upto = false; + log_store->get_logdev()->run_under_flush_lock( + [this, log_store, &trunc_upto, &get_trunc_upto, &mtx, &cv] { + // In case we run truncation in parallel to read, it is possible + // the truncated_upto accordingly. + trunc_upto = log_store->truncated_upto(); + std::unique_lock lock(mtx); + get_trunc_upto = true; + cv.notify_one(); + return true; + }); + std::unique_lock lock(mtx); + cv.wait(lock, [&get_trunc_upto] { return get_trunc_upto == true; }); + if (lsn <= trunc_upto) { + lsn = trunc_upto; + continue; + } + LOGFATAL("Failed to read at upto {} lsn {} trunc_upto {}", upto, lsn, trunc_upto); + } + } + } + + void rollback_validate(std::shared_ptr< HomeLogStore > log_store, logstore_seq_num_t& cur_lsn, + uint32_t num_lsns_to_rollback) { + std::mutex mtx; + std::condition_variable cv; + bool rollback_done = false; + cur_lsn -= num_lsns_to_rollback; + auto const upto_lsn = cur_lsn - 1; + log_store->rollback_async(upto_lsn, [&](logstore_seq_num_t) { + ASSERT_EQ(log_store->get_contiguous_completed_seq_num(-1), upto_lsn) + << "Last completed seq num is not reset after rollback"; + ASSERT_EQ(log_store->get_contiguous_issued_seq_num(-1), upto_lsn) + << "Last issued seq num is not reset after rollback"; + read_all_verify(log_store); + { + std::unique_lock lock(mtx); + rollback_done = true; + } + cv.notify_one(); + }); + + // We wait till async rollback is finished as we do validation. + std::unique_lock lock(mtx); + cv.wait(lock, [&rollback_done] { return rollback_done == true; }); + } + + void truncate_validate(std::shared_ptr< HomeLogStore > log_store) { + auto upto = log_store->get_contiguous_completed_seq_num(-1); + LOGINFO("truncate_validate upto {}", upto); + log_store->truncate(upto); + read_all_verify(log_store); + logstore_service().device_truncate(nullptr /* cb */, true /* wait_till_done */); + } + + void rollback_records_validate(std::shared_ptr< HomeLogStore > log_store, uint32_t expected_count) { + auto actual_count = log_store->get_logdev()->log_dev_meta().num_rollback_records(log_store->get_store_id()); + ASSERT_EQ(actual_count, expected_count); + } +}; + +TEST_F(LogDevTest, WriteSyncThenRead) { + const auto iterations = SISL_OPTIONS["iterations"].as< uint32_t >(); + + for (uint32_t iteration{0}; iteration < iterations; ++iteration) { + LOGINFO("Iteration {}", iteration); + auto logdev_id = logstore_service().create_new_logdev(); + s_max_flush_multiple = logstore_service().get_logdev(logdev_id)->get_flush_size_multiple(); + auto log_store = logstore_service().create_new_log_store(logdev_id, false); + const auto store_id = log_store->get_store_id(); + LOGINFO("Created new log store -> id {}", store_id); + const unsigned count{10}; + for (unsigned i{0}; i < count; ++i) { + // Insert new entry. + insert_sync(log_store, i); + // Verify the entry. + read_verify(log_store, i); } - ++i; + + logstore_service().remove_log_store(logdev_id, store_id); + LOGINFO("Remove logstore -> i {}", store_id); } +} + +TEST_F(LogDevTest, Rollback) { + LOGINFO("Step 1: Create a single logstore to start rollback test"); + auto logdev_id = logstore_service().create_new_logdev(); + s_max_flush_multiple = logstore_service().get_logdev(logdev_id)->get_flush_size_multiple(); + auto log_store = logstore_service().create_new_log_store(logdev_id, false); + auto store_id = log_store->get_store_id(); + + auto restart = [&]() { + std::promise< bool > p; + auto starting_cb = [&]() { + HS_SETTINGS_FACTORY().modifiable_settings([](auto& s) { + // Disable flush timer in UT. + s.logstore.flush_timer_frequency_us = 0; + }); + HS_SETTINGS_FACTORY().save(); + logstore_service().open_logdev(logdev_id); + logstore_service().open_log_store(logdev_id, store_id, false /* append_mode */).thenValue([&](auto store) { + log_store = store; + p.set_value(true); + }); + }; + start_homestore(true /* restart */, starting_cb); + p.get_future().get(); + }; + + LOGINFO("Step 2: Issue sequential inserts with q depth of 10"); + logstore_seq_num_t cur_lsn = 0; + kickstart_inserts(log_store, cur_lsn, 500); + + LOGINFO("Step 3: Rollback last 50 entries and validate if pre-rollback entries are intact"); + rollback_validate(log_store, cur_lsn, 50); // Last entry = 450 + + LOGINFO("Step 4: Append 25 entries after rollback is completed"); + kickstart_inserts(log_store, cur_lsn, 25); // Last entry = 475 + + LOGINFO("Step 5: Rollback again for 75 entries even before previous rollback entry"); + rollback_validate(log_store, cur_lsn, 75); // Last entry = 400 + + LOGINFO("Step 6: Append 25 entries after second rollback is completed"); + kickstart_inserts(log_store, cur_lsn, 25); // Last entry = 425 + + LOGINFO("Step 7: Restart homestore and ensure all rollbacks are effectively validated"); + restart(); + + LOGINFO("Step 8: Post recovery, append another 25 entries"); + kickstart_inserts(log_store, cur_lsn, 25); // Last entry = 450 + + LOGINFO("Step 9: Rollback again for 75 entries even before previous rollback entry"); + rollback_validate(log_store, cur_lsn, 75); // Last entry = 375 + + LOGINFO("Step 10: After 3rd rollback, append another 25 entries"); + kickstart_inserts(log_store, cur_lsn, 25); // Last entry = 400 + + LOGINFO("Step 11: Truncate all entries"); + truncate_validate(log_store); + + LOGINFO("Step 12: Restart homestore and ensure all truncations after rollbacks are effectively validated"); + restart(); + + LOGINFO("Step 13: Append 25 entries after truncation is completed"); + kickstart_inserts(log_store, cur_lsn, 25); // Last entry = 425 + + LOGINFO("Step 14: Do another truncation to effectively truncate previous records"); + truncate_validate(log_store); + + LOGINFO("Step 15: Validate if there are no rollback records"); + rollback_records_validate(log_store, 0 /* expected_count */); +} + +SISL_OPTION_GROUP(test_log_dev, + (num_logdevs, "", "num_logdevs", "number of log devs", + ::cxxopts::value< uint32_t >()->default_value("4"), "number"), + (num_logstores, "", "num_logstores", "number of log stores", + ::cxxopts::value< uint32_t >()->default_value("16"), "number"), + (num_records, "", "num_records", "number of record to test", + ::cxxopts::value< uint32_t >()->default_value("10000"), "number"), + (iterations, "", "iterations", "Iterations", ::cxxopts::value< uint32_t >()->default_value("1"), + "the number of iterations to run each test")); + +int main(int argc, char* argv[]) { + int parsed_argc = argc; + ::testing::InitGoogleTest(&parsed_argc, argv); + SISL_OPTIONS_LOAD(parsed_argc, argv, logging, test_log_dev, iomgr, test_common_setup); + sisl::logging::SetLogger("test_log_dev"); + spdlog::set_pattern("[%D %T%z] [%^%l%$] [%t] %v"); - ld->load(first_offset); + const int ret = RUN_ALL_TESTS(); + return ret; } diff --git a/src/tests/test_log_store.cpp b/src/tests/test_log_store.cpp index 262270c88..586268eaf 100644 --- a/src/tests/test_log_store.cpp +++ b/src/tests/test_log_store.cpp @@ -164,6 +164,9 @@ class SampleLogStoreClient { }); } } + + // Because of restart in tests, we have torce the flush of log entries. + m_log_store->get_logdev()->flush_if_needed(1); } void iterate_validate(const bool expect_all_completed = false) { @@ -254,22 +257,43 @@ class SampleLogStoreClient { validate_data(tl, i); } } catch (const std::exception& e) { + logstore_seq_num_t trunc_upto = get_truncated_upto(); if (!expect_all_completed) { - // In case we run truncation in parallel to read, it is possible truncate moved, so adjust - // the truncated_upto accordingly. - const auto trunc_upto = m_log_store->truncated_upto(); + LOGINFO("got store {} trunc_upto {} {} {}", m_log_store->get_store_id(), trunc_upto, i, + m_log_store->log_records().get_status(2).dump(' ', 2)); if (i <= trunc_upto) { i = trunc_upto; continue; } } - LOGFATAL("Unexpected out_of_range exception for lsn={}:{} upto {}", m_log_store->get_store_id(), i, - upto); + LOGINFO("Unexpected out_of_range exception for lsn={}:{} upto {} trunc_upto {}", + m_log_store->get_store_id(), i, upto, trunc_upto); + LOGFATAL(""); } } } } + logstore_seq_num_t get_truncated_upto() { + std::mutex mtx; + std::condition_variable cv; + bool get_trunc_upto = false; + logstore_seq_num_t trunc_upto = 0; + m_log_store->get_logdev()->run_under_flush_lock([this, &trunc_upto, &get_trunc_upto, &mtx, &cv]() { + // In case we run truncation in parallel to read, it is possible truncate moved, so adjust + // the truncated_upto accordingly. + trunc_upto = m_log_store->truncated_upto(); + std::unique_lock lock(mtx); + get_trunc_upto = true; + cv.notify_one(); + return true; + }); + + std::unique_lock lock(mtx); + cv.wait(lock, [&get_trunc_upto] { return get_trunc_upto == true; }); + return trunc_upto; + } + void fill_hole_and_validate() { const auto start = m_log_store->truncated_upto(); m_hole_lsns.withWLock([&](auto& holes_list) { @@ -295,6 +319,7 @@ class SampleLogStoreClient { } void recovery_validate() { + LOGINFO("Truncated upto {}", get_truncated_upto()); LOGINFO("Totally recovered {} non-truncated lsns and {} truncated lsns for store {}", m_n_recovered_lsns, m_n_recovered_truncated_lsns, m_log_store->get_store_id()); if (m_n_recovered_lsns != (m_cur_lsn.load() - m_truncated_upto_lsn.load() - 1)) { @@ -438,17 +463,31 @@ class SampleDB { } void start_homestore(bool restart = false) { + auto n_log_devs = SISL_OPTIONS["num_logdevs"].as< uint32_t >(); auto n_log_stores = SISL_OPTIONS["num_logstores"].as< uint32_t >(); + if (n_log_stores < 4u) { LOGINFO("Log store test needs minimum 4 log stores for testing, setting them to 4"); n_log_stores = 4u; } + if (restart) { + for (auto& lsc : m_log_store_clients) { + lsc->flush(); + } + } + test_common::HSTestHelper::start_homestore( "test_log_store", {{HS_SERVICE::META, {.size_pct = 5.0}}, - {HS_SERVICE::LOG, {.size_pct = 84.0, .chunk_size = 32 * 1024 * 1024}}}, + {HS_SERVICE::LOG, {.size_pct = 84.0, .chunk_size = 8 * 1024 * 1024, .min_chunk_size = 8 * 1024 * 1024}}}, [this, restart, n_log_stores]() { + HS_SETTINGS_FACTORY().modifiable_settings([](auto& s) { + // Disable flush timer in UT. + s.logstore.flush_timer_frequency_us = 0; + }); + HS_SETTINGS_FACTORY().save(); + if (restart) { for (uint32_t i{0}; i < n_log_stores; ++i) { SampleLogStoreClient* client = m_log_store_clients[i].get(); @@ -462,19 +501,27 @@ class SampleDB { restart); if (!restart) { - auto logdev_id = logstore_service().create_new_logdev(); + std::vector< logdev_id_t > logdev_id_vec; + for (uint32_t i{0}; i < n_log_devs; ++i) { + logdev_id_vec.push_back(logstore_service().create_new_logdev()); + } + for (uint32_t i{0}; i < n_log_stores; ++i) { + auto logdev_id = logdev_id_vec[rand() % logdev_id_vec.size()]; m_log_store_clients.push_back(std::make_unique< SampleLogStoreClient >( logdev_id, bind_this(SampleDB::on_log_insert_completion, 3))); } SampleLogStoreClient::s_max_flush_multiple = - logstore_service().get_logdev(logdev_id)->get_flush_size_multiple(); + logstore_service().get_logdev(logdev_id_vec[0])->get_flush_size_multiple(); } } void shutdown(bool cleanup = true) { test_common::HSTestHelper::shutdown_homestore(cleanup); - if (cleanup) { m_log_store_clients.clear(); } + if (cleanup) { + m_log_store_clients.clear(); + m_highest_log_idx.clear(); + } } void on_log_insert_completion(logdev_id_t fid, logstore_seq_num_t lsn, logdev_key ld_key) { @@ -609,7 +656,7 @@ class LogStoreTest : public ::testing::Test { nlohmann::json json_dump = logdev->dump_log_store(dump_req); dump_sz += json_dump.size(); - LOGINFO("Printing json dump of all logstores in logdev {}. \n {}", logdev->get_id(), json_dump.dump()); + LOGDEBUG("Printing json dump of all logstores in logdev {}. \n {}", logdev->get_id(), json_dump.dump()); for (const auto& logdump : json_dump) { const auto itr = logdump.find("log_records"); if (itr != std::end(logdump)) { rec_count += static_cast< int64_t >(logdump["log_records"].size()); } @@ -619,14 +666,13 @@ class LogStoreTest : public ::testing::Test { EXPECT_EQ(expected_num_records, rec_count); } - void dump_validate_filter(logstore_id_t id, logstore_seq_num_t start_seq, logstore_seq_num_t end_seq, - bool print_content = false) { + void dump_validate_filter(bool print_content = false) { for (const auto& lsc : SampleDB::instance().m_log_store_clients) { - if (lsc->m_log_store->get_store_id() != id) { continue; } - - log_dump_req dump_req; + logstore_id_t id = lsc->m_log_store->get_store_id(); const auto fid = lsc->m_logdev_id; - + logstore_seq_num_t start_seq = lsc->m_log_store->truncated_upto() + 1; + logstore_seq_num_t end_seq = lsc->m_log_store->get_contiguous_completed_seq_num(-1); + log_dump_req dump_req; if (print_content) dump_req.verbosity_level = log_dump_verbosity::CONTENT; dump_req.log_store = lsc->m_log_store; dump_req.start_seq_num = start_seq; @@ -635,8 +681,8 @@ class LogStoreTest : public ::testing::Test { // must use operator= construction as copy construction results in error auto logdev = lsc->m_log_store->get_logdev(); nlohmann::json json_dump = logdev->dump_log_store(dump_req); - LOGINFO("Printing json dump of logdev={} logstore id {}, start_seq {}, end_seq {}, \n\n {}", fid, id, - start_seq, end_seq, json_dump.dump()); + LOGDEBUG("Printing json dump of log_dev={} logstore id {}, start_seq {}, end_seq {}, \n\n {}", fid, id, + start_seq, end_seq, json_dump.dump()); const auto itr_id = json_dump.find(std::to_string(id)); if (itr_id != std::end(json_dump)) { const auto itr_records = itr_id->find("log_records"); @@ -648,9 +694,8 @@ class LogStoreTest : public ::testing::Test { } else { EXPECT_FALSE(true); } - - return; } + return; } int find_garbage_upto(logdev_id_t logdev_id, logid_t idx) { @@ -791,7 +836,8 @@ class LogStoreTest : public ::testing::Test { } } ASSERT_EQ(actual_valid_ids, SampleDB::instance().m_log_store_clients.size()); - ASSERT_EQ(actual_garbage_ids, exp_garbage_store_count); + // Becasue we randomly assign logstore to logdev, some logdev will be empty. + // ASSERT_EQ(actual_garbage_ids, exp_garbage_store_count); } void delete_validate(uint32_t idx) { @@ -890,12 +936,26 @@ TEST_F(LogStoreTest, BurstRandInsertThenTruncate) { this->dump_validate(num_records); LOGINFO("Step 4.2: Read some specific interval/filter of seq number in one logstore and dump it into json"); - this->dump_validate_filter(0, 10, 100, true); + this->dump_validate_filter(true); } LOGINFO("Step 5: Truncate all of the inserts one log store at a time and validate log dev truncation is marked " "correctly and also validate if all data prior to truncation return exception"); this->truncate_validate(); + + LOGINFO("Step 6: Restart homestore"); + SampleDB::instance().start_homestore(true /* restart */); + this->recovery_validate(); + this->init(num_records); + + LOGINFO("Step 7: Issue more sequential inserts after restarts with q depth of 15"); + this->kickstart_inserts(1, 15); + + LOGINFO("Step 8: Wait for the previous Inserts to complete"); + this->wait_for_inserts(); + + LOGINFO("Step 9: Read all the inserts one by one for each log store to validate if what is written is valid"); + this->read_validate(true); } } @@ -1057,6 +1117,9 @@ TEST_F(LogStoreTest, ThrottleSeqInsertThenRecover) { this->recovery_validate(); this->init(num_records); + LOGINFO("Step 5a: Read all the inserts one by one for each log store to validate if what is written is valid"); + this->read_validate(true); + LOGINFO("Step 6: Restart homestore again to validate recovery on consecutive restarts"); SampleDB::instance().start_homestore(true /* restart */); this->recovery_validate(); @@ -1083,15 +1146,15 @@ TEST_F(LogStoreTest, ThrottleSeqInsertThenRecover) { this->truncate_validate(); } } + TEST_F(LogStoreTest, FlushSync) { -#ifdef _PRERELEASE LOGINFO("Step 1: Delay the flush threshold and flush timer to very high value to ensure flush works fine") HS_SETTINGS_FACTORY().modifiable_settings([](auto& s) { s.logstore.max_time_between_flush_us = 5000000ul; // 5 seconds s.logstore.flush_threshold_size = 1048576ul; // 1MB }); HS_SETTINGS_FACTORY().save(); -#endif + LOGINFO("Step 2: Reinit the 10 records to start sequential write test"); this->init(1000); @@ -1101,14 +1164,12 @@ TEST_F(LogStoreTest, FlushSync) { LOGINFO("Step 4: Do a sync flush"); this->flush(); -#ifdef _PRERELEASE LOGINFO("Step 5: Reset the settings back") HS_SETTINGS_FACTORY().modifiable_settings([](auto& s) { s.logstore.max_time_between_flush_us = 300ul; s.logstore.flush_threshold_size = 64ul; }); HS_SETTINGS_FACTORY().save(); -#endif LOGINFO("Step 6: Wait for the Inserts to complete"); this->wait_for_inserts(); @@ -1142,68 +1203,6 @@ TEST_F(LogStoreTest, FlushSync) { #endif } -TEST_F(LogStoreTest, Rollback) { - LOGINFO("Step 1: Reinit the 500 records on a single logstore to start rollback test"); - this->init(500, {std::make_pair(1ull, 100)}); // Last entry = 500 - - LOGINFO("Step 2: Issue sequential inserts with q depth of 10"); - this->kickstart_inserts(1, 10); - - LOGINFO("Step 3: Wait for the Inserts to complete"); - this->wait_for_inserts(); - - LOGINFO("Step 4: Rollback last 50 entries and validate if pre-rollback entries are intact"); - this->rollback_validate(50); // Last entry = 450 - - LOGINFO("Step 5: Append 25 entries after rollback is completed"); - this->init(25, {std::make_pair(1ull, 100)}); - this->kickstart_inserts(1, 10); - this->wait_for_inserts(); // Last entry = 475 - - LOGINFO("Step 7: Rollback again for 75 entries even before previous rollback entry"); - this->rollback_validate(75); // Last entry = 400 - - LOGINFO("Step 8: Append 25 entries after second rollback is completed"); - this->init(25, {std::make_pair(1ull, 100)}); - this->kickstart_inserts(1, 10); - this->wait_for_inserts(); // Last entry = 425 - - LOGINFO("Step 9: Restart homestore and ensure all rollbacks are effectively validated"); - SampleDB::instance().start_homestore(true /* restart */); - this->recovery_validate(); - - LOGINFO("Step 10: Post recovery, append another 25 entries"); - this->init(25, {std::make_pair(1ull, 100)}); - this->kickstart_inserts(1, 5); - this->wait_for_inserts(); // Last entry = 450 - - LOGINFO("Step 11: Rollback again for 75 entries even before previous rollback entry"); - this->rollback_validate(75); // Last entry = 375 - - LOGINFO("Step 12: After 3rd rollback, append another 25 entries"); - this->init(25, {std::make_pair(1ull, 100)}); - this->kickstart_inserts(1, 5); - this->wait_for_inserts(); // Last entry = 400 - - LOGINFO("Step 13: Truncate all entries"); - this->truncate_validate(); - - LOGINFO("Step 14: Restart homestore and ensure all truncations after rollbacks are effectively validated"); - SampleDB::instance().start_homestore(true /* restart */); - this->recovery_validate(); - - LOGINFO("Step 15: Append 25 entries after truncation is completed"); - this->init(25, {std::make_pair(1ull, 100)}); - this->kickstart_inserts(1, 10); - this->wait_for_inserts(); // Last entry = 425 - - LOGINFO("Step 16: Do another truncation to effectively truncate previous records"); - this->truncate_validate(); - - LOGINFO("Step 17: Validate if there are no rollback records"); - this->post_truncate_rollback_validate(); -} - TEST_F(LogStoreTest, DeleteMultipleLogStores) { const auto nrecords = (SISL_OPTIONS["num_records"].as< uint32_t >() * 5) / 100; @@ -1282,8 +1281,10 @@ TEST_F(LogStoreTest, WriteSyncThenRead) { SISL_OPTIONS_ENABLE(logging, test_log_store, iomgr, test_common_setup) SISL_OPTION_GROUP(test_log_store, - (num_logstores, "", "num_logstores", "number of log stores", + (num_logdevs, "", "num_logdevs", "number of log devs", ::cxxopts::value< uint32_t >()->default_value("4"), "number"), + (num_logstores, "", "num_logstores", "number of log stores", + ::cxxopts::value< uint32_t >()->default_value("16"), "number"), (num_records, "", "num_records", "number of record to test", ::cxxopts::value< uint32_t >()->default_value("10000"), "number"), (iterations, "", "iterations", "Iterations", ::cxxopts::value< uint32_t >()->default_value("1"),