diff --git a/conanfile.py b/conanfile.py index ab2de30ce..efe16b56a 100644 --- a/conanfile.py +++ b/conanfile.py @@ -5,7 +5,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "5.1.12" + version = "5.1.14" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" diff --git a/src/include/homestore/logstore/log_store.hpp b/src/include/homestore/logstore/log_store.hpp index dfb725bb5..71a1cdcda 100644 --- a/src/include/homestore/logstore/log_store.hpp +++ b/src/include/homestore/logstore/log_store.hpp @@ -110,7 +110,8 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > { * @param cookie : Any cookie or context which will passed back in the callback * @param cb Callback upon completion which is called with the status, seq_num and cookie that was passed. */ - void write_async(logstore_seq_num_t seq_num, const sisl::io_blob& b, void* cookie, const log_write_comp_cb_t& cb); + void write_async(logstore_seq_num_t seq_num, const sisl::io_blob& b, void* cookie, const log_write_comp_cb_t& cb, + bool flush_wait = false); /** * @brief This method appends the blob into the log and it returns the generated seq number @@ -210,6 +211,8 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > { return (ts == std::numeric_limits< logstore_seq_num_t >::max()) ? -1 : ts; } + sisl::StreamTracker< logstore_record >& log_records() { return m_records; } + /** * @brief iterator to get all the log buffers; * diff --git a/src/include/homestore/logstore/log_store_internal.hpp b/src/include/homestore/logstore/log_store_internal.hpp index 6a2103d01..125e35daa 100644 --- a/src/include/homestore/logstore/log_store_internal.hpp +++ b/src/include/homestore/logstore/log_store_internal.hpp @@ -125,6 +125,7 @@ struct logstore_req { bool is_internal_req; // If the req is created internally by HomeLogStore itself log_req_comp_cb_t cb; // Callback upon completion of write (overridden than default) Clock::time_point start_time; + bool flush_wait{false}; // Wait for the flush to happen logstore_req(const logstore_req&) = delete; logstore_req& operator=(const logstore_req&) = delete; diff --git a/src/lib/device/device.h b/src/lib/device/device.h index cbcdde8ea..a69e740dd 100644 --- a/src/lib/device/device.h +++ b/src/lib/device/device.h @@ -173,6 +173,7 @@ class DeviceManager { std::vector< PhysicalDev* > get_pdevs_by_dev_type(HSDevType dtype) const; std::vector< shared< VirtualDev > > get_vdevs() const; + std::vector< shared< Chunk > > get_chunks() const; uint64_t total_capacity() const; uint64_t total_capacity(HSDevType dtype) const; diff --git a/src/lib/device/device_manager.cpp b/src/lib/device/device_manager.cpp index ad450910e..6c7c59ef1 100644 --- a/src/lib/device/device_manager.cpp +++ b/src/lib/device/device_manager.cpp @@ -543,6 +543,16 @@ std::vector< shared< VirtualDev > > DeviceManager::get_vdevs() const { return ret_v; } +std::vector< shared< Chunk > > DeviceManager::get_chunks() const { + std::unique_lock lg{m_vdev_mutex}; + std::vector< shared< Chunk > > res; + res.reserve(m_chunks.size()); + for (auto& chunk : m_chunks) { + if (chunk) res.push_back(chunk); + } + return res; +} + // Some of the hs_super_blk details uint64_t hs_super_blk::vdev_super_block_size() { return (hs_super_blk::MAX_VDEVS_IN_SYSTEM * vdev_info::size); } diff --git a/src/lib/device/hs_super_blk.h b/src/lib/device/hs_super_blk.h index e156ba94a..a2df05f3d 100644 --- a/src/lib/device/hs_super_blk.h +++ b/src/lib/device/hs_super_blk.h @@ -17,6 +17,7 @@ #include #include +#include #include #include #include @@ -193,9 +194,13 @@ class hs_super_blk { return (dinfo.dev_type == HSDevType::Fast) ? EXTRA_SB_SIZE_FOR_FAST_DEVICE : EXTRA_SB_SIZE_FOR_DATA_DEVICE; } static uint32_t max_chunks_in_pdev(const dev_info& dinfo) { - return (dinfo.dev_size - 1) / - ((dinfo.dev_type == HSDevType::Fast) ? MIN_CHUNK_SIZE_FAST_DEVICE : MIN_CHUNK_SIZE_DATA_DEVICE) + - 1; + uint64_t min_chunk_size = + (dinfo.dev_type == HSDevType::Fast) ? MIN_CHUNK_SIZE_FAST_DEVICE : MIN_CHUNK_SIZE_DATA_DEVICE; +#ifdef _PRERELEASE + auto chunk_size = iomgr_flip::instance()->get_test_flip< long >("set_minimum_chunk_size"); + if (chunk_size) { min_chunk_size = chunk_size.get(); } +#endif + return (dinfo.dev_size - 1) / min_chunk_size + 1; } }; diff --git a/src/lib/device/journal_vdev.cpp b/src/lib/device/journal_vdev.cpp index 87eab5f72..d6063ae54 100644 --- a/src/lib/device/journal_vdev.cpp +++ b/src/lib/device/journal_vdev.cpp @@ -39,14 +39,15 @@ JournalVirtualDev::JournalVirtualDev(DeviceManager& dmgr, const vdev_info& vinfo VirtualDev{dmgr, vinfo, std::move(event_cb), false /* is_auto_recovery */} { // Private data stored when chunks are created. - init_private_data = std::make_shared< JournalChunkPrivate >(); + m_init_private_data = std::make_shared< JournalChunkPrivate >(); m_chunk_pool = std::make_unique< ChunkPool >( dmgr, ChunkPool::Params{ HS_DYNAMIC_CONFIG(generic.journal_chunk_pool_capacity), [this]() { - init_private_data->created_at = get_time_since_epoch_ms(); - sisl::blob private_blob{r_cast< uint8_t* >(init_private_data.get()), sizeof(JournalChunkPrivate)}; + m_init_private_data->created_at = get_time_since_epoch_ms(); + m_init_private_data->end_of_chunk = m_vdev_info.chunk_size; + sisl::blob private_blob{r_cast< uint8_t* >(m_init_private_data.get()), sizeof(JournalChunkPrivate)}; return private_blob; }, m_vdev_info.hs_dev_type, m_vdev_info.vdev_id, m_vdev_info.chunk_size}); @@ -84,17 +85,17 @@ void JournalVirtualDev::init() { // Create descriptor for each logdev_id auto journal_desc = std::make_shared< JournalVirtualDev::Descriptor >(*this, logdev_id); m_journal_descriptors.emplace(logdev_id, journal_desc); - LOGDEBUGMOD(journalvdev, "Loading descriptor {}", logdev_id); + LOGINFOMOD(journalvdev, "Loading descriptor log_dev={}", logdev_id); // Traverse the list starting from the head and add those chunks // in order to the journal descriptor. next_chunk is stored in private_data. // Last chunk will have next_chunk as 0. auto chunk_num = head.chunk_num; while (chunk_num != 0) { auto& c = chunk_map[chunk_num]; - RELEASE_ASSERT(c, "Invalid chunk found logdev {} chunk {}", logdev_id, chunk_num); + RELEASE_ASSERT(c, "Invalid chunk found log_dev={} chunk={}", logdev_id, c->to_string()); journal_desc->m_journal_chunks.push_back(c); visited_chunks.insert(chunk_num); - LOGDEBUGMOD(journalvdev, "Loading chunk {} descriptor {}", chunk_num, logdev_id); + LOGINFOMOD(journalvdev, "Loading log_dev={} chunk={}", logdev_id, c->to_string()); // Increase the the total size. journal_desc->m_total_size += c->size(); @@ -121,8 +122,8 @@ void JournalVirtualDev::init() { *data = JournalChunkPrivate{}; update_chunk_private(chunk, data); - LOGDEBUGMOD(journalvdev, "Removing orphan chunk {} found for logdev {} next {}.", chunk_id, logdev_id, - next_chunk); + LOGINFOMOD(journalvdev, "Removing orphan chunk {} found for logdev {} next {}.", chunk_id, logdev_id, + next_chunk); m_dmgr.remove_chunk_locked(chunk); } @@ -149,7 +150,11 @@ shared< JournalVirtualDev::Descriptor > JournalVirtualDev::open(logdev_id_t logd return journal_desc; } - LOGDEBUGMOD(journalvdev, "Opened log device descriptor {}", logdev_id); + LOGINFOMOD(journalvdev, "Opened journal vdev descriptor log_dev={}", logdev_id); + for (auto& chunk : it->second->m_journal_chunks) { + LOGINFOMOD(journalvdev, " log_dev={} end_of_chunk={} chunk={}", logdev_id, get_end_of_chunk(chunk), + chunk->to_string()); + } return it->second; } @@ -181,18 +186,19 @@ void JournalVirtualDev::Descriptor::append_chunk() { last_chunk_private->end_of_chunk = offset_in_chunk; } m_vdev.update_chunk_private(last_chunk, last_chunk_private); - LOGDEBUGMOD(journalvdev, "Added chunk new {} last {} desc {}", new_chunk->chunk_id(), last_chunk->chunk_id(), - to_string()); + LOGINFOMOD(journalvdev, "Added chunk new {} last {} desc {}", new_chunk->to_string(), last_chunk->chunk_id(), + to_string()); } else { // If the list is empty, update the new chunk as the head. Only head chunk contains the logdev_id. auto* new_chunk_private = r_cast< JournalChunkPrivate* >(const_cast< uint8_t* >(new_chunk->user_private())); new_chunk_private->is_head = true; new_chunk_private->logdev_id = m_logdev_id; + new_chunk_private->end_of_chunk = m_vdev.info().chunk_size; // Append the new chunk m_journal_chunks.push_back(new_chunk); m_vdev.update_chunk_private(new_chunk, new_chunk_private); - LOGDEBUGMOD(journalvdev, "Added head chunk {} desc {}", new_chunk->chunk_id(), to_string()); + LOGINFOMOD(journalvdev, "Added head chunk={} desc {}", new_chunk->to_string(), to_string()); } } @@ -231,7 +237,8 @@ off_t JournalVirtualDev::Descriptor::alloc_next_append_blk(size_t sz) { // assert that returnning logical offset is in good range HS_DBG_ASSERT_LE(tail_off, m_end_offset); - LOGDEBUGMOD(journalvdev, "returned tail_off 0x{} desc {}", to_hex(tail_off), to_string()); + LOGDEBUGMOD(journalvdev, "returned tail_off 0x{} tail_off {} size {} desc {}", to_hex(tail_off), tail_off, sz, + to_string()); return tail_off; } @@ -336,30 +343,45 @@ void JournalVirtualDev::Descriptor::sync_pwritev(const iovec* iov, int iovcnt, o size_t JournalVirtualDev::Descriptor::sync_next_read(uint8_t* buf, size_t size_rd) { if (m_journal_chunks.empty()) { return 0; } + HS_REL_ASSERT_LE(m_seek_cursor, m_end_offset, "seek_cursor {} exceeded end_offset {}", m_seek_cursor, m_end_offset); + if (m_seek_cursor >= m_end_offset) { + LOGTRACEMOD(journalvdev, "sync_next_read reached end of chunks"); + return 0; + } + auto [chunk, _, offset_in_chunk] = offset_to_chunk(m_seek_cursor); auto const end_of_chunk = m_vdev.get_end_of_chunk(chunk); auto const chunk_size = std::min< uint64_t >(end_of_chunk, chunk->size()); bool across_chunk{false}; + // LOGINFO("sync_next_read size_rd {} chunk {} seek_cursor {} end_of_chunk {} {}", size_rd, chunk->to_string(), + // m_seek_cursor, end_of_chunk, chunk_size); + HS_REL_ASSERT_LE((uint64_t)end_of_chunk, chunk->size(), "Invalid end of chunk: {} detected on chunk num: {}", end_of_chunk, chunk->chunk_id()); HS_REL_ASSERT_LE((uint64_t)offset_in_chunk, chunk->size(), "Invalid m_seek_cursor: {} which falls in beyond end of chunk: {}!", m_seek_cursor, end_of_chunk); // if read size is larger then what's left in this chunk - if (size_rd >= (chunk->size() - offset_in_chunk)) { + if (size_rd >= (end_of_chunk - offset_in_chunk)) { // truncate size to what is left; - size_rd = chunk->size() - offset_in_chunk; + size_rd = end_of_chunk - offset_in_chunk; across_chunk = true; } + if (buf == nullptr) { return size_rd; } + auto ec = sync_pread(buf, size_rd, m_seek_cursor); // TODO: Check if we can have tolerate this error and somehow start homestore without replaying or in degraded mode? HS_REL_ASSERT(!ec, "Error in reading next stream of bytes, proceeding could cause some inconsistency, exiting"); // Update seek cursor after read; m_seek_cursor += size_rd; - if (across_chunk) { m_seek_cursor += (chunk->size() - end_of_chunk); } + if (across_chunk) { + m_seek_cursor += (chunk->size() - end_of_chunk); + LOGTRACEMOD(journalvdev, "Across size_rd {} chunk {} seek_cursor {} end_of_chunk {}", size_rd, + chunk->to_string(), m_seek_cursor, end_of_chunk); + } return size_rd; } @@ -412,6 +434,7 @@ off_t JournalVirtualDev::Descriptor::lseek(off_t offset, int whence) { break; } + LOGINFOMOD(journalvdev, "lseek desc {} offset 0x{} whence {} ", to_string(), to_hex(offset), whence); return m_seek_cursor; } @@ -419,30 +442,43 @@ off_t JournalVirtualDev::Descriptor::lseek(off_t offset, int whence) { * @brief :- it returns the vdev offset after nbytes from start offset */ off_t JournalVirtualDev::Descriptor::dev_offset(off_t nbytes) const { - if (m_journal_chunks.empty()) { return data_start_offset(); } + if (nbytes == 0 || m_journal_chunks.empty()) { + // If no chunks return start offset. + return data_start_offset(); + } off_t vdev_offset = data_start_offset(); - uint32_t dev_id{0}, chunk_id{0}; - off_t offset_in_chunk{0}; - off_t cur_read_cur{0}; - - while (cur_read_cur != nbytes) { - auto [chunk, _, offset_in_chunk] = offset_to_chunk(vdev_offset); - - auto const end_of_chunk = m_vdev.get_end_of_chunk(chunk); - auto const chunk_size = std::min< uint64_t >(end_of_chunk, chunk->size()); - auto const remaining = nbytes - cur_read_cur; - if (remaining >= (static_cast< off_t >(chunk->size()) - offset_in_chunk)) { - cur_read_cur += (chunk->size() - offset_in_chunk); - vdev_offset += (chunk->size() - offset_in_chunk); - } else { + auto chunk_size = m_vdev.info().chunk_size; + uint64_t remaining = nbytes; + auto start_offset = data_start_offset() % chunk_size; + + // data_start_offset coulde be anywhere in the first chunk. + // because when we truncate and data_start_offset lies in first chunk + // we dont delete that first chunk. other chunks will have start_offset as 0. + for (auto chunk : m_journal_chunks) { + uint64_t end_of_chunk = std::min< uint64_t >(m_vdev.get_end_of_chunk(chunk), chunk_size); + + auto num_data_bytes = end_of_chunk - start_offset; + if (remaining < num_data_bytes) { vdev_offset += remaining; - cur_read_cur = nbytes; + break; } + + remaining -= num_data_bytes; + vdev_offset += (chunk_size - start_offset); + start_offset = 0; } return vdev_offset; } +void JournalVirtualDev::Descriptor::update_data_start_offset(off_t offset) { + m_data_start_offset = offset; + auto data_start_offset_aligned = sisl::round_down(m_data_start_offset, m_vdev.info().chunk_size); + m_end_offset = data_start_offset_aligned + m_journal_chunks.size() * m_vdev.info().chunk_size; + LOGINFOMOD(journalvdev, "Updated data start offset off 0x{} {}", to_hex(offset), to_string()); + RELEASE_ASSERT_EQ(m_end_offset - data_start_offset_aligned, m_total_size, "offset size mismatch {}", to_string()); +} + off_t JournalVirtualDev::Descriptor::tail_offset(bool reserve_space_include) const { off_t tail = static_cast< off_t >(data_start_offset() + m_write_sz_in_total.load(std::memory_order_relaxed)); if (reserve_space_include) { tail += m_reserved_sz; } @@ -456,13 +492,13 @@ void JournalVirtualDev::Descriptor::update_tail_offset(off_t tail) { if (tail >= start) { m_write_sz_in_total.store(tail - start, std::memory_order_relaxed); - } else { - RELEASE_ASSERT(false, "tail {} less than start offset {}", tail, start); + } else if (tail != 0) { + LOGERROR("tail {} less than start offset {} desc {}", tail, start, to_string()); + RELEASE_ASSERT(false, "Invalid tail offset"); } lseek(tail); - LOGDEBUGMOD(journalvdev, "tail arg 0x{} desc {} ", to_hex(tail), to_string()); - HS_REL_ASSERT(tail_offset() == tail, "tail offset mismatch after calculation 0x{} : {}", tail_offset(), tail); + LOGINFOMOD(journalvdev, "Updated tail offset arg 0x{} desc {} ", to_hex(tail), to_string()); } void JournalVirtualDev::Descriptor::truncate(off_t truncate_offset) { @@ -510,8 +546,11 @@ void JournalVirtualDev::Descriptor::truncate(off_t truncate_offset) { *data = JournalChunkPrivate{}; m_vdev.update_chunk_private(chunk, data); + // We ideally want to zero out chunks as chunks are reused after free across + // logdev's. But zero out chunk is very expensive, We look at crc mismatches + // to know the end offset of the log dev during recovery. + // Format and add back to pool. m_vdev.m_chunk_pool->enqueue(chunk); - HS_PERIODIC_LOG(TRACE, journalvdev, "adding chunk {} back to pool desc {}", chunk->chunk_id(), to_string()); } // Update our start offset, to keep track of actual size @@ -561,7 +600,8 @@ uint64_t JournalVirtualDev::Descriptor::logical_to_dev_offset(off_t log_offset, } #endif -std::tuple< shared< Chunk >, uint32_t, off_t > JournalVirtualDev::Descriptor::offset_to_chunk(off_t log_offset) const { +std::tuple< shared< Chunk >, uint32_t, off_t > JournalVirtualDev::Descriptor::offset_to_chunk(off_t log_offset, + bool check) const { uint64_t chunk_aligned_offset = sisl::round_down(m_data_start_offset, m_vdev.info().chunk_size); uint64_t off_l{static_cast< uint64_t >(log_offset) - chunk_aligned_offset}; uint32_t index = 0; @@ -574,10 +614,17 @@ std::tuple< shared< Chunk >, uint32_t, off_t > JournalVirtualDev::Descriptor::of } } - HS_DBG_ASSERT(false, "Input log_offset is invalid: {}", log_offset); + if (check) { HS_DBG_ASSERT(false, "Input log_offset is invalid: {} {}", log_offset, to_string()); } return {nullptr, 0L, 0L}; } +bool JournalVirtualDev::Descriptor::is_offset_at_last_chunk(off_t bytes_offset) { + auto [chunk, chunk_index, _] = offset_to_chunk(bytes_offset, false); + if (chunk == nullptr) return true; + if (chunk_index == m_journal_chunks.size() - 1) { return true; } + return false; +} + void JournalVirtualDev::Descriptor::high_watermark_check() { if (resource_mgr().check_journal_size(used_size(), size())) { COUNTER_INCREMENT(m_vdev.m_metrics, vdev_high_watermark_count, 1); @@ -598,7 +645,7 @@ bool JournalVirtualDev::Descriptor::is_alloc_accross_chunk(size_t size) const { nlohmann::json JournalVirtualDev::Descriptor::get_status(int log_level) const { nlohmann::json j; - j["logdev_id"] = m_logdev_id; + j["logdev"] = m_logdev_id; j["seek_cursor"] = m_seek_cursor; j["data_start_offset"] = m_data_start_offset; j["end_offset"] = m_end_offset; @@ -613,7 +660,7 @@ nlohmann::json JournalVirtualDev::Descriptor::get_status(int log_level) const { nlohmann::json c; auto* private_data = r_cast< JournalChunkPrivate* >(const_cast< uint8_t* >(chunk->user_private())); c["chunk_id"] = chunk->chunk_id(); - c["logdev_id"] = private_data->logdev_id; + c["logdev"] = private_data->logdev_id; c["is_head"] = private_data->is_head; c["end_of_chunk"] = private_data->end_of_chunk; c["next_chunk"] = private_data->next_chunk; @@ -627,12 +674,13 @@ nlohmann::json JournalVirtualDev::Descriptor::get_status(int log_level) const { } std::string JournalVirtualDev::Descriptor::to_string() const { - std::string str{fmt::format("id={};ds=0x{};end=0x{};writesz={};tail=0x{};" + off_t tail = + static_cast< off_t >(data_start_offset() + m_write_sz_in_total.load(std::memory_order_relaxed)) + m_reserved_sz; + std::string str{fmt::format("log_dev={};ds=0x{};end=0x{};writesz={};tail=0x{};" "rsvdsz={};chunks={};trunc={};total={};seek=0x{} ", m_logdev_id, to_hex(m_data_start_offset), to_hex(m_end_offset), - m_write_sz_in_total.load(std::memory_order_relaxed), to_hex(tail_offset()), - m_reserved_sz, m_journal_chunks.size(), m_truncate_done, m_total_size, - to_hex(m_seek_cursor))}; + m_write_sz_in_total.load(std::memory_order_relaxed), to_hex(tail), m_reserved_sz, + m_journal_chunks.size(), m_truncate_done, m_total_size, to_hex(m_seek_cursor))}; return str; } diff --git a/src/lib/device/journal_vdev.hpp b/src/lib/device/journal_vdev.hpp index 13ec18c65..18bc9608d 100644 --- a/src/lib/device/journal_vdev.hpp +++ b/src/lib/device/journal_vdev.hpp @@ -233,12 +233,7 @@ class JournalVirtualDev : public VirtualDev { * * @param offset : the start logical offset to be persisted */ - void update_data_start_offset(off_t offset) { - m_data_start_offset = offset; - auto data_start_offset_aligned = sisl::round_down(m_data_start_offset, m_vdev.info().chunk_size); - m_end_offset = data_start_offset_aligned + m_journal_chunks.size() * m_vdev.info().chunk_size; - RELEASE_ASSERT_EQ(m_end_offset - data_start_offset_aligned, m_total_size, "offset size mismatch"); - } + void update_data_start_offset(off_t offset); /** * @brief : get the logical tail offset; @@ -302,6 +297,13 @@ class JournalVirtualDev : public VirtualDev { */ uint64_t available_blks() const { return available_size() / m_vdev.block_size(); } + /** + * @brief : Check if the offset_bytes lies at the last chunk. + * + * @return : check if last chunk or not. + */ + bool is_offset_at_last_chunk(off_t offset_bytes); + /** * @brief Get the status of the journal vdev and its internal structures * @param log_level: Log level to do verbosity. @@ -309,6 +311,8 @@ class JournalVirtualDev : public VirtualDev { */ nlohmann::json get_status(int log_level) const; + logdev_id_t logdev_id() const { return m_logdev_id; } + std::string to_string() const; private: @@ -361,7 +365,7 @@ class JournalVirtualDev : public VirtualDev { // off_t& offset_in_chunk) const; // Return the chunk, its index and offset in the chunk list. - std::tuple< shared< Chunk >, uint32_t, off_t > offset_to_chunk(off_t log_offset) const; + std::tuple< shared< Chunk >, uint32_t, off_t > offset_to_chunk(off_t log_offset, bool check = true) const; bool validate_append_size(size_t count) const; @@ -409,7 +413,7 @@ class JournalVirtualDev : public VirtualDev { // Cache the chunks. Getting a chunk from the pool causes a single write of the // last chunk in the list to update its end_of_chunk and next_chunk. std::unique_ptr< ChunkPool > m_chunk_pool; - std::shared_ptr< JournalChunkPrivate > init_private_data; + std::shared_ptr< JournalChunkPrivate > m_init_private_data; }; } // namespace homestore diff --git a/src/lib/device/physical_dev.cpp b/src/lib/device/physical_dev.cpp index 9517c4422..b431cc384 100644 --- a/src/lib/device/physical_dev.cpp +++ b/src/lib/device/physical_dev.cpp @@ -247,12 +247,13 @@ std::vector< shared< Chunk > > PhysicalDev::create_chunks(const std::vector< uin auto chunk = std::make_shared< Chunk >(this, *cinfo, cslot); ret_chunks.push_back(chunk); get_stream(chunk).m_chunks_map.insert(std::pair{chunk_ids[cit], std::move(chunk)}); - + HS_LOG(INFO, device, "Creating chunk {}", chunk->to_string()); cinfo->~chunk_info(); } m_chunk_info_slots->set_bits(b.start_bit, b.nbits); write_super_block(buf, chunk_info::size * b.nbits, chunk_info_offset_nth(b.start_bit)); + hs_utils::iobuf_free(buf, sisl::buftag::superblk); chunks_remaining -= b.nbits; @@ -296,6 +297,7 @@ shared< Chunk > PhysicalDev::create_chunk(uint32_t chunk_id, uint32_t vdev_id, u auto bitmap_mem = m_chunk_info_slots->serialize(m_pdev_info.dev_attr.align_size); write_super_block(bitmap_mem->cbytes(), bitmap_mem->size(), hs_super_blk::chunk_sb_offset()); + HS_LOG(INFO, device, "Created chunk {}", chunk->to_string()); cinfo->~chunk_info(); hs_utils::iobuf_free(buf, sisl::buftag::superblk); @@ -397,6 +399,7 @@ void PhysicalDev::do_remove_chunk(cshared< Chunk >& chunk) { get_stream(chunk).m_chunks_map.erase(chunk->chunk_id()); cinfo->~chunk_info(); hs_utils::iobuf_free(buf, sisl::buftag::superblk); + HS_LOG(DEBUG, device, "Removed chunk {}", chunk->to_string()); } uint64_t PhysicalDev::chunk_info_offset_nth(uint32_t slot) const { @@ -418,11 +421,14 @@ void PhysicalDev::populate_chunk_info(chunk_info* cinfo, uint32_t vdev_id, uint6 cinfo->set_allocated(); cinfo->set_user_private(private_data); cinfo->compute_checksum(); + auto [_, inserted] = m_chunk_start.insert(cinfo->chunk_start_offset); + RELEASE_ASSERT(inserted, "Duplicate start offset {} for chunk {}", cinfo->chunk_start_offset, cinfo->chunk_id); } void PhysicalDev::free_chunk_info(chunk_info* cinfo) { auto ival = ChunkInterval::right_open(cinfo->chunk_start_offset, cinfo->chunk_start_offset + cinfo->chunk_size); m_chunk_data_area.erase(ival); + m_chunk_start.erase(cinfo->chunk_start_offset); cinfo->set_free(); cinfo->checksum = 0; diff --git a/src/lib/device/physical_dev.hpp b/src/lib/device/physical_dev.hpp index 2987fb45e..41eb9221d 100644 --- a/src/lib/device/physical_dev.hpp +++ b/src/lib/device/physical_dev.hpp @@ -136,6 +136,7 @@ class PhysicalDev { ChunkIntervalSet m_chunk_data_area; // Range of chunks data area created std::unique_ptr< sisl::Bitset > m_chunk_info_slots; // Slots to write the chunk info uint32_t m_chunk_sb_size{0}; // Total size of the chunk sb at present + std::unordered_set< uint64_t > m_chunk_start; // Store and verify start offset of all chunks for debugging. public: PhysicalDev(const dev_info& dinfo, int oflags, const pdev_info_header& pinfo); diff --git a/src/lib/logstore/log_dev.cpp b/src/lib/logstore/log_dev.cpp index 90cc8f2e7..541c54768 100644 --- a/src/lib/logstore/log_dev.cpp +++ b/src/lib/logstore/log_dev.cpp @@ -35,9 +35,9 @@ namespace homestore { SISL_LOGGING_DECL(logstore) -#define THIS_LOGDEV_LOG(level, msg, ...) HS_SUBMOD_LOG(level, logstore, , "logdev", m_logdev_id, msg, __VA_ARGS__) +#define THIS_LOGDEV_LOG(level, msg, ...) HS_SUBMOD_LOG(level, logstore, , "log_dev", m_logdev_id, msg, __VA_ARGS__) #define THIS_LOGDEV_PERIODIC_LOG(level, msg, ...) \ - HS_PERIODIC_DETAILED_LOG(level, logstore, "logdev", m_logdev_id, , , msg, __VA_ARGS__) + HS_PERIODIC_DETAILED_LOG(level, logstore, "log_dev", m_logdev_id, , , msg, __VA_ARGS__) static bool has_data_service() { return HomeStore::instance()->has_data_service(); } // static BlkDataService& data_service() { return HomeStore::instance()->data_service(); } @@ -87,12 +87,7 @@ void LogDev::start(bool format, JournalVirtualDev* vdev) { m_last_flush_idx = m_log_idx - 1; } - iomanager.run_on_wait(logstore_service().flush_thread(), [this]() { - m_flush_timer_hdl = iomanager.schedule_thread_timer(HS_DYNAMIC_CONFIG(logstore.flush_timer_frequency_us) * 1000, - true /* recurring */, nullptr /* cookie */, - [this](void*) { flush_if_needed(); }); - }); - + start_timer(); handle_unopened_log_stores(format); { @@ -130,9 +125,7 @@ void LogDev::stop() { m_block_flush_q_cv.wait(lk, [&] { return m_stopped; }); } - // cancel the timer - iomanager.run_on_wait(logstore_service().flush_thread(), - [this]() { iomanager.cancel_timer(m_flush_timer_hdl, true); }); + stop_timer(); { folly::SharedMutexWritePriority::WriteHolder holder(m_store_map_mtx); @@ -158,12 +151,31 @@ void LogDev::stop() { m_hs.reset(); } +void LogDev::start_timer() { + // Currently only tests set it to 0. + if (HS_DYNAMIC_CONFIG(logstore.flush_timer_frequency_us) == 0) { return; } + + iomanager.run_on_wait(logstore_service().flush_thread(), [this]() { + m_flush_timer_hdl = iomanager.schedule_thread_timer(HS_DYNAMIC_CONFIG(logstore.flush_timer_frequency_us) * 1000, + true /* recurring */, nullptr /* cookie */, + [this](void*) { flush_if_needed(); }); + }); +} + +void LogDev::stop_timer() { + if (m_flush_timer_hdl != iomgr::null_timer_handle) { + // cancel the timer + iomanager.run_on_wait(logstore_service().flush_thread(), + [this]() { iomanager.cancel_timer(m_flush_timer_hdl, true); }); + } +} + void LogDev::do_load(const off_t device_cursor) { log_stream_reader lstream{device_cursor, m_vdev, m_vdev_jd, m_flush_size_multiple}; logid_t loaded_from{-1}; off_t group_dev_offset = 0; - THIS_LOGDEV_LOG(TRACE, "LogDev::do_load start {} ", m_logdev_id); + THIS_LOGDEV_LOG(TRACE, "LogDev::do_load start log_dev={} ", m_logdev_id); do { const auto buf = lstream.next_group(&group_dev_offset); @@ -180,6 +192,7 @@ void LogDev::do_load(const off_t device_cursor) { break; } + THIS_LOGDEV_LOG(INFO, "Found log group header offset=0x{} header {}", to_hex(group_dev_offset), *header); HS_REL_ASSERT_EQ(header->start_idx(), m_log_idx.load(), "log indx is not the expected one"); if (loaded_from == -1) { loaded_from = header->start_idx(); } @@ -239,15 +252,16 @@ void LogDev::assert_next_pages(log_stream_reader& lstream) { } int64_t LogDev::append_async(const logstore_id_t store_id, const logstore_seq_num_t seq_num, const sisl::io_blob& data, - void* cb_context) { + void* cb_context, bool flush_wait) { auto prev_size = m_pending_flush_size.fetch_add(data.size(), std::memory_order_relaxed); const auto idx = m_log_idx.fetch_add(1, std::memory_order_acq_rel); auto threshold_size = LogDev::flush_data_threshold_size(); m_log_records->create(idx, store_id, seq_num, data, cb_context); - if (prev_size < threshold_size && ((prev_size + data.size()) >= threshold_size) && - !m_is_flushing.load(std::memory_order_relaxed)) { - flush_if_needed(); + if (flush_wait || + ((prev_size < threshold_size && ((prev_size + data.size()) >= threshold_size) && + !m_is_flushing.load(std::memory_order_relaxed)))) { + flush_if_needed(flush_wait ? 1 : -1); } return idx; } @@ -257,11 +271,17 @@ log_buffer LogDev::read(const logdev_key& key, serialized_log_record& return_rec m_vdev_jd->sync_pread(buf->bytes(), initial_read_size, key.dev_offset); auto* header = r_cast< const log_group_header* >(buf->cbytes()); - HS_REL_ASSERT_EQ(header->magic_word(), LOG_GROUP_HDR_MAGIC, "Log header corrupted with magic mismatch!"); - HS_REL_ASSERT_EQ(header->get_version(), log_group_header::header_version, "Log header version mismatch!"); - HS_REL_ASSERT_LE(header->start_idx(), key.idx, "log key offset does not match with log_idx"); - HS_REL_ASSERT_GT((header->start_idx() + header->nrecords()), key.idx, "log key offset does not match with log_idx"); - HS_LOG_ASSERT_GE(header->total_size(), header->_inline_data_offset(), "Inconsistent size data in log group"); + // THIS_LOGDEV_LOG(TRACE, "Logdev read log group header {}", *header); + HS_REL_ASSERT_EQ(header->magic_word(), LOG_GROUP_HDR_MAGIC, "Log header corrupted with magic mismatch! {} {}", + m_logdev_id, *header); + HS_REL_ASSERT_EQ(header->get_version(), log_group_header::header_version, "Log header version mismatch! {} {}", + m_logdev_id, *header); + HS_REL_ASSERT_LE(header->start_idx(), key.idx, "log key offset does not match with log_idx {} }{}", m_logdev_id, + *header); + HS_REL_ASSERT_GT((header->start_idx() + header->nrecords()), key.idx, + "log key offset does not match with log_idx {} {}", m_logdev_id, *header); + HS_LOG_ASSERT_GE(header->total_size(), header->_inline_data_offset(), "Inconsistent size data in log group {} {}", + m_logdev_id, *header); // We can only do crc match in read if we have read all the blocks. We don't want to aggressively read more data // than we need to just to compare CRC for read operation. It can be done during recovery. @@ -336,7 +356,7 @@ LogGroup* LogDev::prepare_flush(const int32_t estimated_records) { } }); - lg->finish(get_prev_crc()); + lg->finish(m_logdev_id, get_prev_crc()); if (sisl_unlikely(flushing_upto_idx == -1)) { return nullptr; } lg->m_flush_log_idx_from = m_last_flush_idx + 1; lg->m_flush_log_idx_upto = flushing_upto_idx; @@ -345,7 +365,6 @@ LogGroup* LogDev::prepare_flush(const int32_t estimated_records) { HS_DBG_ASSERT_GT(lg->header()->oob_data_offset, 0); THIS_LOGDEV_LOG(DEBUG, "Flushing upto log_idx={}", flushing_upto_idx); - THIS_LOGDEV_LOG(DEBUG, "Log Group: {}", *lg); return lg; } @@ -371,7 +390,8 @@ bool LogDev::flush_if_needed(int64_t threshold_size) { if (flush_by_size || flush_by_time) { // First off, check if we can flush in this thread itself, if not, schedule it into different thread if (!can_flush_in_this_thread()) { - iomanager.run_on_forget(logstore_service().flush_thread(), [this]() { flush_if_needed(); }); + iomanager.run_on_forget(logstore_service().flush_thread(), + [this, threshold_size]() { flush_if_needed(threshold_size); }); return false; } @@ -406,8 +426,9 @@ bool LogDev::flush_if_needed(int64_t threshold_size) { off_t offset = m_vdev_jd->alloc_next_append_blk(lg->header()->total_size()); lg->m_log_dev_offset = offset; HS_REL_ASSERT_NE(lg->m_log_dev_offset, INVALID_OFFSET, "log dev is full"); - THIS_LOGDEV_LOG(TRACE, "Flush prepared, flushing data size={} at offset={}", lg->actual_data_size(), offset); - + THIS_LOGDEV_LOG(TRACE, "Flushing log group data size={} at offset=0x{} log_group={}", lg->actual_data_size(), + to_hex(offset), *lg); + // THIS_LOGDEV_LOG(DEBUG, "Log Group: {}", *lg); do_flush(lg); return true; } else { @@ -508,8 +529,8 @@ void LogDev::unlock_flush(bool do_flush) { } if (!cb()) { // NOTE: Under this if condition DO NOT ASSUME flush lock is still being held. This is because - // callee is saying, I will unlock the flush lock on my own and before returning from cb to here, the - // callee could have schedule a job in other thread and unlock the flush. + // callee is saying, I will unlock the flush lock on my own and before returning from cb to here, + // the callee could have schedule a job in other thread and unlock the flush. std::unique_lock lk{m_block_flush_q_mutex}; THIS_LOGDEV_LOG(DEBUG, "flush cb wanted to hold onto the flush lock, so putting the {} remaining entries back " @@ -539,14 +560,14 @@ void LogDev::unlock_flush(bool do_flush) { uint64_t LogDev::truncate(const logdev_key& key) { HS_DBG_ASSERT_GE(key.idx, m_last_truncate_idx); uint64_t const num_records_to_truncate = static_cast< uint64_t >(key.idx - m_last_truncate_idx); - LOGINFO("LogDev::truncate {}", num_records_to_truncate); + THIS_LOGDEV_LOG(DEBUG, "LogDev::truncate num {} idx {}", num_records_to_truncate, key.idx); if (num_records_to_truncate > 0) { HS_PERIODIC_LOG(INFO, logstore, - "Truncating log device upto logdev {} log_id={} vdev_offset={} truncated {} log records", + "Truncating log device upto log_dev={} log_id={} vdev_offset={} truncated {} log records", m_logdev_id, key.idx, key.dev_offset, num_records_to_truncate); m_log_records->truncate(key.idx); m_vdev_jd->truncate(key.dev_offset); - LOGINFO("LogDev::truncate {}", key.idx); + THIS_LOGDEV_LOG(DEBUG, "LogDev::truncate done {} ", key.idx); m_last_truncate_idx = key.idx; { @@ -562,8 +583,8 @@ uint64_t LogDev::truncate(const logdev_key& key) { for (auto it{std::cbegin(m_garbage_store_ids)}; it != std::cend(m_garbage_store_ids);) { if (it->first > key.idx) break; - HS_PERIODIC_LOG(INFO, logstore, "Garbage collecting the log store id {} log_idx={}", it->second, - it->first); + HS_PERIODIC_LOG(DEBUG, logstore, "Garbage collecting the log_dev={} log_store={} log_idx={}", + m_logdev_id, it->second, it->first); m_logdev_meta.unreserve_store(it->second, false /* persist_now */); it = m_garbage_store_ids.erase(it); #ifdef _PRERELEASE @@ -573,11 +594,11 @@ uint64_t LogDev::truncate(const logdev_key& key) { // We can remove the rollback records of those upto which logid is getting truncated m_logdev_meta.remove_rollback_record_upto(key.idx, false /* persist_now */); - LOGINFO("LogDev::truncate remove rollback {}", key.idx); + THIS_LOGDEV_LOG(DEBUG, "LogDev::truncate remove rollback {}", key.idx); m_logdev_meta.persist(); #ifdef _PRERELEASE if (garbage_collect && iomgr_flip::instance()->test_flip("logdev_abort_after_garbage")) { - LOGINFO("logdev aborting after unreserving garbage ids"); + THIS_LOGDEV_LOG(INFO, "logdev aborting after unreserving garbage ids"); raise(SIGKILL); } #endif @@ -604,9 +625,9 @@ void LogDev::handle_unopened_log_stores(bool format) { } m_unopened_store_io.clear(); - // If there are any unopened storeids found, loop and check again if they are indeed open later. Unopened log store - // could be possible if the ids are deleted, but it is delayed to remove from store id reserver. In that case, - // do the remove from store id reserver now. + // If there are any unopened storeids found, loop and check again if they are indeed open later. Unopened log + // store could be possible if the ids are deleted, but it is delayed to remove from store id reserver. In that + // case, do the remove from store id reserver now. // TODO: At present we are assuming all unopened store ids could be removed. In future have a callback to this // start routine, which takes the list of unopened store ids and can return a new set, which can be removed. { @@ -632,7 +653,7 @@ std::shared_ptr< HomeLogStore > LogDev::create_new_log_store(bool append_mode) { HS_REL_ASSERT((it == m_id_logstore_map.end()), "store_id {}-{} already exists", m_logdev_id, store_id); m_id_logstore_map.insert(std::pair(store_id, logstore_info{.log_store = lstore, .append_mode = append_mode})); } - LOGINFO("Created log store id {}-{}", m_logdev_id, store_id); + LOGINFO("Created log store log_dev={} log_store={}", m_logdev_id, store_id); return lstore; } @@ -652,7 +673,7 @@ folly::Future< shared< HomeLogStore > > LogDev::open_log_store(logstore_id_t sto } void LogDev::remove_log_store(logstore_id_t store_id) { - LOGINFO("Removing log store id {}-{}", m_logdev_id, store_id); + LOGINFO("Removing log_dev={} log_store={}", m_logdev_id, store_id); { folly::SharedMutexWritePriority::WriteHolder holder(m_store_map_mtx); auto ret = m_id_logstore_map.erase(store_id); @@ -694,8 +715,8 @@ void LogDev::on_log_store_found(logstore_id_t store_id, const logstore_superblk& return; } - LOGINFO("Found a logstore store_id={}-{} with start seq_num={}, Creating a new HomeLogStore instance", m_logdev_id, - store_id, sb.m_first_seq_num); + LOGINFO("Found a logstore log_dev={} log_store={} with start seq_num={}, Creating a new HomeLogStore instance", + m_logdev_id, store_id, sb.m_first_seq_num); logstore_info& info = it->second; info.log_store = std::make_shared< HomeLogStore >(shared_from_this(), store_id, info.append_mode, sb.m_first_seq_num); @@ -800,17 +821,17 @@ logdev_key LogDev::do_device_truncate(bool dry_run) { } if ((min_safe_ld_key == logdev_key::out_of_bound_ld_key()) || (min_safe_ld_key.idx < 0)) { - HS_PERIODIC_LOG( - INFO, logstore, - "[Logdev={}] No log store append on any log stores, skipping device truncation, all_logstore_info:<{}>", - m_logdev_id, dbg_str); + HS_PERIODIC_LOG(INFO, logstore, + "[log_dev={}] No log store append on any log stores, skipping device truncation, " + "all_logstore_info:<{}>", + m_logdev_id, dbg_str); return min_safe_ld_key; } // Got the safest log id to truncate and actually truncate upto the safe log idx to the log device if (!dry_run) { truncate(min_safe_ld_key); } HS_PERIODIC_LOG(INFO, logstore, - "[Logdev={}] LogDevice truncate, all_logstore_info:<{}> safe log dev key to truncate={}", + "[log_dev={}] LogDevice truncate, all_logstore_info:<{}> safe log dev key to truncate={}", m_logdev_id, dbg_str, min_safe_ld_key); // We call post device truncation only to the log stores whose prepared truncation points are fully @@ -1054,7 +1075,7 @@ void LogDevMetadata::remove_rollback_record_upto(logid_t upto_id, bool persist_n uint32_t n_removed{0}; for (auto i = m_rollback_sb->num_records; i > 0; --i) { auto& rec = m_rollback_sb->at(i - 1); - LOGINFO("Removing record sb {} {}", rec.idx_range.second, upto_id); + HS_LOG(TRACE, logstore, "Removing record sb {} {}", rec.idx_range.second, upto_id); if (rec.idx_range.second <= upto_id) { m_rollback_sb->remove_ith_record(i - 1); ++n_removed; @@ -1063,7 +1084,7 @@ void LogDevMetadata::remove_rollback_record_upto(logid_t upto_id, bool persist_n if (n_removed) { for (auto it = m_rollback_info.begin(); it != m_rollback_info.end();) { - LOGINFO("Removing info {} {}", it->second.second, upto_id); + HS_LOG(TRACE, logstore, "Removing info {} {}", it->second.second, upto_id); if (it->second.second <= upto_id) { it = m_rollback_info.erase(it); } else { diff --git a/src/lib/logstore/log_dev.hpp b/src/lib/logstore/log_dev.hpp index bfb2afb1a..f356102a0 100644 --- a/src/lib/logstore/log_dev.hpp +++ b/src/lib/logstore/log_dev.hpp @@ -136,6 +136,7 @@ struct log_group_header { uint32_t footer_offset; // offset of where footer starts crc32_t prev_grp_crc; // Checksum of the previous group that was written crc32_t cur_grp_crc; // Checksum of the current group record + logdev_id_t logdev_id; // Logdev id log_group_header() : magic{LOG_GROUP_HDR_MAGIC}, version{header_version} {} log_group_header(const log_group_header&) = delete; @@ -195,9 +196,10 @@ struct fmt::formatter< homestore::log_group_header > { return fmt::format_to( ctx.out(), "magic = {} version={} n_log_records = {} start_log_idx = {} group_size = {} inline_data_offset = {} " - "oob_data_offset = {} prev_grp_crc = {} cur_grp_crc = {}", + "oob_data_offset = {} prev_grp_crc = {} cur_grp_crc = {} logdev = {}", header.magic, header.version, header.n_log_records, header.start_log_idx, header.group_size, - header.inline_data_offset, header.oob_data_offset, header.prev_grp_crc, header.cur_grp_crc); + header.inline_data_offset, header.oob_data_offset, header.prev_grp_crc, header.cur_grp_crc, + header.logdev_id); } }; @@ -253,7 +255,7 @@ class LogGroup { bool add_record(log_record& record, const int64_t log_idx); bool can_accomodate(const log_record& record) const { return (m_nrecords <= m_max_records); } - const iovec_array& finish(const crc32_t prev_crc); + const iovec_array& finish(logdev_id_t logdev_id, const crc32_t prev_crc); crc32_t compute_crc(); log_group_header* header() { return reinterpret_cast< log_group_header* >(m_cur_log_buf); } @@ -634,6 +636,18 @@ class LogDev : public std::enable_shared_from_this< LogDev > { */ void stop(); + /** + * @brief Start the flush timer. + * + */ + void start_timer(); + + /** + * @brief Stop the flush timer. + * + */ + void stop_timer(); + /** * @brief Append the data to the log device asynchronously. The buffer that is passed is expected to be valid, till * the append callback is done. @@ -648,7 +662,7 @@ class LogDev : public std::enable_shared_from_this< LogDev > { * @return logid_t : log_idx of the log of the data. */ logid_t append_async(logstore_id_t store_id, logstore_seq_num_t seq_num, const sisl::io_blob& data, - void* cb_context); + void* cb_context, bool flush_wait = false); /** * @brief Read the log id from the device offset @@ -781,7 +795,6 @@ class LogDev : public std::enable_shared_from_this< LogDev > { void device_truncate_under_lock(const std::shared_ptr< truncate_req >& treq); logdev_key do_device_truncate(bool dry_run = false); void handle_unopened_log_stores(bool format); - logdev_id_t get_id() { return m_logdev_id; } private: @@ -858,7 +871,7 @@ class LogDev : public std::enable_shared_from_this< LogDev > { uint32_t m_log_group_idx{0}; std::atomic< bool > m_flush_status = false; // Timer handle - iomgr::timer_handle_t m_flush_timer_hdl; + iomgr::timer_handle_t m_flush_timer_hdl{iomgr::null_timer_handle}; }; // LogDev } // namespace homestore diff --git a/src/lib/logstore/log_group.cpp b/src/lib/logstore/log_group.cpp index 2f54da8d8..597d97849 100644 --- a/src/lib/logstore/log_group.cpp +++ b/src/lib/logstore/log_group.cpp @@ -117,13 +117,14 @@ bool LogGroup::new_iovec_for_footer() const { return ((m_inline_data_pos + sizeof(log_group_footer)) >= m_cur_buf_len || m_oob_data_pos != 0); } -const iovec_array& LogGroup::finish(const crc32_t prev_crc) { +const iovec_array& LogGroup::finish(logdev_id_t logdev_id, const crc32_t prev_crc) { // add footer auto footer = add_and_get_footer(); m_iovecs[0].iov_len = sisl::round_up(m_iovecs[0].iov_len, m_flush_multiple_size); log_group_header* hdr = new (header()) log_group_header{}; + hdr->logdev_id = logdev_id; hdr->n_log_records = m_nrecords; hdr->prev_grp_crc = prev_crc; hdr->inline_data_offset = sizeof(log_group_header) + (m_max_records * sizeof(serialized_log_record)); diff --git a/src/lib/logstore/log_store.cpp b/src/lib/logstore/log_store.cpp index 089bb3286..48023f0e5 100644 --- a/src/lib/logstore/log_store.cpp +++ b/src/lib/logstore/log_store.cpp @@ -28,9 +28,9 @@ namespace homestore { SISL_LOGGING_DECL(logstore) -#define THIS_LOGSTORE_LOG(level, msg, ...) HS_SUBMOD_LOG(level, logstore, , "store", m_fq_name, msg, __VA_ARGS__) +#define THIS_LOGSTORE_LOG(level, msg, ...) HS_SUBMOD_LOG(level, logstore, , "log_store", m_fq_name, msg, __VA_ARGS__) #define THIS_LOGSTORE_PERIODIC_LOG(level, msg, ...) \ - HS_PERIODIC_DETAILED_LOG(level, logstore, "store", m_fq_name, , , msg, __VA_ARGS__) + HS_PERIODIC_DETAILED_LOG(level, logstore, "log_store", m_fq_name, , , msg, __VA_ARGS__) HomeLogStore::HomeLogStore(std::shared_ptr< LogDev > logdev, logstore_id_t id, bool append_mode, logstore_seq_num_t start_lsn) : @@ -39,7 +39,7 @@ HomeLogStore::HomeLogStore(std::shared_ptr< LogDev > logdev, logstore_id_t id, b m_records{"HomeLogStoreRecords", start_lsn - 1}, m_append_mode{append_mode}, m_seq_num{start_lsn}, - m_fq_name{fmt::format("{}.{}", logdev->get_id(), id)}, + m_fq_name{fmt::format("{} log_dev={}", id, logdev->get_id())}, m_metrics{logstore_service().metrics()} { m_truncation_barriers.reserve(10000); m_safe_truncation_boundary.ld_key = m_logdev->get_last_flush_ld_key(); @@ -57,18 +57,19 @@ bool HomeLogStore::write_sync(logstore_seq_num_t seq_num, const sisl::io_blob& b bool ret{false}; }; auto ctx = std::make_shared< Context >(); - this->write_async(seq_num, b, nullptr, - [seq_num, this, ctx](homestore::logstore_seq_num_t seq_num_cb, - [[maybe_unused]] const sisl::io_blob& b, homestore::logdev_key ld_key, - [[maybe_unused]] void* cb_ctx) { - HS_DBG_ASSERT((ld_key && seq_num == seq_num_cb), "Write_Async failed or corrupted"); - { - std::unique_lock< std::mutex > lk{ctx->write_mutex}; - ctx->write_done = true; - ctx->ret = true; - } - ctx->write_cv.notify_one(); - }); + this->write_async( + seq_num, b, nullptr, + [seq_num, this, ctx](homestore::logstore_seq_num_t seq_num_cb, [[maybe_unused]] const sisl::io_blob& b, + homestore::logdev_key ld_key, [[maybe_unused]] void* cb_ctx) { + HS_DBG_ASSERT((ld_key && seq_num == seq_num_cb), "Write_Async failed or corrupted"); + { + std::unique_lock< std::mutex > lk{ctx->write_mutex}; + ctx->write_done = true; + ctx->ret = true; + } + ctx->write_cv.notify_one(); + }, + true /* flush_wait */); { std::unique_lock< std::mutex > lk{ctx->write_mutex}; @@ -95,14 +96,15 @@ void HomeLogStore::write_async(logstore_req* req, const log_req_comp_cb_t& cb) { m_records.create(req->seq_num); COUNTER_INCREMENT(m_metrics, logstore_append_count, 1); HISTOGRAM_OBSERVE(m_metrics, logstore_record_size, req->data.size()); - m_logdev->append_async(m_store_id, req->seq_num, req->data, static_cast< void* >(req)); + m_logdev->append_async(m_store_id, req->seq_num, req->data, static_cast< void* >(req), req->flush_wait); } void HomeLogStore::write_async(logstore_seq_num_t seq_num, const sisl::io_blob& b, void* cookie, - const log_write_comp_cb_t& cb) { + const log_write_comp_cb_t& cb, bool flush_wait) { // Form an internal request and issue the write auto* req = logstore_req::make(this, seq_num, b, true /* is_write_req */); req->cookie = cookie; + req->flush_wait = flush_wait; write_async(req, [cb](logstore_req* req, logdev_key written_lkey) { if (cb) { cb(req->seq_num, req->data, written_lkey, req->cookie); } @@ -121,7 +123,7 @@ log_buffer HomeLogStore::read_sync(logstore_seq_num_t seq_num) { // If seq_num has not been flushed yet, but issued, then we flush them before reading auto const s = m_records.status(seq_num); if (s.is_out_of_range || s.is_hole) { - // THIS_LOGSTORE_LOG(DEBUG, "ld_key not valid {}", seq_num); + // THIS_LOGSTORE_LOG(ERROR, "ld_key not valid {}", seq_num); throw std::out_of_range("key not valid"); } else if (!s.is_completed) { THIS_LOGSTORE_LOG(TRACE, "Reading lsn={}:{} before flushed, doing flush first", m_store_id, seq_num); diff --git a/src/lib/logstore/log_store_service.cpp b/src/lib/logstore/log_store_service.cpp index 90e35cf06..68f08d275 100644 --- a/src/lib/logstore/log_store_service.cpp +++ b/src/lib/logstore/log_store_service.cpp @@ -60,6 +60,14 @@ folly::Future< std::error_code > LogStoreService::create_vdev(uint64_t size, uin hs_vdev_context hs_ctx; hs_ctx.type = hs_vdev_type_t::LOGDEV_VDEV; +#ifdef _PRERELEASE + auto min_size = iomgr_flip::instance()->get_test_flip< long >("set_minimum_chunk_size"); + if (min_size) { + chunk_size = min_size.get(); + LOGINFO("Flip set_minimum_chunk_size is enabled, min_chunk_size now is {}", chunk_size); + } +#endif + // reason we set alloc_type/chunk_sel_type here instead of by homestore logstore service consumer is because // consumer doesn't care or understands the underlying alloc/chunkSel for this service, if this changes in the // future, we can let consumer set it by then; @@ -115,7 +123,6 @@ logdev_id_t LogStoreService::create_new_logdev() { auto logdev = create_new_logdev_internal(logdev_id); logdev->start(true /* format */, m_logdev_vdev.get()); COUNTER_INCREMENT(m_metrics, logdevs_count, 1); - LOGINFO("Created log dev id {}", logdev_id); return logdev_id; } diff --git a/src/lib/logstore/log_stream.cpp b/src/lib/logstore/log_stream.cpp index afa7e67e9..105a7714e 100644 --- a/src/lib/logstore/log_stream.cpp +++ b/src/lib/logstore/log_stream.cpp @@ -29,6 +29,8 @@ log_stream_reader::log_stream_reader(off_t device_cursor, JournalVirtualDev* vde m_vdev_jd{std::move(vdev_jd)}, m_first_group_cursor{device_cursor}, m_read_size_multiple{read_size_multiple} { + // We set the journal descriptor seek_cursor here so that + // sync_next_read reads from the seek_cursor. m_vdev_jd->lseek(m_first_group_cursor); } @@ -44,7 +46,7 @@ sisl::byte_view log_stream_reader::next_group(off_t* out_dev_offset) { do { m_cur_log_buf = read_next_bytes(std::max(min_needed, bulk_read_size)); if (m_cur_log_buf.size() == 0) { - LOGINFOMOD(logstore, "Logdev data empty"); + LOGINFOMOD(logstore, "Logdev data empty log_dev={}", m_vdev_jd->logdev_id()); return {}; } } while (m_cur_log_buf.size() < sizeof(log_group_header)); @@ -54,56 +56,65 @@ sisl::byte_view log_stream_reader::next_group(off_t* out_dev_offset) { HS_REL_ASSERT_GE(m_cur_log_buf.size(), m_read_size_multiple); const auto* header = r_cast< log_group_header const* >(m_cur_log_buf.bytes()); if (header->magic_word() != LOG_GROUP_HDR_MAGIC) { - LOGINFOMOD(logstore, "Logdev data not seeing magic at pos {}, must have come to end of logdev", - m_vdev_jd->dev_offset(m_cur_read_bytes)); + LOGINFOMOD(logstore, "Logdev data not seeing magic at pos {}, must have come to end of log_dev={}", + m_vdev_jd->dev_offset(m_cur_read_bytes), m_vdev_jd->logdev_id()); *out_dev_offset = m_vdev_jd->dev_offset(m_cur_read_bytes); - // move it by dma boundary if header is not valid m_prev_crc = 0; m_cur_read_bytes += m_read_size_multiple; return ret_buf; } - if (header->total_size() > m_cur_log_buf.size()) { - LOGINFOMOD(logstore, "Logstream group size {} is more than available buffer size {}, reading from store", - header->total_size(), m_cur_log_buf.size()); - // Bigger group size than needed bytes, read again - min_needed = sisl::round_up(header->total_size(), m_read_size_multiple); - goto read_again; - } - - LOGTRACEMOD(logstore, - "Logstream read log group of size={} nrecords={} m_cur_log_dev_offset {} buf size " - "remaining {} ", - header->total_size(), header->nrecords(), m_vdev_jd->dev_offset(m_cur_read_bytes), - m_cur_log_buf.size()); - + // Because reuse chunks without cleaning up, we could get chunks used by other logdev's + // and it can happen that log group headers couldnt match. In that case check we dont error + // if its the last chunk or not with is_offset_at_last_chunk else raise assert. // compare it with prev crc if (m_prev_crc != 0 && m_prev_crc != header->prev_grp_crc) { // we reached at the end - LOGINFOMOD(logstore, "we have reached the end. crc doesn't match with the prev crc {}", - m_vdev_jd->dev_offset(m_cur_read_bytes)); + LOGINFOMOD(logstore, + "we have reached the end. crc doesn't match offset {} prev crc {} header prev crc {} log_dev={}", + m_vdev_jd->dev_offset(m_cur_read_bytes), header->prev_grp_crc, m_prev_crc, m_vdev_jd->logdev_id()); *out_dev_offset = m_vdev_jd->dev_offset(m_cur_read_bytes); - + if (!m_vdev_jd->is_offset_at_last_chunk(m_cur_read_bytes)) { + HS_REL_ASSERT(0, "data is corrupted {}", m_vdev_jd->logdev_id()); + } // move it by dma boundary if header is not valid m_prev_crc = 0; m_cur_read_bytes += m_read_size_multiple; return ret_buf; } + if (header->total_size() > m_cur_log_buf.size()) { + LOGTRACEMOD(logstore, "Logstream group size {} is more than available buffer size {}, reading from log_dev={}", + header->total_size(), m_cur_log_buf.size(), m_vdev_jd->logdev_id()); + // Bigger group size than needed bytes, read again + min_needed = sisl::round_up(header->total_size(), m_read_size_multiple); + goto read_again; + } + + LOGTRACEMOD(logstore, + "Logstream read log group of size={} nrecords={} journal_dev_offset {} cur_read_bytes {} buf size " + "remaining {} log_dev={}", + header->total_size(), header->nrecords(), m_vdev_jd->dev_offset(m_cur_read_bytes), m_cur_read_bytes, + m_cur_log_buf.size(), m_vdev_jd->logdev_id()); + // At this point data seems to be valid. Lets see if a data is written completely by comparing the footer const auto* footer = r_cast< log_group_footer* >((uint64_t)m_cur_log_buf.bytes() + header->footer_offset); if (footer->magic != LOG_GROUP_FOOTER_MAGIC || footer->start_log_idx != header->start_log_idx) { LOGINFOMOD(logstore, - "last write is not completely written. footer magic {} footer start_log_idx {} header log indx {}", - footer->magic, footer->start_log_idx, header->start_log_idx); + "last write is not completely written. footer magic {} footer start_log_idx {} header log indx {} " + "log_dev={}", + footer->magic, footer->start_log_idx, header->start_log_idx, m_vdev_jd->logdev_id()); *out_dev_offset = m_vdev_jd->dev_offset(m_cur_read_bytes); - // move it by dma boundary if header is not valid m_prev_crc = 0; m_cur_read_bytes += m_read_size_multiple; + if (!m_vdev_jd->is_offset_at_last_chunk(m_cur_read_bytes)) { + HS_REL_ASSERT(0, "data is corrupted {}", m_vdev_jd->logdev_id()); + } return ret_buf; } + HS_DBG_ASSERT_EQ(footer->version, log_group_footer::footer_version, "Log footer version mismatch"); // verify crc with data @@ -112,8 +123,9 @@ sisl::byte_view log_stream_reader::next_group(off_t* out_dev_offset) { (header->total_size() - sizeof(log_group_header))); if (cur_crc != header->cur_grp_crc) { /* This is a valid entry so crc should match */ - HS_REL_ASSERT(0, "data is corrupted"); - LOGINFOMOD(logstore, "crc doesn't match {}", m_vdev_jd->dev_offset(m_cur_read_bytes)); + LOGERRORMOD(logstore, "crc doesn't match {} log_dev={}", m_vdev_jd->dev_offset(m_cur_read_bytes), + m_vdev_jd->logdev_id()); + HS_REL_ASSERT(0, "data is corrupted {}", m_vdev_jd->logdev_id()); *out_dev_offset = m_vdev_jd->dev_offset(m_cur_read_bytes); // move it by dma boundary if header is not valid @@ -141,16 +153,20 @@ sisl::byte_view log_stream_reader::group_in_next_page() { sisl::byte_view log_stream_reader::read_next_bytes(uint64_t nbytes) { // TO DO: Might need to address alignment based on data or fast type + const auto prev_pos = m_vdev_jd->seeked_pos(); + auto sz_to_read = m_vdev_jd->sync_next_read(nullptr, nbytes); + if (sz_to_read == 0) { return sisl::byte_view{m_cur_log_buf}; } + auto out_buf = - hs_utils::make_byte_array(nbytes + m_cur_log_buf.size(), true, sisl::buftag::logread, m_vdev->align_size()); + hs_utils::make_byte_array(sz_to_read + m_cur_log_buf.size(), true, sisl::buftag::logread, m_vdev->align_size()); if (m_cur_log_buf.size()) { memcpy(out_buf->bytes(), m_cur_log_buf.bytes(), m_cur_log_buf.size()); } - const auto prev_pos = m_vdev_jd->seeked_pos(); - auto sz_read = m_vdev_jd->sync_next_read(out_buf->bytes() + m_cur_log_buf.size(), nbytes); - if (sz_read == 0) { return {}; } + auto sz_read = m_vdev_jd->sync_next_read(out_buf->bytes() + m_cur_log_buf.size(), sz_to_read); + assert(sz_read == sz_to_read); - LOGINFOMOD(logstore, "LogStream read {} bytes from vdev offset {} and vdev cur offset {}", nbytes, prev_pos, - m_vdev_jd->seeked_pos()); + LOGTRACEMOD(logstore, + "LogStream read {} bytes req bytes {} from vdev prev offset {} and vdev cur offset {} log_dev={}", + sz_read, nbytes, prev_pos, m_vdev_jd->seeked_pos(), m_vdev_jd->logdev_id()); return sisl::byte_view{out_buf}; } } // namespace homestore diff --git a/src/lib/replication/log_store/home_raft_log_store.cpp b/src/lib/replication/log_store/home_raft_log_store.cpp index 7444169ee..41b2ba1c4 100644 --- a/src/lib/replication/log_store/home_raft_log_store.cpp +++ b/src/lib/replication/log_store/home_raft_log_store.cpp @@ -64,11 +64,11 @@ HomeRaftLogStore::HomeRaftLogStore(logdev_id_t logdev_id, logstore_id_t logstore m_log_store = logstore_service().create_new_log_store(m_logdev_id, true); if (!m_log_store) { throw std::runtime_error("Failed to create log store"); } m_logstore_id = m_log_store->get_store_id(); - LOGDEBUGMOD(replication, "Opened new home log dev = {} store id={}", m_logdev_id, m_logstore_id); + LOGDEBUGMOD(replication, "Opened new home log_dev={} log_store={}", m_logdev_id, m_logstore_id); } else { m_logdev_id = logdev_id; m_logstore_id = logstore_id; - LOGDEBUGMOD(replication, "Opening existing home log dev = {} store id={}", m_logdev_id, logstore_id); + LOGDEBUGMOD(replication, "Opening existing home log_dev={} log_store={}", m_logdev_id, logstore_id); logstore_service().open_logdev(m_logdev_id); logstore_service().open_log_store(m_logdev_id, logstore_id, true).thenValue([this](auto log_store) { m_log_store = std::move(log_store); diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index 327660270..73ebf9fd1 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -68,9 +68,11 @@ RaftReplDev::RaftReplDev(RaftReplService& svc, superblk< raft_repl_dev_superblk m_rd_sb.write(); } - RD_LOG(INFO, "Started {} RaftReplDev group_id={}, replica_id={}, raft_server_id={} commited_lsn={} next_dsn={}", + RD_LOG(INFO, + "Started {} RaftReplDev group_id={}, replica_id={}, raft_server_id={} commited_lsn={} next_dsn={} " + "log_dev={} log_store={}", (load_existing ? "Existing" : "New"), group_id_str(), my_replica_id_str(), m_raft_server_id, - m_commit_upto_lsn.load(), m_next_dsn.load()); + m_commit_upto_lsn.load(), m_next_dsn.load(), m_rd_sb->logdev_id, m_rd_sb->logstore_id); m_msg_mgr.bind_data_service_request(PUSH_DATA, m_group_id, bind_this(RaftReplDev::on_push_data_received, 1)); m_msg_mgr.bind_data_service_request(FETCH_DATA, m_group_id, bind_this(RaftReplDev::on_fetch_data_received, 1)); @@ -466,7 +468,7 @@ void RaftReplDev::check_and_fetch_remote_data(std::vector< repl_req_ptr_t >* rre void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs) { if (rreqs.size() == 0) { return; } - std::vector<::flatbuffers::Offset< RequestEntry > > entries; + std::vector< ::flatbuffers::Offset< RequestEntry > > entries; entries.reserve(rreqs.size()); shared< flatbuffers::FlatBufferBuilder > builder = std::make_shared< flatbuffers::FlatBufferBuilder >(); diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt index e5f4ea0b2..e3b40fc90 100644 --- a/src/tests/CMakeLists.txt +++ b/src/tests/CMakeLists.txt @@ -85,6 +85,10 @@ if (${io_tests}) target_sources(test_log_store PRIVATE test_log_store.cpp) target_link_libraries(test_log_store hs_logdev homestore ${COMMON_TEST_DEPS} ) + add_executable(test_log_dev) + target_sources(test_log_dev PRIVATE test_log_dev.cpp) + target_link_libraries(test_log_dev hs_logdev homestore ${COMMON_TEST_DEPS} ) + set(TEST_METABLK_SOURCE_FILES test_meta_blk_mgr.cpp) add_executable(test_meta_blk_mgr ${TEST_METABLK_SOURCE_FILES}) target_link_libraries(test_meta_blk_mgr homestore ${COMMON_TEST_DEPS} GTest::gmock) @@ -108,7 +112,8 @@ if (${io_tests}) can_build_epoll_io_tests(epoll_tests) if(${epoll_tests}) - # add_test(NAME LogStore-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_log_store) + add_test(NAME LogDev-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_log_dev) + add_test(NAME LogStore-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_log_store) add_test(NAME MetaBlkMgr-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_meta_blk_mgr) add_test(NAME DataService-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_data_service) @@ -120,6 +125,7 @@ if (${io_tests}) can_build_spdk_io_tests(spdk_tests) if(${spdk_tests}) add_test(NAME LogStore-Spdk COMMAND ${CMAKE_BINARY_DIR}/bin/test_log_store -- --spdk "true") + add_test(NAME LogDev-Spdk COMMAND ${CMAKE_BINARY_DIR}/bin/test_log_dev -- --spdk "true") add_test(NAME MetaBlkMgr-Spdk COMMAND ${CMAKE_BINARY_DIR}/bin/test_meta_blk_mgr -- --spdk "true") add_test(NAME DataSerice-Spdk COMMAND ${CMAKE_BINARY_DIR}/bin/test_data_service -- --spdk "true") add_test(NAME SoloReplDev-Spdk COMMAND ${CMAKE_BINARY_DIR}/bin/test_solo_repl_dev -- --spdk "true") diff --git a/src/tests/log_store_benchmark.cpp b/src/tests/log_store_benchmark.cpp index d6fb50a4c..20b81f302 100644 --- a/src/tests/log_store_benchmark.cpp +++ b/src/tests/log_store_benchmark.cpp @@ -169,7 +169,7 @@ static void test_append(benchmark::State& state) { static void setup() { test_common::HSTestHelper::start_homestore( - "test_log_store", {{HS_SERVICE::META, {.size_pct = 5.0}}, {HS_SERVICE::LOG, {.size_pct = 87.0}}}); + "test_log_store_bench", {{HS_SERVICE::META, {.size_pct = 5.0}}, {HS_SERVICE::LOG, {.size_pct = 87.0}}}); } static void teardown() { test_common::HSTestHelper::shutdown_homestore(); } diff --git a/src/tests/test_common/homestore_test_common.hpp b/src/tests/test_common/homestore_test_common.hpp index 33b6d63a6..1890339bf 100644 --- a/src/tests/test_common/homestore_test_common.hpp +++ b/src/tests/test_common/homestore_test_common.hpp @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -170,6 +171,7 @@ class HSTestHelper { shared< ReplApplication > repl_app{nullptr}; chunk_num_t num_chunks{1}; uint64_t chunk_size{32 * 1024 * 1024}; // Chunk size in MB. + uint64_t min_chunk_size{0}; vdev_size_type_t vdev_size_type{vdev_size_type_t::VDEV_SIZE_STATIC}; }; @@ -246,6 +248,11 @@ class HSTestHelper { bool need_format = hsi->start(hs_input_params{.devices = device_info, .app_mem_size = app_mem_size}, std::move(cb)); + // We need to set the min chunk size before homestore format + if (svc_params[HS_SERVICE::LOG].min_chunk_size != 0) { + set_min_chunk_size(svc_params[HS_SERVICE::LOG].min_chunk_size); + } + if (need_format) { hsi->format_and_start({{HS_SERVICE::META, {.size_pct = svc_params[HS_SERVICE::META].size_pct}}, {HS_SERVICE::LOG, @@ -278,6 +285,21 @@ class HSTestHelper { s_dev_names.clear(); } + static void set_min_chunk_size(uint64_t chunk_size) { +#ifdef _PRERELEASE + LOGINFO("Set minimum chunk size {}", chunk_size); + flip::FlipClient* fc = iomgr_flip::client_instance(); + + flip::FlipFrequency freq; + freq.set_count(2000000); + freq.set_percent(100); + + flip::FlipCondition dont_care_cond; + fc->create_condition("", flip::Operator::DONT_CARE, (int)1, &dont_care_cond); + fc->inject_retval_flip< long >("set_minimum_chunk_size", {dont_care_cond}, freq, chunk_size); +#endif + } + static void fill_data_buf(uint8_t* buf, uint64_t size, uint64_t pattern = 0) { uint64_t* ptr = r_cast< uint64_t* >(buf); for (uint64_t i = 0ul; i < size / sizeof(uint64_t); ++i) { diff --git a/src/tests/test_device_manager.cpp b/src/tests/test_device_manager.cpp index 13bef9d6b..6e1cdcfcc 100644 --- a/src/tests/test_device_manager.cpp +++ b/src/tests/test_device_manager.cpp @@ -174,6 +174,62 @@ TEST_F(DeviceMgrTest, StripedVDevCreation) { this->validate_striped_vdevs(); } +TEST_F(DeviceMgrTest, CreateChunk) { + // Create dynamically chunks and verify no two chunks ahve same start offset. + uint64_t avail_size{0}; + for (auto& pdev : m_pdevs) { + avail_size += pdev->data_size(); + } + + LOGINFO("Step 1: Creating test_vdev with size={}", in_bytes(avail_size)); + auto vdev = + m_dmgr->create_vdev(homestore::vdev_parameters{.vdev_name = "test_vdev", + .size_type = vdev_size_type_t::VDEV_SIZE_DYNAMIC, + .vdev_size = avail_size, + .num_chunks = 0, + .blk_size = 512, + .chunk_size = 512 * 1024, + .dev_type = HSDevType::Data, + .alloc_type = blk_allocator_type_t::none, + .chunk_sel_type = chunk_selector_type_t::NONE, + .multi_pdev_opts = vdev_multi_pdev_opts_t::ALL_PDEV_STRIPED, + .context_data = sisl::blob{}}); + + auto num_chunks = 10; + LOGINFO("Step 2: Creating {} chunks", num_chunks); + std::unordered_map< uint32_t, chunk_info > chunk_info_map; + std::unordered_set< uint64_t > chunk_start; + + for (int i = 0; i < num_chunks; i++) { + auto chunk = m_dmgr->create_chunk(HSDevType::Data, vdev->info().vdev_id, 512 * 1024, {}); + chunk_info_map[chunk->chunk_id()] = chunk->info(); + auto [_, inserted] = chunk_start.insert(chunk->info().chunk_start_offset); + ASSERT_EQ(inserted, true) << "chunk start duplicate " << chunk->info().chunk_start_offset; + } + + LOGINFO("Step 3: Restarting homestore"); + this->restart(); + + LOGINFO("Step 4: Creating additional {} chunks", num_chunks); + for (int i = 0; i < num_chunks; i++) { + auto chunk = m_dmgr->create_chunk(HSDevType::Data, vdev->info().vdev_id, 512 * 1024, {}); + chunk_info_map[chunk->chunk_id()] = chunk->info(); + auto [_, inserted] = chunk_start.insert(chunk->info().chunk_start_offset); + ASSERT_EQ(inserted, true) << "chunk start duplicate " << chunk->info().chunk_start_offset; + } + + chunk_start.clear(); + auto chunk_vec = m_dmgr->get_chunks(); + ASSERT_EQ(chunk_vec.size(), num_chunks * 2); + for (const auto& chunk : chunk_vec) { + ASSERT_EQ(chunk->info().chunk_start_offset, chunk_info_map[chunk->chunk_id()].chunk_start_offset) + << "Chunk offsets mismatch"; + auto [_, inserted] = chunk_start.insert(chunk->info().chunk_start_offset); + ASSERT_EQ(inserted, true) << "chunk start duplicate " << chunk->info().chunk_start_offset; + } + vdev.reset(); +} + int main(int argc, char* argv[]) { SISL_OPTIONS_LOAD(argc, argv, logging, test_device_manager, iomgr); ::testing::InitGoogleTest(&argc, argv); diff --git a/src/tests/test_log_dev.cpp b/src/tests/test_log_dev.cpp index b41a5f9a5..2a1bba56b 100644 --- a/src/tests/test_log_dev.cpp +++ b/src/tests/test_log_dev.cpp @@ -13,135 +13,339 @@ * specific language governing permissions and limitations under the License. * *********************************************************************************/ -#include + +#include #include #include -#include -#include #include #include -#include +#include +#include +#include +#include #include +#include +#include #include #include +#include +#include +#include +#include "common/homestore_utils.hpp" +#include "common/homestore_assert.hpp" #include "logstore/log_dev.hpp" #include "test_common/homestore_test_common.hpp" using namespace homestore; + RCU_REGISTER_INIT SISL_LOGGING_INIT(HOMESTORE_LOG_MODS) +SISL_OPTIONS_ENABLE(logging, test_log_dev, iomgr, test_common_setup) +SISL_LOGGING_DECL(test_log_dev) -std::vector< logdev_key > s_logdev_keys; -static uint64_t first_offset{~static_cast< uint64_t >(0)}; +std::vector< std::string > test_common::HSTestHelper::s_dev_names; -static void on_append_completion(const logdev_key lkey, void* const ctx) { - s_logdev_keys.push_back(lkey); - LOGINFO("Append completed with log_idx = {} offset = {}", lkey.idx, lkey.dev_offset); - if (first_offset == ~static_cast< uint64_t >(0)) { first_offset = lkey.dev_offset; } -} +struct test_log_data { + test_log_data() = default; + test_log_data(const test_log_data&) = delete; + test_log_data(test_log_data&&) noexcept = delete; + test_log_data& operator=(const test_log_data&) = delete; + test_log_data& operator=(test_log_data&&) noexcept = delete; + ~test_log_data() = default; -static void on_log_found(const logdev_key lkey, const log_buffer buf) { - s_logdev_keys.push_back(lkey); - LOGINFO("Found a log with log_idx = {} offset = {}", lkey.idx, lkey.dev_offset); -} + uint32_t size; -[[nodiscard]] static std::shared_ptr< iomgr::ioMgr > start_homestore(const uint32_t ndevices, const uint64_t dev_size, - const uint32_t nthreads) { - std::vector< dev_info > device_info; - // these should be static so that they stay in scope in the lambda in case function ends before lambda completes - static std::mutex start_mutex; - static std::condition_variable cv; - static bool inited; - - inited = false; - LOGINFO("creating {} device files with each of size {} ", ndevices, dev_size); - for (uint32_t i{0}; i < ndevices; ++i) { - const std::filesystem::path fpath{"/tmp/" + std::to_string(i + 1)}; - std::ofstream ofs{fpath.string(), std::ios::binary | std::ios::out}; - std::filesystem::resize_file(fpath, dev_size); - device_info.emplace_back(std::filesystem::canonical(fpath).string(), HSDevType::Data); + uint8_t* get_data() { return uintptr_cast(this) + sizeof(test_log_data); }; + uint8_t const* get_data_const() const { return r_cast< uint8_t const* >(this) + sizeof(test_log_data); } + const uint8_t* get_data() const { return r_cast< const uint8_t* >(this) + sizeof(test_log_data); } + uint32_t total_size() const { return sizeof(test_log_data) + size; } + std::string get_data_str() const { + return std::string(r_cast< const char* >(get_data_const()), static_cast< size_t >(size)); } +}; - LOGINFO("Creating iomgr with {} threads", nthreads); - auto iomgr_obj{std::make_shared< iomgr::ioMgr >(2, nthreads)}; - - const uint64_t cache_size{((ndevices * dev_size) * 10) / 100}; - LOGINFO("Initialize and start HomeBlks with cache_size = {}", cache_size); - - boost::uuids::string_generator gen; - init_params params; - params.open_flags = homestore::io_flag::DIRECT_IO; - params.min_virtual_page_size = 4096; - params.cache_size = cache_size; - params.devices = device_info; - params.iomgr = iomgr_obj; - params.init_done_cb = [&iomgr_obj, &tl_start_mutex = start_mutex, &tl_cv = cv, - &tl_inited = inited](std::error_condition err, const out_params& params) { - iomgr_obj->start(); - LOGINFO("HomeBlks Init completed"); - { - std::unique_lock< std::mutex > lk{tl_start_mutex}; - tl_inited = true; - } - tl_cv.notify_one(); - }; - params.vol_mounted_cb = [](const VolumePtr& vol_obj, vol_state state) {}; - params.vol_state_change_cb = [](const VolumePtr& vol, vol_state old_state, vol_state new_state) {}; - params.vol_found_cb = [](boost::uuids::uuid uuid) -> bool { return true; }; +class LogDevTest : public ::testing::Test { +public: + const std::map< uint32_t, test_common::HSTestHelper::test_params > svc_params = {}; + static constexpr uint32_t max_data_size = 1024; + uint64_t s_max_flush_multiple = 0; - test_common::set_random_http_port(); - VolInterface::init(params); + virtual void SetUp() override { start_homestore(); } - { - std::unique_lock< std::mutex > lk{start_mutex}; - cv.wait(lk, [] { return inited; }); + void start_homestore(bool restart = false, hs_before_services_starting_cb_t starting_cb = nullptr) { + auto const ndevices = SISL_OPTIONS["num_devs"].as< uint32_t >(); + auto const dev_size = SISL_OPTIONS["dev_size_mb"].as< uint64_t >() * 1024 * 1024; + test_common::HSTestHelper::start_homestore("test_log_dev", + { + {HS_SERVICE::META, {.size_pct = 15.0}}, + {HS_SERVICE::LOG, + {.size_pct = 50.0, + .chunk_size = 8 * 1024 * 1024, + .min_chunk_size = 8 * 1024 * 1024, + .vdev_size_type = vdev_size_type_t::VDEV_SIZE_DYNAMIC}}, + }, + starting_cb, restart); } - return iomgr_obj; -} -SISL_OPTIONS_ENABLE(logging, test_log_dev) -SISL_OPTION_GROUP(test_log_dev, - (num_threads, "", "num_threads", "number of threads", - ::cxxopts::value< uint32_t >()->default_value("2"), "number"), - (num_devs, "", "num_devs", "number of devices to create", - ::cxxopts::value< uint32_t >()->default_value("2"), "number"), - (dev_size_mb, "", "dev_size_mb", "size of each device in MB", - ::cxxopts::value< uint64_t >()->default_value("5120"), "number")); + virtual void TearDown() override { test_common::HSTestHelper::shutdown_homestore(); } -int main(int argc, char* argv[]) { - SISL_OPTIONS_LOAD(argc, argv, logging, test_log_dev); - sisl::logging::SetLogger("test_log_dev"); - spdlog::set_pattern("[%D %T%z] [%^%l%$] [%n] [%t] %v"); + test_log_data* prepare_data(const logstore_seq_num_t lsn, bool& io_memory) { + static thread_local std::random_device rd{}; + static thread_local std::default_random_engine re{rd()}; + uint32_t sz{0}; + uint8_t* raw_buf{nullptr}; + + // Generate buffer of random size and fill with specific data + std::uniform_int_distribution< uint8_t > gen_percentage{0, 99}; + std::uniform_int_distribution< uint32_t > gen_data_size{0, max_data_size - 1}; + if (gen_percentage(re) < static_cast< uint8_t >(10)) { + // 10% of data is dma'ble aligned boundary + const auto alloc_sz = sisl::round_up(gen_data_size(re) + sizeof(test_log_data), s_max_flush_multiple); + raw_buf = iomanager.iobuf_alloc(dma_address_boundary, alloc_sz); + sz = alloc_sz - sizeof(test_log_data); + io_memory = true; + } else { + sz = gen_data_size(re); + raw_buf = static_cast< uint8_t* >(std::malloc(sizeof(test_log_data) + sz)); + io_memory = false; + } - auto iomgr_obj{start_homestore(SISL_OPTIONS["num_devs"].as< uint32_t >(), - SISL_OPTIONS["dev_size_mb"].as< uint64_t >() * 1024 * 1024, - SISL_OPTIONS["num_threads"].as< uint32_t >())}; + test_log_data* d = new (raw_buf) test_log_data(); + d->size = sz; - std::array< std::string, 1024 > s; - auto ld{LogDev::instance()}; - ld->register_append_cb(on_append_completion); - ld->register_logfound_cb(on_log_found); + assert(uintptr_cast(d) == raw_buf); - for ( - size_t i{0}; (i < std::min< size_t >(195, s.size()); ++i) { - s[i] = std::to_string(i); - ld->append_async(0, 0, {reinterpret_cast< const uint8_t* >(s[i].c_str()), s[i].size() + 1}, nullptr); + const char c = static_cast< char >((lsn % 94) + 33); + std::memset(voidptr_cast(d->get_data()), c, static_cast< size_t >(d->size)); + return d; } - size_t i{0}; - for (const auto& lk : s_logdev_keys) { - const auto b{ld->read(lk)}; - const auto exp_val{std::to_string(i)}; - const auto actual_val{reinterpret_cast< const char* >(b.data()), static_cast< size_t >(b.size())}; - if (actual_val != exp_val) { - LOGERROR("Error in reading value for log_idx {} actual_val={} expected_val={}", i, actual_val, exp_val); + void validate_data(std::shared_ptr< HomeLogStore > log_store, const test_log_data* d, + const logstore_seq_num_t lsn) { + const char c = static_cast< char >((lsn % 94) + 33); + const std::string actual = d->get_data_str(); + const std::string expected(static_cast< size_t >(d->size), + c); // needs to be () because of same reason as vector + ASSERT_EQ(actual, expected) << "Data mismatch for LSN=" << log_store->get_store_id() << ":" << lsn + << " size=" << d->size; + } + + void insert_sync(std::shared_ptr< HomeLogStore > log_store, logstore_seq_num_t lsn) { + bool io_memory{false}; + auto* d = prepare_data(lsn, io_memory); + const bool succ = log_store->write_sync(lsn, {uintptr_cast(d), d->total_size(), false}); + EXPECT_TRUE(succ); + LOGINFO("Written sync data for LSN -> {}:{}", log_store->get_store_id(), lsn); + if (io_memory) { + iomanager.iobuf_free(uintptr_cast(d)); } else { - LOGINFO("Read value {} for log_idx {}", actual_val, i); + std::free(voidptr_cast(d)); + } + } + + void kickstart_inserts(std::shared_ptr< HomeLogStore > log_store, logstore_seq_num_t& cur_lsn, int64_t batch) { + auto last = cur_lsn + batch; + for (; cur_lsn < last; cur_lsn++) { + insert_sync(log_store, cur_lsn); + } + } + + void read_verify(std::shared_ptr< HomeLogStore > log_store, logstore_seq_num_t lsn) { + auto b = log_store->read_sync(lsn); + auto* d = r_cast< test_log_data const* >(b.bytes()); + ASSERT_EQ(d->total_size(), b.size()) << "Size Mismatch for lsn=" << log_store->get_store_id() << ":" << lsn; + validate_data(log_store, d, lsn); + } + + void read_all_verify(std::shared_ptr< HomeLogStore > log_store) { + const auto trunc_upto = log_store->truncated_upto(); + const auto upto = log_store->get_contiguous_completed_seq_num(-1); + + for (std::remove_const_t< decltype(trunc_upto) > i{0}; i <= trunc_upto; ++i) { + ASSERT_THROW(log_store->read_sync(i), std::out_of_range) + << "Expected std::out_of_range exception for lsn=" << log_store->get_store_id() << ":" << i + << " but not thrown"; + } + + for (auto lsn = trunc_upto + 1; lsn < upto; lsn++) { + try { + read_verify(log_store, lsn); + } catch (const std::exception& ex) { + logstore_seq_num_t trunc_upto = 0; + std::mutex mtx; + std::condition_variable cv; + bool get_trunc_upto = false; + log_store->get_logdev()->run_under_flush_lock( + [this, log_store, &trunc_upto, &get_trunc_upto, &mtx, &cv] { + // In case we run truncation in parallel to read, it is possible + // the truncated_upto accordingly. + trunc_upto = log_store->truncated_upto(); + std::unique_lock lock(mtx); + get_trunc_upto = true; + cv.notify_one(); + return true; + }); + std::unique_lock lock(mtx); + cv.wait(lock, [&get_trunc_upto] { return get_trunc_upto == true; }); + if (lsn <= trunc_upto) { + lsn = trunc_upto; + continue; + } + LOGFATAL("Failed to read at upto {} lsn {} trunc_upto {}", upto, lsn, trunc_upto); + } + } + } + + void rollback_validate(std::shared_ptr< HomeLogStore > log_store, logstore_seq_num_t& cur_lsn, + uint32_t num_lsns_to_rollback) { + std::mutex mtx; + std::condition_variable cv; + bool rollback_done = false; + cur_lsn -= num_lsns_to_rollback; + auto const upto_lsn = cur_lsn - 1; + log_store->rollback_async(upto_lsn, [&](logstore_seq_num_t) { + ASSERT_EQ(log_store->get_contiguous_completed_seq_num(-1), upto_lsn) + << "Last completed seq num is not reset after rollback"; + ASSERT_EQ(log_store->get_contiguous_issued_seq_num(-1), upto_lsn) + << "Last issued seq num is not reset after rollback"; + read_all_verify(log_store); + { + std::unique_lock lock(mtx); + rollback_done = true; + } + cv.notify_one(); + }); + + // We wait till async rollback is finished as we do validation. + std::unique_lock lock(mtx); + cv.wait(lock, [&rollback_done] { return rollback_done == true; }); + } + + void truncate_validate(std::shared_ptr< HomeLogStore > log_store) { + auto upto = log_store->get_contiguous_completed_seq_num(-1); + LOGINFO("truncate_validate upto {}", upto); + log_store->truncate(upto); + read_all_verify(log_store); + logstore_service().device_truncate(nullptr /* cb */, true /* wait_till_done */); + } + + void rollback_records_validate(std::shared_ptr< HomeLogStore > log_store, uint32_t expected_count) { + auto actual_count = log_store->get_logdev()->log_dev_meta().num_rollback_records(log_store->get_store_id()); + ASSERT_EQ(actual_count, expected_count); + } +}; + +TEST_F(LogDevTest, WriteSyncThenRead) { + const auto iterations = SISL_OPTIONS["iterations"].as< uint32_t >(); + + for (uint32_t iteration{0}; iteration < iterations; ++iteration) { + LOGINFO("Iteration {}", iteration); + auto logdev_id = logstore_service().create_new_logdev(); + s_max_flush_multiple = logstore_service().get_logdev(logdev_id)->get_flush_size_multiple(); + auto log_store = logstore_service().create_new_log_store(logdev_id, false); + const auto store_id = log_store->get_store_id(); + LOGINFO("Created new log store -> id {}", store_id); + const unsigned count{10}; + for (unsigned i{0}; i < count; ++i) { + // Insert new entry. + insert_sync(log_store, i); + // Verify the entry. + read_verify(log_store, i); } - ++i; + + logstore_service().remove_log_store(logdev_id, store_id); + LOGINFO("Remove logstore -> i {}", store_id); } +} + +TEST_F(LogDevTest, Rollback) { + LOGINFO("Step 1: Create a single logstore to start rollback test"); + auto logdev_id = logstore_service().create_new_logdev(); + s_max_flush_multiple = logstore_service().get_logdev(logdev_id)->get_flush_size_multiple(); + auto log_store = logstore_service().create_new_log_store(logdev_id, false); + auto store_id = log_store->get_store_id(); + + auto restart = [&]() { + std::promise< bool > p; + auto starting_cb = [&]() { + HS_SETTINGS_FACTORY().modifiable_settings([](auto& s) { + // Disable flush timer in UT. + s.logstore.flush_timer_frequency_us = 0; + }); + HS_SETTINGS_FACTORY().save(); + logstore_service().open_logdev(logdev_id); + logstore_service().open_log_store(logdev_id, store_id, false /* append_mode */).thenValue([&](auto store) { + log_store = store; + p.set_value(true); + }); + }; + start_homestore(true /* restart */, starting_cb); + p.get_future().get(); + }; + + LOGINFO("Step 2: Issue sequential inserts with q depth of 10"); + logstore_seq_num_t cur_lsn = 0; + kickstart_inserts(log_store, cur_lsn, 500); + + LOGINFO("Step 3: Rollback last 50 entries and validate if pre-rollback entries are intact"); + rollback_validate(log_store, cur_lsn, 50); // Last entry = 450 + + LOGINFO("Step 4: Append 25 entries after rollback is completed"); + kickstart_inserts(log_store, cur_lsn, 25); // Last entry = 475 + + LOGINFO("Step 5: Rollback again for 75 entries even before previous rollback entry"); + rollback_validate(log_store, cur_lsn, 75); // Last entry = 400 + + LOGINFO("Step 6: Append 25 entries after second rollback is completed"); + kickstart_inserts(log_store, cur_lsn, 25); // Last entry = 425 + + LOGINFO("Step 7: Restart homestore and ensure all rollbacks are effectively validated"); + restart(); + + LOGINFO("Step 8: Post recovery, append another 25 entries"); + kickstart_inserts(log_store, cur_lsn, 25); // Last entry = 450 + + LOGINFO("Step 9: Rollback again for 75 entries even before previous rollback entry"); + rollback_validate(log_store, cur_lsn, 75); // Last entry = 375 + + LOGINFO("Step 10: After 3rd rollback, append another 25 entries"); + kickstart_inserts(log_store, cur_lsn, 25); // Last entry = 400 + + LOGINFO("Step 11: Truncate all entries"); + truncate_validate(log_store); + + LOGINFO("Step 12: Restart homestore and ensure all truncations after rollbacks are effectively validated"); + restart(); + + LOGINFO("Step 13: Append 25 entries after truncation is completed"); + kickstart_inserts(log_store, cur_lsn, 25); // Last entry = 425 + + LOGINFO("Step 14: Do another truncation to effectively truncate previous records"); + truncate_validate(log_store); + + LOGINFO("Step 15: Validate if there are no rollback records"); + rollback_records_validate(log_store, 0 /* expected_count */); +} + +SISL_OPTION_GROUP(test_log_dev, + (num_logdevs, "", "num_logdevs", "number of log devs", + ::cxxopts::value< uint32_t >()->default_value("4"), "number"), + (num_logstores, "", "num_logstores", "number of log stores", + ::cxxopts::value< uint32_t >()->default_value("16"), "number"), + (num_records, "", "num_records", "number of record to test", + ::cxxopts::value< uint32_t >()->default_value("10000"), "number"), + (iterations, "", "iterations", "Iterations", ::cxxopts::value< uint32_t >()->default_value("1"), + "the number of iterations to run each test")); + +int main(int argc, char* argv[]) { + int parsed_argc = argc; + ::testing::InitGoogleTest(&parsed_argc, argv); + SISL_OPTIONS_LOAD(parsed_argc, argv, logging, test_log_dev, iomgr, test_common_setup); + sisl::logging::SetLogger("test_log_dev"); + spdlog::set_pattern("[%D %T%z] [%^%l%$] [%t] %v"); - ld->load(first_offset); + const int ret = RUN_ALL_TESTS(); + return ret; } diff --git a/src/tests/test_log_store.cpp b/src/tests/test_log_store.cpp index 262270c88..586268eaf 100644 --- a/src/tests/test_log_store.cpp +++ b/src/tests/test_log_store.cpp @@ -164,6 +164,9 @@ class SampleLogStoreClient { }); } } + + // Because of restart in tests, we have torce the flush of log entries. + m_log_store->get_logdev()->flush_if_needed(1); } void iterate_validate(const bool expect_all_completed = false) { @@ -254,22 +257,43 @@ class SampleLogStoreClient { validate_data(tl, i); } } catch (const std::exception& e) { + logstore_seq_num_t trunc_upto = get_truncated_upto(); if (!expect_all_completed) { - // In case we run truncation in parallel to read, it is possible truncate moved, so adjust - // the truncated_upto accordingly. - const auto trunc_upto = m_log_store->truncated_upto(); + LOGINFO("got store {} trunc_upto {} {} {}", m_log_store->get_store_id(), trunc_upto, i, + m_log_store->log_records().get_status(2).dump(' ', 2)); if (i <= trunc_upto) { i = trunc_upto; continue; } } - LOGFATAL("Unexpected out_of_range exception for lsn={}:{} upto {}", m_log_store->get_store_id(), i, - upto); + LOGINFO("Unexpected out_of_range exception for lsn={}:{} upto {} trunc_upto {}", + m_log_store->get_store_id(), i, upto, trunc_upto); + LOGFATAL(""); } } } } + logstore_seq_num_t get_truncated_upto() { + std::mutex mtx; + std::condition_variable cv; + bool get_trunc_upto = false; + logstore_seq_num_t trunc_upto = 0; + m_log_store->get_logdev()->run_under_flush_lock([this, &trunc_upto, &get_trunc_upto, &mtx, &cv]() { + // In case we run truncation in parallel to read, it is possible truncate moved, so adjust + // the truncated_upto accordingly. + trunc_upto = m_log_store->truncated_upto(); + std::unique_lock lock(mtx); + get_trunc_upto = true; + cv.notify_one(); + return true; + }); + + std::unique_lock lock(mtx); + cv.wait(lock, [&get_trunc_upto] { return get_trunc_upto == true; }); + return trunc_upto; + } + void fill_hole_and_validate() { const auto start = m_log_store->truncated_upto(); m_hole_lsns.withWLock([&](auto& holes_list) { @@ -295,6 +319,7 @@ class SampleLogStoreClient { } void recovery_validate() { + LOGINFO("Truncated upto {}", get_truncated_upto()); LOGINFO("Totally recovered {} non-truncated lsns and {} truncated lsns for store {}", m_n_recovered_lsns, m_n_recovered_truncated_lsns, m_log_store->get_store_id()); if (m_n_recovered_lsns != (m_cur_lsn.load() - m_truncated_upto_lsn.load() - 1)) { @@ -438,17 +463,31 @@ class SampleDB { } void start_homestore(bool restart = false) { + auto n_log_devs = SISL_OPTIONS["num_logdevs"].as< uint32_t >(); auto n_log_stores = SISL_OPTIONS["num_logstores"].as< uint32_t >(); + if (n_log_stores < 4u) { LOGINFO("Log store test needs minimum 4 log stores for testing, setting them to 4"); n_log_stores = 4u; } + if (restart) { + for (auto& lsc : m_log_store_clients) { + lsc->flush(); + } + } + test_common::HSTestHelper::start_homestore( "test_log_store", {{HS_SERVICE::META, {.size_pct = 5.0}}, - {HS_SERVICE::LOG, {.size_pct = 84.0, .chunk_size = 32 * 1024 * 1024}}}, + {HS_SERVICE::LOG, {.size_pct = 84.0, .chunk_size = 8 * 1024 * 1024, .min_chunk_size = 8 * 1024 * 1024}}}, [this, restart, n_log_stores]() { + HS_SETTINGS_FACTORY().modifiable_settings([](auto& s) { + // Disable flush timer in UT. + s.logstore.flush_timer_frequency_us = 0; + }); + HS_SETTINGS_FACTORY().save(); + if (restart) { for (uint32_t i{0}; i < n_log_stores; ++i) { SampleLogStoreClient* client = m_log_store_clients[i].get(); @@ -462,19 +501,27 @@ class SampleDB { restart); if (!restart) { - auto logdev_id = logstore_service().create_new_logdev(); + std::vector< logdev_id_t > logdev_id_vec; + for (uint32_t i{0}; i < n_log_devs; ++i) { + logdev_id_vec.push_back(logstore_service().create_new_logdev()); + } + for (uint32_t i{0}; i < n_log_stores; ++i) { + auto logdev_id = logdev_id_vec[rand() % logdev_id_vec.size()]; m_log_store_clients.push_back(std::make_unique< SampleLogStoreClient >( logdev_id, bind_this(SampleDB::on_log_insert_completion, 3))); } SampleLogStoreClient::s_max_flush_multiple = - logstore_service().get_logdev(logdev_id)->get_flush_size_multiple(); + logstore_service().get_logdev(logdev_id_vec[0])->get_flush_size_multiple(); } } void shutdown(bool cleanup = true) { test_common::HSTestHelper::shutdown_homestore(cleanup); - if (cleanup) { m_log_store_clients.clear(); } + if (cleanup) { + m_log_store_clients.clear(); + m_highest_log_idx.clear(); + } } void on_log_insert_completion(logdev_id_t fid, logstore_seq_num_t lsn, logdev_key ld_key) { @@ -609,7 +656,7 @@ class LogStoreTest : public ::testing::Test { nlohmann::json json_dump = logdev->dump_log_store(dump_req); dump_sz += json_dump.size(); - LOGINFO("Printing json dump of all logstores in logdev {}. \n {}", logdev->get_id(), json_dump.dump()); + LOGDEBUG("Printing json dump of all logstores in logdev {}. \n {}", logdev->get_id(), json_dump.dump()); for (const auto& logdump : json_dump) { const auto itr = logdump.find("log_records"); if (itr != std::end(logdump)) { rec_count += static_cast< int64_t >(logdump["log_records"].size()); } @@ -619,14 +666,13 @@ class LogStoreTest : public ::testing::Test { EXPECT_EQ(expected_num_records, rec_count); } - void dump_validate_filter(logstore_id_t id, logstore_seq_num_t start_seq, logstore_seq_num_t end_seq, - bool print_content = false) { + void dump_validate_filter(bool print_content = false) { for (const auto& lsc : SampleDB::instance().m_log_store_clients) { - if (lsc->m_log_store->get_store_id() != id) { continue; } - - log_dump_req dump_req; + logstore_id_t id = lsc->m_log_store->get_store_id(); const auto fid = lsc->m_logdev_id; - + logstore_seq_num_t start_seq = lsc->m_log_store->truncated_upto() + 1; + logstore_seq_num_t end_seq = lsc->m_log_store->get_contiguous_completed_seq_num(-1); + log_dump_req dump_req; if (print_content) dump_req.verbosity_level = log_dump_verbosity::CONTENT; dump_req.log_store = lsc->m_log_store; dump_req.start_seq_num = start_seq; @@ -635,8 +681,8 @@ class LogStoreTest : public ::testing::Test { // must use operator= construction as copy construction results in error auto logdev = lsc->m_log_store->get_logdev(); nlohmann::json json_dump = logdev->dump_log_store(dump_req); - LOGINFO("Printing json dump of logdev={} logstore id {}, start_seq {}, end_seq {}, \n\n {}", fid, id, - start_seq, end_seq, json_dump.dump()); + LOGDEBUG("Printing json dump of log_dev={} logstore id {}, start_seq {}, end_seq {}, \n\n {}", fid, id, + start_seq, end_seq, json_dump.dump()); const auto itr_id = json_dump.find(std::to_string(id)); if (itr_id != std::end(json_dump)) { const auto itr_records = itr_id->find("log_records"); @@ -648,9 +694,8 @@ class LogStoreTest : public ::testing::Test { } else { EXPECT_FALSE(true); } - - return; } + return; } int find_garbage_upto(logdev_id_t logdev_id, logid_t idx) { @@ -791,7 +836,8 @@ class LogStoreTest : public ::testing::Test { } } ASSERT_EQ(actual_valid_ids, SampleDB::instance().m_log_store_clients.size()); - ASSERT_EQ(actual_garbage_ids, exp_garbage_store_count); + // Becasue we randomly assign logstore to logdev, some logdev will be empty. + // ASSERT_EQ(actual_garbage_ids, exp_garbage_store_count); } void delete_validate(uint32_t idx) { @@ -890,12 +936,26 @@ TEST_F(LogStoreTest, BurstRandInsertThenTruncate) { this->dump_validate(num_records); LOGINFO("Step 4.2: Read some specific interval/filter of seq number in one logstore and dump it into json"); - this->dump_validate_filter(0, 10, 100, true); + this->dump_validate_filter(true); } LOGINFO("Step 5: Truncate all of the inserts one log store at a time and validate log dev truncation is marked " "correctly and also validate if all data prior to truncation return exception"); this->truncate_validate(); + + LOGINFO("Step 6: Restart homestore"); + SampleDB::instance().start_homestore(true /* restart */); + this->recovery_validate(); + this->init(num_records); + + LOGINFO("Step 7: Issue more sequential inserts after restarts with q depth of 15"); + this->kickstart_inserts(1, 15); + + LOGINFO("Step 8: Wait for the previous Inserts to complete"); + this->wait_for_inserts(); + + LOGINFO("Step 9: Read all the inserts one by one for each log store to validate if what is written is valid"); + this->read_validate(true); } } @@ -1057,6 +1117,9 @@ TEST_F(LogStoreTest, ThrottleSeqInsertThenRecover) { this->recovery_validate(); this->init(num_records); + LOGINFO("Step 5a: Read all the inserts one by one for each log store to validate if what is written is valid"); + this->read_validate(true); + LOGINFO("Step 6: Restart homestore again to validate recovery on consecutive restarts"); SampleDB::instance().start_homestore(true /* restart */); this->recovery_validate(); @@ -1083,15 +1146,15 @@ TEST_F(LogStoreTest, ThrottleSeqInsertThenRecover) { this->truncate_validate(); } } + TEST_F(LogStoreTest, FlushSync) { -#ifdef _PRERELEASE LOGINFO("Step 1: Delay the flush threshold and flush timer to very high value to ensure flush works fine") HS_SETTINGS_FACTORY().modifiable_settings([](auto& s) { s.logstore.max_time_between_flush_us = 5000000ul; // 5 seconds s.logstore.flush_threshold_size = 1048576ul; // 1MB }); HS_SETTINGS_FACTORY().save(); -#endif + LOGINFO("Step 2: Reinit the 10 records to start sequential write test"); this->init(1000); @@ -1101,14 +1164,12 @@ TEST_F(LogStoreTest, FlushSync) { LOGINFO("Step 4: Do a sync flush"); this->flush(); -#ifdef _PRERELEASE LOGINFO("Step 5: Reset the settings back") HS_SETTINGS_FACTORY().modifiable_settings([](auto& s) { s.logstore.max_time_between_flush_us = 300ul; s.logstore.flush_threshold_size = 64ul; }); HS_SETTINGS_FACTORY().save(); -#endif LOGINFO("Step 6: Wait for the Inserts to complete"); this->wait_for_inserts(); @@ -1142,68 +1203,6 @@ TEST_F(LogStoreTest, FlushSync) { #endif } -TEST_F(LogStoreTest, Rollback) { - LOGINFO("Step 1: Reinit the 500 records on a single logstore to start rollback test"); - this->init(500, {std::make_pair(1ull, 100)}); // Last entry = 500 - - LOGINFO("Step 2: Issue sequential inserts with q depth of 10"); - this->kickstart_inserts(1, 10); - - LOGINFO("Step 3: Wait for the Inserts to complete"); - this->wait_for_inserts(); - - LOGINFO("Step 4: Rollback last 50 entries and validate if pre-rollback entries are intact"); - this->rollback_validate(50); // Last entry = 450 - - LOGINFO("Step 5: Append 25 entries after rollback is completed"); - this->init(25, {std::make_pair(1ull, 100)}); - this->kickstart_inserts(1, 10); - this->wait_for_inserts(); // Last entry = 475 - - LOGINFO("Step 7: Rollback again for 75 entries even before previous rollback entry"); - this->rollback_validate(75); // Last entry = 400 - - LOGINFO("Step 8: Append 25 entries after second rollback is completed"); - this->init(25, {std::make_pair(1ull, 100)}); - this->kickstart_inserts(1, 10); - this->wait_for_inserts(); // Last entry = 425 - - LOGINFO("Step 9: Restart homestore and ensure all rollbacks are effectively validated"); - SampleDB::instance().start_homestore(true /* restart */); - this->recovery_validate(); - - LOGINFO("Step 10: Post recovery, append another 25 entries"); - this->init(25, {std::make_pair(1ull, 100)}); - this->kickstart_inserts(1, 5); - this->wait_for_inserts(); // Last entry = 450 - - LOGINFO("Step 11: Rollback again for 75 entries even before previous rollback entry"); - this->rollback_validate(75); // Last entry = 375 - - LOGINFO("Step 12: After 3rd rollback, append another 25 entries"); - this->init(25, {std::make_pair(1ull, 100)}); - this->kickstart_inserts(1, 5); - this->wait_for_inserts(); // Last entry = 400 - - LOGINFO("Step 13: Truncate all entries"); - this->truncate_validate(); - - LOGINFO("Step 14: Restart homestore and ensure all truncations after rollbacks are effectively validated"); - SampleDB::instance().start_homestore(true /* restart */); - this->recovery_validate(); - - LOGINFO("Step 15: Append 25 entries after truncation is completed"); - this->init(25, {std::make_pair(1ull, 100)}); - this->kickstart_inserts(1, 10); - this->wait_for_inserts(); // Last entry = 425 - - LOGINFO("Step 16: Do another truncation to effectively truncate previous records"); - this->truncate_validate(); - - LOGINFO("Step 17: Validate if there are no rollback records"); - this->post_truncate_rollback_validate(); -} - TEST_F(LogStoreTest, DeleteMultipleLogStores) { const auto nrecords = (SISL_OPTIONS["num_records"].as< uint32_t >() * 5) / 100; @@ -1282,8 +1281,10 @@ TEST_F(LogStoreTest, WriteSyncThenRead) { SISL_OPTIONS_ENABLE(logging, test_log_store, iomgr, test_common_setup) SISL_OPTION_GROUP(test_log_store, - (num_logstores, "", "num_logstores", "number of log stores", + (num_logdevs, "", "num_logdevs", "number of log devs", ::cxxopts::value< uint32_t >()->default_value("4"), "number"), + (num_logstores, "", "num_logstores", "number of log stores", + ::cxxopts::value< uint32_t >()->default_value("16"), "number"), (num_records, "", "num_records", "number of record to test", ::cxxopts::value< uint32_t >()->default_value("10000"), "number"), (iterations, "", "iterations", "Iterations", ::cxxopts::value< uint32_t >()->default_value("1"),