diff --git a/conanfile.py b/conanfile.py index 5b65203b4..c381ebe03 100644 --- a/conanfile.py +++ b/conanfile.py @@ -5,7 +5,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "5.0.4" + version = "5.0.5" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" topics = ("ebay", "nublox") diff --git a/src/include/homestore/chunk_selector.h b/src/include/homestore/chunk_selector.h index 5c3a21a20..1d66f63cc 100644 --- a/src/include/homestore/chunk_selector.h +++ b/src/include/homestore/chunk_selector.h @@ -22,6 +22,7 @@ class ChunkSelector { public: ChunkSelector() = default; virtual void add_chunk(cshared< Chunk >&) = 0; + virtual void remove_chunk(cshared< Chunk >&){}; virtual void foreach_chunks(std::function< void(cshared< Chunk >&) >&& cb) = 0; virtual cshared< Chunk > select_chunk(blk_count_t nblks, const blk_alloc_hints& hints) = 0; diff --git a/src/include/homestore/homestore_decl.hpp b/src/include/homestore/homestore_decl.hpp index 1935ccd35..353c39c52 100644 --- a/src/include/homestore/homestore_decl.hpp +++ b/src/include/homestore/homestore_decl.hpp @@ -106,6 +106,8 @@ ENUM(chunk_selector_type_t, uint8_t, // What are the options to select chunk to ALWAYS_CALLER_CONTROLLED // Expect the caller to always provide the specific chunkid ); +ENUM(vdev_size_type_t, uint8_t, VDEV_SIZE_STATIC, VDEV_SIZE_DYNAMIC); + ////////////// All structs /////////////////// struct dev_info { explicit dev_info(std::string name, HSDevType type = HSDevType::Data) : dev_name{std::move(name)}, dev_type{type} {} @@ -148,7 +150,9 @@ static std::string in_bytes(uint64_t sz) { struct hs_format_params { float size_pct; uint32_t num_chunks{1}; + uint64_t chunk_size{0}; uint32_t block_size{0}; + vdev_size_type_t vdev_size_type{vdev_size_type_t::VDEV_SIZE_STATIC}; blk_allocator_type_t alloc_type{blk_allocator_type_t::varsize}; chunk_selector_type_t chunk_sel_type{chunk_selector_type_t::ROUND_ROBIN}; }; @@ -199,5 +203,4 @@ struct cap_attrs { } // namespace homestore ////////////// Misc /////////////////// -#define HOMESTORE_LOG_MODS \ - btree, device, blkalloc, cp, metablk, wbcache, logstore, transient, replication +#define HOMESTORE_LOG_MODS btree, device, blkalloc, cp, metablk, wbcache, logstore, transient, replication, journalvdev diff --git a/src/include/homestore/logstore_service.hpp b/src/include/homestore/logstore_service.hpp index 506eff5ba..c10423e27 100644 --- a/src/include/homestore/logstore_service.hpp +++ b/src/include/homestore/logstore_service.hpp @@ -135,7 +135,7 @@ class LogStoreService { void device_truncate(const device_truncate_cb_t& cb = nullptr, const bool wait_till_done = false, const bool dry_run = false); - folly::Future< std::error_code > create_vdev(uint64_t size, logstore_family_id_t family, uint32_t num_chunks); + folly::Future< std::error_code > create_vdev(uint64_t size, logstore_family_id_t family, uint32_t chunk_size); shared< VirtualDev > open_vdev(const vdev_info& vinfo, logstore_family_id_t family, bool load_existing); shared< JournalVirtualDev > get_vdev(logstore_family_id_t family) const { return (family == DATA_LOG_FAMILY_IDX) ? m_data_logdev_vdev : m_ctrl_logdev_vdev; diff --git a/src/lib/common/homestore_config.fbs b/src/lib/common/homestore_config.fbs index 8ea23aba6..95f523d81 100644 --- a/src/lib/common/homestore_config.fbs +++ b/src/lib/common/homestore_config.fbs @@ -140,6 +140,9 @@ table Generic { // percentage of cache used to create indx mempool. It should be more than 100 to // take into account some floating buffers in writeback cache. indx_mempool_percent : uint32 = 110; + + // Number of chunks in journal chunk pool. + journal_chunk_pool_capacity: uint32 = 5; } table ResourceLimits { diff --git a/src/lib/device/chunk.cpp b/src/lib/device/chunk.cpp index 0853571a1..9eb8563de 100644 --- a/src/lib/device/chunk.cpp +++ b/src/lib/device/chunk.cpp @@ -23,10 +23,10 @@ Chunk::Chunk(PhysicalDev* pdev, const chunk_info& cinfo, uint32_t chunk_slot) : m_chunk_info{cinfo}, m_pdev{pdev}, m_chunk_slot{chunk_slot}, m_stream_id{pdev->chunk_to_stream_id(cinfo)} {} std::string Chunk::to_string() const { - return fmt::format("chunk_id={}, vdev_id={}, start_offset={}, size={}, end_of_chunk={}, slot_num_in_pdev={} " + return fmt::format("chunk_id={}, vdev_id={}, start_offset={}, size={}, slot_num_in_pdev={} " "pdev_ordinal={} vdev_ordinal={} stream_id={}", - chunk_id(), vdev_id(), start_offset(), in_bytes(size()), end_of_chunk(), slot_number(), - pdev_ordinal(), vdev_ordinal(), stream_id()); + chunk_id(), vdev_id(), start_offset(), in_bytes(size()), slot_number(), pdev_ordinal(), + vdev_ordinal(), stream_id()); } void Chunk::set_user_private(const sisl::blob& data) { @@ -36,13 +36,6 @@ void Chunk::set_user_private(const sisl::blob& data) { write_chunk_info(); } -void Chunk::update_end_of_chunk(uint64_t end_offset) { - std::unique_lock lg{m_mgmt_mutex}; - m_chunk_info.end_of_chunk_size = end_offset; - m_chunk_info.compute_checksum(); - write_chunk_info(); -} - void Chunk::write_chunk_info() { auto buf = hs_utils::iobuf_alloc(chunk_info::size, sisl::buftag::superblk, physical_dev()->align_size()); auto cinfo = new (buf) chunk_info(); @@ -59,7 +52,6 @@ nlohmann::json Chunk::get_status([[maybe_unused]] int log_level) const { j["vdev_id"] = vdev_id(); j["start_offset"] = start_offset(); j["size"] = size(); - j["end_of_chunk_size"] = end_of_chunk(); j["slot_alloced?"] = is_busy(); return j; } diff --git a/src/lib/device/chunk.h b/src/lib/device/chunk.h index bb36b8449..777291292 100644 --- a/src/lib/device/chunk.h +++ b/src/lib/device/chunk.h @@ -49,7 +49,6 @@ class Chunk { bool is_busy() const { return m_chunk_info.is_allocated(); } uint32_t vdev_id() const { return m_chunk_info.vdev_id; } uint16_t chunk_id() const { return static_cast< uint16_t >(m_chunk_info.chunk_id); } - uint64_t end_of_chunk() const { return m_chunk_info.end_of_chunk_size; } uint32_t pdev_ordinal() const { return m_chunk_info.chunk_ordinal; } const uint8_t* user_private() { return &m_chunk_info.user_private[0]; } uint32_t stream_id() const { return m_stream_id; } @@ -62,7 +61,6 @@ class Chunk { BlkAllocator* blk_allocator_mutable() { return m_blk_allocator.get(); } ////////////// Setters ///////////////////// - void update_end_of_chunk(uint64_t end_offset); void set_user_private(const sisl::blob& data); void set_block_allocator(cshared< BlkAllocator >& blkalloc) { m_blk_allocator = blkalloc; } void set_vdev_ordinal(uint32_t vdev_ordinal) { m_vdev_ordinal = vdev_ordinal; } diff --git a/src/lib/device/device.h b/src/lib/device/device.h index aec65638e..cbcdde8ea 100644 --- a/src/lib/device/device.h +++ b/src/lib/device/device.h @@ -42,15 +42,17 @@ struct vdev_info { uint32_t num_mirrors{0}; // 12: Total number of mirrors uint32_t blk_size{0}; // 16: IO block size for this vdev uint32_t num_primary_chunks{0}; // 20: number of primary chunks - uint8_t slot_allocated{0}; // 24: Is this current slot allocated - uint8_t failed{0}; // 25: set to true if disk is replaced - uint8_t hs_dev_type{0}; // 26: PDev dev type (as in fast or data) - uint8_t multi_pdev_choice{0}; // 27: Choice when multiple pdevs are present (vdev_multi_pdev_opts_t) - char name[64]; // 28: Name of the vdev - uint16_t checksum{0}; // 92: Checksum of this entire Block - uint8_t alloc_type; // 94: Allocator type of this vdev - uint8_t chunk_sel_type; // 95: Chunk Selector type of this vdev_id - uint8_t padding[160]{}; // 96: Pad to make it 256 bytes total + uint32_t chunk_size{0}; // 24: chunk size used in vdev. + vdev_size_type_t size_type{}; // 28: Whether its a static or dynamic type. + uint8_t slot_allocated{0}; // 29: Is this current slot allocated + uint8_t failed{0}; // 30: set to true if disk is replaced + uint8_t hs_dev_type{0}; // 31: PDev dev type (as in fast or data) + uint8_t multi_pdev_choice{0}; // 32: Choice when multiple pdevs are present (vdev_multi_pdev_opts_t) + char name[64]; // 33: Name of the vdev + uint16_t checksum{0}; // 97: Checksum of this entire Block + uint8_t alloc_type; // 98: Allocator type of this vdev + uint8_t chunk_sel_type; // 99: Chunk Selector type of this vdev_id + uint8_t padding[155]{}; // 100: Pad to make it 256 bytes total uint8_t user_private[user_private_size]{}; // 128: User specific information uint32_t get_vdev_id() const { return vdev_id; } @@ -94,11 +96,13 @@ ENUM(chunk_selector_t, uint8_t, // What are the options to select chunk to alloc struct vdev_parameters { std::string vdev_name; // Name of the vdev + vdev_size_type_t size_type{}; // Wether size is static or dynamic. uint64_t vdev_size; // Current Vdev size. - uint32_t num_chunks; // Total number of primary chunks. + uint32_t num_chunks{}; // Total number of primary chunks. // NOTE: If pdev opts is ALL_PDEV_STRIPED, then num_chunks would round off // to number of pdevs evenly uint32_t blk_size; // Block size vdev operates on + uint32_t chunk_size{}; // Chunk size provided for dynamic vdev. HSDevType dev_type; // Which physical device type this vdev belongs to (FAST or DATA) blk_allocator_type_t alloc_type; // which allocator type this vdev wants to be with; chunk_selector_type_t chunk_sel_type; // which chunk selector type this vdev wants to be with; @@ -154,15 +158,18 @@ class DeviceManager { shared< VirtualDev > create_vdev(vdev_parameters&& vdev_param); const Chunk* get_chunk(uint32_t chunk_id) const { + std::unique_lock lg{m_vdev_mutex}; return (chunk_id == INVALID_CHUNK_ID) ? nullptr : m_chunks[chunk_id].get(); } Chunk* get_chunk_mutable(uint32_t chunk_id) { + std::unique_lock lg{m_vdev_mutex}; return (chunk_id == INVALID_CHUNK_ID) ? nullptr : m_chunks[chunk_id].get(); } uint32_t atomic_page_size(HSDevType dtype) const; uint32_t optimal_page_size(HSDevType dtype) const; + uint32_t align_size(HSDevType dtype) const; std::vector< PhysicalDev* > get_pdevs_by_dev_type(HSDevType dtype) const; std::vector< shared< VirtualDev > > get_vdevs() const; @@ -170,6 +177,10 @@ class DeviceManager { uint64_t total_capacity() const; uint64_t total_capacity(HSDevType dtype) const; + shared< Chunk > create_chunk(HSDevType dev_type, uint32_t vdev_id, uint64_t chunk_size, const sisl::blob& data); + void remove_chunk(shared< Chunk > chunk); + void remove_chunk_locked(shared< Chunk > chunk); + private: void load_vdevs(); int device_open_flags(const std::string& devname) const; @@ -181,4 +192,52 @@ class DeviceManager { const std::vector< PhysicalDev* >& pdevs_by_type_internal(HSDevType dtype) const; }; // class DeviceManager +// Chunk pool is used to get chunks when there is no space +// and its cheaper compared to create a chunk on the fly. +// Creating chunk on the fly causes sync write. +class ChunkPool { +public: + struct Params { + uint64_t pool_capacity; + // Private data used when creating chunks. + std::function< sisl::blob() > init_private_data_cb; + uint8_t hs_dev_type; + uint32_t vdev_id; + uint64_t chunk_size; + }; + + ChunkPool(DeviceManager& dmgr, Params&& param); + ~ChunkPool(); + + // Start the chunk pool. + void start(); + + // Add chunk to the pool. If the queue is full, + // chunk removed from the system. Returns if + // if we could reuse chunk by adding back to pool. + bool enqueue(shared< Chunk >& chunk); + + // Get a chunk from the pool. + shared< Chunk > dequeue(); + + // Returns the capacity of the chunk pool. + uint64_t capacity() { return m_params.pool_capacity; } + uint64_t size() { return m_pool.size(); } + +private: + // Producer thread. + void producer(); + +private: + DeviceManager& m_dmgr; + Params m_params; + std::list< shared< Chunk > > m_pool; + uint32_t m_pool_capacity; + std::condition_variable m_pool_cv; + std::mutex m_pool_mutex; + std::thread m_producer_thread; + bool m_run_pool{false}; + folly::Promise< folly::Unit > m_pool_halt; +}; + } // namespace homestore diff --git a/src/lib/device/device_manager.cpp b/src/lib/device/device_manager.cpp index 0ae9d7d6f..ad450910e 100644 --- a/src/lib/device/device_manager.cpp +++ b/src/lib/device/device_manager.cpp @@ -209,28 +209,60 @@ shared< VirtualDev > DeviceManager::create_vdev(vdev_parameters&& vparam) { for (const auto& d : m_dev_infos) { max_num_chunks += hs_super_blk::max_chunks_in_pdev(d); } - auto input_num_chunks = vparam.num_chunks; - vparam.num_chunks = std::min(vparam.num_chunks, max_num_chunks); - if (input_num_chunks != vparam.num_chunks) { - LOGINFO("{} Virtual device is attempted to be created with num_chunks={}, it needs to be adjust to " - "new_num_chunks={}", - vparam.vdev_name, in_bytes(input_num_chunks), in_bytes(vparam.num_chunks)); - } auto input_vdev_size = vparam.vdev_size; - vparam.vdev_size = sisl::round_up(vparam.vdev_size, vparam.num_chunks * vparam.blk_size); - if (input_vdev_size != vparam.vdev_size) { - LOGINFO("{} Virtual device is attempted to be created with size={}, it needs to be rounded to new_size={}", - vparam.vdev_name, in_bytes(input_vdev_size), in_bytes(vparam.vdev_size)); - } + if (vparam.size_type == vdev_size_type_t::VDEV_SIZE_STATIC) { + // If its static size, vdev_size should be provided. + RELEASE_ASSERT_GT(vparam.vdev_size, 0, "Vdev size cant be 0"); + + // Either num_chunks or chunk_size can be provided and we calculate the other. + if (vparam.num_chunks != 0) { + vparam.vdev_size = sisl::round_up(vparam.vdev_size, vparam.num_chunks * vparam.blk_size); + if (input_vdev_size != vparam.vdev_size) { + LOGINFO( + "{} Virtual device is attempted to be created with size={}, it needs to be rounded to new_size={}", + vparam.vdev_name, in_bytes(input_vdev_size), in_bytes(vparam.vdev_size)); + } + + auto input_num_chunks = vparam.num_chunks; + vparam.num_chunks = std::min(vparam.num_chunks, max_num_chunks); + if (input_num_chunks != vparam.num_chunks) { + LOGINFO("{} Virtual device is attempted to be created with num_chunks={}, it needs to be adjust to " + "new_num_chunks={}", + vparam.vdev_name, in_bytes(input_num_chunks), in_bytes(vparam.num_chunks)); + } + vparam.chunk_size = vparam.vdev_size / vparam.num_chunks; + } else if (vparam.chunk_size != 0) { + auto input_chunk_size = vparam.chunk_size; + vparam.chunk_size = std::min(vparam.chunk_size, hs_super_blk::MAX_CHUNKS_IN_SYSTEM); + if (input_chunk_size != vparam.chunk_size) { + LOGINFO("{} Virtual device is attempted to be created with chunk_size={}, it needs to be adjust to " + "new_chunk_size={}", + vparam.vdev_name, in_bytes(input_chunk_size), in_bytes(vparam.chunk_size)); + } + + vparam.vdev_size = sisl::round_up(vparam.vdev_size, vparam.chunk_size); + if (input_vdev_size != vparam.vdev_size) { + LOGINFO( + "{} Virtual device is attempted to be created with size={}, it needs to be rounded to new_size={}", + vparam.vdev_name, in_bytes(input_vdev_size), in_bytes(vparam.vdev_size)); + } + + vparam.num_chunks = vparam.vdev_size / vparam.chunk_size; + } else { + RELEASE_ASSERT(false, "Both num_chunks and chunk_size cant be zero for vdev"); + } - uint32_t chunk_size = vparam.vdev_size / vparam.num_chunks; + } else { + // We need chunk_size. We start with zero num_chunks. + RELEASE_ASSERT_GT(vparam.chunk_size, 0, "Chunk size should be provided"); + } LOGINFO( "New Virtal Dev={} of size={} with id={} is attempted to be created with multi_pdev_opts={}. The params are " "adjusted as follows: VDev_Size={} Num_pdevs={} Total_chunks_across_all_pdevs={} Each_Chunk_Size={}", vparam.vdev_name, in_bytes(input_vdev_size), vdev_id, vparam.multi_pdev_opts, in_bytes(vparam.vdev_size), - pdevs.size(), vparam.num_chunks, in_bytes(chunk_size)); + pdevs.size(), vparam.num_chunks, in_bytes(vparam.chunk_size)); // Convert the vparameters to the vdev_info auto buf = hs_utils::iobuf_alloc(vdev_info::size, sisl::buftag::superblk, pdevs[0]->align_size()); @@ -242,25 +274,32 @@ shared< VirtualDev > DeviceManager::create_vdev(vdev_parameters&& vparam) { m_vdevs[vdev_id] = vdev; // Create initial chunk based on current size - for (auto& pdev : pdevs) { - std::vector< uint32_t > chunk_ids; - - // Create chunk ids for all chunks in each of these pdevs - for (uint32_t c{0}; c < vparam.num_chunks / pdevs.size(); ++c) { - auto chunk_id = m_chunk_id_bm.get_next_reset_bit(0u); - if (chunk_id == sisl::Bitset::npos) { throw std::out_of_range("System has no room for additional chunks"); } - m_chunk_id_bm.set_bit(chunk_id); - chunk_ids.push_back(chunk_id); - } - - // Create all chunks at one shot and add each one to the vdev - auto chunks = pdev->create_chunks(chunk_ids, vdev_id, chunk_size); - for (auto& chunk : chunks) { - vdev->add_chunk(chunk, true /* fresh_chunk */); - m_chunks[chunk->chunk_id()] = chunk; + if (vparam.num_chunks != 0) { + for (auto& pdev : pdevs) { + std::vector< uint32_t > chunk_ids; + + // Create chunk ids for all chunks in each of these pdevs + for (uint32_t c{0}; c < vparam.num_chunks / pdevs.size(); ++c) { + auto chunk_id = m_chunk_id_bm.get_next_reset_bit(0u); + if (chunk_id == sisl::Bitset::npos) { + throw std::out_of_range("System has no room for additional chunks"); + } + m_chunk_id_bm.set_bit(chunk_id); + chunk_ids.push_back(chunk_id); + } + + // Create all chunks at one shot and add each one to the vdev + auto chunks = pdev->create_chunks(chunk_ids, vdev_id, vparam.chunk_size); + for (auto& chunk : chunks) { + vdev->add_chunk(chunk, true /* fresh_chunk */); + m_chunks[chunk->chunk_id()] = chunk; + } } } + // Handle any initialization needed. + vdev->init(); + // Locate and write the vdev info in the super blk area of all pdevs this vdev will be created on for (auto& pdev : pdevs) { uint64_t offset = hs_super_blk::vdev_sb_offset() + (vdev_id * vdev_info::size); @@ -299,11 +338,99 @@ void DeviceManager::load_vdevs() { } m_chunk_id_bm.set_bit(chunk->chunk_id()); m_chunks[chunk->chunk_id()] = chunk; + HS_LOG(TRACE, device, "loaded chunks {} ", chunk->to_string()) m_vdevs[chunk->vdev_id()]->add_chunk(chunk, false /* fresh_chunk */); return true; }); } } + + // Run initialization of all vdevs. + for (auto& vdev : m_vdevs) { + vdev->init(); + } +} + +shared< Chunk > DeviceManager::create_chunk(HSDevType dev_type, uint32_t vdev_id, uint64_t chunk_size, + const sisl::blob& data) { + std::unique_lock lg{m_vdev_mutex}; + auto pdevs = pdevs_by_type_internal(dev_type); + auto chunk_id = m_chunk_id_bm.get_next_reset_bit(0u); + if (chunk_id == sisl::Bitset::npos) { throw std::out_of_range("System has no room for additional chunk"); } + m_chunk_id_bm.set_bit(chunk_id); + + shared< Chunk > chunk; + PhysicalDev* pdev = nullptr; + // Create a chunk on any pdev of device type. + for (const auto& dev : pdevs) { + // Ordinal added in add_chunk. + chunk = dev->create_chunk(chunk_id, vdev_id, chunk_size, 0 /* ordinal */, data); + if (chunk != nullptr) { + pdev = dev; + break; + } + } + + if (!chunk) { throw std::out_of_range("Unable to create chunk on physical devices"); } + + auto vdev = m_vdevs[vdev_id]; + vdev->add_chunk(chunk, true /* fresh_chunk */); + m_chunks[chunk->chunk_id()] = chunk; + + auto buf = hs_utils::iobuf_alloc(vdev_info::size, sisl::buftag::superblk, pdev->align_size()); + auto vdev_info = vdev->info(); + vdev_info.vdev_size += chunk_size; + vdev_info.num_primary_chunks++; + vdev_info.compute_checksum(); + + // Update the vdev info. + vdev->update_info(vdev_info); + std::memcpy(buf, &vdev_info, sizeof(vdev_info)); + uint64_t offset = hs_super_blk::vdev_sb_offset() + (vdev_id * vdev_info::size); + pdev->write_super_block(buf, vdev_info::size, offset); + hs_utils::iobuf_free(buf, sisl::buftag::superblk); + + HS_LOG(TRACE, device, "Created chunk id={} dev_type={} vdev_id={} size={}", chunk_id, (uint8_t)dev_type, vdev_id, + chunk_size); + return chunk; +} + +void DeviceManager::remove_chunk(shared< Chunk > chunk) { + std::unique_lock lg{m_vdev_mutex}; + remove_chunk_locked(chunk); +} + +void DeviceManager::remove_chunk_locked(shared< Chunk > chunk) { + auto chunk_id = chunk->chunk_id(); + auto vdev_id = chunk->vdev_id(); + + // Reset chunk id bitmap. + m_chunk_id_bm.reset_bit(chunk_id); + + // Delete from the physical dev. + auto pdev = chunk->physical_dev_mutable(); + pdev->remove_chunk(chunk); + + // Remove from the vdev. + auto vdev = m_vdevs[vdev_id]; + vdev->remove_chunk(chunk); + + m_chunks[chunk_id].reset(); + + // Update the vdev info. + auto buf = hs_utils::iobuf_alloc(vdev_info::size, sisl::buftag::superblk, pdev->align_size()); + auto vdev_info = vdev->info(); + vdev_info.vdev_size -= vdev_info.chunk_size; + vdev_info.num_primary_chunks--; + vdev_info.compute_checksum(); + + vdev->update_info(vdev_info); + std::memcpy(buf, &vdev_info, sizeof(vdev_info)); + uint64_t offset = hs_super_blk::vdev_sb_offset() + (vdev_id * vdev_info::size); + pdev->write_super_block(buf, vdev_info::size, offset); + hs_utils::iobuf_free(buf, sisl::buftag::superblk); + + HS_LOG(TRACE, device, "Removed chunk id={} vdev_id={}", chunk_id, vdev_id); } uint32_t DeviceManager::populate_pdev_info(const dev_info& dinfo, const iomgr::drive_attributes& attr, @@ -348,6 +475,7 @@ static void populate_vdev_info(const vdev_parameters& vparam, uint32_t vdev_id, out_info->blk_size = vparam.blk_size; out_info->num_primary_chunks = (vparam.multi_pdev_opts == vdev_multi_pdev_opts_t::ALL_PDEV_STRIPED) ? pdevs.size() : 1u; + out_info->chunk_size = vparam.chunk_size; out_info->set_allocated(); out_info->set_dev_type(vparam.dev_type); out_info->set_pdev_choice(vparam.multi_pdev_opts); @@ -405,6 +533,7 @@ uint32_t DeviceManager::atomic_page_size(HSDevType dtype) const { uint32_t DeviceManager::optimal_page_size(HSDevType dtype) const { return pdevs_by_type_internal(dtype)[0]->optimal_page_size(); } +uint32_t DeviceManager::align_size(HSDevType dtype) const { return pdevs_by_type_internal(dtype)[0]->align_size(); } std::vector< shared< VirtualDev > > DeviceManager::get_vdevs() const { std::vector< shared< VirtualDev > > ret_v; @@ -421,4 +550,93 @@ uint64_t hs_super_blk::chunk_super_block_size(const dev_info& dinfo) { return chunk_info_bitmap_size(dinfo) + (max_chunks_in_pdev(dinfo) * chunk_info::size); } +ChunkPool::ChunkPool(DeviceManager& dmgr, Params&& params) : m_dmgr(dmgr), m_params(std::move(params)) {} + +ChunkPool::~ChunkPool() { + { + std::unique_lock< std::mutex > lk{m_pool_mutex}; + m_run_pool = false; + m_pool_cv.notify_one(); + } + // Wait for the chunk pool to finish. + m_pool_halt.getFuture().get(); + m_producer_thread.join(); +} + +void ChunkPool::start() { + RELEASE_ASSERT(!m_run_pool, "Pool already started"); + { + std::unique_lock< std::mutex > lk{m_pool_mutex}; + m_run_pool = true; + } + m_producer_thread = std::thread(&ChunkPool::producer, this); + HS_LOG(INFO, device, "Starting chunk pool for vdev {}", m_params.vdev_id); +} + +void ChunkPool::producer() { + // Fill the chunk pool. + while (true) { + // Wait until run is false or pool is less than half the capacity + // so that consumer have space to release unused chunks back to pool. + std::unique_lock< std::mutex > lk{m_pool_mutex}; + m_pool_cv.wait(lk, [this] { + if (m_run_pool == false) return true; + if (m_pool.size() < (m_params.pool_capacity / 2)) return true; + return false; + }); + + if (!m_run_pool) { + m_pool_halt.setValue(); + return; + } + + auto private_data = m_params.init_private_data_cb(); + auto chunk = m_dmgr.create_chunk(static_cast< HSDevType >(m_params.hs_dev_type), m_params.vdev_id, + m_params.chunk_size, std::move(private_data)); + RELEASE_ASSERT(chunk, "Cannot create chunk"); + m_pool.push_back(chunk); + HS_LOG(TRACE, device, "Produced chunk to pool id {} type {} vdev {} size {}", chunk->chunk_id(), + m_params.hs_dev_type, m_params.vdev_id, m_params.chunk_size); + m_pool_cv.notify_one(); + } +} + +shared< Chunk > ChunkPool::dequeue() { + RELEASE_ASSERT(m_run_pool, "Pool not started"); + shared< Chunk > chunk; + { + std::unique_lock< std::mutex > lk{m_pool_mutex}; + m_pool_cv.wait(lk, [this] { return !m_pool.empty(); }); + chunk = m_pool.back(); + m_pool.pop_back(); + } + RELEASE_ASSERT(chunk, "Chunk invalid"); + HS_LOG(TRACE, device, "Dequeue chunk {} from pool", chunk->chunk_id()); + m_pool_cv.notify_one(); + return chunk; +} + +bool ChunkPool::enqueue(shared< Chunk >& chunk) { + RELEASE_ASSERT(chunk, "Chunk invalid"); + bool reuse = false; + { + std::unique_lock< std::mutex > lk{m_pool_mutex}; + if (m_pool.size() < m_params.pool_capacity) { + chunk->set_user_private(m_params.init_private_data_cb()); + m_pool.push_back(chunk); + reuse = true; + HS_LOG(TRACE, device, "Enqueue chunk {} to pool", chunk->chunk_id()); + } + } + + if (!reuse) { + // If cache is full, remove the chunk. + HS_LOG(TRACE, device, "Cache is full removing chunk {}", chunk->chunk_id()); + m_dmgr.remove_chunk(chunk); + } else { + m_pool_cv.notify_one(); + } + return reuse; +} + } // namespace homestore diff --git a/src/lib/device/journal_vdev.cpp b/src/lib/device/journal_vdev.cpp index 1db2c55dc..5556da1e3 100644 --- a/src/lib/device/journal_vdev.cpp +++ b/src/lib/device/journal_vdev.cpp @@ -32,115 +32,247 @@ #include "common/homestore_utils.hpp" #include "common/resource_mgr.hpp" +SISL_LOGGING_DECL(journalvdev) + namespace homestore { JournalVirtualDev::JournalVirtualDev(DeviceManager& dmgr, const vdev_info& vinfo, vdev_event_cb_t event_cb) : - VirtualDev{dmgr, vinfo, std::move(event_cb), false /* is_auto_recovery */} {} + VirtualDev{dmgr, vinfo, std::move(event_cb), false /* is_auto_recovery */} { + + // Private data stored when chunks are created. + init_private_data = std::make_shared< JournalChunkPrivate >(); + m_chunk_pool = std::make_unique< ChunkPool >( + dmgr, + ChunkPool::Params{ + HS_DYNAMIC_CONFIG(generic.journal_chunk_pool_capacity), + [this]() { + init_private_data->created_at = get_time_since_epoch_ms(); + sisl::blob private_blob{r_cast< uint8_t* >(init_private_data.get()), sizeof(JournalChunkPrivate)}; + return private_blob; + }, + m_vdev_info.hs_dev_type, m_vdev_info.vdev_id, m_vdev_info.chunk_size}); +} -off_t JournalVirtualDev::alloc_next_append_blk(size_t sz) { - if (used_size() + sz > size()) { - // not enough space left; - HS_LOG(ERROR, device, "No space left! m_write_sz_in_total: {}, m_reserved_sz: {}", m_write_sz_in_total.load(), - m_reserved_sz); - return INVALID_OFFSET; +JournalVirtualDev::~JournalVirtualDev() {} + +void JournalVirtualDev::init() { + struct HeadChunk { + chunk_num_t chunk_num{}; + uint64_t created_at{}; + }; + + // Create a mapp of logdev_id to the head chunk and chunk_id to chunk. + std::unordered_map< logdev_id_t, HeadChunk > logdev_head_map; + std::unordered_map< chunk_num_t, shared< Chunk > > chunk_map; + std::unordered_set< chunk_num_t > visited_chunks; + + // Traverse the chunks and find the heads of the logdev_id's. + for (auto& chunk : m_all_chunks) { + auto* data = r_cast< JournalChunkPrivate* >(const_cast< uint8_t* >(chunk->user_private())); + auto chunk_id = chunk->chunk_id(); + auto logdev_id = data->logdev_id; + // Create index for chunks. + chunk_map[chunk_id] = chunk; + if (data->is_head) { + // Store the head which has the latest creation timestamp. + if (data->created_at > logdev_head_map[logdev_id].created_at) { + logdev_head_map[logdev_id] = HeadChunk{chunk_id, data->created_at}; + } + } } -#ifdef _PRERELEASE - iomgr_flip::test_and_abort("abort_before_update_eof_cur_chunk"); -#endif + for (auto& [logdev_id, head] : logdev_head_map) { + // Create descriptor for each logdev_id + auto journal_desc = std::make_shared< JournalVirtualDev::Descriptor >(*this, logdev_id); + m_journal_descriptors.emplace(logdev_id, journal_desc); + LOGDEBUGMOD(journalvdev, "Loading descriptor {}", logdev_id); + // Traverse the list starting from the head and add those chunks + // in order to the journal descriptor. next_chunk is stored in private_data. + // Last chunk will have next_chunk as 0. + auto chunk_num = head.chunk_num; + while (chunk_num != 0) { + auto& c = chunk_map[chunk_num]; + RELEASE_ASSERT(c, "Invalid chunk found logdev {} chunk {}", logdev_id, chunk_num); + journal_desc->m_journal_chunks.push_back(c); + visited_chunks.insert(chunk_num); + LOGDEBUGMOD(journalvdev, "Loading chunk {} descriptor {}", chunk_num, logdev_id); + + // Increase the the total size. + journal_desc->m_total_size += c->size(); + + auto data = r_cast< JournalChunkPrivate* >(const_cast< uint8_t* >(c->user_private())); + chunk_num = data->next_chunk; + } + } - const off_t ds_off = data_start_offset(); - const off_t end_offset = tail_offset(); + // Chunks which are not in visited set are orphans and needs to be cleaned up. + // Remove chunk will affect the m_all_chunks so keep a separate list. + std::vector< shared< Chunk > > orphan_chunks; + for (auto& chunk : m_all_chunks) { + if (!visited_chunks.count(chunk->chunk_id())) { orphan_chunks.push_back(chunk); } + } - auto const [chunk, offset_in_chunk] = offset_to_chunk(end_offset); + for (auto& chunk : orphan_chunks) { + auto* data = r_cast< JournalChunkPrivate* >(const_cast< uint8_t* >(chunk->user_private())); + auto chunk_id = chunk->chunk_id(); + auto logdev_id = data->logdev_id; + auto next_chunk = data->next_chunk; -#ifndef NDEBUG - if (end_offset < ds_off) { HS_DBG_ASSERT_EQ(size() - used_size(), static_cast< uint64_t >(ds_off - end_offset)); } -#endif - // works for both "end_offset >= ds_off" and "end_offset < ds_off"; - if (offset_in_chunk + sz <= chunk->size()) { - // not acrossing boundary, nothing to do; - } else if ((used_size() + (chunk->size() - offset_in_chunk) + sz) <= size()) { - // across chunk boundary, still enough space; + // Clear the private chunk data. + *data = JournalChunkPrivate{}; + update_chunk_private(chunk, data); - // Update the overhead to total write size; - m_write_sz_in_total.fetch_add(chunk->size() - offset_in_chunk, std::memory_order_relaxed); + LOGDEBUGMOD(journalvdev, "Removing orphan chunk {} found for logdev {} next {}.", chunk_id, logdev_id, + next_chunk); + m_dmgr.remove_chunk_locked(chunk); + } - // If across chunk boundary, update the chunk super-block of the chunk size - chunk->update_end_of_chunk(offset_in_chunk); + // Start the chunk pool. + m_chunk_pool->start(); + LOGINFO("Journal vdev init done"); +} -#ifdef _PRERELEASE - iomgr_flip::test_and_abort("abort_after_update_eof_cur_chunk"); -#endif - // get next chunk handle - auto next_chunk = get_next_chunk(chunk); - if (next_chunk != chunk) { - // Since we are re-using a new chunk, update this chunk's end as its original size; - next_chunk->update_end_of_chunk(chunk->size()); +void JournalVirtualDev::update_chunk_private(shared< Chunk >& chunk, JournalChunkPrivate* private_data) { + sisl::blob private_blob{r_cast< uint8_t* >(private_data), sizeof(JournalChunkPrivate)}; + chunk->set_user_private(private_blob); +} + +uint64_t JournalVirtualDev::get_end_of_chunk(shared< Chunk >& chunk) const { + auto* private_data = r_cast< JournalChunkPrivate* >(const_cast< uint8_t* >(chunk->user_private())); + return private_data->end_of_chunk; +} + +shared< JournalVirtualDev::Descriptor > JournalVirtualDev::open(logdev_id_t logdev_id) { + auto it = m_journal_descriptors.find(logdev_id); + if (it == m_journal_descriptors.end()) { + auto journal_desc = std::make_shared< JournalVirtualDev::Descriptor >(*this, logdev_id); + m_journal_descriptors.emplace(logdev_id, journal_desc); + return journal_desc; + } + + LOGDEBUGMOD(journalvdev, "Opened log device descriptor {}", logdev_id); + return it->second; +} + +void JournalVirtualDev::Descriptor::append_chunk() { + // Get a new chunk from the pool. + auto new_chunk = m_vdev.m_chunk_pool->dequeue(); + + // Increase the right window and total size. + m_total_size += new_chunk->size(); + m_end_offset += new_chunk->size(); + + if (!m_journal_chunks.empty()) { + // If there are already chunks in the m_journal_chunks list, append this new chunk to the end of the list. Write + // the next_chunk of the last chunk in the list to point to this new chunk. If already there are no chunks make + // the new chunk as the head. + auto last_chunk = m_journal_chunks.back(); + auto* last_chunk_private = r_cast< JournalChunkPrivate* >(const_cast< uint8_t* >(last_chunk->user_private())); + + // Set the next chunk with the newly created chunk id. + last_chunk_private->next_chunk = new_chunk->chunk_id(); + + // Append the new chunk + m_journal_chunks.push_back(new_chunk); + auto chunk_size = m_vdev.info().chunk_size; + auto offset_in_chunk = (tail_offset() % chunk_size); + if (offset_in_chunk != 0) { + // Update the overhead to total write size + m_write_sz_in_total.fetch_add(last_chunk->size() - offset_in_chunk, std::memory_order_relaxed); + last_chunk_private->end_of_chunk = offset_in_chunk; } + m_vdev.update_chunk_private(last_chunk, last_chunk_private); + LOGDEBUGMOD(journalvdev, "Added chunk new {} last {} desc {}", new_chunk->chunk_id(), last_chunk->chunk_id(), + to_string()); + } else { - // across chunk boundary and no space left; - HS_LOG(ERROR, device, "No space left! m_write_sz_in_total: {}, m_reserved_sz: {}", m_write_sz_in_total.load(), - m_reserved_sz); - return INVALID_OFFSET; - // m_reserved_sz stays sthe same; + // If the list is empty, update the new chunk as the head. Only head chunk contains the logdev_id. + auto* new_chunk_private = r_cast< JournalChunkPrivate* >(const_cast< uint8_t* >(new_chunk->user_private())); + new_chunk_private->is_head = true; + new_chunk_private->logdev_id = m_logdev_id; + // Append the new chunk + m_journal_chunks.push_back(new_chunk); + m_vdev.update_chunk_private(new_chunk, new_chunk_private); + LOGDEBUGMOD(journalvdev, "Added head chunk {} desc {}", new_chunk->chunk_id(), to_string()); + } +} + +off_t JournalVirtualDev::Descriptor::alloc_next_append_blk(size_t sz) { + // We currently assume size requested is less than chunk_size. + auto chunk_size = m_vdev.info().chunk_size; + RELEASE_ASSERT_LT(sz, chunk_size, "Size requested greater than chunk size"); + + if ((tail_offset() + static_cast< off_t >(sz)) >= m_end_offset) { + // not enough space left, add a new chunk. + LOGDEBUGMOD(journalvdev, "No space left for size {} Creating chunk desc {}", sz, to_string()); + +#ifdef _PRERELEASE + iomgr_flip::test_and_abort("abort_before_update_eof_cur_chunk"); +#endif + + // Append a chunk to m_journal_chunks list. This will increase the m_end_offset. + append_chunk(); + +#ifdef _PRERELEASE + iomgr_flip::test_and_abort("abort_after_update_eof_next_chunk"); +#endif + + RELEASE_ASSERT((tail_offset() + static_cast< off_t >(sz)) < m_end_offset, "No space for append blk"); } // if we made a successful reserve, return the tail offset; - const off_t offset = tail_offset(); + const off_t start_offset = data_start_offset(); + const off_t tail_off = tail_offset(); + RELEASE_ASSERT(tail_off >= start_offset, "Invalid start and tail offset"); // update reserved size; m_reserved_sz += sz; high_watermark_check(); -#ifdef _PRERELEASE - iomgr_flip::test_and_abort("abort_after_update_eof_next_chunk"); -#endif - // assert that returnning logical offset is in good range; - HS_DBG_ASSERT_LE(static_cast< uint64_t >(offset), size()); - return offset; + // assert that returnning logical offset is in good range + HS_DBG_ASSERT_LE(tail_off, m_end_offset); + LOGDEBUGMOD(journalvdev, "returned tail_off 0x{} desc {}", to_hex(tail_off), to_string()); + return tail_off; } -bool JournalVirtualDev::validate_append_size(size_t count) const { - if (used_size() + count > size()) { +bool JournalVirtualDev::Descriptor::validate_append_size(size_t req_sz) const { + if (used_size() + req_sz > size()) { // not enough space left; - HS_LOG(ERROR, device, "No space left! m_write_sz_in_total: {}, m_reserved_sz: {}", m_write_sz_in_total.load(), - m_reserved_sz); + HS_LOG(ERROR, device, "No space left! req_sz {} desc {}", req_sz, to_string()); return false; } if (m_reserved_sz != 0) { - HS_LOG(ERROR, device, "write can't be served when m_reserved_sz:{} is not comsumed by pwrite yet.", - m_reserved_sz); + HS_LOG(ERROR, device, "write can't be served when m_reserved_sz is not comsumed by pwrite yet {}", to_string()); return false; } return true; } -auto JournalVirtualDev::process_pwrite_offset(size_t len, off_t offset) { +auto JournalVirtualDev::Descriptor::process_pwrite_offset(size_t len, off_t offset) { // convert logical offset to chunk and its offset auto const chunk_details = offset_to_chunk(offset); - auto const [chunk, offset_in_chunk] = chunk_details; + auto const [chunk, _, offset_in_chunk] = chunk_details; + + LOGTRACEMOD(journalvdev, "writing in chunk: {}, offset: 0x{} len: {} offset_in_chunk: 0x{} chunk_sz: {} desc {}", + chunk->chunk_id(), to_hex(offset), len, to_hex(offset_in_chunk), chunk->size(), to_string()); // this assert only valid for pwrite/pwritev, which calls alloc_next_append_blk to get the offset to do the // write, which guarantees write will with the returned offset will not accross chunk boundary. HS_REL_ASSERT_GE(chunk->size() - offset_in_chunk, len, "Writing size: {} crossing chunk is not allowed!", len); m_write_sz_in_total.fetch_add(len, std::memory_order_relaxed); - HS_LOG(TRACE, device, "Writing in chunk: {}, offset: {}, m_write_sz_in_total: {}, start off: {}", chunk->chunk_id(), - to_hex(offset_in_chunk), to_hex(m_write_sz_in_total.load()), to_hex(data_start_offset())); - return chunk_details; } /////////////////////////////// Write Section ////////////////////////////////// -folly::Future< std::error_code > JournalVirtualDev::async_append(const uint8_t* buf, size_t size) { +folly::Future< std::error_code > JournalVirtualDev::Descriptor::async_append(const uint8_t* buf, size_t size) { if (!validate_append_size(size)) { return folly::makeFuture< std::error_code >(std::make_error_code(std::errc::no_space_on_device)); } else { - auto const [chunk, offset_in_chunk] = process_pwrite_offset(size, m_seek_cursor); + auto const [chunk, _, offset_in_chunk] = process_pwrite_offset(size, m_seek_cursor); m_seek_cursor += size; - return async_write(r_cast< const char* >(buf), size, chunk, offset_in_chunk); + return m_vdev.async_write(r_cast< const char* >(buf), size, chunk, offset_in_chunk); } } @@ -157,15 +289,17 @@ folly::Future< std::error_code > JournalVirtualDev::async_append(const uint8_t* * @param cb : callback after write is completed, can be null * */ -folly::Future< std::error_code > JournalVirtualDev::async_pwrite(const uint8_t* buf, size_t size, off_t offset) { +folly::Future< std::error_code > JournalVirtualDev::Descriptor::async_pwrite(const uint8_t* buf, size_t size, + off_t offset) { HS_REL_ASSERT_LE(size, m_reserved_sz, "Write size: larger then reserved size is not allowed!"); m_reserved_sz -= size; // update reserved size - auto const [chunk, offset_in_chunk] = process_pwrite_offset(size, offset); - return async_write(r_cast< const char* >(buf), size, chunk, offset_in_chunk); + auto const [chunk, _, offset_in_chunk] = process_pwrite_offset(size, offset); + return m_vdev.async_write(r_cast< const char* >(buf), size, chunk, offset_in_chunk); } -folly::Future< std::error_code > JournalVirtualDev::async_pwritev(const iovec* iov, int iovcnt, off_t offset) { +folly::Future< std::error_code > JournalVirtualDev::Descriptor::async_pwritev(const iovec* iov, int iovcnt, + off_t offset) { auto const size = VirtualDev::get_len(iov, iovcnt); // if size is smaller than reserved size, it means write will never be overlapping start offset; @@ -173,19 +307,20 @@ folly::Future< std::error_code > JournalVirtualDev::async_pwritev(const iovec* i HS_REL_ASSERT_LE(size, m_reserved_sz, "Write size: larger then reserved size: is not allowed!"); m_reserved_sz -= size; - auto const [chunk, offset_in_chunk] = process_pwrite_offset(size, offset); - return async_writev(iov, iovcnt, chunk, offset_in_chunk); + auto const [chunk, _, offset_in_chunk] = process_pwrite_offset(size, offset); + return m_vdev.async_writev(iov, iovcnt, chunk, offset_in_chunk); } -void JournalVirtualDev::sync_pwrite(const uint8_t* buf, size_t size, off_t offset) { +void JournalVirtualDev::Descriptor::sync_pwrite(const uint8_t* buf, size_t size, off_t offset) { + HS_REL_ASSERT_LE(size, m_reserved_sz, "Write size: larger then reserved size is not allowed!"); m_reserved_sz -= size; // update reserved size - auto const [chunk, offset_in_chunk] = process_pwrite_offset(size, offset); - sync_write(r_cast< const char* >(buf), size, chunk, offset_in_chunk); + auto const [chunk, index, offset_in_chunk] = process_pwrite_offset(size, offset); + m_vdev.sync_write(r_cast< const char* >(buf), size, chunk, offset_in_chunk); } -void JournalVirtualDev::sync_pwritev(const iovec* iov, int iovcnt, off_t offset) { +void JournalVirtualDev::Descriptor::sync_pwritev(const iovec* iov, int iovcnt, off_t offset) { auto const size = VirtualDev::get_len(iov, iovcnt); // if size is smaller than reserved size, it means write will never be overlapping start offset; @@ -193,14 +328,16 @@ void JournalVirtualDev::sync_pwritev(const iovec* iov, int iovcnt, off_t offset) HS_REL_ASSERT_LE(size, m_reserved_sz, "Write size: larger then reserved size: is not allowed!"); m_reserved_sz -= size; - auto const [chunk, offset_in_chunk] = process_pwrite_offset(size, offset); - sync_writev(iov, iovcnt, chunk, offset_in_chunk); + auto const [chunk, _, offset_in_chunk] = process_pwrite_offset(size, offset); + m_vdev.sync_writev(iov, iovcnt, chunk, offset_in_chunk); } /////////////////////////////// Read Section ////////////////////////////////// -void JournalVirtualDev::sync_next_read(uint8_t* buf, size_t size_rd) { - auto const [chunk, offset_in_chunk] = offset_to_chunk(m_seek_cursor); - auto const end_of_chunk = chunk->end_of_chunk(); +size_t JournalVirtualDev::Descriptor::sync_next_read(uint8_t* buf, size_t size_rd) { + if (m_journal_chunks.empty()) { return 0; } + + auto [chunk, _, offset_in_chunk] = offset_to_chunk(m_seek_cursor); + auto const end_of_chunk = m_vdev.get_end_of_chunk(chunk); auto const chunk_size = std::min< uint64_t >(end_of_chunk, chunk->size()); bool across_chunk{false}; @@ -223,11 +360,11 @@ void JournalVirtualDev::sync_next_read(uint8_t* buf, size_t size_rd) { // Update seek cursor after read; m_seek_cursor += size_rd; if (across_chunk) { m_seek_cursor += (chunk->size() - end_of_chunk); } - m_seek_cursor = m_seek_cursor % size(); + return size_rd; } -std::error_code JournalVirtualDev::sync_pread(uint8_t* buf, size_t size, off_t offset) { - auto const [chunk, offset_in_chunk] = offset_to_chunk(offset); +std::error_code JournalVirtualDev::Descriptor::sync_pread(uint8_t* buf, size_t size, off_t offset) { + auto [chunk, index, offset_in_chunk] = offset_to_chunk(offset); // if the read count is acrossing chunk, only return what's left in this chunk if (chunk->size() - offset_in_chunk < size) { @@ -235,12 +372,14 @@ std::error_code JournalVirtualDev::sync_pread(uint8_t* buf, size_t size, off_t o size = chunk->size() - offset_in_chunk; } - return sync_read(r_cast< char* >(buf), size, chunk, offset_in_chunk); + LOGTRACEMOD(journalvdev, "offset: 0x{} size: {} chunk: {} index: {} offset_in_chunk: 0x{} desc {}", to_hex(offset), + size, chunk->chunk_id(), index, to_hex(offset_in_chunk), to_string()); + return m_vdev.sync_read(r_cast< char* >(buf), size, chunk, offset_in_chunk); } -std::error_code JournalVirtualDev::sync_preadv(iovec* iov, int iovcnt, off_t offset) { +std::error_code JournalVirtualDev::Descriptor::sync_preadv(iovec* iov, int iovcnt, off_t offset) { uint64_t len = VirtualDev::get_len(iov, iovcnt); - auto const [chunk, offset_in_chunk] = offset_to_chunk(offset); + auto [chunk, index, offset_in_chunk] = offset_to_chunk(offset); if (chunk->size() - offset_in_chunk < len) { if (iovcnt > 1) { @@ -253,10 +392,13 @@ std::error_code JournalVirtualDev::sync_preadv(iovec* iov, int iovcnt, off_t off iov[0].iov_len = len; // is this needed? } - return sync_readv(iov, iovcnt, chunk, offset_in_chunk); + LOGTRACEMOD(journalvdev, "offset: 0x{} iov: {} len: {} chunk: {} index: {} offset_in_chunk: 0x{} desc {}", + to_hex(offset), iovcnt, len, chunk->chunk_id(), index, to_hex(offset_in_chunk), to_string()); + + return m_vdev.sync_readv(iov, iovcnt, chunk, offset_in_chunk); } -off_t JournalVirtualDev::lseek(off_t offset, int whence) { +off_t JournalVirtualDev::Descriptor::lseek(off_t offset, int whence) { switch (whence) { case SEEK_SET: m_seek_cursor = offset; @@ -276,22 +418,23 @@ off_t JournalVirtualDev::lseek(off_t offset, int whence) { /** * @brief :- it returns the vdev offset after nbytes from start offset */ -off_t JournalVirtualDev::dev_offset(off_t nbytes) const { +off_t JournalVirtualDev::Descriptor::dev_offset(off_t nbytes) const { + if (m_journal_chunks.empty()) { return 0; } + off_t vdev_offset = data_start_offset(); uint32_t dev_id{0}, chunk_id{0}; off_t offset_in_chunk{0}; off_t cur_read_cur{0}; while (cur_read_cur != nbytes) { - auto const [chunk, offset_in_chunk] = offset_to_chunk(vdev_offset); + auto [chunk, _, offset_in_chunk] = offset_to_chunk(vdev_offset); - auto const end_of_chunk = chunk->end_of_chunk(); + auto const end_of_chunk = m_vdev.get_end_of_chunk(chunk); auto const chunk_size = std::min< uint64_t >(end_of_chunk, chunk->size()); auto const remaining = nbytes - cur_read_cur; if (remaining >= (static_cast< off_t >(chunk->size()) - offset_in_chunk)) { cur_read_cur += (chunk->size() - offset_in_chunk); vdev_offset += (chunk->size() - offset_in_chunk); - vdev_offset = vdev_offset % size(); } else { vdev_offset += remaining; cur_read_cur = nbytes; @@ -300,75 +443,98 @@ off_t JournalVirtualDev::dev_offset(off_t nbytes) const { return vdev_offset; } -off_t JournalVirtualDev::tail_offset(bool reserve_space_include) const { +off_t JournalVirtualDev::Descriptor::tail_offset(bool reserve_space_include) const { off_t tail = static_cast< off_t >(data_start_offset() + m_write_sz_in_total.load(std::memory_order_relaxed)); if (reserve_space_include) { tail += m_reserved_sz; } - if (static_cast< uint64_t >(tail) >= size()) { tail -= size(); } - + HS_REL_ASSERT(static_cast< int64_t >(tail) <= m_end_offset, "tail is more than offset tail {} offset {}", tail, + m_end_offset); return tail; } -void JournalVirtualDev::update_tail_offset(off_t tail) { +void JournalVirtualDev::Descriptor::update_tail_offset(off_t tail) { const off_t start = data_start_offset(); - HS_LOG(INFO, device, "total_size: {}, tail is being updated to: {}, start: {}", to_hex(size()), to_hex(tail), - to_hex(start)); if (tail >= start) { m_write_sz_in_total.store(tail - start, std::memory_order_relaxed); } else { - m_write_sz_in_total.store(size() - start + tail, std::memory_order_relaxed); + RELEASE_ASSERT(false, "tail {} less than start offset {}", tail, start); } lseek(tail); - HS_LOG(INFO, device, "m_write_sz_in_total updated to: {}", to_hex(m_write_sz_in_total.load())); - - HS_REL_ASSERT(tail_offset() == tail, "tail offset mismatch after calculation {} : {}", tail_offset(), tail); + LOGDEBUGMOD(journalvdev, "tail arg 0x{} desc {} ", to_hex(tail), to_string()); + HS_REL_ASSERT(tail_offset() == tail, "tail offset mismatch after calculation 0x{} : {}", tail_offset(), tail); } -void JournalVirtualDev::truncate(off_t offset) { +void JournalVirtualDev::Descriptor::truncate(off_t truncate_offset) { const off_t ds_off = data_start_offset(); - COUNTER_INCREMENT(m_metrics, vdev_truncate_count, 1); + COUNTER_INCREMENT(m_vdev.m_metrics, vdev_truncate_count, 1); - HS_PERIODIC_LOG(INFO, device, "truncating to logical offset: {}, start: {}, m_write_sz_in_total: {} ", - to_hex(offset), to_hex(ds_off), to_hex(m_write_sz_in_total.load())); + HS_PERIODIC_LOG(DEBUG, journalvdev, "truncating to logical offset: 0x{} desc {}", to_hex(truncate_offset), + to_string()); uint64_t size_to_truncate{0}; - if (offset >= ds_off) { + if (truncate_offset >= ds_off) { // the truncate offset is larger than current start offset - size_to_truncate = offset - ds_off; + size_to_truncate = truncate_offset - ds_off; } else { - // the truncate offset is smaller than current start offset, meaning we are looping back to previous chunks; - HS_PERIODIC_LOG(INFO, device, - "Loop-back truncating to logical offset: {} which is smaller than current data start " - "offset: {}, m_write_sz_in_total: {}", - to_hex(offset), to_hex(ds_off), to_hex(m_write_sz_in_total.load())); - size_to_truncate = size() - (ds_off - offset); - HS_REL_ASSERT_GE(m_write_sz_in_total.load(), size_to_truncate, "invalid truncate offset"); - HS_REL_ASSERT_GE(tail_offset(), offset); + RELEASE_ASSERT(false, "Loop-back not supported"); } - // update in-memory total write size counter; - m_write_sz_in_total.fetch_sub(size_to_truncate, std::memory_order_relaxed); + // Find the chunk which has the truncation offset. This will be the new + // head chunk in the list. We first update the is_head is true of this chunk. + // So if a crash happens after this, we could have two chunks which has is_head + // true in the list and during recovery we select head with the highest creation + // timestamp and reuse or cleanup the other. + auto [new_head_chunk, _, offset_in_chunk] = offset_to_chunk(truncate_offset); + auto* private_data = r_cast< JournalChunkPrivate* >(const_cast< uint8_t* >(new_head_chunk->user_private())); + private_data->is_head = true; + private_data->logdev_id = m_logdev_id; + m_vdev.update_chunk_private(new_head_chunk, private_data); + + // Find all chunks which needs to be removed from the start of m_journal_chunks. + // We stop till the truncation offset. Start from the old data_start_offset. + // Align the data_start_offset to the chunk_size as we deleting chunks and + // all chunks are same size in a journal vdev. + uint32_t start = sisl::round_down(ds_off, m_vdev.info().chunk_size); + for (auto it = m_journal_chunks.begin(); it != m_journal_chunks.end();) { + auto chunk = *it; + start += chunk->size(); + if (start >= truncate_offset) { break; } + + m_total_size -= chunk->size(); + it = m_journal_chunks.erase(it); + + // Clear the private chunk data before adding to pool. + auto* data = r_cast< JournalChunkPrivate* >(const_cast< uint8_t* >(chunk->user_private())); + *data = JournalChunkPrivate{}; + m_vdev.update_chunk_private(chunk, data); + + m_vdev.m_chunk_pool->enqueue(chunk); + HS_PERIODIC_LOG(TRACE, journalvdev, "adding chunk {} back to pool desc {}", chunk->chunk_id(), to_string()); + } // Update our start offset, to keep track of actual size - update_data_start_offset(offset); + HS_REL_ASSERT_LE(truncate_offset, m_end_offset, "truncate offset less than end offset"); + update_data_start_offset(truncate_offset); - HS_PERIODIC_LOG(INFO, device, "after truncate: m_write_sz_in_total: {}, start: {} ", - to_hex(m_write_sz_in_total.load()), to_hex(data_start_offset())); + // update in-memory total write size counter; + m_write_sz_in_total.fetch_sub(size_to_truncate, std::memory_order_relaxed); m_truncate_done = true; + + HS_PERIODIC_LOG(DEBUG, journalvdev, "After truncate desc {}", to_string()); } #if 0 -uint64_t JournalVirtualDev::get_offset_in_dev(uint32_t dev_id, uint32_t chunk_id, uint64_t offset_in_chunk) const { +uint64_t JournalVirtualDev::Descriptor::get_offset_in_dev(uint32_t dev_id, uint32_t chunk_id, uint64_t offset_in_chunk) const { return get_chunk_start_offset(dev_id, chunk_id) + offset_in_chunk; } -uint64_t JournalVirtualDev::get_chunk_start_offset(uint32_t dev_id, uint32_t chunk_id) const { +uint64_t JournalVirtualDev::Descriptor::get_chunk_start_offset(uint32_t dev_id, uint32_t chunk_id) const { return m_primary_pdev_chunks_list[dev_id].chunks_in_pdev[chunk_id]->start_offset(); } -uint64_t JournalVirtualDev::logical_to_dev_offset(off_t log_offset, uint32_t& dev_id, uint32_t& chunk_id, +uint64_t JournalVirtualDev::Descriptor::logical_to_dev_offset(off_t log_offset, uint32_t& dev_id, uint32_t& chunk_id, off_t& offset_in_chunk) const { dev_id = 0; chunk_id = 0; @@ -395,46 +561,100 @@ uint64_t JournalVirtualDev::logical_to_dev_offset(off_t log_offset, uint32_t& de } #endif -std::pair< cshared< Chunk >&, off_t > JournalVirtualDev::offset_to_chunk(off_t log_offset) const { - uint64_t off_l{static_cast< uint64_t >(log_offset)}; - for (const auto& chunk : m_all_chunks) { +std::tuple< shared< Chunk >, uint32_t, off_t > JournalVirtualDev::Descriptor::offset_to_chunk(off_t log_offset) const { + uint64_t chunk_aligned_offset = sisl::round_down(m_data_start_offset, m_vdev.info().chunk_size); + uint64_t off_l{static_cast< uint64_t >(log_offset) - chunk_aligned_offset}; + uint32_t index = 0; + for (auto& chunk : m_journal_chunks) { if (off_l >= chunk->size()) { off_l -= chunk->size(); + index++; } else { - return std::pair< cshared< Chunk >&, off_t >(chunk, off_l); + return {chunk, index, off_l}; } } HS_DBG_ASSERT(false, "Input log_offset is invalid: {}", log_offset); - return std::pair(nullptr, 0); + return {nullptr, 0L, 0L}; } -void JournalVirtualDev::high_watermark_check() { +void JournalVirtualDev::Descriptor::high_watermark_check() { if (resource_mgr().check_journal_size(used_size(), size())) { - COUNTER_INCREMENT(m_metrics, vdev_high_watermark_count, 1); + COUNTER_INCREMENT(m_vdev.m_metrics, vdev_high_watermark_count, 1); - if (m_event_cb && m_truncate_done) { + if (m_vdev.m_event_cb && m_truncate_done) { // don't send high watermark callback repeated until at least one truncate has been received; HS_LOG(INFO, device, "Callback to client for high watermark warning."); - m_event_cb(*this, vdev_event_t::SIZE_THRESHOLD_REACHED, "High watermark reached"); + m_vdev.m_event_cb(m_vdev, vdev_event_t::SIZE_THRESHOLD_REACHED, "High watermark reached"); m_truncate_done = false; } } } -bool JournalVirtualDev::is_alloc_accross_chunk(size_t size) const { - auto const [chunk, offset_in_chunk] = offset_to_chunk(tail_offset()); +bool JournalVirtualDev::Descriptor::is_alloc_accross_chunk(size_t size) const { + auto [chunk, _, offset_in_chunk] = offset_to_chunk(tail_offset()); return (offset_in_chunk + size > chunk->size()); } +nlohmann::json JournalVirtualDev::Descriptor::get_status(int log_level) const { + nlohmann::json j; + j["logdev_id"] = m_logdev_id; + j["seek_cursor"] = m_seek_cursor; + j["data_start_offset"] = m_data_start_offset; + j["end_offset"] = m_end_offset; + j["write_size"] = m_write_sz_in_total.load(std::memory_order_relaxed); + j["truncate_done"] = m_truncate_done; + j["reserved_size"] = m_reserved_sz; + j["num_chunks"] = m_journal_chunks.size(); + j["total_size"] = m_total_size; + if (log_level >= 3) { + nlohmann::json chunk_js = nlohmann::json::array(); + for (const auto& chunk : m_journal_chunks) { + nlohmann::json c; + auto* private_data = r_cast< JournalChunkPrivate* >(const_cast< uint8_t* >(chunk->user_private())); + c["chunk_id"] = chunk->chunk_id(); + c["logdev_id"] = private_data->logdev_id; + c["is_head"] = private_data->is_head; + c["end_of_chunk"] = private_data->end_of_chunk; + c["next_chunk"] = private_data->next_chunk; + chunk_js.push_back(move(c)); + } + j["chunks"] = std::move(chunk_js); + } + + LOGINFO("{}", j.dump(2, ' ')); + return j; +} + +std::string JournalVirtualDev::Descriptor::to_string() const { + std::string str{fmt::format("id={};ds=0x{};end=0x{};writesz={};tail=0x{};" + "rsvdsz={};chunks={};trunc={};total={};seek=0x{} ", + m_logdev_id, to_hex(m_data_start_offset), to_hex(m_end_offset), + m_write_sz_in_total.load(std::memory_order_relaxed), to_hex(tail_offset()), + m_reserved_sz, m_journal_chunks.size(), m_truncate_done, m_total_size, + to_hex(m_seek_cursor))}; + return str; +} + +uint64_t JournalVirtualDev::used_size() const { + std::lock_guard lock{m_mutex}; + uint64_t total_size = 0; + for (const auto& [id, jd] : m_journal_descriptors) { + total_size += jd->used_size(); + } + return total_size; +} + +uint64_t JournalVirtualDev::available_blks() const { return (size() - used_size()) / block_size(); } + nlohmann::json JournalVirtualDev::get_status(int log_level) const { + std::lock_guard lock{m_mutex}; nlohmann::json j; - j["VirtualDev"] = VirtualDev::get_status(log_level); - j["JournalVirtualDev"]["m_seek_cursor"] = m_seek_cursor; - j["JournalVirtualDev"]["data_start_offset"] = m_data_start_offset; - j["JournalVirtualDev"]["write_size"] = m_write_sz_in_total.load(std::memory_order_relaxed); - j["JournalVirtualDev"]["truncate_done"] = m_truncate_done; - j["JournalVirtualDev"]["reserved_size"] = m_reserved_sz; + j["num_descriptors"] = std::to_string(m_journal_descriptors.size()); + for (const auto& [logdev_id, descriptor] : m_journal_descriptors) { + j["journalvdev_logdev_id_" + std::to_string(logdev_id)] = descriptor->get_status(log_level); + } return j; } + } // namespace homestore diff --git a/src/lib/device/journal_vdev.hpp b/src/lib/device/journal_vdev.hpp index 9ecac4342..899253a2a 100644 --- a/src/lib/device/journal_vdev.hpp +++ b/src/lib/device/journal_vdev.hpp @@ -19,250 +19,375 @@ #include #include #include +#include +#include #include "device.h" +#include "physical_dev.hpp" #include "virtual_dev.hpp" namespace homestore { typedef std::function< void(const off_t ret_off) > alloc_next_blk_cb_t; +using journal_id_t = uint64_t; +// Each logstore family is associated to a single logdevice. +using logdev_id_t = uint64_t; + +// Chunks used for journal vdev has journal related info stored in chunk private data. +// Each log device has a list of journal chunk data with next_chunk. +// Journal vdev will arrange the chunks in order during recovery. +struct JournalChunkPrivate { + logdev_id_t logdev_id{0}; + bool is_head{false}; // Is it the head element. + uint64_t created_at{0}; // Creation timestamp + uint64_t end_of_chunk{0}; // The offset indicates end of chunk. + chunk_num_t next_chunk{0}; // Next chunk in the list. +}; + +static_assert(sizeof(JournalChunkPrivate) <= chunk_info::user_private_size, "Journal private area bigger"); class JournalVirtualDev : public VirtualDev { - struct Chunk_EOF_t { - uint64_t e; +public: + // Client use journal vdev open to create a descriptor to append log entries. + // Each descriptor is independent list of chunks in order and like sliding window + // maintains size, offsets like the left side (m_data_start_offset) and + // right side (m_end_offset). Truncate increases the left m_data_start_offset + // and pop chunks from the front of the list. alloc_next_append_blk adds more chunk to + // the back of list if no space and adjusts the right m_end_offset. All offsets + // only increase and never wraps around. Each chunk in the descriptor has private + // data about the logdev_id its part of, next chunk to maintain the list. + struct Descriptor { + private: + JournalVirtualDev& m_vdev; + logdev_id_t m_logdev_id; // Unique id identifying the journal descriptor. + // off_t is long. make it uint64_t ? + off_t m_seek_cursor{0}; // the seek cursor + + off_t m_data_start_offset{0}; // Start offset of where actual data begin for this vdev + std::atomic< uint64_t > m_write_sz_in_total{0}; // Size will be decreased by truncate and increased by append; + bool m_truncate_done{true}; + uint64_t m_reserved_sz{0}; // write size within chunk, used to check chunk boundary; + std::vector< shared< Chunk > > m_journal_chunks; // Chunks part of this journal in order. + uint64_t m_total_size{0}; // Total size of all chunks. + off_t m_end_offset{0}; // Offset right to window. Never reduced. Increased in multiple of chunk size. + bool m_end_offset_set{false}; // Adjust the m_end_offset only once during init. + friend class JournalVirtualDev; + + public: + // Descriptor is created via JournalVirtualDev::open similar to file descriptor. + Descriptor(JournalVirtualDev& vdev, logdev_id_t id) : m_vdev(vdev), m_logdev_id(id) {} + + // Create and append the chunk to m_journal_chunks. + void append_chunk(); + + /** + * @brief : allocate space specified by input size. + * + * @param size : size to be allocated + * + * @return : the start unique offset of the allocated space + * + * Possible calling sequence: + * offset_1 = reserve(size1); + * offset_2 = reserve(size2); + * write_at_offset(offset_2); + * write_at_offset(offset_1); + */ + off_t alloc_next_append_blk(const size_t size); + + /** + * @brief : writes up to count bytes from the buffer starting at buf. append advances seek cursor; + * + * @param buf : buffer to be written + * @param count : size of buffer in bytes + * @param req : async req; + * + * @return : On success, the number of bytes written is returned. On error, -1 is returned. + */ + folly::Future< std::error_code > async_append(const uint8_t* buf, size_t count); + + /** + * @brief : writes up to count bytes from the buffer starting at buf at offset offset. + * The cursor is not changed. + * pwrite always use offset returned from alloc_next_append_blk to do the write; + * pwrite should not across chunk boundaries because alloc_next_append_blk guarantees offset returned always + * doesn't across chunk boundary; + * + * @param buf : buffer pointing to the data being written + * @param size : size of buffer to be written + * @param offset : offset to be written + * @param req : async req + * + * @return : On success, the number of bytes read or written is returned, or -1 on error. + */ + folly::Future< std::error_code > async_pwrite(const uint8_t* buf, size_t size, off_t offset); + + /** + * @brief : writes iovcnt buffers of data described by iov to the offset. + * pwritev doesn't advance curosr; + * + * @param iov : the iovec that holds vector of data buffers + * @param iovcnt : size of iov + * @param offset : offset to be written + * @param req : aync req. + * if req is not nullptr, it will be an async call. + * if req is nullptr, it will be a sync call. + * + * @return : On success, number of bytes written. On error, -1 is returned + */ + folly::Future< std::error_code > async_pwritev(const iovec* iov, int iovcnt, off_t offset); + + /// @brief writes up to count bytes from the buffer starting at buf at offset offset. The cursor is not + /// changed. pwrite always use offset returned from alloc_next_append_blk to do the write;pwrite should not + /// across chunk boundaries because alloc_next_append_blk guarantees offset returned always doesn't across chunk + /// boundary; + /// + /// @param buf : buffer pointing to the data being written + /// @param size : size of buffer to be written + /// @param offset : offset to be written + /// @return : On success, the number of bytes written is returned, or -1 on error. + void sync_pwrite(const uint8_t* buf, size_t size, off_t offset); + + void sync_pwritev(const iovec* iov, int iovcnt, off_t offset); + + /** + * @brief : read up to count bytes into the buffer starting at buf. + * Only read the size before end of chunk and update m_seek_cursor to next chunk; + * + * @param buf : the buffer that points to read out data + * @param count : the size of buffer; + * + * @return : On success, the number of bytes read is returned (zero indicates end of file), and the cursor is + * advanced by this number. it is not an error if this number is smaller than the number requested, because it + * can be end of chunk, since read won't across chunk. + */ + size_t sync_next_read(uint8_t* buf, size_t count_in); + + /** + * @brief : reads up to count bytes at offset into the buffer starting at buf. + * The curosr is not updated. + * + * @param buf : the buffer that points to the read out data. + * @param count : size of buffer + * @param offset : the start offset to do read + * + * @return : return the error code of the read + */ + std::error_code sync_pread(uint8_t* buf, size_t count_in, off_t offset); + + /** + * @brief : read at offset and save output to iov. + * We don't have a use case for external caller of preadv now, meaning iov will always have only 1 element; + * if the len is acrossing chunk boundary, + * we only do read on one chunk and return the num of bytes read on this chunk; + * + * @param iov : the iovect to store the read out data + * @param iovcnt : size of iovev + * @param offset : the start offset to read + * + * @return : return the error code of the read + */ + std::error_code sync_preadv(iovec* iov, int iovcnt, off_t offset); + + /** + * @brief : repositions the cusor of the device to the argument offset + * according to the directive whence as follows: + * SEEK_SET + * The curosr is set to offset bytes. + * SEEK_CUR + * The cursor is set to its current location plus offset bytes. + * SEEK_END + * Not supported yet. No use case for now. + * + * @param offset : the logical offset + * @param whence : see above + * + * @return : Upon successful completion, lseek() returns the resulting offset + * location as measured in bytes from the beginning of the file. On + * error, the value (off_t) -1 is returned + */ + off_t lseek(off_t offset, int whence = SEEK_SET); + + /** + * @brief : this API can be replaced by lseek(0, SEEK_CUR); + * + * @return : current curosr offset + */ + off_t seeked_pos() const { return m_seek_cursor; } + + /** + * @brief :- it returns the vdev offset after nbytes from start offset + */ + off_t dev_offset(off_t nbytes) const; + + /** + * @brief : get the start logical offset where data starts; + * + * @return : the start logical offset where data starts; + */ + off_t data_start_offset() const { return m_data_start_offset; } + + off_t end_offset() const { return m_end_offset; } + + /** + * @brief : persist start logical offset to vdev's super block + * Supposed to be called when truncate happens; + * + * @param offset : the start logical offset to be persisted + */ + void update_data_start_offset(off_t offset) { + m_data_start_offset = offset; + auto data_start_offset_aligned = sisl::round_down(m_data_start_offset, m_vdev.info().chunk_size); + m_end_offset = data_start_offset_aligned + m_journal_chunks.size() * m_vdev.info().chunk_size; + RELEASE_ASSERT_EQ(m_end_offset - data_start_offset_aligned, m_total_size, "offset size mismatch"); + } + + /** + * @brief : get the logical tail offset; + * + * @param reserve_space_include : include reserved space or not; + * + * @return : the logical tail offset; + */ + off_t tail_offset(bool reserve_space_include = true) const; + + /** + * @brief : update the tail to vdev, this API will be called during reboot and + * upper layer(logdev) has completed scanning all the valid records in vdev and then + * update the tail in vdev. + * + * @param tail : logical tail offset + */ + void update_tail_offset(off_t tail); + + /** + * @brief : truncate vdev to the provided logcial offset + * + * @param truncate_offset: logical offset that vdev needs to truncate to. + * + * Concurrency: + * 1. truncate and write can be received concurrently. + * 2. multiple truncate calls can be received concurently. + * + * Following things should happen for truncate: + * 1. update in-memory counter of total write size. + * 2. update vdev superblock of the new start logical offset that is being truncate to; + * + */ + void truncate(off_t truncate_offset); + + /** + * @brief : get the total size in journal + * + * @return : the total space in journal + */ + uint64_t size() const { return m_total_size; } + + /** + * @brief : get the used size in journal + * + * @return : the used space in journal + */ + uint64_t used_size() const { return m_write_sz_in_total.load(std::memory_order_relaxed) + m_reserved_sz; } + + /** + * @brief : get the free space left in journal + * + * @return : free space left in journal + */ + uint64_t available_size() const { return size() - used_size(); } + + /** + * @brief : get the free blks available in journal, assuming page_size as a measure of blks + * + * @return : free number of pages/blks available. + */ + uint64_t available_blks() const { return available_size() / m_vdev.block_size(); } + + /** + * @brief Get the status of the journal vdev and its internal structures + * @param log_level: Log level to do verbosity. + * @return Json containing internal details + */ + nlohmann::json get_status(int log_level) const; + + std::string to_string() const; + + private: + /** + * @brief : convert logical offset to physical offset for pwrite/pwritev; + * + * @param len : len of data that is going to be written + * @param offset : logical offset to be written + * @param dev_id : the return value of device id + * @param chunk_id : the return value of chunk id + * @param req : async req + * + * @return : the unique offset + */ + auto process_pwrite_offset(size_t len, off_t offset); + + /** + * @brief : convert logical offset in chunk to the physical device offset + * + * @param dev_id : the device id + * @param chunk_id : the chunk id; + * @param offset_in_chunk : the logical offset in chunk; + * + * @return : the physical device offset; + */ + uint64_t get_offset_in_dev(uint32_t dev_id, uint32_t chunk_id, uint64_t offset_in_chunk) const; + + /** + * @brief : get the physical start offset of the chunk; + * + * @param dev_id : the deivce id; + * @param chunk_id : the chunk id; + * + * @return : the physical start offset of the chunk; + */ + uint64_t get_chunk_start_offset(uint32_t dev_id, uint32_t chunk_id) const; + + /** + * @brief : Convert from logical offset to device offset. + * It handles device overloop, e.g. reach to end of the device then start from the beginning device + * + * @param log_offset : the logical offset + * @param dev_id : the device id after convertion + * @param chunk_id : the chunk id after convertion + * @param offset_in_chunk : the relative offset in chunk + * + * @return : the unique offset after converion; + */ + // uint64_t logical_to_dev_offset(off_t log_offset, uint32_t& dev_id, uint32_t& chunk_id, + // off_t& offset_in_chunk) const; + + // Return the chunk, its index and offset in the chunk list. + std::tuple< shared< Chunk >, uint32_t, off_t > offset_to_chunk(off_t log_offset) const; + + bool validate_append_size(size_t count) const; + + void high_watermark_check(); + + bool is_alloc_accross_chunk(size_t size) const; + + auto get_dev_details(size_t len, off_t offset); }; - // NOTE: Usage of this needs to avoid punning which is now illegal in C++ 11 and up - typedef union { - struct Chunk_EOF_t eof; - std::array< unsigned char, VIRDEV_BLKSIZE > padding; - } Chunk_EOF; - - static_assert(sizeof(Chunk_EOF) == VIRDEV_BLKSIZE, "LogDevRecordHeader must be VIRDEV_SIZE bytes"); - -private: - off_t m_seek_cursor{0}; // the seek cursor - off_t m_data_start_offset{0}; // Start offset of where actual data begin for this vdev - std::atomic< uint64_t > m_write_sz_in_total{0}; // this size will be decreased by truncate and increased by append; - bool m_truncate_done{true}; - uint64_t m_reserved_sz{0}; // write size within chunk, used to check chunk boundary; - -public: /* Create a new virtual dev for these parameters */ JournalVirtualDev(DeviceManager& dmgr, const vdev_info& vinfo, vdev_event_cb_t event_cb); JournalVirtualDev(const JournalVirtualDev& other) = delete; JournalVirtualDev& operator=(const JournalVirtualDev& other) = delete; JournalVirtualDev(JournalVirtualDev&&) noexcept = delete; JournalVirtualDev& operator=(JournalVirtualDev&&) noexcept = delete; - virtual ~JournalVirtualDev() override = default; - - /** - * @brief : allocate space specified by input size. - * - * @param size : size to be allocated - * - * @return : the start unique offset of the allocated space - * - * Possible calling sequence: - * offset_1 = reserve(size1); - * offset_2 = reserve(size2); - * write_at_offset(offset_2); - * write_at_offset(offset_1); - */ - off_t alloc_next_append_blk(const size_t size); - - /** - * @brief : writes up to count bytes from the buffer starting at buf. append advances seek cursor; - * - * @param buf : buffer to be written - * @param count : size of buffer in bytes - * @param req : async req; - * - * @return : On success, the number of bytes written is returned. On error, -1 is returned. - */ - folly::Future< std::error_code > async_append(const uint8_t* buf, size_t count); - - /** - * @brief : writes up to count bytes from the buffer starting at buf at offset offset. - * The cursor is not changed. - * pwrite always use offset returned from alloc_next_append_blk to do the write; - * pwrite should not across chunk boundaries because alloc_next_append_blk guarantees offset returned always doesn't - * across chunk boundary; - * - * @param buf : buffer pointing to the data being written - * @param size : size of buffer to be written - * @param offset : offset to be written - * @param req : async req - * - * @return : On success, the number of bytes read or written is returned, or -1 on error. - */ - folly::Future< std::error_code > async_pwrite(const uint8_t* buf, size_t size, off_t offset); - - /** - * @brief : writes iovcnt buffers of data described by iov to the offset. - * pwritev doesn't advance curosr; - * - * @param iov : the iovec that holds vector of data buffers - * @param iovcnt : size of iov - * @param offset : offset to be written - * @param req : aync req. - * if req is not nullptr, it will be an async call. - * if req is nullptr, it will be a sync call. - * - * @return : On success, number of bytes written. On error, -1 is returned - */ - folly::Future< std::error_code > async_pwritev(const iovec* iov, int iovcnt, off_t offset); + virtual ~JournalVirtualDev(); - /// @brief writes up to count bytes from the buffer starting at buf at offset offset. The cursor is not - /// changed. pwrite always use offset returned from alloc_next_append_blk to do the write;pwrite should not across - /// chunk boundaries because alloc_next_append_blk guarantees offset returned always doesn't across chunk boundary; - /// - /// @param buf : buffer pointing to the data being written - /// @param size : size of buffer to be written - /// @param offset : offset to be written - /// @return : On success, the number of bytes written is returned, or -1 on error. - void sync_pwrite(const uint8_t* buf, size_t size, off_t offset); + // Initialize the journal vdev during reovery. Traverse all chunks + // and group chunks based on logdev_id and its list. + virtual void init() override; - void sync_pwritev(const iovec* iov, int iovcnt, off_t offset); - - /** - * @brief : read up to count bytes into the buffer starting at buf. - * Only read the size before end of chunk and update m_seek_cursor to next chunk; - * - * @param buf : the buffer that points to read out data - * @param count : the size of buffer; - * - * @return : On success, the number of bytes read is returned (zero indicates end of file), and the cursor is - * advanced by this number. it is not an error if this number is smaller than the number requested, because it can - * be end of chunk, since read won't across chunk. - */ - void sync_next_read(uint8_t* buf, size_t count_in); - - /** - * @brief : reads up to count bytes at offset into the buffer starting at buf. - * The curosr is not updated. - * - * @param buf : the buffer that points to the read out data. - * @param count : size of buffer - * @param offset : the start offset to do read - * - * @return : return the error code of the read - */ - std::error_code sync_pread(uint8_t* buf, size_t count_in, off_t offset); - - /** - * @brief : read at offset and save output to iov. - * We don't have a use case for external caller of preadv now, meaning iov will always have only 1 element; - * if the len is acrossing chunk boundary, - * we only do read on one chunk and return the num of bytes read on this chunk; - * - * @param iov : the iovect to store the read out data - * @param iovcnt : size of iovev - * @param offset : the start offset to read - * - * @return : return the error code of the read - */ - std::error_code sync_preadv(iovec* iov, int iovcnt, off_t offset); - - /** - * @brief : repositions the cusor of the device to the argument offset - * according to the directive whence as follows: - * SEEK_SET - * The curosr is set to offset bytes. - * SEEK_CUR - * The cursor is set to its current location plus offset bytes. - * SEEK_END - * Not supported yet. No use case for now. - * - * @param offset : the logical offset - * @param whence : see above - * - * @return : Upon successful completion, lseek() returns the resulting offset - * location as measured in bytes from the beginning of the file. On - * error, the value (off_t) -1 is returned - */ - off_t lseek(off_t offset, int whence = SEEK_SET); - - /** - * @brief : this API can be replaced by lseek(0, SEEK_CUR); - * - * @return : current curosr offset - */ - off_t seeked_pos() const { return m_seek_cursor; } - - /** - * @brief :- it returns the vdev offset after nbytes from start offset - */ - off_t dev_offset(off_t nbytes) const; - - /** - * @brief : get the start logical offset where data starts; - * - * @return : the start logical offset where data starts; - */ - off_t data_start_offset() const { return m_data_start_offset; } - - /** - * @brief : persist start logical offset to vdev's super block - * Supposed to be called when truncate happens; - * - * @param offset : the start logical offset to be persisted - */ - void update_data_start_offset(off_t offset) { m_data_start_offset = offset; } - - /** - * @brief : get the logical tail offset; - * - * @param reserve_space_include : include reserved space or not; - * - * @return : the logical tail offset; - */ - off_t tail_offset(bool reserve_space_include = true) const; - - /** - * @brief : update the tail to vdev, this API will be called during reboot and - * upper layer(logdev) has completed scanning all the valid records in vdev and then - * update the tail in vdev. - * - * @param tail : logical tail offset - */ - void update_tail_offset(off_t tail); - - /** - * @brief : truncate vdev to the provided logcial offset - * - * @param offset: logical offset that vdev needs to truncate to. - * - * Concurrency: - * 1. truncate and write can be received concurrently. - * 2. multiple truncate calls can be received concurently. - * - * Following things should happen for truncate: - * 1. update in-memory counter of total write size. - * 2. update vdev superblock of the new start logical offset that is being truncate to; - * - */ - void truncate(off_t offset); - - /** - * @brief : get the used size in vdev - * - * @return : the used space in vdev - */ - uint64_t used_size() const override { return m_write_sz_in_total.load(std::memory_order_relaxed) + m_reserved_sz; } - - /** - * @brief : get the free space left in vdev - * - * @return : free space left in vdev - */ - uint64_t available_size() const { return size() - used_size(); } - - /** - * @brief : get the free blks available in vdev, assuming page_size as a measure of blks - * - * @return : free number of pages/blks available. - */ - uint64_t available_blks() const override { return available_size() / block_size(); } + // Create and return a journal descriptor. A journal descriptor has a list of chunks + // where log entries are stored. It also mantains offsets, size etc. + shared< Descriptor > open(logdev_id_t id); /** * @brief Get the status of the journal vdev and its internal structures @@ -271,66 +396,21 @@ class JournalVirtualDev : public VirtualDev { */ nlohmann::json get_status(int log_level) const override; -private: - /** - * @brief : convert logical offset to physical offset for pwrite/pwritev; - * - * @param len : len of data that is going to be written - * @param offset : logical offset to be written - * @param dev_id : the return value of device id - * @param chunk_id : the return value of chunk id - * @param req : async req - * - * @return : the unique offset - */ - auto process_pwrite_offset(size_t len, off_t offset); - - /** - * @brief : convert logical offset in chunk to the physical device offset - * - * @param dev_id : the device id - * @param chunk_id : the chunk id; - * @param offset_in_chunk : the logical offset in chunk; - * - * @return : the physical device offset; - */ - uint64_t get_offset_in_dev(uint32_t dev_id, uint32_t chunk_id, uint64_t offset_in_chunk) const; + uint64_t used_size() const override; + uint64_t available_blks() const override; - /** - * @brief : get the physical start offset of the chunk; - * - * @param dev_id : the deivce id; - * @param chunk_id : the chunk id; - * - * @return : the physical start offset of the chunk; - */ - uint64_t get_chunk_start_offset(uint32_t dev_id, uint32_t chunk_id) const; + void update_chunk_private(shared< Chunk >& chunk, JournalChunkPrivate* chunk_private); + uint64_t get_end_of_chunk(shared< Chunk >& chunk) const; - /** - * @brief : Convert from logical offset to device offset. - * It handles device overloop, e.g. reach to end of the device then start from the beginning device - * - * @param log_offset : the logical offset - * @param dev_id : the device id after convertion - * @param chunk_id : the chunk id after convertion - * @param offset_in_chunk : the relative offset in chunk - * - * @return : the unique offset after converion; - */ - // uint64_t logical_to_dev_offset(off_t log_offset, uint32_t& dev_id, uint32_t& chunk_id, - // off_t& offset_in_chunk) const; - - std::pair< cshared< Chunk >&, off_t > offset_to_chunk(off_t log_offset) const; - - bool validate_append_size(size_t count) const; - - void high_watermark_check(); - - off_t alloc_next_append_blk_internal(size_t size); - - bool is_alloc_accross_chunk(size_t size) const; - - auto get_dev_details(size_t len, off_t offset); +private: + // Mapping of logdev id to its journal descriptors. + std::unordered_map< logdev_id_t, shared< Descriptor > > m_journal_descriptors; + std::mutex m_mutex; + + // Cache the chunks. Getting a chunk from the pool causes a single write of the + // last chunk in the list to update its end_of_chunk and next_chunk. + std::unique_ptr< ChunkPool > m_chunk_pool; + std::shared_ptr< JournalChunkPrivate > init_private_data; }; } // namespace homestore diff --git a/src/lib/device/physical_dev.cpp b/src/lib/device/physical_dev.cpp index 4c4b3da2a..fe44059de 100644 --- a/src/lib/device/physical_dev.cpp +++ b/src/lib/device/physical_dev.cpp @@ -242,7 +242,7 @@ std::vector< shared< Chunk > > PhysicalDev::create_chunks(const std::vector< uin auto ptr = buf; for (auto cslot = b.start_bit; cslot < b.start_bit + b.nbits; ++cslot, ++cit, ptr += chunk_info::size) { chunk_info* cinfo = new (ptr) chunk_info(); - populate_chunk_info(cinfo, vdev_id, size, chunk_ids[cit], cit); + populate_chunk_info(cinfo, vdev_id, size, chunk_ids[cit], cit, {}); auto chunk = std::make_shared< Chunk >(this, *cinfo, cslot); ret_chunks.push_back(chunk); @@ -271,7 +271,8 @@ std::vector< shared< Chunk > > PhysicalDev::create_chunks(const std::vector< uin return ret_chunks; } -shared< Chunk > PhysicalDev::create_chunk(uint32_t chunk_id, uint32_t vdev_id, uint64_t size, uint32_t ordinal) { +shared< Chunk > PhysicalDev::create_chunk(uint32_t chunk_id, uint32_t vdev_id, uint64_t size, uint32_t ordinal, + const sisl::blob& user_private) { std::unique_lock lg{m_chunk_op_mtx}; // We need to alloc a slot to store the chunk_info in the super blk @@ -285,7 +286,7 @@ shared< Chunk > PhysicalDev::create_chunk(uint32_t chunk_id, uint32_t vdev_id, u shared< Chunk > chunk; try { - populate_chunk_info(cinfo, vdev_id, size, chunk_id, ordinal); + populate_chunk_info(cinfo, vdev_id, size, chunk_id, ordinal, user_private); // Locate and write the chunk info in the super blk area write_super_block(buf, chunk_info::size, chunk_info_offset_nth(cslot)); @@ -402,7 +403,7 @@ uint64_t PhysicalDev::chunk_info_offset_nth(uint32_t slot) const { } void PhysicalDev::populate_chunk_info(chunk_info* cinfo, uint32_t vdev_id, uint64_t size, uint32_t chunk_id, - uint32_t ordinal) { + uint32_t ordinal, const sisl::blob& private_data) { // Find the free area for chunk data within between data_start_offset() and data_end_offset() auto ival = find_next_chunk_area(size); m_chunk_data_area.insert(ival); @@ -413,6 +414,7 @@ void PhysicalDev::populate_chunk_info(chunk_info* cinfo, uint32_t vdev_id, uint6 cinfo->chunk_id = chunk_id; cinfo->chunk_ordinal = ordinal; cinfo->set_allocated(); + cinfo->set_user_private(private_data); cinfo->compute_checksum(); } diff --git a/src/lib/device/physical_dev.hpp b/src/lib/device/physical_dev.hpp index ade1be9a2..2987fb45e 100644 --- a/src/lib/device/physical_dev.hpp +++ b/src/lib/device/physical_dev.hpp @@ -79,13 +79,13 @@ struct chunk_info { uint64_t chunk_start_offset{0}; // 0: Start offset of the chunk within a pdev uint64_t chunk_size{0}; // 8: Chunk size - uint64_t end_of_chunk_size{0}; // 16: The offset indicates end of chunk. - uint32_t vdev_id{0}; // 24: Virtual device id this chunk hosts. UINT32_MAX if chunk is free - uint32_t chunk_id{0}; // 28: ID for this chunk - unique for entire homestore across devices - uint32_t chunk_ordinal{0}; // 32: Chunk ordinal within the vdev on this pdev - uint8_t chunk_allocated{0x00}; // 36: Is chunk allocated or free - uint16_t checksum{0}; // 37: checksum of this chunk info - uint8_t padding[25]{}; // 39: pad to make it 128 bytes total + + uint32_t vdev_id{0}; // 24: Virtual device id this chunk hosts. UINT32_MAX if chunk is free + uint32_t chunk_id{0}; // 28: ID for this chunk - unique for entire homestore across devices + uint32_t chunk_ordinal{0}; // 32: Chunk ordinal within the vdev on this pdev + uint8_t chunk_allocated{0x00}; // 36: Is chunk allocated or free + uint16_t checksum{0}; // 37: checksum of this chunk info + uint8_t padding[25]{}; // 39: pad to make it 128 bytes total uint8_t chunk_selector_private[selector_private_size]{}; // 64: Chunk selector private area uint8_t user_private[user_private_size]{}; // 128: Opaque user of the chunk information @@ -99,7 +99,9 @@ struct chunk_info { std::memcpy(&chunk_selector_private, data.cbytes(), std::min(data.size(), uint32_cast(selector_private_size))); } void set_user_private(const sisl::blob& data) { - std::memcpy(&user_private, data.cbytes(), std::min(data.size(), uint32_cast(user_private_size))); + if (data.size() != 0) { + std::memcpy(&user_private, data.cbytes(), std::min(data.size(), uint32_cast(user_private_size))); + } } void compute_checksum() { @@ -171,8 +173,10 @@ class PhysicalDev { /// @param size: Size of each chunk /// @param ordinal: Ordinal for a pdev within the vdev. This is useful to match similar vdevs from different pdevs /// for mirroring + /// @param private_data: data to be stored in chunk private space. /// @return Shared instance of chunk class created - shared< Chunk > create_chunk(uint32_t chunk_id, uint32_t vdev_id, uint64_t size, uint32_t ordinal); + shared< Chunk > create_chunk(uint32_t chunk_id, uint32_t vdev_id, uint64_t size, uint32_t ordinal, + const sisl::blob& private_data = {}); void load_chunks(std::function< bool(cshared< Chunk >&) >&& chunk_found_cb); void remove_chunks(std::vector< shared< Chunk > >& chunks); @@ -227,7 +231,8 @@ class PhysicalDev { private: void do_remove_chunk(cshared< Chunk >& chunk); - void populate_chunk_info(chunk_info* cinfo, uint32_t vdev_id, uint64_t size, uint32_t chunk_id, uint32_t ordinal); + void populate_chunk_info(chunk_info* cinfo, uint32_t vdev_id, uint64_t size, uint32_t chunk_id, uint32_t ordinal, + const sisl::blob& private_data); void free_chunk_info(chunk_info* cinfo); ChunkInterval find_next_chunk_area(uint64_t size) const; }; diff --git a/src/lib/device/virtual_dev.cpp b/src/lib/device/virtual_dev.cpp index a63bee2ba..c79abd73c 100644 --- a/src/lib/device/virtual_dev.cpp +++ b/src/lib/device/virtual_dev.cpp @@ -123,6 +123,13 @@ void VirtualDev::add_chunk(cshared< Chunk >& chunk, bool is_fresh_chunk) { m_chunk_selector->add_chunk(chunk); } +void VirtualDev::remove_chunk(cshared< Chunk >& chunk) { + std::unique_lock lg{m_mgmt_mutex}; + auto iter = std::remove_if(m_all_chunks.begin(), m_all_chunks.end(), [chunk](auto c) { return c == chunk; }); + m_all_chunks.erase(iter, m_all_chunks.end()); + m_chunk_selector->remove_chunk(chunk); +} + folly::Future< std::error_code > VirtualDev::async_format() { static thread_local std::vector< folly::Future< std::error_code > > s_futs; s_futs.clear(); @@ -533,17 +540,12 @@ nlohmann::json VirtualDev::get_status(int log_level) const { return j; } -uint32_t VirtualDev::align_size() const { - auto* pdev = *(m_pdevs.begin()); - return pdev->align_size(); -} +uint32_t VirtualDev::align_size() const { return m_dmgr.align_size(static_cast< HSDevType >(m_vdev_info.hs_dev_type)); } uint32_t VirtualDev::optimal_page_size() const { - auto* pdev = *(m_pdevs.begin()); - return pdev->optimal_page_size(); + return m_dmgr.optimal_page_size(static_cast< HSDevType >(m_vdev_info.hs_dev_type)); } uint32_t VirtualDev::atomic_page_size() const { - auto* pdev = *(m_pdevs.begin()); - return pdev->atomic_page_size(); + return m_dmgr.atomic_page_size(static_cast< HSDevType >(m_vdev_info.hs_dev_type)); } std::string VirtualDev::to_string() const { return ""; } @@ -598,6 +600,7 @@ int VirtualDev::cp_progress_percent() { return 100; } ///////////////////////// VirtualDev Private Methods ///////////////////////////// uint64_t VirtualDev::to_dev_offset(BlkId const& b, Chunk** chunk) const { *chunk = m_dmgr.get_chunk_mutable(b.chunk_num()); + RELEASE_ASSERT(*chunk, "Chunk got null {}", b.chunk_num()); return uint64_cast(b.blk_num()) * block_size() + uint64_cast((*chunk)->start_offset()); } diff --git a/src/lib/device/virtual_dev.hpp b/src/lib/device/virtual_dev.hpp index d28a67f35..6cc629cfc 100644 --- a/src/lib/device/virtual_dev.hpp +++ b/src/lib/device/virtual_dev.hpp @@ -79,6 +79,7 @@ struct blkalloc_cp; class VirtualDev; ENUM(vdev_event_t, uint8_t, SIZE_THRESHOLD_REACHED, VDEV_ERRORED_OUT); + using vdev_event_cb_t = std::function< void(VirtualDev&, vdev_event_t, const std::string&) >; class VDevCPContext; @@ -108,12 +109,20 @@ class VirtualDev { VirtualDev& operator=(VirtualDev&&) noexcept = delete; virtual ~VirtualDev() = default; + /// @brief Run any initialization of the vdev after recovery or first time. + virtual void init() {} + /// @brief Adds chunk to the vdev. It is expected that this will happen at startup time and hence it only /// takes lock for writing and not reading /// /// @param chunk Chunk to be added virtual void add_chunk(cshared< Chunk >& chunk, bool is_fresh_chunk); + /// @brief Remove chunk from the vdev. + /// + /// @param chunk Chunk to be removed. + virtual void remove_chunk(cshared< Chunk >& chunk); + /// @brief Formats the vdev asynchronously by zeroing the entire vdev. It will use underlying physical device /// capabilities to zero them if fast zero is possible, otherwise will zero block by block /// @param cb Callback after formatting is completed. @@ -270,6 +279,8 @@ class VirtualDev { virtual uint64_t used_size() const; virtual uint64_t num_chunks() const { return m_vdev_info.num_primary_chunks; } virtual uint32_t block_size() const { return m_vdev_info.blk_size; } + virtual vdev_info info() const { return m_vdev_info; } + virtual void update_info(const vdev_info& info) { m_vdev_info = info; } virtual uint32_t num_mirrors() const { return 0; } virtual std::string to_string() const; virtual nlohmann::json get_status(int log_level) const; diff --git a/src/lib/homestore.cpp b/src/lib/homestore.cpp index 8486fc44d..b22164b5d 100644 --- a/src/lib/homestore.cpp +++ b/src/lib/homestore.cpp @@ -165,10 +165,10 @@ void HomeStore::format_and_start(std::map< uint32_t, hs_format_params >&& format m_meta_service->create_vdev(pct_to_size(fparams.size_pct, HSDevType::Fast), fparams.num_chunks); } else if ((svc_type & HS_SERVICE::LOG_REPLICATED) && has_log_service()) { futs.emplace_back(m_log_service->create_vdev(pct_to_size(fparams.size_pct, HSDevType::Fast), - LogStoreService::DATA_LOG_FAMILY_IDX, fparams.num_chunks)); + LogStoreService::DATA_LOG_FAMILY_IDX, fparams.chunk_size)); } else if ((svc_type & HS_SERVICE::LOG_LOCAL) && has_log_service()) { futs.emplace_back(m_log_service->create_vdev(pct_to_size(fparams.size_pct, HSDevType::Fast), - LogStoreService::CTRL_LOG_FAMILY_IDX, fparams.num_chunks)); + LogStoreService::CTRL_LOG_FAMILY_IDX, fparams.chunk_size)); } else if ((svc_type & HS_SERVICE::DATA) && has_data_service()) { m_data_service->create_vdev(pct_to_size(fparams.size_pct, HSDevType::Data), fparams.block_size, fparams.alloc_type, fparams.chunk_sel_type, fparams.num_chunks); diff --git a/src/lib/logstore/log_dev.cpp b/src/lib/logstore/log_dev.cpp index c4c4ad579..a51cf228e 100644 --- a/src/lib/logstore/log_dev.cpp +++ b/src/lib/logstore/log_dev.cpp @@ -59,7 +59,11 @@ void LogDev::start(bool format, JournalVirtualDev* vdev) { HS_LOG_ASSERT((m_store_found_cb != nullptr), "Expected Log store found callback to be registered"); HS_LOG_ASSERT((m_logfound_cb != nullptr), "Expected Logs found callback to be registered"); + // Each family has one journal descriptor. m_vdev = vdev; + m_vdev_jd = m_vdev->open(m_family_id); + RELEASE_ASSERT(m_vdev_jd, "Journal descriptor is null"); + if (m_flush_size_multiple == 0) { m_flush_size_multiple = m_vdev->optimal_page_size(); } THIS_LOGDEV_LOG(INFO, "Initializing logdev with flush size multiple={}", m_flush_size_multiple); @@ -73,7 +77,8 @@ void LogDev::start(bool format, JournalVirtualDev* vdev) { if (format) { HS_LOG_ASSERT(m_logdev_meta.is_empty(), "Expected meta to be not present"); m_logdev_meta.create(); - m_vdev->update_data_start_offset(0); + + m_vdev_jd->update_data_start_offset(0); } else { HS_LOG_ASSERT(!m_logdev_meta.is_empty(), "Expected meta data to be read already before loading"); auto const store_list = m_logdev_meta.load(); @@ -86,12 +91,13 @@ void LogDev::start(bool format, JournalVirtualDev* vdev) { THIS_LOGDEV_LOG(INFO, "get start vdev offset during recovery {} log indx {} ", m_logdev_meta.get_start_dev_offset(), m_logdev_meta.get_start_log_idx()); - m_vdev->update_data_start_offset(m_logdev_meta.get_start_dev_offset()); + m_vdev_jd->update_data_start_offset(m_logdev_meta.get_start_dev_offset()); m_log_idx = m_logdev_meta.get_start_log_idx(); do_load(m_logdev_meta.get_start_dev_offset()); m_log_records->reinit(m_log_idx); m_last_flush_idx = m_log_idx - 1; } + m_flush_timer_hdl = iomanager.schedule_global_timer( HS_DYNAMIC_CONFIG(logstore.flush_timer_frequency_us) * 1000, true, nullptr /* cookie */, iomgr::reactor_regex::all_worker, @@ -143,7 +149,7 @@ void LogDev::stop() { } void LogDev::do_load(const off_t device_cursor) { - log_stream_reader lstream{device_cursor, m_vdev, m_flush_size_multiple}; + log_stream_reader lstream{device_cursor, m_vdev, m_vdev_jd, m_flush_size_multiple}; logid_t loaded_from{-1}; off_t group_dev_offset; @@ -201,7 +207,7 @@ void LogDev::do_load(const off_t device_cursor) { // Update the tail offset with where we finally end up loading, so that new append entries can be written from // here. - m_vdev->update_tail_offset(group_dev_offset); + m_vdev_jd->update_tail_offset(group_dev_offset); } void LogDev::assert_next_pages(log_stream_reader& lstream) { @@ -236,7 +242,7 @@ int64_t LogDev::append_async(const logstore_id_t store_id, const logstore_seq_nu log_buffer LogDev::read(const logdev_key& key, serialized_log_record& return_record_header) { auto buf = sisl::make_byte_array(initial_read_size, m_flush_size_multiple, sisl::buftag::logread); - m_vdev->sync_pread(buf->bytes(), initial_read_size, key.dev_offset); + m_vdev_jd->sync_pread(buf->bytes(), initial_read_size, key.dev_offset); auto* header = r_cast< const log_group_header* >(buf->cbytes()); HS_REL_ASSERT_EQ(header->magic_word(), LOG_GROUP_HDR_MAGIC, "Log header corrupted with magic mismatch!"); @@ -263,7 +269,7 @@ log_buffer LogDev::read(const logdev_key& key, serialized_log_record& return_rec auto const rounded_size = sisl::round_up(record_header->size + data_offset - rounded_data_offset, m_vdev->align_size()); auto new_buf = sisl::make_byte_array(rounded_size, m_vdev->align_size(), sisl::buftag::logread); - m_vdev->sync_pread(new_buf->bytes(), rounded_size, key.dev_offset + rounded_data_offset); + m_vdev_jd->sync_pread(new_buf->bytes(), rounded_size, key.dev_offset + rounded_data_offset); ret_view = sisl::byte_view{new_buf, s_cast< uint32_t >(data_offset - rounded_data_offset), record_header->size}; } @@ -385,10 +391,11 @@ bool LogDev::flush_if_needed(int64_t threshold_size) { auto sz = m_pending_flush_size.fetch_sub(lg->actual_data_size(), std::memory_order_relaxed); HS_REL_ASSERT_GE((sz - lg->actual_data_size()), 0, "size {} lg size{}", sz, lg->actual_data_size()); - off_t offset = m_vdev->alloc_next_append_blk(lg->header()->total_size()); + off_t offset = m_vdev_jd->alloc_next_append_blk(lg->header()->total_size()); lg->m_log_dev_offset = offset; HS_REL_ASSERT_NE(lg->m_log_dev_offset, INVALID_OFFSET, "log dev is full"); THIS_LOGDEV_LOG(TRACE, "Flush prepared, flushing data size={} at offset={}", lg->actual_data_size(), offset); + do_flush(lg); return true; } else { @@ -419,7 +426,7 @@ void LogDev::do_flush_write(LogGroup* lg) { THIS_LOGDEV_LOG(TRACE, "vdev offset={} log group total size={}", lg->m_log_dev_offset, lg->header()->total_size()); // write log - m_vdev->async_pwritev(lg->iovecs().data(), int_cast(lg->iovecs().size()), lg->m_log_dev_offset) + m_vdev_jd->async_pwritev(lg->iovecs().data(), int_cast(lg->iovecs().size()), lg->m_log_dev_offset) .thenValue([this, lg](auto) { on_flush_completion(lg); }); } @@ -524,7 +531,7 @@ uint64_t LogDev::truncate(const logdev_key& key) { HS_PERIODIC_LOG(INFO, logstore, "Truncating log device upto log_id={} vdev_offset={} truncated {} log records", key.idx, key.dev_offset, num_records_to_truncate); m_log_records->truncate(key.idx); - m_vdev->truncate(key.dev_offset); + m_vdev_jd->truncate(key.dev_offset); m_last_truncate_idx = key.idx; { @@ -551,7 +558,6 @@ uint64_t LogDev::truncate(const logdev_key& key) { // We can remove the rollback records of those upto which logid is getting truncated m_logdev_meta.remove_rollback_record_upto(key.idx, false /* persist_now */); - m_logdev_meta.persist(); #ifdef _PRERELEASE if (garbage_collect && iomgr_flip::instance()->test_flip("logdev_abort_after_garbage")) { diff --git a/src/lib/logstore/log_dev.hpp b/src/lib/logstore/log_dev.hpp index f6d4fb606..a417c1fc6 100644 --- a/src/lib/logstore/log_dev.hpp +++ b/src/lib/logstore/log_dev.hpp @@ -37,6 +37,7 @@ #include #include "common/homestore_config.hpp" #include "device/chunk.h" +#include "device/journal_vdev.hpp" namespace homestore { @@ -51,15 +52,15 @@ static constexpr uint32_t max_log_group{2}; /* * LogGroup Layout: * - * <---- Log Group Header ---> <-- Record 1 --> <-- Record 2 --> <-- - Inline data area --> + * <---- Log Group Header ---> <-- Record 1 --> <-- Record 2 --> <-- - Inline data area --> * |----------------------------------------- |--------------------|--------------------| |----------------|-----------|----------------| * |#records|...| oob area | inline area | Size | data offset | Size | data offset | ... | Record #1 data | ... | OOB Record 1 | * |----------------------------------------- |--------------------|--------------------| |----------------|-----------|----------------| - * | | | ^ ^ + * | | | ^ ^ * | | | | | * | | -------------------------------------| | * | ------------------------------------------------------------| | - * |------------------------------------------------------------------------------------------------------| + * |------------------------------------------------------------------------------------------------------| */ // clang-format on @@ -549,7 +550,8 @@ class JournalVirtualDev; class log_stream_reader { public: - log_stream_reader(off_t device_cursor, JournalVirtualDev* vdev, uint64_t min_read_size); + log_stream_reader(off_t device_cursor, JournalVirtualDev* vdev, shared< JournalVirtualDev::Descriptor > vdev_jd, + uint64_t min_read_size); log_stream_reader(const log_stream_reader&) = delete; log_stream_reader& operator=(const log_stream_reader&) = delete; log_stream_reader(log_stream_reader&&) noexcept = delete; @@ -564,6 +566,7 @@ class log_stream_reader { private: JournalVirtualDev* m_vdev; + shared< JournalVirtualDev::Descriptor > m_vdev_jd; // Journal descriptor. sisl::byte_view m_cur_log_buf; off_t m_first_group_cursor; off_t m_cur_read_bytes{0}; @@ -805,7 +808,8 @@ class LogDev { bool m_stopped{false}; // Is Logdev stopped. We don't need lock here, because it is updated under flush lock logstore_family_id_t m_family_id; // The family id this logdev is part of JournalVirtualDev* m_vdev{nullptr}; - HomeStoreSafePtr m_hs; // Back pointer to homestore + shared< JournalVirtualDev::Descriptor > m_vdev_jd; // Journal descriptor. + HomeStoreSafePtr m_hs; // Back pointer to homestore std::multimap< logid_t, logstore_id_t > m_garbage_store_ids; Clock::time_point m_last_flush_time; diff --git a/src/lib/logstore/log_store_family.cpp b/src/lib/logstore/log_store_family.cpp index d96eaf070..0af47625d 100644 --- a/src/lib/logstore/log_store_family.cpp +++ b/src/lib/logstore/log_store_family.cpp @@ -120,6 +120,7 @@ void LogStoreFamily::device_truncate(const std::shared_ptr< truncate_req >& treq iomanager.run_on_forget(logstore_service().truncate_thread(), [this, treq]() { const logdev_key trunc_upto = do_device_truncate(treq->dry_run); bool done{false}; + if (treq->cb || treq->wait_till_done) { { std::lock_guard< std::mutex > lk{treq->mtx}; @@ -129,7 +130,10 @@ void LogStoreFamily::device_truncate(const std::shared_ptr< truncate_req >& treq } if (done) { if (treq->cb) { treq->cb(treq->m_trunc_upto_result); } - if (treq->wait_till_done) { treq->cv.notify_one(); } + if (treq->wait_till_done) { + std::lock_guard< std::mutex > lk{treq->mtx}; + treq->cv.notify_one(); + } } m_log_dev.unlock_flush(); }); diff --git a/src/lib/logstore/log_store_service.cpp b/src/lib/logstore/log_store_service.cpp index 931e11b44..67f2dc1dd 100644 --- a/src/lib/logstore/log_store_service.cpp +++ b/src/lib/logstore/log_store_service.cpp @@ -43,7 +43,7 @@ LogStoreService::LogStoreService() : std::make_unique< LogStoreFamily >(CTRL_LOG_FAMILY_IDX)} {} folly::Future< std::error_code > LogStoreService::create_vdev(uint64_t size, logstore_family_id_t family, - uint32_t num_chunks) { + uint32_t chunk_size) { const auto atomic_page_size = hs()->device_mgr()->atomic_page_size(HSDevType::Fast); hs_vdev_context hs_ctx; @@ -62,9 +62,11 @@ folly::Future< std::error_code > LogStoreService::create_vdev(uint64_t size, log // future, we can let consumer set it by then; auto vdev = hs()->device_mgr()->create_vdev(vdev_parameters{.vdev_name = name, - .vdev_size = size, - .num_chunks = num_chunks, + .size_type = vdev_size_type_t::VDEV_SIZE_DYNAMIC, + .vdev_size = 0, + .num_chunks = 0, .blk_size = atomic_page_size, + .chunk_size = chunk_size, .dev_type = HSDevType::Fast, .alloc_type = blk_allocator_type_t::none, .chunk_sel_type = chunk_selector_type_t::ROUND_ROBIN, diff --git a/src/lib/logstore/log_stream.cpp b/src/lib/logstore/log_stream.cpp index 1d4b2e82c..67c8fe284 100644 --- a/src/lib/logstore/log_stream.cpp +++ b/src/lib/logstore/log_stream.cpp @@ -23,9 +23,13 @@ namespace homestore { SISL_LOGGING_DECL(logstore) -log_stream_reader::log_stream_reader(off_t device_cursor, JournalVirtualDev* store, uint64_t read_size_multiple) : - m_vdev{store}, m_first_group_cursor{device_cursor}, m_read_size_multiple{read_size_multiple} { - m_vdev->lseek(m_first_group_cursor); +log_stream_reader::log_stream_reader(off_t device_cursor, JournalVirtualDev* vdev, + shared< JournalVirtualDev::Descriptor > vdev_jd, uint64_t read_size_multiple) : + m_vdev{vdev}, + m_vdev_jd{std::move(vdev_jd)}, + m_first_group_cursor{device_cursor}, + m_read_size_multiple{read_size_multiple} { + m_vdev_jd->lseek(m_first_group_cursor); } sisl::byte_view log_stream_reader::next_group(off_t* out_dev_offset) { @@ -38,6 +42,10 @@ sisl::byte_view log_stream_reader::next_group(off_t* out_dev_offset) { if (m_cur_log_buf.size() < min_needed) { do { m_cur_log_buf = read_next_bytes(std::max(min_needed, bulk_read_size)); + if (m_cur_log_buf.size() == 0) { + LOGINFOMOD(logstore, "Logdev data empty"); + return {}; + } } while (m_cur_log_buf.size() < sizeof(log_group_header)); min_needed = 0; } @@ -46,8 +54,8 @@ sisl::byte_view log_stream_reader::next_group(off_t* out_dev_offset) { const auto* header = r_cast< log_group_header const* >(m_cur_log_buf.bytes()); if (header->magic_word() != LOG_GROUP_HDR_MAGIC) { LOGINFOMOD(logstore, "Logdev data not seeing magic at pos {}, must have come to end of logdev", - m_vdev->dev_offset(m_cur_read_bytes)); - *out_dev_offset = m_vdev->dev_offset(m_cur_read_bytes); + m_vdev_jd->dev_offset(m_cur_read_bytes)); + *out_dev_offset = m_vdev_jd->dev_offset(m_cur_read_bytes); // move it by dma boundary if header is not valid m_prev_crc = 0; @@ -66,14 +74,15 @@ sisl::byte_view log_stream_reader::next_group(off_t* out_dev_offset) { LOGTRACEMOD(logstore, "Logstream read log group of size={} nrecords={} m_cur_log_dev_offset {} buf size " "remaining {} ", - header->total_size(), header->nrecords(), m_vdev->dev_offset(m_cur_read_bytes), m_cur_log_buf.size()); + header->total_size(), header->nrecords(), m_vdev_jd->dev_offset(m_cur_read_bytes), + m_cur_log_buf.size()); // compare it with prev crc if (m_prev_crc != 0 && m_prev_crc != header->prev_grp_crc) { // we reached at the end LOGINFOMOD(logstore, "we have reached the end. crc doesn't match with the prev crc {}", - m_vdev->dev_offset(m_cur_read_bytes)); - *out_dev_offset = m_vdev->dev_offset(m_cur_read_bytes); + m_vdev_jd->dev_offset(m_cur_read_bytes)); + *out_dev_offset = m_vdev_jd->dev_offset(m_cur_read_bytes); // move it by dma boundary if header is not valid m_prev_crc = 0; @@ -87,7 +96,7 @@ sisl::byte_view log_stream_reader::next_group(off_t* out_dev_offset) { LOGINFOMOD(logstore, "last write is not completely written. footer magic {} footer start_log_idx {} header log indx {}", footer->magic, footer->start_log_idx, header->start_log_idx); - *out_dev_offset = m_vdev->dev_offset(m_cur_read_bytes); + *out_dev_offset = m_vdev_jd->dev_offset(m_cur_read_bytes); // move it by dma boundary if header is not valid m_prev_crc = 0; @@ -103,8 +112,8 @@ sisl::byte_view log_stream_reader::next_group(off_t* out_dev_offset) { if (cur_crc != header->cur_grp_crc) { /* This is a valid entry so crc should match */ HS_REL_ASSERT(0, "data is corrupted"); - LOGINFOMOD(logstore, "crc doesn't match {}", m_vdev->dev_offset(m_cur_read_bytes)); - *out_dev_offset = m_vdev->dev_offset(m_cur_read_bytes); + LOGINFOMOD(logstore, "crc doesn't match {}", m_vdev_jd->dev_offset(m_cur_read_bytes)); + *out_dev_offset = m_vdev_jd->dev_offset(m_cur_read_bytes); // move it by dma boundary if header is not valid m_prev_crc = 0; @@ -116,7 +125,7 @@ sisl::byte_view log_stream_reader::next_group(off_t* out_dev_offset) { m_prev_crc = cur_crc; ret_buf = m_cur_log_buf; - *out_dev_offset = m_vdev->dev_offset(m_cur_read_bytes); + *out_dev_offset = m_vdev_jd->dev_offset(m_cur_read_bytes); m_cur_read_bytes += header->total_size(); m_cur_log_buf.move_forward(header->total_size()); @@ -135,10 +144,10 @@ sisl::byte_view log_stream_reader::read_next_bytes(uint64_t nbytes) { hs_utils::make_byte_array(nbytes + m_cur_log_buf.size(), true, sisl::buftag::logread, m_vdev->align_size()); if (m_cur_log_buf.size()) { memcpy(out_buf->bytes(), m_cur_log_buf.bytes(), m_cur_log_buf.size()); } - const auto prev_pos = m_vdev->seeked_pos(); - m_vdev->sync_next_read(out_buf->bytes() + m_cur_log_buf.size(), nbytes); + const auto prev_pos = m_vdev_jd->seeked_pos(); + m_vdev_jd->sync_next_read(out_buf->bytes() + m_cur_log_buf.size(), nbytes); LOGINFOMOD(logstore, "LogStream read {} bytes from vdev offset {} and vdev cur offset {}", nbytes, prev_pos, - m_vdev->seeked_pos()); + m_vdev_jd->seeked_pos()); return sisl::byte_view{out_buf}; } } // namespace homestore diff --git a/src/lib/replication/repl_dev/solo_repl_dev.cpp b/src/lib/replication/repl_dev/solo_repl_dev.cpp index 08d9e094e..3d62182ea 100644 --- a/src/lib/replication/repl_dev/solo_repl_dev.cpp +++ b/src/lib/replication/repl_dev/solo_repl_dev.cpp @@ -138,6 +138,7 @@ void SoloReplDev::cp_flush(CP*) { m_rd_sb.write(); } -void SoloReplDev::cp_cleanup(CP*) { /* m_data_journal->truncate(m_rd_sb->checkpoint_lsn); */ } +void SoloReplDev::cp_cleanup(CP*) { /* m_data_journal->truncate(m_rd_sb->checkpoint_lsn); */ +} } // namespace homestore diff --git a/src/tests/test_common/homestore_test_common.hpp b/src/tests/test_common/homestore_test_common.hpp index 3589fca43..758becb58 100644 --- a/src/tests/test_common/homestore_test_common.hpp +++ b/src/tests/test_common/homestore_test_common.hpp @@ -169,6 +169,8 @@ class HSTestHelper { IndexServiceCallbacks* index_svc_cbs{nullptr}; shared< ReplApplication > repl_app{nullptr}; chunk_num_t num_chunks{1}; + uint64_t chunk_size{32 * 1024 * 1024}; // Chunk size in MB. + vdev_size_type_t vdev_size_type{vdev_size_type_t::VDEV_SIZE_STATIC}; }; #if 0 @@ -249,24 +251,29 @@ class HSTestHelper { hsi->start(hs_input_params{.devices = device_info, .app_mem_size = app_mem_size}, std::move(cb)); if (need_format) { - hsi->format_and_start( - {{HS_SERVICE::META, {.size_pct = svc_params[HS_SERVICE::META].size_pct}}, - {HS_SERVICE::LOG_REPLICATED, {.size_pct = svc_params[HS_SERVICE::LOG_REPLICATED].size_pct}}, - {HS_SERVICE::LOG_LOCAL, {.size_pct = svc_params[HS_SERVICE::LOG_LOCAL].size_pct}}, - {HS_SERVICE::DATA, - {.size_pct = svc_params[HS_SERVICE::DATA].size_pct, - .num_chunks = svc_params[HS_SERVICE::DATA].num_chunks, - .alloc_type = svc_params[HS_SERVICE::DATA].blkalloc_type, - .chunk_sel_type = svc_params[HS_SERVICE::DATA].custom_chunk_selector - ? chunk_selector_type_t::CUSTOM - : chunk_selector_type_t::ROUND_ROBIN}}, - {HS_SERVICE::INDEX, {.size_pct = svc_params[HS_SERVICE::INDEX].size_pct}}, - {HS_SERVICE::REPLICATION, - {.size_pct = svc_params[HS_SERVICE::REPLICATION].size_pct, - .alloc_type = svc_params[HS_SERVICE::REPLICATION].blkalloc_type, - .chunk_sel_type = svc_params[HS_SERVICE::REPLICATION].custom_chunk_selector - ? chunk_selector_type_t::CUSTOM - : chunk_selector_type_t::ROUND_ROBIN}}}); + hsi->format_and_start({{HS_SERVICE::META, {.size_pct = svc_params[HS_SERVICE::META].size_pct}}, + {HS_SERVICE::LOG_REPLICATED, + {.size_pct = 1, + .chunk_size = svc_params[HS_SERVICE::LOG_REPLICATED].chunk_size, + .vdev_size_type = svc_params[HS_SERVICE::LOG_REPLICATED].vdev_size_type}}, + {HS_SERVICE::LOG_LOCAL, + {.size_pct = 1, + .chunk_size = svc_params[HS_SERVICE::LOG_LOCAL].chunk_size, + .vdev_size_type = svc_params[HS_SERVICE::LOG_LOCAL].vdev_size_type}}, + {HS_SERVICE::DATA, + {.size_pct = svc_params[HS_SERVICE::DATA].size_pct, + .num_chunks = svc_params[HS_SERVICE::DATA].num_chunks, + .alloc_type = svc_params[HS_SERVICE::DATA].blkalloc_type, + .chunk_sel_type = svc_params[HS_SERVICE::DATA].custom_chunk_selector + ? chunk_selector_type_t::CUSTOM + : chunk_selector_type_t::ROUND_ROBIN}}, + {HS_SERVICE::INDEX, {.size_pct = svc_params[HS_SERVICE::INDEX].size_pct}}, + {HS_SERVICE::REPLICATION, + {.size_pct = svc_params[HS_SERVICE::REPLICATION].size_pct, + .alloc_type = svc_params[HS_SERVICE::REPLICATION].blkalloc_type, + .chunk_sel_type = svc_params[HS_SERVICE::REPLICATION].custom_chunk_selector + ? chunk_selector_type_t::CUSTOM + : chunk_selector_type_t::ROUND_ROBIN}}}); } } diff --git a/src/tests/test_journal_vdev.cpp b/src/tests/test_journal_vdev.cpp index 693eb0925..a34eae090 100644 --- a/src/tests/test_journal_vdev.cpp +++ b/src/tests/test_journal_vdev.cpp @@ -22,6 +22,7 @@ #include #include #include +#include #include #include @@ -42,7 +43,7 @@ using namespace homestore; RCU_REGISTER_INIT SISL_LOGGING_INIT(HOMESTORE_LOG_MODS) -SISL_OPTIONS_ENABLE(logging, test_vdev, iomgr, test_common_setup) +SISL_OPTIONS_ENABLE(logging, test_journal_vdev, iomgr, test_common_setup) std::vector< std::string > test_common::HSTestHelper::s_dev_names; struct Param { @@ -57,33 +58,59 @@ struct Param { uint32_t max_wrt_sz; uint32_t truncate_watermark_percentage; }; -SISL_LOGGING_DECL(test_vdev) +SISL_LOGGING_DECL(test_journal_vdev) static Param gp; // trigger truncate when used space ratio reaches more than 80% constexpr uint32_t dma_alignment = 512; -class VDevIOTest : public ::testing::Test { +class VDevJournalIOTest : public ::testing::Test { +public: + const std::map< uint32_t, test_common::HSTestHelper::test_params > svc_params = {}; + + virtual void SetUp() override { + auto const ndevices = SISL_OPTIONS["num_devs"].as< uint32_t >(); + auto const dev_size = SISL_OPTIONS["dev_size_mb"].as< uint64_t >() * 1024 * 1024; + test_common::HSTestHelper::start_homestore( + "test_journal_vdev", + {{HS_SERVICE::META, {.size_pct = 15.0}}, + {HS_SERVICE::LOG_REPLICATED, + {.chunk_size = 32 * 1024 * 1024, .vdev_size_type = vdev_size_type_t::VDEV_SIZE_DYNAMIC}}, + {HS_SERVICE::LOG_LOCAL, + {.chunk_size = 32 * 1024 * 1024, .vdev_size_type = vdev_size_type_t::VDEV_SIZE_DYNAMIC}}}, + nullptr /* starting_cb */, false /* restart */); + } + + virtual void TearDown() override { test_common::HSTestHelper::shutdown_homestore(); } + + void restart_homestore() { + test_common::HSTestHelper::start_homestore( + "test_journal_vdev", + {{HS_SERVICE::META, {.size_pct = 15.0}}, + {HS_SERVICE::LOG_REPLICATED, + {.chunk_size = 32 * 1024 * 1024, .vdev_size_type = vdev_size_type_t::VDEV_SIZE_DYNAMIC}}, + {HS_SERVICE::LOG_LOCAL, + {.chunk_size = 32 * 1024 * 1024, .vdev_size_type = vdev_size_type_t::VDEV_SIZE_DYNAMIC}}}, + nullptr /* starting_cb */, true /* restart */); + } +}; + +class JournalDescriptorTest { struct write_info { uint64_t size; uint64_t crc; }; public: - virtual void SetUp() override { - auto const ndevices = SISL_OPTIONS["num_devs"].as< uint32_t >(); - auto const dev_size = SISL_OPTIONS["dev_size_mb"].as< uint64_t >() * 1024 * 1024; + JournalDescriptorTest(logdev_id_t id) : m_logdev_id(id) { reinit(); } - test_common::HSTestHelper::start_homestore("test_journal_vdev", - {{HS_SERVICE::META, {.size_pct = 15.0}}, - {HS_SERVICE::LOG_REPLICATED, {.size_pct = 75.0}}, - {HS_SERVICE::LOG_LOCAL, {.size_pct = 5.0}}}); + std::shared_ptr< JournalVirtualDev::Descriptor > vdev_jd() { return m_vdev_jd; } - m_vdev = hs()->logstore_service().get_vdev(homestore::LogStoreService::DATA_LOG_FAMILY_IDX); + void reinit() { + auto vdev = hs()->logstore_service().get_vdev(homestore::LogStoreService::DATA_LOG_FAMILY_IDX); + m_vdev_jd = vdev->open(m_logdev_id); } - virtual void TearDown() override { test_common::HSTestHelper::shutdown_homestore(); } - uint64_t get_elapsed_time(Clock::time_point start) { std::chrono::seconds sec = std::chrono::duration_cast< std::chrono::seconds >(Clock::now() - start); return sec.count(); @@ -108,10 +135,9 @@ class VDevIOTest : public ::testing::Test { return false; } - void execute() { + void longrunning() { m_start_time = Clock::now(); // m_store = HomeBlks::instance()->get_data_logdev_blkstore(); - m_total_size = m_vdev->size(); while (keep_running()) { if (do_write()) { @@ -129,7 +155,7 @@ class VDevIOTest : public ::testing::Test { // get random truncate offset between [m_start_off + used_space*20%, m_start_off + used_space*80%], rounded up to // 512 bytes; off_t get_rand_truncate_offset() { - auto used_space = m_vdev->used_size(); + auto used_space = m_vdev_jd->used_size(); static thread_local std::random_device rd{}; static thread_local std::default_random_engine generator{rd()}; @@ -137,24 +163,25 @@ class VDevIOTest : public ::testing::Test { m_start_off + (used_space * 4) / 5}; auto rand_off = dist(generator); - auto off = sisl::round_up(rand_off, dma_alignment); + off_t off = sisl::round_up(rand_off, dma_alignment); // if it is larger than total size, ring back to valid offset; - return off >= m_total_size ? (off - m_total_size) : off; + assert(off <= m_vdev_jd->end_offset()); + return off >= m_vdev_jd->end_offset() ? (off - m_vdev_jd->end_offset()) : off; } void truncate(off_t off_to_truncate) { - LOGINFO("truncating to offset: 0x{}, start: 0x{}, tail: 0x{}", to_hex(off_to_truncate), - to_hex(m_vdev->data_start_offset()), to_hex(m_vdev->tail_offset())); + LOGDEBUG("truncating to offset: 0x{}, start: 0x{}, tail: 0x{}", to_hex(off_to_truncate), + to_hex(m_vdev_jd->data_start_offset()), to_hex(m_vdev_jd->tail_offset())); validate_truncate_offset(off_to_truncate); - auto tail_before = m_vdev->tail_offset(); - m_vdev->truncate(off_to_truncate); - auto tail_after = m_vdev->tail_offset(); + auto tail_before = m_vdev_jd->tail_offset(); + m_vdev_jd->truncate(off_to_truncate); + auto tail_after = m_vdev_jd->tail_offset(); HS_DBG_ASSERT_EQ(tail_before, tail_after); - HS_DBG_ASSERT_EQ(off_to_truncate, m_vdev->data_start_offset()); + HS_DBG_ASSERT_EQ(off_to_truncate, m_vdev_jd->data_start_offset()); if (off_to_truncate > m_start_off) { // remove the offsets before truncate offset, since they are not valid for read anymore; @@ -167,8 +194,8 @@ class VDevIOTest : public ::testing::Test { } } } else { // truncate offset is before m_start_off; - LOGINFO("truncating before start offset, looping back to beginning devices at offset: 0x{}", - to_hex(off_to_truncate)); + LOGDEBUG("truncating before start offset, looping back to beginning devices at offset: 0x{}", + to_hex(off_to_truncate)); m_truncate_loop_back_cnt++; // remove the offsets before truncate offset and after m_start_off; for (auto it = m_off_to_info_map.begin(); it != m_off_to_info_map.end();) { @@ -192,50 +219,51 @@ class VDevIOTest : public ::testing::Test { auto elapsed_time = get_elapsed_time(pt_start); if (elapsed_time > print_every_n_secs) { - LOGINFO("write: {}, read: {}, truncate: {}, truncate_loop_back: {}", m_wrt_cnt, m_read_cnt, m_truncate_cnt, - m_truncate_loop_back_cnt); + LOGDEBUG("write: {}, read: {}, truncate: {}, truncate_loop_back: {}", m_wrt_cnt, m_read_cnt, m_truncate_cnt, + m_truncate_loop_back_cnt); pt_start = Clock::now(); } } void space_usage_asserts() { - auto used_space = m_vdev->used_size(); - auto start_off = m_vdev->data_start_offset(); + auto used_space = m_vdev_jd->used_size(); + auto start_off = m_vdev_jd->data_start_offset(); - HS_DBG_ASSERT_GT(m_total_size, 0); - HS_DBG_ASSERT_LT(used_space, m_total_size); + HS_DBG_ASSERT_GT(m_vdev_jd->size(), 0); + HS_DBG_ASSERT_LT(used_space, m_vdev_jd->size()); HS_DBG_ASSERT_EQ(start_off, m_start_off); } bool time_to_truncate() { - auto used_space = m_vdev->used_size(); + auto used_space = m_vdev_jd->used_size(); - if (gp.truncate_watermark_percentage <= (100 * used_space / m_total_size)) { return true; } + if (gp.truncate_watermark_percentage <= (100 * used_space / m_vdev_jd->size())) { return true; } return false; } void validate_truncate_offset(off_t off) { - HS_DBG_ASSERT_LE((uint64_t)off, m_total_size); + HS_DBG_ASSERT_LE(off, m_vdev_jd->end_offset()); validate_read_offset(off); } void validate_write_offset(off_t off, uint64_t sz) { - auto tail_offset = m_vdev->tail_offset(); - auto start_offset = m_vdev->data_start_offset(); + auto tail_offset = m_vdev_jd->tail_offset(); + auto start_offset = m_vdev_jd->data_start_offset(); - HS_DBG_ASSERT_LE(off + sz, m_vdev->size()); + HS_DBG_ASSERT_LE(off + static_cast< off_t >(sz), m_vdev_jd->end_offset()); - if ((off + sz) == m_vdev->size()) { - HS_DBG_ASSERT_EQ((uint64_t)0, (uint64_t)tail_offset); + if ((off + sz) == m_vdev_jd->size()) { + // TODO confirm if this needed if no loop back. + // HS_DBG_ASSERT_EQ((uint64_t)0, (uint64_t)tail_offset); } else { HS_DBG_ASSERT_EQ((uint64_t)(off + sz), (uint64_t)tail_offset); } } void validate_read_offset(off_t off) { - auto tail_offset = m_vdev->tail_offset(); - auto start_offset = m_vdev->data_start_offset(); + auto tail_offset = m_vdev_jd->tail_offset(); + auto start_offset = m_vdev_jd->data_start_offset(); HS_DBG_ASSERT_EQ(m_start_off, start_offset); if (start_offset < tail_offset) { @@ -251,24 +279,39 @@ class VDevIOTest : public ::testing::Test { auto it = m_off_to_info_map.begin(); std::advance(it, rand() % m_off_to_info_map.size()); auto off_to_read = it->first; + read_and_validate(off_to_read); + } + + void read_all() { + // Validate all the offsets. + for (const auto& iter : m_off_to_info_map) { + read_and_validate(iter.first); + } + } - LOGDEBUG("reading on offset: 0x{}, size: {}, start: 0x{}, tail: 0x{}", to_hex(off_to_read), it->second.size, - to_hex(m_start_off), to_hex(m_vdev->tail_offset())); + void read_and_validate(off_t off_to_read) { + auto& write_info = m_off_to_info_map[off_to_read]; + LOGDEBUG("reading on offset: 0x{}, size: {}, start: 0x{}, tail: 0x{}", to_hex(off_to_read), write_info.size, + to_hex(m_start_off), to_hex(m_vdev_jd->tail_offset())); - // validate_read_offset(off_to_read); + validate_read_offset(off_to_read); - auto buf = iomanager.iobuf_alloc(512, it->second.size); - m_vdev->sync_pread(buf, (size_t)it->second.size, (off_t)off_to_read); + auto buf = iomanager.iobuf_alloc(512, write_info.size); + auto ec = m_vdev_jd->sync_pread(buf, (size_t)write_info.size, (off_t)off_to_read); + HS_REL_ASSERT(!ec, "Error in reading"); + auto count = *(uint64_t*)buf; - auto crc = util::Hash64((const char*)buf, (size_t)it->second.size); - HS_DBG_ASSERT_EQ(crc, it->second.crc, "CRC Mismatch: read out crc: {}, saved write: {}", crc, it->second.crc); + auto crc = util::Hash64((const char*)buf, (size_t)write_info.size); + HS_DBG_ASSERT_EQ(crc, write_info.crc, + "CRC Mismatch: offset: 0x{} size: {} count: {} read out crc: {}, saved write: {}", + to_hex(off_to_read), write_info.size, count, crc, write_info.crc); iomanager.iobuf_free(buf); m_read_cnt++; } void random_write() { auto sz_to_wrt = rand_size(); - auto off_to_wrt = m_vdev->alloc_next_append_blk(sz_to_wrt); + auto off_to_wrt = m_vdev_jd->alloc_next_append_blk(sz_to_wrt); auto it = m_off_to_info_map.find(off_to_wrt); if (it != m_off_to_info_map.end()) { @@ -280,19 +323,21 @@ class VDevIOTest : public ::testing::Test { validate_write_offset(off_to_wrt, sz_to_wrt); - LOGDEBUG("writing to offset: 0x{}, size: {}, start: 0x{}, tail: 0x{}", to_hex(off_to_wrt), sz_to_wrt, - to_hex(m_start_off), to_hex(m_vdev->tail_offset())); - + m_wrt_cnt++; auto buf = iomanager.iobuf_alloc(512, sz_to_wrt); gen_rand_buf(buf, sz_to_wrt); + *(uint64_t*)buf = m_wrt_cnt; - m_vdev->sync_pwrite(buf, sz_to_wrt, off_to_wrt); - HS_DBG_ASSERT_LT((size_t)off_to_wrt, (size_t)m_total_size); + m_vdev_jd->sync_pwrite(buf, sz_to_wrt, off_to_wrt); + HS_DBG_ASSERT_LT((size_t)off_to_wrt, (size_t)m_vdev_jd->end_offset()); - m_wrt_cnt++; m_off_to_info_map[off_to_wrt].size = sz_to_wrt; m_off_to_info_map[off_to_wrt].crc = util::Hash64((const char*)buf, (size_t)sz_to_wrt); + LOGDEBUG("writing to bytes offset: 0x{}, size: {}, write_count: {} start: 0x{}, tail: 0x{} crc: 0x{}", + to_hex(off_to_wrt), sz_to_wrt, m_wrt_cnt, to_hex(m_start_off), to_hex(m_vdev_jd->tail_offset()), + m_off_to_info_map[off_to_wrt].crc); + iomanager.iobuf_free(buf); } @@ -315,20 +360,115 @@ class VDevIOTest : public ::testing::Test { } private: + logdev_id_t m_logdev_id = 0; off_t m_start_off = 0; uint64_t m_wrt_cnt = 0; uint64_t m_read_cnt = 0; uint64_t m_truncate_cnt = 0; uint64_t m_truncate_loop_back_cnt = 0; - uint64_t m_total_size = 0; std::map< off_t, write_info > m_off_to_info_map; Clock::time_point m_start_time; - std::shared_ptr< JournalVirtualDev > m_vdev; + std::shared_ptr< JournalVirtualDev::Descriptor > m_vdev_jd; + friend class VDevJournalIOTest; }; -TEST_F(VDevIOTest, VDevIOTest) { this->execute(); } +// TODO add more tests covering unclean shutdown and more corner cases. + +TEST_F(VDevJournalIOTest, LongRunning) { + // Create multiple journal descriptors and run long running test + // of random read, write and truncate parallely. + auto num_vdev_jd = SISL_OPTIONS["num_vdev_jd"].as< uint32_t >(); + std::vector< std::thread > threads; + for (uint32_t i = 0; i < num_vdev_jd; i++) { + logdev_id_t id = i + 2; // 0 and 1 are used for data and control logdev family. + threads.emplace_back(std::thread([id] { JournalDescriptorTest(id).longrunning(); })); + } + for (auto& t : threads) + t.join(); +} + +TEST_F(VDevJournalIOTest, Recovery) { + // Create multiple journal descriptors and add log entries, truncate + // do restart, add more entries and verify the log entries. + gp.fixed_wrt_sz_enabled = true; + gp.fixed_wrt_sz = 1024; + int num_entries = 1024; + + auto num_vdev_jd = SISL_OPTIONS["num_vdev_jd"].as< uint32_t >(); + std::vector< JournalDescriptorTest > tests; + for (uint32_t i = 0; i < num_vdev_jd; i++) { + logdev_id_t id = i + 2; // 0 and 1 are used for data and control logdev family. + tests.emplace_back(JournalDescriptorTest(id)); + } + + LOGINFO("Add log entries"); + // Write 512k logs which should create more chunks dynamically. + for (uint32_t i = 0; i < tests.size(); i++) { + for (int j = 0; j < num_entries; j++) { + tests[i].random_write(); + } + } + + // Validate all logs. + LOGINFO("Validate log entries"); + for (uint32_t i = 0; i < tests.size(); i++) { + tests[i].read_all(); + } + + // Truncate and validate logs. + for (uint32_t i = 0; i < tests.size(); i++) { + tests[i].truncate(tests[i].get_rand_truncate_offset()); + tests[i].read_all(); + } + + LOGINFO("Restart homestore"); + + // Record the offsets of the journal descriptors. + std::vector< uint64_t > last_tail_offset, last_start_offset; + for (uint32_t i = 0; i < tests.size(); i++) { + auto vdev_jd = tests[i].vdev_jd(); + last_tail_offset.push_back(vdev_jd->tail_offset()); + last_start_offset.push_back(vdev_jd->data_start_offset()); + } + + // Restart homestore. + restart_homestore(); + + // Set the offsets after restart. + for (uint32_t i = 0; i < tests.size(); i++) { + tests[i].reinit(); + auto vdev_jd = tests[i].vdev_jd(); + vdev_jd->update_data_start_offset(last_start_offset[i]); + vdev_jd->update_tail_offset(last_tail_offset[i]); + } + + // Validate all logs. + for (uint32_t i = 0; i < tests.size(); i++) { + tests[i].read_all(); + } + + LOGINFO("Add log entries"); + // Write 512k logs which should create more chunks dynamically. + for (uint32_t i = 0; i < tests.size(); i++) { + for (int j = 0; j < num_entries; j++) { + tests[i].random_write(); + } + } + + // Validate all logs. + for (uint32_t i = 0; i < tests.size(); i++) { + tests[i].read_all(); + } + + // Truncate and validate all logs. + LOGINFO("Validate log entries"); + for (uint32_t i = 0; i < tests.size(); i++) { + tests[i].truncate(tests[i].get_rand_truncate_offset()); + tests[i].read_all(); + } +} -SISL_OPTION_GROUP(test_vdev, +SISL_OPTION_GROUP(test_journal_vdev, (truncate_watermark_percentage, "", "truncate_watermark_percentage", "percentage of space usage to trigger truncate", ::cxxopts::value< uint32_t >()->default_value("80"), "number"), @@ -345,12 +485,15 @@ SISL_OPTION_GROUP(test_vdev, (per_read, "", "per_read", "read percentage of io that are reads", ::cxxopts::value< uint32_t >()->default_value("20"), "number"), (per_write, "", "per_write", "write percentage of io that are writes", - ::cxxopts::value< uint32_t >()->default_value("80"), "number")); + ::cxxopts::value< uint32_t >()->default_value("80"), "number"), + (num_vdev_jd, "", "num_vdev_jd", "number of descriptors for journal vdev", + ::cxxopts::value< uint32_t >()->default_value("4"), "number")); int main(int argc, char* argv[]) { - SISL_OPTIONS_LOAD(argc, argv, logging, test_vdev, iomgr, test_common_setup); - ::testing::InitGoogleTest(&argc, argv); - sisl::logging::SetLogger("test_vdev"); + int parsed_argc = argc; + ::testing::InitGoogleTest(&parsed_argc, argv); + SISL_OPTIONS_LOAD(parsed_argc, argv, logging, test_journal_vdev, iomgr, test_common_setup); + sisl::logging::SetLogger("test_journal_vdev"); spdlog::set_pattern("[%D %T%z] [%^%l%$] [%n] [%t] %v"); gp.num_io = SISL_OPTIONS["num_io"].as< uint64_t >(); diff --git a/src/tests/test_log_store.cpp b/src/tests/test_log_store.cpp index 656092105..2cfb94b74 100644 --- a/src/tests/test_log_store.cpp +++ b/src/tests/test_log_store.cpp @@ -436,8 +436,8 @@ class SampleDB { test_common::HSTestHelper::start_homestore( "test_log_store", {{HS_SERVICE::META, {.size_pct = 5.0}}, - {HS_SERVICE::LOG_REPLICATED, {.size_pct = 42.0}}, - {HS_SERVICE::LOG_LOCAL, {.size_pct = 42.0}}}, + {HS_SERVICE::LOG_REPLICATED, {.size_pct = 5.0, .chunk_size = 32 * 1024 * 1024}}, + {HS_SERVICE::LOG_LOCAL, {.size_pct = 5.0, .chunk_size = 32 * 1024 * 1024}}}, [this, restart, n_log_stores]() { if (restart) { for (uint32_t i{0}; i < n_log_stores; ++i) {