diff --git a/conanfile.py b/conanfile.py index c381ebe03..c7c6aef09 100644 --- a/conanfile.py +++ b/conanfile.py @@ -5,7 +5,8 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "5.0.5" + version = "5.0.6" + homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" topics = ("ebay", "nublox") diff --git a/src/include/homestore/homestore.hpp b/src/include/homestore/homestore.hpp index 263986639..97bc2bd1c 100644 --- a/src/include/homestore/homestore.hpp +++ b/src/include/homestore/homestore.hpp @@ -56,8 +56,7 @@ class ReplApplication; using HomeStoreSafePtr = std::shared_ptr< HomeStore >; -VENUM(hs_vdev_type_t, uint32_t, DATA_VDEV = 1, INDEX_VDEV = 2, META_VDEV = 3, DATA_LOGDEV_VDEV = 4, - CTRL_LOGDEV_VDEV = 5); +VENUM(hs_vdev_type_t, uint32_t, DATA_VDEV = 1, INDEX_VDEV = 2, META_VDEV = 3, LOGDEV_VDEV = 4); #pragma pack(1) struct hs_vdev_context { @@ -76,11 +75,10 @@ struct hs_stats { struct HS_SERVICE { static constexpr uint32_t META = 1 << 0; - static constexpr uint32_t LOG_REPLICATED = 1 << 1; - static constexpr uint32_t LOG_LOCAL = 1 << 2; - static constexpr uint32_t DATA = 1 << 3; - static constexpr uint32_t INDEX = 1 << 4; - static constexpr uint32_t REPLICATION = 1 << 5; + static constexpr uint32_t LOG = 1 << 1; + static constexpr uint32_t DATA = 1 << 2; + static constexpr uint32_t INDEX = 1 << 3; + static constexpr uint32_t REPLICATION = 1 << 4; uint32_t svcs; @@ -91,8 +89,7 @@ struct HS_SERVICE { if (svcs & META) { str += "meta,"; } if (svcs & DATA) { str += "data,"; } if (svcs & INDEX) { str += "index,"; } - if (svcs & LOG_REPLICATED) { str += "log_replicated,"; } - if (svcs & LOG_LOCAL) { str += "log_local,"; } + if (svcs & LOG) { str += "log,"; } if (svcs & REPLICATION) { str += "replication,"; } return str; } diff --git a/src/include/homestore/logstore/log_store.hpp b/src/include/homestore/logstore/log_store.hpp index 6b46c69e6..dfb725bb5 100644 --- a/src/include/homestore/logstore/log_store.hpp +++ b/src/include/homestore/logstore/log_store.hpp @@ -35,7 +35,6 @@ namespace homestore { -class LogStoreFamily; class LogDev; class LogStoreServiceMetrics; @@ -44,7 +43,7 @@ typedef std::function< void(logstore_seq_num_t) > on_rollback_cb_t; class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > { public: - HomeLogStore(LogStoreFamily& family, logstore_id_t id, bool append_mode, logstore_seq_num_t start_lsn); + HomeLogStore(std::shared_ptr< LogDev > logdev, logstore_id_t id, bool append_mode, logstore_seq_num_t start_lsn); HomeLogStore(const HomeLogStore&) = delete; HomeLogStore(HomeLogStore&&) noexcept = delete; HomeLogStore& operator=(const HomeLogStore&) = delete; @@ -266,7 +265,7 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > { auto seq_num() const { return m_seq_num.load(std::memory_order_acquire); } - LogStoreFamily& get_family() { return m_logstore_family; } + std::shared_ptr< LogDev > get_logdev() { return m_logdev; } nlohmann::json dump_log_store(const log_dump_req& dump_req = log_dump_req()); @@ -285,8 +284,7 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > { int search_max_le(logstore_seq_num_t input_sn); logstore_id_t m_store_id; - LogStoreFamily& m_logstore_family; - LogDev& m_logdev; + std::shared_ptr< LogDev > m_logdev; sisl::StreamTracker< logstore_record > m_records; bool m_append_mode{false}; log_req_comp_cb_t m_comp_cb; diff --git a/src/include/homestore/logstore/log_store_internal.hpp b/src/include/homestore/logstore/log_store_internal.hpp index 7e442c48d..f8cd6123e 100644 --- a/src/include/homestore/logstore/log_store_internal.hpp +++ b/src/include/homestore/logstore/log_store_internal.hpp @@ -44,12 +44,14 @@ typedef std::function< void(logstore_req*, logdev_key) > log_req_comp_cb_t; typedef sisl::byte_view log_buffer; typedef uint32_t logstore_id_t; typedef uint8_t logstore_family_id_t; +typedef uint32_t logdev_id_t; typedef std::function< void(logstore_req*, logdev_key) > log_req_comp_cb_t; typedef std::function< void(logstore_seq_num_t, sisl::io_blob&, logdev_key, void*) > log_write_comp_cb_t; typedef std::function< void(logstore_seq_num_t, log_buffer, void*) > log_found_cb_t; typedef std::function< void(std::shared_ptr< HomeLogStore >) > log_store_opened_cb_t; typedef std::function< void(std::shared_ptr< HomeLogStore >, logstore_seq_num_t) > log_replay_done_cb_t; +typedef std::function< void(const std::unordered_map< logdev_id_t, logdev_key >&) > device_truncate_cb_t; typedef int64_t logid_t; diff --git a/src/include/homestore/logstore_service.hpp b/src/include/homestore/logstore_service.hpp index c10423e27..7e915c5cc 100644 --- a/src/include/homestore/logstore_service.hpp +++ b/src/include/homestore/logstore_service.hpp @@ -41,7 +41,6 @@ class LogStoreServiceMetrics : public sisl::MetricsGroup { LogStoreServiceMetrics& operator=(LogStoreServiceMetrics&&) noexcept = delete; }; -class LogStoreFamily; class HomeLogStore; class LogDev; struct logdev_key; @@ -49,18 +48,13 @@ class VirtualDev; class JournalVirtualDev; struct vdev_info; struct log_dump_req; +struct logdev_superblk; class LogStoreService { friend class HomeLogStore; - friend class LogStoreFamily; friend class LogDev; public: - static constexpr logstore_family_id_t DATA_LOG_FAMILY_IDX{0}; - static constexpr logstore_family_id_t CTRL_LOG_FAMILY_IDX{1}; - static constexpr size_t num_log_families = CTRL_LOG_FAMILY_IDX + 1; - typedef std::function< void(const std::array< logdev_key, num_log_families >&) > device_truncate_cb_t; - LogStoreService(); LogStoreService(const LogStoreService&) = delete; LogStoreService(LogStoreService&&) noexcept = delete; @@ -81,37 +75,52 @@ class LogStoreService { */ void stop(); + /** + * @brief Create a brand new log dev. + * + * @return Newly created log dev id. + */ + logdev_id_t create_new_logdev(); + + /** + * @brief Open a log dev. + * + * @param logdev_id: Logdev ID + * @return Newly created log dev id. + */ + void open_logdev(logdev_id_t logdev_id); + /** * @brief Create a brand new log store (both in-memory and on device) and returns its instance. It also book * keeps the created log store and user can get this instance of log store by using logstore_id * - * @param family_id: Logstores can be created on different log_devs. As of now we only support data log_dev and - * ctrl log dev. The idx indicates which log device it is from. Its a mandatory parameter. + * @param logdev_id: Logstores can be created on different log_devs. * @param append_mode: If the log store have to be in append mode, user can call append_async and do not need to * maintain the log_idx. Else user is expected to keep track of the log idx. Default to false * * @return std::shared_ptr< HomeLogStore > */ - std::shared_ptr< HomeLogStore > create_new_log_store(const logstore_family_id_t family_id, - const bool append_mode = false); + std::shared_ptr< HomeLogStore > create_new_log_store(const logdev_id_t logdev_id, const bool append_mode = false); /** * @brief Open an existing log store and does a recovery. It then creates an instance of this logstore and * returns - * + * @param logdev_id: Logdev ID of the log store to close * @param store_id: Store ID of the log store to open + * @param append_mode: Append or not. + * @param on_open_cb: Callback to be called once log store is opened. * @return std::shared_ptr< HomeLogStore > */ - void open_log_store(const logstore_family_id_t family_id, const logstore_id_t store_id, const bool append_mode, + void open_log_store(const logdev_id_t logdev_id, const logstore_id_t store_id, const bool append_mode, const log_store_opened_cb_t& on_open_cb); /** * @brief Close the log store instance and free-up the resources - * + * @param logdev_id: Logdev ID of the log store to close * @param store_id: Store ID of the log store to close * @return true on success */ - bool close_log_store(const logstore_family_id_t family_id, const logstore_id_t store_id) { + bool close_log_store(const logdev_id_t logdev_id, const logstore_id_t store_id) { // TODO: Implement this method return true; } @@ -122,7 +131,7 @@ class LogStoreService { * * @param store_id */ - void remove_log_store(const logstore_family_id_t family_id, const logstore_id_t store_id); + void remove_log_store(const logdev_id_t logdev_id, const logstore_id_t store_id); /** * @brief Schedule a truncate all the log stores physically on the device. @@ -135,21 +144,16 @@ class LogStoreService { void device_truncate(const device_truncate_cb_t& cb = nullptr, const bool wait_till_done = false, const bool dry_run = false); - folly::Future< std::error_code > create_vdev(uint64_t size, logstore_family_id_t family, uint32_t chunk_size); - shared< VirtualDev > open_vdev(const vdev_info& vinfo, logstore_family_id_t family, bool load_existing); - shared< JournalVirtualDev > get_vdev(logstore_family_id_t family) const { - return (family == DATA_LOG_FAMILY_IDX) ? m_data_logdev_vdev : m_ctrl_logdev_vdev; - } + folly::Future< std::error_code > create_vdev(uint64_t size, uint32_t chunk_size); + shared< VirtualDev > open_vdev(const vdev_info& vinfo, bool load_existing); + shared< JournalVirtualDev > get_vdev() const { return m_logdev_vdev; } + std::vector< std::shared_ptr< LogDev > > get_all_logdev(); + std::shared_ptr< LogDev > get_logdev(logdev_id_t id); nlohmann::json dump_log_store(const log_dump_req& dum_req); nlohmann::json get_status(const int verbosity) const; LogStoreServiceMetrics& metrics() { return m_metrics; } - LogStoreFamily* data_log_family() { return m_logstore_families[DATA_LOG_FAMILY_IDX].get(); } - LogStoreFamily* ctrl_log_family() { return m_logstore_families[CTRL_LOG_FAMILY_IDX].get(); } - - LogDev& data_logdev(); - LogDev& ctrl_logdev(); uint32_t used_size() const; uint32_t total_size() const; @@ -157,13 +161,18 @@ class LogStoreService { iomgr::io_fiber_t truncate_thread() { return m_truncate_fiber; } private: + std::shared_ptr< LogDev > create_new_logdev_internal(logdev_id_t logdev_id); + void logdev_super_blk_found(const sisl::byte_view& buf, void* meta_cookie); + void rollback_super_blk_found(const sisl::byte_view& buf, void* meta_cookie); void start_threads(); void flush_if_needed(); private: - std::array< std::unique_ptr< LogStoreFamily >, num_log_families > m_logstore_families; - std::shared_ptr< JournalVirtualDev > m_data_logdev_vdev; - std::shared_ptr< JournalVirtualDev > m_ctrl_logdev_vdev; + std::unordered_map< logdev_id_t, std::shared_ptr< LogDev > > m_id_logdev_map; + std::unique_ptr< sisl::IDReserver > m_id_reserver; + folly::SharedMutexWritePriority m_logdev_map_mtx; + + std::shared_ptr< JournalVirtualDev > m_logdev_vdev; iomgr::io_fiber_t m_truncate_fiber; iomgr::io_fiber_t m_flush_fiber; LogStoreServiceMetrics m_metrics; diff --git a/src/lib/common/homestore_config.fbs b/src/lib/common/homestore_config.fbs index 95f523d81..626870741 100644 --- a/src/lib/common/homestore_config.fbs +++ b/src/lib/common/homestore_config.fbs @@ -24,12 +24,12 @@ table BlkAllocator { /* Count of free blks cache in-terms of device size */ free_blk_cache_count_by_vdev_percent: double = 80.0; - /* Percentage of overall memory allocated for blkallocator free blks cache. The memory allocated effictively is the + /* Percentage of overall memory allocated for blkallocator free blks cache. The memory allocated effictively is the * min of memory occupied by (free_blk_cache_size_by_vdev_percent, max_free_blk_cache_memory_percent) */ max_free_blk_cache_memory_percent: double = 1.0; /* Free blk cache slab distribution percentage - * An example assuming blk_size=4K is [4K, 8K, 16K, 32K, 64K, 128K, 256K, 512K, 1M, 2M, 4M, 8M, 16M] + * An example assuming blk_size=4K is [4K, 8K, 16K, 32K, 64K, 128K, 256K, 512K, 1M, 2M, 4M, 8M, 16M] * free_blk_slab_distribution : [double] = [20.0, 10.0, 10.0, 10.0, 35.0, 3.0, 3.0, 3.0, 2.0, 1.0, 1.0, 1.0, 1.0] */ free_blk_slab_distribution : [double]; @@ -54,7 +54,7 @@ table BlkAllocator { } table Btree { - max_nodes_to_rebalance: uint32 = 3; + max_nodes_to_rebalance: uint32 = 3; mem_btree_page_size: uint32 = 8192; } @@ -78,9 +78,9 @@ table Device { // Max completions to process per event in a thread max_completions_process_per_event_per_thread: uint32 = 200; - + // DIRECT_IO mode, switch for HDD IO mode; - direct_io_mode: bool = false; + direct_io_mode: bool = false; } table LogStore { @@ -107,8 +107,7 @@ table LogStore { try_flush_iteration: uint64 = 10240(hotswap); // Logdev flushes in multiples of this size, setting to 0 will make it use default device optimal size - flush_size_multiple_data_logdev: uint64 = 0; - flush_size_multiple_ctrl_logdev: uint64 = 512; + flush_size_multiple_logdev: uint64 = 512; // Logdev will flush the logs only in a dedicated thread. Turn this on, if flush IO doesn't want to // intervene with data IO path. @@ -137,7 +136,7 @@ table Generic { // number of threads for btree writes; num_btree_write_threads : uint32 = 2; - // percentage of cache used to create indx mempool. It should be more than 100 to + // percentage of cache used to create indx mempool. It should be more than 100 to // take into account some floating buffers in writeback cache. indx_mempool_percent : uint32 = 110; @@ -148,13 +147,13 @@ table Generic { table ResourceLimits { /* it is going to use 2 times of this space because of two concurrent cps */ dirty_buf_percent: uint32 = 1 (hotswap); - + /* it is going to use 2 times of this space because of two concurrent cps */ free_blk_cnt: uint32 = 10000000 (hotswap); free_blk_size_percent: uint32 = 2 (hotswap); - + /* Percentage of memory allocated for homestore cache */ - cache_size_percent: uint32 = 65; + cache_size_percent: uint32 = 65; /* precentage of memory used during recovery */ memory_in_recovery_precent: uint32 = 40; @@ -167,18 +166,18 @@ table ResourceLimits { } table MetaBlkStore { - // turn on/off compression feature + // turn on/off compression feature compress_feature_on : bool = true (hotswap); // turn on/off skip header check skip_header_size_check : bool = false (hotswap); - // Compress buffer larger than this memory limit in MB will not trigger compress; + // Compress buffer larger than this memory limit in MB will not trigger compress; max_compress_memory_size_mb: uint32 = 512 (hotswap); - + // Inital memory allocated for compress buffer init_compress_memory_size_mb: uint32 = 10 (hotswap); - + // Try to do compress only when input buffer is larger than this size min_compress_size_mb: uint32 = 1 (hotswap); @@ -188,7 +187,7 @@ table MetaBlkStore { // percentage of *free* root fs while dump to file for get_status; percent_of_free_space: uint32 = 10 (hotswap); - // meta sanity check interval + // meta sanity check interval sanity_check_interval: uint32 = 10 (hotswap); } diff --git a/src/lib/device/journal_vdev.hpp b/src/lib/device/journal_vdev.hpp index 899253a2a..13ec18c65 100644 --- a/src/lib/device/journal_vdev.hpp +++ b/src/lib/device/journal_vdev.hpp @@ -25,12 +25,11 @@ #include "device.h" #include "physical_dev.hpp" #include "virtual_dev.hpp" +#include namespace homestore { typedef std::function< void(const off_t ret_off) > alloc_next_blk_cb_t; using journal_id_t = uint64_t; -// Each logstore family is associated to a single logdevice. -using logdev_id_t = uint64_t; // Chunks used for journal vdev has journal related info stored in chunk private data. // Each log device has a list of journal chunk data with next_chunk. diff --git a/src/lib/homestore.cpp b/src/lib/homestore.cpp index b22164b5d..0946dab68 100644 --- a/src/lib/homestore.cpp +++ b/src/lib/homestore.cpp @@ -40,7 +40,6 @@ #include "device/virtual_dev.hpp" #include "common/resource_mgr.hpp" #include "meta/meta_sb.hpp" -#include "logstore/log_store_family.hpp" #include "replication/service/generic_repl_svc.h" /* @@ -78,13 +77,13 @@ HomeStore& HomeStore::with_index_service(std::unique_ptr< IndexServiceCallbacks } HomeStore& HomeStore::with_log_service() { - m_services.svcs |= HS_SERVICE::LOG_REPLICATED | HS_SERVICE::LOG_LOCAL; + m_services.svcs |= HS_SERVICE::LOG; return *this; } HomeStore& HomeStore::with_repl_data_service(cshared< ReplApplication >& repl_app, cshared< ChunkSelector >& custom_chunk_selector) { - m_services.svcs |= HS_SERVICE::REPLICATION | HS_SERVICE::LOG_REPLICATED | HS_SERVICE::LOG_LOCAL; + m_services.svcs |= HS_SERVICE::REPLICATION | HS_SERVICE::LOG; m_services.svcs &= ~HS_SERVICE::DATA; // ReplicationDataSvc or DataSvc are mutually exclusive s_repl_app = repl_app; s_custom_chunk_selector = std::move(custom_chunk_selector); @@ -163,12 +162,10 @@ void HomeStore::format_and_start(std::map< uint32_t, hs_format_params >&& format if ((svc_type & HS_SERVICE::META) && has_meta_service()) { m_meta_service->create_vdev(pct_to_size(fparams.size_pct, HSDevType::Fast), fparams.num_chunks); - } else if ((svc_type & HS_SERVICE::LOG_REPLICATED) && has_log_service()) { - futs.emplace_back(m_log_service->create_vdev(pct_to_size(fparams.size_pct, HSDevType::Fast), - LogStoreService::DATA_LOG_FAMILY_IDX, fparams.chunk_size)); - } else if ((svc_type & HS_SERVICE::LOG_LOCAL) && has_log_service()) { - futs.emplace_back(m_log_service->create_vdev(pct_to_size(fparams.size_pct, HSDevType::Fast), - LogStoreService::CTRL_LOG_FAMILY_IDX, fparams.chunk_size)); + + } else if ((svc_type & HS_SERVICE::LOG) && has_log_service()) { + futs.emplace_back( + m_log_service->create_vdev(pct_to_size(fparams.size_pct, HSDevType::Fast), fparams.chunk_size)); } else if ((svc_type & HS_SERVICE::DATA) && has_data_service()) { m_data_service->create_vdev(pct_to_size(fparams.size_pct, HSDevType::Data), fparams.block_size, fparams.alloc_type, fparams.chunk_sel_type, fparams.num_chunks); @@ -298,7 +295,7 @@ bool HomeStore::has_repl_data_service() const { return m_services.svcs & HS_SERV bool HomeStore::has_meta_service() const { return m_services.svcs & HS_SERVICE::META; } bool HomeStore::has_log_service() const { auto const s = m_services.svcs; - return (s & (HS_SERVICE::LOG_REPLICATED | HS_SERVICE::LOG_LOCAL)); + return (s & HS_SERVICE::LOG); } #if 0 @@ -336,18 +333,9 @@ shared< VirtualDev > HomeStore::create_vdev_cb(const vdev_info& vinfo, bool load auto vdev_context = r_cast< const hs_vdev_context* >(vinfo.get_user_private()); switch (vdev_context->type) { - case hs_vdev_type_t::DATA_LOGDEV_VDEV: - if (has_log_service()) { - ret_vdev = m_log_service->open_vdev(vinfo, LogStoreService::DATA_LOG_FAMILY_IDX, load_existing); - } - break; - - case hs_vdev_type_t::CTRL_LOGDEV_VDEV: - if (has_log_service()) { - ret_vdev = m_log_service->open_vdev(vinfo, LogStoreService::CTRL_LOG_FAMILY_IDX, load_existing); - } + case hs_vdev_type_t::LOGDEV_VDEV: + if (has_log_service()) { ret_vdev = m_log_service->open_vdev(vinfo, load_existing); } break; - case hs_vdev_type_t::META_VDEV: if (has_meta_service()) { ret_vdev = m_meta_service->open_vdev(vinfo, load_existing); } break; diff --git a/src/lib/logstore/CMakeLists.txt b/src/lib/logstore/CMakeLists.txt index 7f2cd749c..9a93bde75 100644 --- a/src/lib/logstore/CMakeLists.txt +++ b/src/lib/logstore/CMakeLists.txt @@ -10,7 +10,6 @@ target_sources(hs_logdev PRIVATE log_group.cpp log_stream.cpp log_store.cpp - log_store_family.cpp log_store_service.cpp ) target_link_libraries(hs_logdev ${COMMON_DEPS}) \ No newline at end of file diff --git a/src/lib/logstore/log_dev.cpp b/src/lib/logstore/log_dev.cpp index a51cf228e..ea758b838 100644 --- a/src/lib/logstore/log_dev.cpp +++ b/src/lib/logstore/log_dev.cpp @@ -35,21 +35,18 @@ namespace homestore { SISL_LOGGING_DECL(logstore) -#define THIS_LOGDEV_LOG(level, msg, ...) HS_SUBMOD_LOG(level, logstore, , "logdev", m_family_id, msg, __VA_ARGS__) +#define THIS_LOGDEV_LOG(level, msg, ...) HS_SUBMOD_LOG(level, logstore, , "logdev", m_logdev_id, msg, __VA_ARGS__) #define THIS_LOGDEV_PERIODIC_LOG(level, msg, ...) \ - HS_PERIODIC_DETAILED_LOG(level, logstore, "logdev", m_family_id, , , msg, __VA_ARGS__) + HS_PERIODIC_DETAILED_LOG(level, logstore, "logdev", m_logdev_id, , , msg, __VA_ARGS__) static bool has_data_service() { return HomeStore::instance()->has_data_service(); } // static BlkDataService& data_service() { return HomeStore::instance()->data_service(); } -LogDev::LogDev(const logstore_family_id_t f_id, const std::string& logdev_name) : - m_family_id{f_id}, m_logdev_meta{logdev_name} { - m_flush_size_multiple = 0; - if (f_id == LogStoreService::DATA_LOG_FAMILY_IDX) { - m_flush_size_multiple = HS_DYNAMIC_CONFIG(logstore->flush_size_multiple_data_logdev); - } else if (f_id == LogStoreService::CTRL_LOG_FAMILY_IDX) { - m_flush_size_multiple = HS_DYNAMIC_CONFIG(logstore->flush_size_multiple_ctrl_logdev); - } +LogDev::LogDev(const logdev_id_t id) : m_logdev_id{id} { + m_flush_size_multiple = HS_DYNAMIC_CONFIG(logstore->flush_size_multiple_logdev); + register_store_found_cb(bind_this(LogDev::on_log_store_found, 2)); + register_append_cb(bind_this(LogDev::on_io_completion, 5)); + register_logfound_cb(bind_this(LogDev::on_logfound, 6)); } LogDev::~LogDev() = default; @@ -61,7 +58,7 @@ void LogDev::start(bool format, JournalVirtualDev* vdev) { // Each family has one journal descriptor. m_vdev = vdev; - m_vdev_jd = m_vdev->open(m_family_id); + m_vdev_jd = m_vdev->open(m_logdev_id); RELEASE_ASSERT(m_vdev_jd, "Journal descriptor is null"); if (m_flush_size_multiple == 0) { m_flush_size_multiple = m_vdev->optimal_page_size(); } @@ -76,8 +73,7 @@ void LogDev::start(bool format, JournalVirtualDev* vdev) { // First read the info block if (format) { HS_LOG_ASSERT(m_logdev_meta.is_empty(), "Expected meta to be not present"); - m_logdev_meta.create(); - + m_logdev_meta.create(m_logdev_id); m_vdev_jd->update_data_start_offset(0); } else { HS_LOG_ASSERT(!m_logdev_meta.is_empty(), "Expected meta data to be read already before loading"); @@ -105,6 +101,8 @@ void LogDev::start(bool format, JournalVirtualDev* vdev) { if (m_pending_flush_size.load() && !m_is_flushing.load(std::memory_order_relaxed)) { flush_if_needed(); } }, true /* wait_to_schedule */); + + handle_unopened_log_stores(format); } void LogDev::stop() { @@ -129,6 +127,11 @@ void LogDev::stop() { // cancel the timer iomanager.cancel_timer(m_flush_timer_hdl, true); + { + folly::SharedMutexWritePriority::WriteHolder holder(m_store_map_mtx); + m_id_logstore_map.clear(); + } + m_log_records = nullptr; m_logdev_meta.reset(); m_log_idx.store(0); @@ -406,7 +409,7 @@ bool LogDev::flush_if_needed(int64_t threshold_size) { void LogDev::do_flush(LogGroup* lg) { #ifdef _PRERELEASE if (iomgr_flip::instance()->delay_flip< int >( - "simulate_log_flush_delay", [this, lg]() { do_flush_write(lg); }, m_family_id)) { + "simulate_log_flush_delay", [this, lg]() { do_flush_write(lg); }, m_logdev_id)) { THIS_LOGDEV_LOG(INFO, "Delaying flush by rescheduling the async write"); return; } @@ -580,7 +583,242 @@ void LogDev::rollback(logstore_id_t store_id, logid_range_t id_range) { m_logdev_meta.add_rollback_record(store_id, id_range, true); } -void LogDev::get_status(const int verbosity, nlohmann::json& js) const { +/////////////////////////////// LogStore Section /////////////////////////////////////// + +void LogDev::handle_unopened_log_stores(bool format) { + for (auto it{std::begin(m_unopened_store_io)}; it != std::end(m_unopened_store_io); ++it) { + LOGINFO("skip log entries for store id {}-{}, ios {}", m_logdev_id, it->first, it->second); + } + m_unopened_store_io.clear(); + + // If there are any unopened storeids found, loop and check again if they are indeed open later. Unopened log store + // could be possible if the ids are deleted, but it is delayed to remove from store id reserver. In that case, + // do the remove from store id reserver now. + // TODO: At present we are assuming all unopened store ids could be removed. In future have a callback to this + // start routine, which takes the list of unopened store ids and can return a new set, which can be removed. + { + folly::SharedMutexWritePriority::WriteHolder holder(m_store_map_mtx); + for (auto it{std::begin(m_unopened_store_id)}; it != std::end(m_unopened_store_id);) { + if (m_id_logstore_map.find(*it) == m_id_logstore_map.end()) { + // Not opened even on second time check, simply unreserve id + unreserve_store_id(*it); + } + it = m_unopened_store_id.erase(it); + } + + // Also call the logstore to inform that start/replay is completed. + if (!format) { + for (auto& p : m_id_logstore_map) { + auto& lstore{p.second.m_log_store}; + if (lstore && lstore->get_log_replay_done_cb()) { + lstore->get_log_replay_done_cb()(lstore, lstore->seq_num() - 1); + lstore->truncate(lstore->truncated_upto()); + } + } + } + } +} + +std::shared_ptr< HomeLogStore > LogDev::create_new_log_store(bool append_mode) { + auto const store_id = reserve_store_id(); + std::shared_ptr< HomeLogStore > lstore; + lstore = std::make_shared< HomeLogStore >(shared_from_this(), store_id, append_mode, 0); + + { + folly::SharedMutexWritePriority::WriteHolder holder(m_store_map_mtx); + const auto it = m_id_logstore_map.find(store_id); + HS_REL_ASSERT((it == m_id_logstore_map.end()), "store_id {}-{} already exists", m_logdev_id, store_id); + m_id_logstore_map.insert(std::make_pair<>(store_id, logstore_info_t{lstore, nullptr, append_mode})); + } + LOGINFO("Created log store id {}-{}", m_logdev_id, store_id); + return lstore; +} + +void LogDev::open_log_store(logstore_id_t store_id, bool append_mode, const log_store_opened_cb_t& on_open_cb) { + folly::SharedMutexWritePriority::WriteHolder holder(m_store_map_mtx); + const auto it = m_id_logstore_map.find(store_id); + HS_REL_ASSERT((it == m_id_logstore_map.end()), "store_id {}-{} already exists", m_logdev_id, store_id); + LOGINFO("Opening log store id {}-{}", m_logdev_id, store_id); + m_id_logstore_map.insert(std::make_pair<>(store_id, logstore_info_t{nullptr, on_open_cb, append_mode})); +} + +void LogDev::remove_log_store(logstore_id_t store_id) { + LOGINFO("Removing log store id {}-{}", m_logdev_id, store_id); + + { + folly::SharedMutexWritePriority::WriteHolder holder(m_store_map_mtx); + auto ret = m_id_logstore_map.erase(store_id); + HS_REL_ASSERT((ret == 1), "try to remove invalid store_id {}-{}", m_logdev_id, store_id); + } + unreserve_store_id(store_id); +} + +void LogDev::device_truncate_under_lock(const std::shared_ptr< truncate_req >& treq) { + run_under_flush_lock([this, treq]() { + iomanager.run_on_forget(logstore_service().truncate_thread(), [this, treq]() { + const logdev_key trunc_upto = do_device_truncate(treq->dry_run); + bool done{false}; + if (treq->cb || treq->wait_till_done) { + { + std::lock_guard< std::mutex > lk{treq->mtx}; + done = (--treq->trunc_outstanding == 0); + treq->m_trunc_upto_result[m_logdev_id] = trunc_upto; + } + } + if (done) { + if (treq->cb) { treq->cb(treq->m_trunc_upto_result); } + if (treq->wait_till_done) { treq->cv.notify_one(); } + } + unlock_flush(); + }); + return false; // Do not release the flush lock yet, the scheduler will unlock it. + }); +} + +void LogDev::on_log_store_found(logstore_id_t store_id, const logstore_superblk& sb) { + folly::SharedMutexWritePriority::ReadHolder holder(m_store_map_mtx); + const auto it = m_id_logstore_map.find(store_id); + if (it == m_id_logstore_map.end()) { + LOGERROR("Store Id {}-{} found but not opened yet.", m_logdev_id, store_id); + m_unopened_store_id.insert(store_id); + m_unopened_store_io.insert(std::make_pair<>(store_id, 0)); + return; + } + + LOGINFO("Found a logstore store_id={}-{} with start seq_num={}, Creating a new HomeLogStore instance", m_logdev_id, + store_id, sb.m_first_seq_num); + auto& l_info = const_cast< logstore_info_t& >(it->second); + l_info.m_log_store = + std::make_shared< HomeLogStore >(shared_from_this(), store_id, l_info.append_mode, sb.m_first_seq_num); + if (l_info.m_on_log_store_opened) l_info.m_on_log_store_opened(l_info.m_log_store); +} + +static thread_local std::vector< std::shared_ptr< HomeLogStore > > s_cur_flush_batch_stores; + +void LogDev::on_io_completion(logstore_id_t id, logdev_key ld_key, logdev_key flush_ld_key, + uint32_t nremaining_in_batch, void* ctx) { + auto* req = s_cast< logstore_req* >(ctx); + HomeLogStore* log_store = req->log_store; + + if (req->is_write) { + HS_LOG_ASSERT_EQ(log_store->get_store_id(), id, "Expecting store id in log store and io completion to match"); + log_store->on_write_completion(req, ld_key); + on_batch_completion(log_store, nremaining_in_batch, flush_ld_key); + } else { + log_store->on_read_completion(req, ld_key); + } +} + +void LogDev::on_logfound(logstore_id_t id, logstore_seq_num_t seq_num, logdev_key ld_key, logdev_key flush_ld_key, + log_buffer buf, uint32_t nremaining_in_batch) { + HomeLogStore* log_store{nullptr}; + + { + folly::SharedMutexWritePriority::ReadHolder holder(m_store_map_mtx); + auto const it = m_id_logstore_map.find(id); + if (it == m_id_logstore_map.end()) { + auto [unopened_it, inserted] = m_unopened_store_io.insert(std::make_pair<>(id, 0)); + if (inserted) { + // HS_REL_ASSERT(0, "log id {}-{} not found", m_logdev_id, id); + } + ++unopened_it->second; + return; + } + log_store = it->second.m_log_store.get(); + } + if (!log_store) { return; } + log_store->on_log_found(seq_num, ld_key, flush_ld_key, buf); + on_batch_completion(log_store, nremaining_in_batch, flush_ld_key); +} + +void LogDev::on_batch_completion(HomeLogStore* log_store, uint32_t nremaining_in_batch, logdev_key flush_ld_key) { + /* check if it is a first update on this log store */ + auto id = log_store->get_store_id(); + const auto it = m_last_flush_info.find(id); + if ((it == std::end(m_last_flush_info)) || (it->second != flush_ld_key.idx)) { + // first time completion in this batch for a given store_id + m_last_flush_info.insert_or_assign(id, flush_ld_key.idx); + if (it == std::end(m_last_flush_info)) { s_cur_flush_batch_stores.push_back(log_store->shared_from_this()); } + } + if (nremaining_in_batch == 0) { + // This batch is completed, call all log stores participated in this batch about the end of batch + HS_LOG_ASSERT_GT(s_cur_flush_batch_stores.size(), 0U, "Expecting one store to be flushed in batch"); + + for (auto& l : s_cur_flush_batch_stores) { + l->on_batch_completion(flush_ld_key); + } + s_cur_flush_batch_stores.clear(); + m_last_flush_info.clear(); + } +} + +logdev_key LogDev::do_device_truncate(bool dry_run) { + static thread_local std::vector< std::shared_ptr< HomeLogStore > > m_min_trunc_stores; + static thread_local std::vector< std::shared_ptr< HomeLogStore > > m_non_participating_stores; + + m_min_trunc_stores.clear(); + m_non_participating_stores.clear(); + logdev_key min_safe_ld_key = logdev_key::out_of_bound_ld_key(); + + std::string dbg_str{"Format [store_id:trunc_lsn:logidx:dev_trunc_pending?:active_writes_in_trucate?] "}; + + { + folly::SharedMutexWritePriority::ReadHolder holder(m_store_map_mtx); + for (auto& id_logstore : m_id_logstore_map) { + auto& store_ptr = id_logstore.second.m_log_store; + const auto& trunc_info = store_ptr->pre_device_truncation(); + + if (!trunc_info.pending_dev_truncation && !trunc_info.active_writes_not_part_of_truncation) { + // This log store neither has any pending device truncation nor active logstore io going on for now. + // Ignore this log store for min safe boundary calculation. + fmt::format_to(std::back_inserter(dbg_str), "[{}:None] ", store_ptr->get_store_id()); + m_non_participating_stores.push_back(store_ptr); + continue; + } + + fmt::format_to(std::back_inserter(dbg_str), "[{}:{}:{}:{}:{}] ", store_ptr->get_store_id(), + trunc_info.seq_num.load(), trunc_info.ld_key.idx, trunc_info.pending_dev_truncation, + trunc_info.active_writes_not_part_of_truncation); + if (trunc_info.ld_key.idx > min_safe_ld_key.idx) { continue; } + + if (trunc_info.ld_key.idx < min_safe_ld_key.idx) { + // New minimum safe l entry + min_safe_ld_key = trunc_info.ld_key; + m_min_trunc_stores.clear(); + } + m_min_trunc_stores.push_back(store_ptr); + } + } + + if ((min_safe_ld_key == logdev_key::out_of_bound_ld_key()) || (min_safe_ld_key.idx < 0)) { + HS_PERIODIC_LOG( + INFO, logstore, + "[Logdev={}] No log store append on any log stores, skipping device truncation, all_logstore_info:<{}>", + m_logdev_id, dbg_str); + return min_safe_ld_key; + } + + // Got the safest log id to truncate and actually truncate upto the safe log idx to the log device + if (!dry_run) { truncate(min_safe_ld_key); } + HS_PERIODIC_LOG(INFO, logstore, + "[Logdev={}] LogDevice truncate, all_logstore_info:<{}> safe log dev key to truncate={}", + m_logdev_id, dbg_str, min_safe_ld_key); + + // We call post device truncation only to the log stores whose prepared truncation points are fully + // truncated or to stores which didn't particpate in this device truncation. + for (auto& store_ptr : m_min_trunc_stores) { + store_ptr->post_device_truncation(min_safe_ld_key); + } + for (auto& store_ptr : m_non_participating_stores) { + store_ptr->post_device_truncation(min_safe_ld_key); + } + m_min_trunc_stores.clear(); // Not clearing here, would cause a shared_ptr ref holding. + m_non_participating_stores.clear(); + + return min_safe_ld_key; +} + +void LogDev::get_logdev_status(const int verbosity, nlohmann::json& js) const { js["current_log_idx"] = m_log_idx.load(std::memory_order_relaxed); js["last_flush_log_idx"] = m_last_flush_idx; js["last_truncate_log_idx"] = m_last_truncate_idx; @@ -593,25 +831,51 @@ void LogDev::get_status(const int verbosity, nlohmann::json& js) const { } } -/////////////////////////////// LogDevMetadata Section /////////////////////////////////////// -LogDevMetadata::LogDevMetadata(const std::string& logdev_name) : - m_sb{logdev_name + "_logdev_sb"}, m_rollback_sb{logdev_name + "_rollback_sb"} { - meta_service().register_handler( - logdev_name + "_logdev_sb", - [this](meta_blk* mblk, sisl::byte_view buf, size_t size) { - logdev_super_blk_found(std::move(buf), voidptr_cast(mblk)); - }, - nullptr); +nlohmann::json LogDev::dump_log_store(const log_dump_req& dump_req) { + nlohmann::json json_dump{}; // create root object + if (dump_req.log_store == nullptr) { + folly::SharedMutexWritePriority::ReadHolder holder(m_store_map_mtx); + for (auto& id_logstore : m_id_logstore_map) { + auto store_ptr{id_logstore.second.m_log_store}; + const std::string id{std::to_string(store_ptr->get_store_id())}; + // must use operator= construction as copy construction results in error + nlohmann::json val = store_ptr->dump_log_store(dump_req); + json_dump[id] = std::move(val); + } + } else { + const std::string id{std::to_string(dump_req.log_store->get_store_id())}; + // must use operator= construction as copy construction results in error + nlohmann::json val = dump_req.log_store->dump_log_store(dump_req); + json_dump[id] = std::move(val); + } + return json_dump; +} - meta_service().register_handler( - logdev_name + "_rollback_sb", - [this](meta_blk* mblk, sisl::byte_view buf, size_t size) { - rollback_super_blk_found(std::move(buf), voidptr_cast(mblk)); - }, - nullptr); +nlohmann::json LogDev::get_status(int verbosity) const { + nlohmann::json js; + auto unopened = nlohmann::json::array(); + for (const auto& l : m_unopened_store_id) { + unopened.push_back(l); + } + js["logstores_unopened"] = std::move(unopened); + + // Logdev status + get_logdev_status(verbosity, js); + + // All logstores + { + folly::SharedMutexWritePriority::ReadHolder holder(m_store_map_mtx); + for (const auto& [id, lstore] : m_id_logstore_map) { + js["logstore_id_" + std::to_string(id)] = lstore.m_log_store->get_status(verbosity); + } + } + return js; } -logdev_superblk* LogDevMetadata::create() { +/////////////////////////////// LogDevMetadata Section /////////////////////////////////////// +LogDevMetadata::LogDevMetadata() : m_sb{logdev_sb_meta_name}, m_rollback_sb{logdev_rollback_sb_meta_name} {} + +logdev_superblk* LogDevMetadata::create(logdev_id_t id) { logdev_superblk* sb = m_sb.create(logdev_sb_size_needed(0)); rollback_superblk* rsb = m_rollback_sb.create(rollback_superblk::size_needed(1)); @@ -619,7 +883,10 @@ logdev_superblk* LogDevMetadata::create() { std::fill_n(sb_area, store_capacity(), logstore_superblk::default_value()); m_id_reserver = std::make_unique< sisl::IDReserver >(); + m_sb->logdev_id = id; m_sb.write(); + + m_rollback_sb->logdev_id = id; m_rollback_sb.write(); return sb; } diff --git a/src/lib/logstore/log_dev.hpp b/src/lib/logstore/log_dev.hpp index a417c1fc6..14f29b9bd 100644 --- a/src/lib/logstore/log_dev.hpp +++ b/src/lib/logstore/log_dev.hpp @@ -416,6 +416,7 @@ struct logdev_superblk { uint32_t magic{LOGDEV_SB_MAGIC}; uint32_t version{LOGDEV_SB_VERSION}; + logdev_id_t logdev_id{0}; uint32_t num_stores{0}; uint64_t start_dev_offset{0}; logid_t key_idx{0}; @@ -455,6 +456,7 @@ struct rollback_superblk { uint32_t magic{ROLLBACK_SB_MAGIC}; uint32_t version{ROLLBACK_SB_VERSION}; + logdev_id_t logdev_id{0}; uint32_t num_records{0}; uint32_t get_magic() const { return magic; } @@ -488,14 +490,14 @@ class LogDevMetadata { friend class LogDev; public: - LogDevMetadata(const std::string& logdev_name); + LogDevMetadata(); LogDevMetadata(const LogDevMetadata&) = delete; LogDevMetadata& operator=(const LogDevMetadata&) = delete; LogDevMetadata(LogDevMetadata&&) noexcept = delete; LogDevMetadata& operator=(LogDevMetadata&&) noexcept = delete; ~LogDevMetadata() = default; - logdev_superblk* create(); + logdev_superblk* create(logdev_id_t id); void reset(); std::vector< std::pair< logstore_id_t, logstore_superblk > > load(); void persist(); @@ -522,11 +524,12 @@ class LogDevMetadata { uint32_t num_rollback_records(logstore_id_t store_id) const; bool is_rolled_back(logstore_id_t store_id, logid_t logid) const; + void logdev_super_blk_found(const sisl::byte_view& buf, void* meta_cookie); + void rollback_super_blk_found(const sisl::byte_view& buf, void* meta_cookie); + private: bool resize_logdev_sb_if_needed(); bool resize_rollback_sb_if_needed(); - void logdev_super_blk_found(const sisl::byte_view& buf, void* meta_cookie); - void rollback_super_blk_found(const sisl::byte_view& buf, void* meta_cookie); uint32_t logdev_sb_size_needed(uint32_t nstores) const { return sizeof(logdev_superblk) + (nstores * sizeof(logstore_superblk)); @@ -574,9 +577,25 @@ class log_stream_reader { uint64_t m_read_size_multiple; }; -struct meta_blk; +struct logstore_info_t { + std::shared_ptr< HomeLogStore > m_log_store; + log_store_opened_cb_t m_on_log_store_opened; + bool append_mode; +}; +struct truncate_req { + std::mutex mtx; + std::condition_variable cv; + bool wait_till_done{false}; + bool dry_run{false}; + device_truncate_cb_t cb; + std::unordered_map< logdev_id_t, logdev_key > m_trunc_upto_result; + int trunc_outstanding{0}; +}; + +static std::string const logdev_sb_meta_name{"Logdev_sb"}; +static std::string const logdev_rollback_sb_meta_name{"Logdev_rollback_sb"}; -class LogDev { +class LogDev : public std::enable_shared_from_this< LogDev > { friend class HomeLogStore; public: @@ -592,7 +611,7 @@ class LogDev { return HS_DYNAMIC_CONFIG(logstore.flush_threshold_size) - sizeof(log_group_header); } - LogDev(logstore_family_id_t f_id, const std::string& metablk_name); + LogDev(logdev_id_t logdev_id); LogDev(const LogDev&) = delete; LogDev& operator=(const LogDev&) = delete; LogDev(LogDev&&) noexcept = delete; @@ -658,9 +677,8 @@ class LogDev { * NOTE: This method is not thread safe. * * @param cb Callback to call upon completion of append. It will call with 2 parameters - * a) logdev_key: The key to access the log dev data. It can be treated as opaque (internally has log_id and device - * offset) - * b) Context: The context which was passed to append method. + * a) logdev_key: The key to access the log dev data. It can be treated as opaque (internally has log_id and + * device offset) b) Context: The context which was passed to append method. */ void register_append_cb(const log_append_comp_callback& cb) { m_append_comp_cb = cb; } @@ -760,7 +778,10 @@ class LogDev { void update_store_superblk(logstore_id_t idx, const logstore_superblk& meta, bool persist_now); - void get_status(int verbosity, nlohmann::json& out_json) const; + void get_logdev_status(int verbosity, nlohmann::json& out_json) const; + nlohmann::json dump_log_store(const log_dump_req& dum_req); + nlohmann::json get_status(int verbosity) const; + bool flush_if_needed(int64_t threshold_size = -1); bool is_aligned_buf_needed(size_t size) const { @@ -773,6 +794,26 @@ class LogDev { LogDevMetadata& log_dev_meta() { return m_logdev_meta; } static bool can_flush_in_this_thread(); + // Logstore management. + std::shared_ptr< HomeLogStore > create_new_log_store(bool append_mode = false); + void open_log_store(logstore_id_t store_id, bool append_mode, const log_store_opened_cb_t& on_open_cb); + bool close_log_store(logstore_id_t store_id) { + // TODO: Implement this method + return true; + } + void remove_log_store(logstore_id_t store_id); + void on_io_completion(logstore_id_t id, logdev_key ld_key, logdev_key flush_idx, uint32_t nremaining_in_batch, + void* ctx); + void on_log_store_found(logstore_id_t store_id, const logstore_superblk& sb); + void on_logfound(logstore_id_t id, logstore_seq_num_t seq_num, logdev_key ld_key, logdev_key flush_ld_key, + log_buffer buf, uint32_t nremaining_in_batch); + void on_batch_completion(HomeLogStore* log_store, uint32_t nremaining_in_batch, logdev_key flush_ld_key); + void device_truncate_under_lock(const std::shared_ptr< truncate_req >& treq); + logdev_key do_device_truncate(bool dry_run = false); + void handle_unopened_log_stores(bool format); + + logdev_id_t get_id() { return m_logdev_id; } + private: LogGroup* make_log_group(uint32_t estimated_records) { m_log_group_pool[m_log_group_idx].reset(estimated_records); @@ -806,11 +847,17 @@ class LogDev { std::atomic< int64_t > m_pending_flush_size{0}; // How much flushable logs are pending std::atomic< bool > m_is_flushing{false}; // Is LogDev currently flushing (so far supports one flusher at a time) bool m_stopped{false}; // Is Logdev stopped. We don't need lock here, because it is updated under flush lock - logstore_family_id_t m_family_id; // The family id this logdev is part of + logdev_id_t m_logdev_id; JournalVirtualDev* m_vdev{nullptr}; shared< JournalVirtualDev::Descriptor > m_vdev_jd; // Journal descriptor. HomeStoreSafePtr m_hs; // Back pointer to homestore + folly::SharedMutexWritePriority m_store_map_mtx; + std::unordered_map< logstore_id_t, logstore_info_t > m_id_logstore_map; + std::unordered_map< logstore_id_t, uint64_t > m_unopened_store_io; + std::unordered_set< logstore_id_t > m_unopened_store_id; + std::unordered_map< logstore_id_t, logid_t > m_last_flush_info; + std::multimap< logid_t, logstore_id_t > m_garbage_store_ids; Clock::time_point m_last_flush_time; diff --git a/src/lib/logstore/log_store.cpp b/src/lib/logstore/log_store.cpp index f51e29944..089bb3286 100644 --- a/src/lib/logstore/log_store.cpp +++ b/src/lib/logstore/log_store.cpp @@ -23,7 +23,6 @@ #include #include #include "common/homestore_assert.hpp" -#include "log_store_family.hpp" #include "log_dev.hpp" namespace homestore { @@ -33,17 +32,17 @@ SISL_LOGGING_DECL(logstore) #define THIS_LOGSTORE_PERIODIC_LOG(level, msg, ...) \ HS_PERIODIC_DETAILED_LOG(level, logstore, "store", m_fq_name, , , msg, __VA_ARGS__) -HomeLogStore::HomeLogStore(LogStoreFamily& family, logstore_id_t id, bool append_mode, logstore_seq_num_t start_lsn) : +HomeLogStore::HomeLogStore(std::shared_ptr< LogDev > logdev, logstore_id_t id, bool append_mode, + logstore_seq_num_t start_lsn) : m_store_id{id}, - m_logstore_family{family}, - m_logdev{family.logdev()}, + m_logdev{logdev}, m_records{"HomeLogStoreRecords", start_lsn - 1}, m_append_mode{append_mode}, m_seq_num{start_lsn}, - m_fq_name{fmt::format("{}.{}", family.get_family_id(), id)}, + m_fq_name{fmt::format("{}.{}", logdev->get_id(), id)}, m_metrics{logstore_service().metrics()} { m_truncation_barriers.reserve(10000); - m_safe_truncation_boundary.ld_key = m_logdev.get_last_flush_ld_key(); + m_safe_truncation_boundary.ld_key = m_logdev->get_last_flush_ld_key(); m_safe_truncation_boundary.seq_num.store(start_lsn - 1, std::memory_order_release); } @@ -83,7 +82,7 @@ void HomeLogStore::write_async(logstore_req* req, const log_req_comp_cb_t& cb) { HS_LOG_ASSERT((cb || m_comp_cb), "Expected either cb is not null or default cb registered"); req->cb = (cb ? cb : m_comp_cb); req->start_time = Clock::now(); - if (req->seq_num == 0) { m_safe_truncation_boundary.ld_key = m_logdev.get_last_flush_ld_key(); } + if (req->seq_num == 0) { m_safe_truncation_boundary.ld_key = m_logdev->get_last_flush_ld_key(); } #ifndef NDEBUG const auto trunc_upto_lsn = truncated_upto(); if (req->seq_num <= trunc_upto_lsn) { @@ -96,7 +95,7 @@ void HomeLogStore::write_async(logstore_req* req, const log_req_comp_cb_t& cb) { m_records.create(req->seq_num); COUNTER_INCREMENT(m_metrics, logstore_append_count, 1); HISTOGRAM_OBSERVE(m_metrics, logstore_record_size, req->data.size()); - m_logdev.append_async(m_store_id, req->seq_num, req->data, static_cast< void* >(req)); + m_logdev->append_async(m_store_id, req->seq_num, req->data, static_cast< void* >(req)); } void HomeLogStore::write_async(logstore_seq_num_t seq_num, const sisl::io_blob& b, void* cookie, @@ -141,7 +140,7 @@ log_buffer HomeLogStore::read_sync(logstore_seq_num_t seq_num) { // ld_key.idx, ld_key.dev_offset); COUNTER_INCREMENT(m_metrics, logstore_read_count, 1); serialized_log_record header; - const auto b = m_logdev.read(ld_key, header); + const auto b = m_logdev->read(ld_key, header); HISTOGRAM_OBSERVE(m_metrics, logstore_read_latency, get_elapsed_time_us(start_time)); return b; } @@ -152,7 +151,7 @@ void HomeLogStore::read_async(logstore_req* req, const log_found_cb_t& cb) { auto record = m_records.at(req->seq_num); logdev_key ld_key = record.m_dev_key; req->cb = cb; - m_logdev.read_async(ld_key, (void*)req); + m_logdev->read_async(ld_key, (void*)req); } void HomeLogStore::read_async(logstore_seq_num_t seq_num, void* cookie, const log_found_cb_t& cb) { @@ -243,7 +242,7 @@ void HomeLogStore::truncate(logstore_seq_num_t upto_seq_num, bool in_memory_trun // First try to block the flushing of logdevice and if we are successfully able to do, then auto shared_this = shared_from_this(); - m_logdev.run_under_flush_lock([shared_this, upto_seq_num]() { + m_logdev->run_under_flush_lock([shared_this, upto_seq_num]() { shared_this->do_truncate(upto_seq_num); return true; }); @@ -255,7 +254,7 @@ void HomeLogStore::do_truncate(logstore_seq_num_t upto_seq_num) { m_safe_truncation_boundary.seq_num.store(upto_seq_num, std::memory_order_release); // Need to update the superblock with meta, we don't persist yet, will be done as part of log dev truncation - m_logdev.update_store_superblk(m_store_id, logstore_superblk{upto_seq_num + 1}, false /* persist_now */); + m_logdev->update_store_superblk(m_store_id, logstore_superblk{upto_seq_num + 1}, false /* persist_now */); const int ind = search_max_le(upto_seq_num); if (ind < 0) { @@ -346,7 +345,7 @@ nlohmann::json HomeLogStore::dump_log_store(const log_dump_req& dump_req) { nlohmann::json json_val = nlohmann::json::object(); serialized_log_record record_header; - const auto log_buffer{m_logdev.read(record.m_dev_key, record_header)}; + const auto log_buffer{m_logdev->read(record.m_dev_key, record_header)}; try { json_val["size"] = static_cast< uint32_t >(record_header.size); @@ -376,7 +375,7 @@ void HomeLogStore::foreach (int64_t start_idx, const std::function< bool(logstor m_records.foreach_all_completed(start_idx, [&](int64_t cur_idx, homestore::logstore_record& record) -> bool { // do a sync read serialized_log_record header; - auto log_buf = m_logdev.read(record.m_dev_key, header); + auto log_buf = m_logdev->read(record.m_dev_key, header); return cb(cur_idx, log_buf); }); } @@ -412,7 +411,7 @@ void HomeLogStore::flush_sync(logstore_seq_num_t upto_seq_num) { if (!m_records.status(upto_seq_num).is_active) { return; } // Step 3: Force a flush (with least threshold) - m_logdev.flush_if_needed(1); + m_logdev->flush_if_needed(1); // Step 4: Wait for completion m_sync_flush_cv.wait(lk, [this, upto_seq_num] { return !m_records.status(upto_seq_num).is_active; }); @@ -445,10 +444,10 @@ uint64_t HomeLogStore::rollback_async(logstore_seq_num_t to_lsn, on_rollback_cb_ m_records.at(from_lsn).m_dev_key.idx); // Get the logid range to rollback m_records.rollback(to_lsn); // Rollback all bitset records and from here on, we can't access any lsns beyond to_lsn - m_logdev.run_under_flush_lock([logid_range, to_lsn, this, comp_cb = std::move(cb)]() { + m_logdev->run_under_flush_lock([logid_range, to_lsn, this, comp_cb = std::move(cb)]() { iomanager.run_on_forget(logstore_service().truncate_thread(), [logid_range, to_lsn, this, comp_cb]() { // Rollback the log_ids in the range, for this log store (which persists this info in its superblk) - m_logdev.rollback(m_store_id, logid_range); + m_logdev->rollback(m_store_id, logid_range); // Remove all truncation barriers on rolled back lsns for (auto it = std::rbegin(m_truncation_barriers); it != std::rend(m_truncation_barriers); ++it) { @@ -460,7 +459,7 @@ uint64_t HomeLogStore::rollback_async(logstore_seq_num_t to_lsn, on_rollback_cb_ } m_flush_batch_max_lsn = invalid_lsn(); // Reset the flush batch for next batch. if (comp_cb) { comp_cb(to_lsn); } - m_logdev.unlock_flush(); + m_logdev->unlock_flush(); }); return false; }); @@ -478,7 +477,7 @@ nlohmann::json HomeLogStore::get_status(int verbosity) const { js["truncation_pending_on_device?"] = m_safe_truncation_boundary.pending_dev_truncation; js["truncation_parallel_to_writes?"] = m_safe_truncation_boundary.active_writes_not_part_of_truncation; js["logstore_records"] = m_records.get_status(verbosity); - js["logstore_sb_first_lsn"] = m_logdev.log_dev_meta().store_superblk(m_store_id).m_first_seq_num; + js["logstore_sb_first_lsn"] = m_logdev->log_dev_meta().store_superblk(m_store_id).m_first_seq_num; return js; } diff --git a/src/lib/logstore/log_store_family.cpp b/src/lib/logstore/log_store_family.cpp deleted file mode 100644 index 0af47625d..000000000 --- a/src/lib/logstore/log_store_family.cpp +++ /dev/null @@ -1,328 +0,0 @@ -/********************************************************************************* - * Modifications Copyright 2017-2019 eBay Inc. - * - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed - * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - * - *********************************************************************************/ -#include -#include - -#include -#include -#include - -#include -#include - -#include "common/homestore_assert.hpp" -#include "log_store_family.hpp" -#include "log_dev.hpp" - -namespace homestore { -SISL_LOGGING_DECL(logstore) - -LogStoreFamily::LogStoreFamily(logstore_family_id_t f_id) : - m_family_id{f_id}, m_name{std::string("LogDevFamily") + std::to_string(f_id)}, m_log_dev{f_id, m_name} {} - -void LogStoreFamily::start(bool format, JournalVirtualDev* blk_store) { - m_log_dev.register_store_found_cb(bind_this(LogStoreFamily::on_log_store_found, 2)); - m_log_dev.register_append_cb(bind_this(LogStoreFamily::on_io_completion, 5)); - m_log_dev.register_logfound_cb(bind_this(LogStoreFamily::on_logfound, 6)); - - // Start the logdev, which loads the device in case of recovery. - m_log_dev.start(format, blk_store); - for (auto it{std::begin(m_unopened_store_io)}; it != std::end(m_unopened_store_io); ++it) { - LOGINFO("skip log entries for store id {}-{}, ios {}", m_family_id, it->first, it->second); - } - m_unopened_store_io.clear(); - - // If there are any unopened storeids found, loop and check again if they are indeed open later. Unopened log store - // could be possible if the ids are deleted, but it is delayed to remove from store id reserver. In that case, - // do the remove from store id reserver now. - // TODO: At present we are assuming all unopened store ids could be removed. In future have a callback to this - // start routine, which takes the list of unopened store ids and can return a new set, which can be removed. - { - folly::SharedMutexWritePriority::WriteHolder holder(m_store_map_mtx); - for (auto it{std::begin(m_unopened_store_id)}; it != std::end(m_unopened_store_id);) { - if (m_id_logstore_map.find(*it) == m_id_logstore_map.end()) { - // Not opened even on second time check, simply unreserve id - m_log_dev.unreserve_store_id(*it); - } - it = m_unopened_store_id.erase(it); - } - - // Also call the logstore to inform that start/replay is completed. - if (!format) { - for (auto& p : m_id_logstore_map) { - auto& lstore{p.second.m_log_store}; - if (lstore && lstore->get_log_replay_done_cb()) { - lstore->get_log_replay_done_cb()(lstore, lstore->seq_num() - 1); - lstore->truncate(lstore->truncated_upto()); - } - } - } - } -} - -void LogStoreFamily::stop() { - { - folly::SharedMutexWritePriority::WriteHolder holder(m_store_map_mtx); - m_id_logstore_map.clear(); - } - m_log_dev.stop(); -} - -std::shared_ptr< HomeLogStore > LogStoreFamily::create_new_log_store(bool append_mode) { - auto const store_id = m_log_dev.reserve_store_id(); - std::shared_ptr< HomeLogStore > lstore; - lstore = std::make_shared< HomeLogStore >(*this, store_id, append_mode, 0); - - { - folly::SharedMutexWritePriority::WriteHolder holder(m_store_map_mtx); - const auto it = m_id_logstore_map.find(store_id); - HS_REL_ASSERT((it == m_id_logstore_map.end()), "store_id {}-{} already exists", m_family_id, store_id); - m_id_logstore_map.insert(std::make_pair<>(store_id, logstore_info_t{lstore, nullptr, append_mode})); - } - LOGINFO("Created log store id {}-{}", m_family_id, store_id); - return lstore; -} - -void LogStoreFamily::open_log_store(logstore_id_t store_id, bool append_mode, const log_store_opened_cb_t& on_open_cb) { - folly::SharedMutexWritePriority::WriteHolder holder(m_store_map_mtx); - const auto it = m_id_logstore_map.find(store_id); - HS_REL_ASSERT((it == m_id_logstore_map.end()), "store_id {}-{} already exists", m_family_id, store_id); - LOGINFO("Opening log store id {}-{}", m_family_id, store_id); - m_id_logstore_map.insert(std::make_pair<>(store_id, logstore_info_t{nullptr, on_open_cb, append_mode})); -} - -void LogStoreFamily::remove_log_store(logstore_id_t store_id) { - LOGINFO("Removing log store id {}-{}", m_family_id, store_id); - - { - folly::SharedMutexWritePriority::WriteHolder holder(m_store_map_mtx); - auto ret = m_id_logstore_map.erase(store_id); - HS_REL_ASSERT((ret == 1), "try to remove invalid store_id {}-{}", m_family_id, store_id); - } - m_log_dev.unreserve_store_id(store_id); -} - -void LogStoreFamily::device_truncate(const std::shared_ptr< truncate_req >& treq) { - m_log_dev.run_under_flush_lock([this, treq]() { - iomanager.run_on_forget(logstore_service().truncate_thread(), [this, treq]() { - const logdev_key trunc_upto = do_device_truncate(treq->dry_run); - bool done{false}; - - if (treq->cb || treq->wait_till_done) { - { - std::lock_guard< std::mutex > lk{treq->mtx}; - done = (--treq->trunc_outstanding == 0); - treq->m_trunc_upto_result[m_family_id] = trunc_upto; - } - } - if (done) { - if (treq->cb) { treq->cb(treq->m_trunc_upto_result); } - if (treq->wait_till_done) { - std::lock_guard< std::mutex > lk{treq->mtx}; - treq->cv.notify_one(); - } - } - m_log_dev.unlock_flush(); - }); - return false; // Do not release the flush lock yet, the scheduler will unlock it. - }); -} - -void LogStoreFamily::on_log_store_found(logstore_id_t store_id, const logstore_superblk& sb) { - folly::SharedMutexWritePriority::ReadHolder holder(m_store_map_mtx); - const auto it = m_id_logstore_map.find(store_id); - if (it == m_id_logstore_map.end()) { - LOGERROR("Store Id {}-{} found but not opened yet.", m_family_id, store_id); - m_unopened_store_id.insert(store_id); - m_unopened_store_io.insert(std::make_pair<>(store_id, 0)); - return; - } - - LOGINFO("Found a logstore store_id={}-{} with start seq_num={}, Creating a new HomeLogStore instance", m_family_id, - store_id, sb.m_first_seq_num); - auto& l_info = const_cast< logstore_info_t& >(it->second); - l_info.m_log_store = std::make_shared< HomeLogStore >(*this, store_id, l_info.append_mode, sb.m_first_seq_num); - if (l_info.m_on_log_store_opened) l_info.m_on_log_store_opened(l_info.m_log_store); -} - -static thread_local std::vector< std::shared_ptr< HomeLogStore > > s_cur_flush_batch_stores; - -void LogStoreFamily::on_io_completion(logstore_id_t id, logdev_key ld_key, logdev_key flush_ld_key, - uint32_t nremaining_in_batch, void* ctx) { - auto* req = s_cast< logstore_req* >(ctx); - HomeLogStore* log_store = req->log_store; - - if (req->is_write) { - HS_LOG_ASSERT_EQ(log_store->get_store_id(), id, "Expecting store id in log store and io completion to match"); - log_store->on_write_completion(req, ld_key); - on_batch_completion(log_store, nremaining_in_batch, flush_ld_key); - } else { - log_store->on_read_completion(req, ld_key); - } -} - -void LogStoreFamily::on_logfound(logstore_id_t id, logstore_seq_num_t seq_num, logdev_key ld_key, - logdev_key flush_ld_key, log_buffer buf, uint32_t nremaining_in_batch) { - HomeLogStore* log_store{nullptr}; - - { - folly::SharedMutexWritePriority::ReadHolder holder(m_store_map_mtx); - auto const it = m_id_logstore_map.find(id); - if (it == m_id_logstore_map.end()) { - auto [unopened_it, inserted] = m_unopened_store_io.insert(std::make_pair<>(id, 0)); - if (inserted) { - // HS_REL_ASSERT(0, "log id {}-{} not found", m_family_id, id); - } - ++unopened_it->second; - return; - } - log_store = it->second.m_log_store.get(); - } - if (!log_store) { return; } - log_store->on_log_found(seq_num, ld_key, flush_ld_key, buf); - on_batch_completion(log_store, nremaining_in_batch, flush_ld_key); -} - -void LogStoreFamily::on_batch_completion(HomeLogStore* log_store, uint32_t nremaining_in_batch, - logdev_key flush_ld_key) { - - /* check if it is a first update on this log store */ - auto id = log_store->get_store_id(); - const auto it = m_last_flush_info.find(id); - if ((it == std::end(m_last_flush_info)) || (it->second != flush_ld_key.idx)) { - // first time completion in this batch for a given store_id - m_last_flush_info.insert_or_assign(id, flush_ld_key.idx); - if (it == std::end(m_last_flush_info)) { s_cur_flush_batch_stores.push_back(log_store->shared_from_this()); } - } - if (nremaining_in_batch == 0) { - // This batch is completed, call all log stores participated in this batch about the end of batch - HS_LOG_ASSERT_GT(s_cur_flush_batch_stores.size(), 0U, "Expecting one store to be flushed in batch"); - - for (auto& l : s_cur_flush_batch_stores) { - l->on_batch_completion(flush_ld_key); - } - s_cur_flush_batch_stores.clear(); - m_last_flush_info.clear(); - } -} - -logdev_key LogStoreFamily::do_device_truncate(bool dry_run) { - static thread_local std::vector< std::shared_ptr< HomeLogStore > > m_min_trunc_stores; - static thread_local std::vector< std::shared_ptr< HomeLogStore > > m_non_participating_stores; - - m_min_trunc_stores.clear(); - m_non_participating_stores.clear(); - logdev_key min_safe_ld_key = logdev_key::out_of_bound_ld_key(); - - std::string dbg_str{"Format [store_id:trunc_lsn:logidx:dev_trunc_pending?:active_writes_in_trucate?] "}; - - { - folly::SharedMutexWritePriority::ReadHolder holder(m_store_map_mtx); - for (auto& id_logstore : m_id_logstore_map) { - auto& store_ptr = id_logstore.second.m_log_store; - const auto& trunc_info = store_ptr->pre_device_truncation(); - - if (!trunc_info.pending_dev_truncation && !trunc_info.active_writes_not_part_of_truncation) { - // This log store neither has any pending device truncation nor active logstore io going on for now. - // Ignore this log store for min safe boundary calculation. - fmt::format_to(std::back_inserter(dbg_str), "[{}:None] ", store_ptr->get_store_id()); - m_non_participating_stores.push_back(store_ptr); - continue; - } - - fmt::format_to(std::back_inserter(dbg_str), "[{}:{}:{}:{}:{}] ", store_ptr->get_store_id(), - trunc_info.seq_num.load(), trunc_info.ld_key.idx, trunc_info.pending_dev_truncation, - trunc_info.active_writes_not_part_of_truncation); - if (trunc_info.ld_key.idx > min_safe_ld_key.idx) { continue; } - - if (trunc_info.ld_key.idx < min_safe_ld_key.idx) { - // New minimum safe l entry - min_safe_ld_key = trunc_info.ld_key; - m_min_trunc_stores.clear(); - } - m_min_trunc_stores.push_back(store_ptr); - } - } - - if ((min_safe_ld_key == logdev_key::out_of_bound_ld_key()) || (min_safe_ld_key.idx < 0)) { - HS_PERIODIC_LOG( - INFO, logstore, - "[Family={}] No log store append on any log stores, skipping device truncation, all_logstore_info:<{}>", - m_family_id, dbg_str); - return min_safe_ld_key; - } - - // Got the safest log id to truncate and actually truncate upto the safe log idx to the log device - if (!dry_run) { m_log_dev.truncate(min_safe_ld_key); } - HS_PERIODIC_LOG(INFO, logstore, - "[Family={}] LogDevice truncate, all_logstore_info:<{}> safe log dev key to truncate={}", - m_family_id, dbg_str, min_safe_ld_key); - - // We call post device truncation only to the log stores whose prepared truncation points are fully - // truncated or to stores which didn't particpate in this device truncation. - for (auto& store_ptr : m_min_trunc_stores) { - store_ptr->post_device_truncation(min_safe_ld_key); - } - for (auto& store_ptr : m_non_participating_stores) { - store_ptr->post_device_truncation(min_safe_ld_key); - } - m_min_trunc_stores.clear(); // Not clearing here, would cause a shared_ptr ref holding. - m_non_participating_stores.clear(); - - return min_safe_ld_key; -} - -nlohmann::json LogStoreFamily::dump_log_store(const log_dump_req& dump_req) { - nlohmann::json json_dump{}; // create root object - if (dump_req.log_store == nullptr) { - folly::SharedMutexWritePriority::ReadHolder holder(m_store_map_mtx); - for (auto& id_logstore : m_id_logstore_map) { - auto store_ptr{id_logstore.second.m_log_store}; - const std::string id{std::to_string(store_ptr->get_store_id())}; - // must use operator= construction as copy construction results in error - nlohmann::json val = store_ptr->dump_log_store(dump_req); - json_dump[id] = std::move(val); - } - } else { - const std::string id{std::to_string(dump_req.log_store->get_store_id())}; - // must use operator= construction as copy construction results in error - nlohmann::json val = dump_req.log_store->dump_log_store(dump_req); - json_dump[id] = std::move(val); - } - return json_dump; -} - -nlohmann::json LogStoreFamily::get_status(int verbosity) const { - nlohmann::json js; - auto unopened = nlohmann::json::array(); - for (const auto& l : m_unopened_store_id) { - unopened.push_back(l); - } - js["logstores_unopened"] = std::move(unopened); - - // Logdev status - m_log_dev.get_status(verbosity, js); - - // All logstores - { - folly::SharedMutexWritePriority::ReadHolder holder(m_store_map_mtx); - for (const auto& [id, lstore] : m_id_logstore_map) { - js["logstore_id_" + std::to_string(id)] = lstore.m_log_store->get_status(verbosity); - } - } - return js; -} -} // namespace homestore diff --git a/src/lib/logstore/log_store_family.hpp b/src/lib/logstore/log_store_family.hpp deleted file mode 100644 index c92d05633..000000000 --- a/src/lib/logstore/log_store_family.hpp +++ /dev/null @@ -1,113 +0,0 @@ -/********************************************************************************* - * Modifications Copyright 2017-2019 eBay Inc. - * - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed - * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - * - *********************************************************************************/ -#pragma once - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include - -#include -#include -#include "log_dev.hpp" - -namespace homestore { -struct log_dump_req; - -struct logstore_info_t { - std::shared_ptr< HomeLogStore > m_log_store; - log_store_opened_cb_t m_on_log_store_opened; - bool append_mode; -}; - -struct truncate_req { - std::mutex mtx; - std::condition_variable cv; - bool wait_till_done{false}; - bool dry_run{false}; - LogStoreService::device_truncate_cb_t cb; - std::array< logdev_key, LogStoreService::num_log_families > m_trunc_upto_result; - int trunc_outstanding{0}; -}; - -class JournalVirtualDev; -class HomeLogStore; -struct meta_blk; - -class LogStoreFamily { - friend class LogStoreService; - friend class LogDev; - -public: - LogStoreFamily(const logstore_family_id_t f_id); - LogStoreFamily(const LogStoreFamily&) = delete; - LogStoreFamily(LogStoreFamily&&) noexcept = delete; - LogStoreFamily& operator=(const LogStoreFamily&) = delete; - LogStoreFamily& operator=(LogStoreFamily&&) noexcept = delete; - - void start(const bool format, JournalVirtualDev* blk_store); - void stop(); - - std::shared_ptr< HomeLogStore > create_new_log_store(bool append_mode = false); - void open_log_store(logstore_id_t store_id, bool append_mode, const log_store_opened_cb_t& on_open_cb); - bool close_log_store(logstore_id_t store_id) { - // TODO: Implement this method - return true; - } - void remove_log_store(logstore_id_t store_id); - - void device_truncate(const std::shared_ptr< truncate_req >& treq); - - nlohmann::json dump_log_store(const log_dump_req& dum_req); - - LogDev& logdev() { return m_log_dev; } - - nlohmann::json get_status(int verbosity) const; - std::string get_name() const { return m_name; } - - logstore_family_id_t get_family_id() const { return m_family_id; } - - logdev_key do_device_truncate(bool dry_run = false); - -private: - void on_log_store_found(logstore_id_t store_id, const logstore_superblk& meta); - void on_io_completion(logstore_id_t id, logdev_key ld_key, logdev_key flush_idx, uint32_t nremaining_in_batch, - void* ctx); - void on_logfound(logstore_id_t id, logstore_seq_num_t seq_num, logdev_key ld_key, logdev_key flush_ld_key, - log_buffer buf, uint32_t nremaining_in_batch); - void on_batch_completion(HomeLogStore* log_store, uint32_t nremaining_in_batch, logdev_key flush_ld_key); - -private: - folly::SharedMutexWritePriority m_store_map_mtx; - std::unordered_map< logstore_id_t, logstore_info_t > m_id_logstore_map; - std::unordered_map< logstore_id_t, uint64_t > m_unopened_store_io; - std::unordered_set< logstore_id_t > m_unopened_store_id; - std::unordered_map< logstore_id_t, logid_t > m_last_flush_info; - logstore_family_id_t m_family_id; - std::string m_name; - LogDev m_log_dev; -}; -} // namespace homestore diff --git a/src/lib/logstore/log_store_service.cpp b/src/lib/logstore/log_store_service.cpp index 67f2dc1dd..d323f816b 100644 --- a/src/lib/logstore/log_store_service.cpp +++ b/src/lib/logstore/log_store_service.cpp @@ -29,7 +29,6 @@ #include "common/homestore_status_mgr.hpp" #include "device/journal_vdev.hpp" #include "device/physical_dev.hpp" -#include "log_store_family.hpp" #include "log_dev.hpp" namespace homestore { @@ -38,30 +37,34 @@ SISL_LOGGING_DECL(logstore) LogStoreService& logstore_service() { return hs()->logstore_service(); } /////////////////////////////////////// LogStoreService Section /////////////////////////////////////// -LogStoreService::LogStoreService() : - m_logstore_families{std::make_unique< LogStoreFamily >(DATA_LOG_FAMILY_IDX), - std::make_unique< LogStoreFamily >(CTRL_LOG_FAMILY_IDX)} {} +LogStoreService::LogStoreService() { + m_id_reserver = std::make_unique< sisl::IDReserver >(); + meta_service().register_handler( + logdev_sb_meta_name, + [this](meta_blk* mblk, sisl::byte_view buf, size_t size) { + logdev_super_blk_found(std::move(buf), voidptr_cast(mblk)); + }, + nullptr); + + meta_service().register_handler( + logdev_rollback_sb_meta_name, + [this](meta_blk* mblk, sisl::byte_view buf, size_t size) { + rollback_super_blk_found(std::move(buf), voidptr_cast(mblk)); + }, + nullptr); +} -folly::Future< std::error_code > LogStoreService::create_vdev(uint64_t size, logstore_family_id_t family, - uint32_t chunk_size) { +folly::Future< std::error_code > LogStoreService::create_vdev(uint64_t size, uint32_t chunk_size) { const auto atomic_page_size = hs()->device_mgr()->atomic_page_size(HSDevType::Fast); hs_vdev_context hs_ctx; - std::string name; - - if (family == DATA_LOG_FAMILY_IDX) { - name = "data_logdev"; - hs_ctx.type = hs_vdev_type_t::DATA_LOGDEV_VDEV; - } else { - name = "ctrl_logdev"; - hs_ctx.type = hs_vdev_type_t::CTRL_LOGDEV_VDEV; - } + hs_ctx.type = hs_vdev_type_t::LOGDEV_VDEV; // reason we set alloc_type/chunk_sel_type here instead of by homestore logstore service consumer is because // consumer doesn't care or understands the underlying alloc/chunkSel for this service, if this changes in the // future, we can let consumer set it by then; auto vdev = - hs()->device_mgr()->create_vdev(vdev_parameters{.vdev_name = name, + hs()->device_mgr()->create_vdev(vdev_parameters{.vdev_name = "LogDev", .size_type = vdev_size_type_t::VDEV_SIZE_DYNAMIC, .vdev_size = 0, .num_chunks = 0, @@ -76,14 +79,10 @@ folly::Future< std::error_code > LogStoreService::create_vdev(uint64_t size, log return vdev->async_format(); } -shared< VirtualDev > LogStoreService::open_vdev(const vdev_info& vinfo, logstore_family_id_t family, - bool load_existing) { +shared< VirtualDev > LogStoreService::open_vdev(const vdev_info& vinfo, bool load_existing) { + RELEASE_ASSERT(m_logdev_vdev == nullptr, "Duplicate journal vdev"); auto vdev = std::make_shared< JournalVirtualDev >(*(hs()->device_mgr()), vinfo, nullptr); - if (family == DATA_LOG_FAMILY_IDX) { - m_data_logdev_vdev = std::dynamic_pointer_cast< JournalVirtualDev >(vdev); - } else { - m_ctrl_logdev_vdev = std::dynamic_pointer_cast< JournalVirtualDev >(vdev); - } + m_logdev_vdev = std::dynamic_pointer_cast< JournalVirtualDev >(vdev); return vdev; } @@ -93,35 +92,132 @@ void LogStoreService::start(bool format) { // Create an truncate thread loop which handles truncation which does sync IO start_threads(); - // Start the logstore families - m_logstore_families[DATA_LOG_FAMILY_IDX]->start(format, m_data_logdev_vdev.get()); - m_logstore_families[CTRL_LOG_FAMILY_IDX]->start(format, m_ctrl_logdev_vdev.get()); + for (auto& [logdev_id, logdev] : m_id_logdev_map) { + logdev->start(format, m_logdev_vdev.get()); + } } void LogStoreService::stop() { device_truncate(nullptr, true, false); - for (auto& f : m_logstore_families) { - f->stop(); + for (auto& [id, logdev] : m_id_logdev_map) { + logdev->stop(); + } + { + folly::SharedMutexWritePriority::WriteHolder holder(m_logdev_map_mtx); + m_id_logdev_map.clear(); + } + m_id_reserver.reset(); +} + +logdev_id_t LogStoreService::create_new_logdev() { + folly::SharedMutexWritePriority::WriteHolder holder(m_logdev_map_mtx); + logdev_id_t logdev_id = m_id_reserver->reserve(); + auto logdev = create_new_logdev_internal(logdev_id); + logdev->start(true /* format */, m_logdev_vdev.get()); + COUNTER_INCREMENT(m_metrics, logdevs_count, 1); + LOGINFO("Created log dev id {}", logdev_id); + return logdev_id; +} + +std::shared_ptr< LogDev > LogStoreService::create_new_logdev_internal(logdev_id_t logdev_id) { + auto logdev = std::make_shared< LogDev >(logdev_id); + const auto it = m_id_logdev_map.find(logdev_id); + HS_REL_ASSERT((it == m_id_logdev_map.end()), "logdev id {} already exists", logdev_id); + m_id_logdev_map.insert(std::make_pair<>(logdev_id, logdev)); + return logdev; +} + +void LogStoreService::open_logdev(logdev_id_t logdev_id) { + folly::SharedMutexWritePriority::WriteHolder holder(m_logdev_map_mtx); + const auto it = m_id_logdev_map.find(logdev_id); + if (it == m_id_logdev_map.end()) { + m_id_reserver->reserve(logdev_id); + create_new_logdev_internal(logdev_id); + } +} + +std::vector< std::shared_ptr< LogDev > > LogStoreService::get_all_logdev() { + folly::SharedMutexWritePriority::ReadHolder holder(m_logdev_map_mtx); + std::vector< std::shared_ptr< LogDev > > res; + for (auto& [id, logdev] : m_id_logdev_map) { + res.push_back(logdev); } + return res; } -std::shared_ptr< HomeLogStore > LogStoreService::create_new_log_store(const logstore_family_id_t family_id, - const bool append_mode) { - HS_REL_ASSERT_LT(family_id, num_log_families); +std::shared_ptr< LogDev > LogStoreService::get_logdev(logdev_id_t id) { + folly::SharedMutexWritePriority::ReadHolder holder(m_logdev_map_mtx); + const auto it = m_id_logdev_map.find(id); + HS_REL_ASSERT((it != m_id_logdev_map.end()), "logdev id {} doesnt exists", id); + return it->second; +} + +void LogStoreService::logdev_super_blk_found(const sisl::byte_view& buf, void* meta_cookie) { + superblk< logdev_superblk > sb; + sb.load(buf, meta_cookie); + HS_REL_ASSERT_EQ(sb->get_magic(), logdev_superblk::LOGDEV_SB_MAGIC, "Invalid logdev metablk, magic mismatch"); + HS_REL_ASSERT_EQ(sb->get_version(), logdev_superblk::LOGDEV_SB_VERSION, "Invalid version of logdev metablk"); + { + folly::SharedMutexWritePriority::WriteHolder holder(m_logdev_map_mtx); + std::shared_ptr< LogDev > logdev; + auto id = sb->logdev_id; + const auto it = m_id_logdev_map.find(id); + if (it == m_id_logdev_map.end()) { + m_id_reserver->reserve(id); + logdev = create_new_logdev_internal(id); + } else { + logdev = it->second; + } + + logdev->log_dev_meta().logdev_super_blk_found(buf, meta_cookie); + } +} + +void LogStoreService::rollback_super_blk_found(const sisl::byte_view& buf, void* meta_cookie) { + superblk< rollback_superblk > rollback_sb; + rollback_sb.load(buf, meta_cookie); + HS_REL_ASSERT_EQ(rollback_sb->get_magic(), rollback_superblk::ROLLBACK_SB_MAGIC, "Rollback sb magic mismatch"); + HS_REL_ASSERT_EQ(rollback_sb->get_version(), rollback_superblk::ROLLBACK_SB_VERSION, + "Rollback sb version mismatch"); + { + folly::SharedMutexWritePriority::WriteHolder holder(m_logdev_map_mtx); + std::shared_ptr< LogDev > logdev; + auto id = rollback_sb->logdev_id; + const auto it = m_id_logdev_map.find(id); + if (it == m_id_logdev_map.end()) { + m_id_reserver->reserve(id); + logdev = create_new_logdev_internal(id); + } else { + logdev = it->second; + } + + logdev->log_dev_meta().rollback_super_blk_found(buf, meta_cookie); + } +} + +std::shared_ptr< HomeLogStore > LogStoreService::create_new_log_store(logdev_id_t logdev_id, const bool append_mode) { + folly::SharedMutexWritePriority::ReadHolder holder(m_logdev_map_mtx); COUNTER_INCREMENT(m_metrics, logstores_count, 1); - return m_logstore_families[family_id]->create_new_log_store(append_mode); + const auto it = m_id_logdev_map.find(logdev_id); + HS_REL_ASSERT((it != m_id_logdev_map.end()), "logdev id {} doesnt exist", logdev_id); + return it->second->create_new_log_store(append_mode); } -void LogStoreService::open_log_store(const logstore_family_id_t family_id, const logstore_id_t store_id, - const bool append_mode, const log_store_opened_cb_t& on_open_cb) { - HS_REL_ASSERT_LT(family_id, num_log_families); +void LogStoreService::open_log_store(const logdev_id_t logdev_id, const logstore_id_t store_id, const bool append_mode, + const log_store_opened_cb_t& on_open_cb) { + folly::SharedMutexWritePriority::ReadHolder holder(m_logdev_map_mtx); + const auto it = m_id_logdev_map.find(logdev_id); + HS_REL_ASSERT((it != m_id_logdev_map.end()), "logdev id {} doesnt exist", logdev_id); COUNTER_INCREMENT(m_metrics, logstores_count, 1); - return m_logstore_families[family_id]->open_log_store(store_id, append_mode, on_open_cb); + return it->second->open_log_store(store_id, append_mode, on_open_cb); } -void LogStoreService::remove_log_store(const logstore_family_id_t family_id, const logstore_id_t store_id) { - HS_REL_ASSERT_LT(family_id, num_log_families); - m_logstore_families[family_id]->remove_log_store(store_id); +void LogStoreService::remove_log_store(const logdev_id_t logdev_id, const logstore_id_t store_id) { + folly::SharedMutexWritePriority::ReadHolder holder(m_logdev_map_mtx); + COUNTER_INCREMENT(m_metrics, logstores_count, 1); + const auto it = m_id_logdev_map.find(logdev_id); + HS_REL_ASSERT((it != m_id_logdev_map.end()), "logdev id {} doesnt exist", logdev_id); + it->second->remove_log_store(store_id); COUNTER_DECREMENT(m_metrics, logstores_count, 1); } @@ -130,12 +226,11 @@ void LogStoreService::device_truncate(const device_truncate_cb_t& cb, const bool treq->wait_till_done = wait_till_done; treq->dry_run = dry_run; treq->cb = cb; - if (treq->wait_till_done) { treq->trunc_outstanding = m_logstore_families.size(); } + if (treq->wait_till_done) { treq->trunc_outstanding = m_id_logdev_map.size(); } - for (auto& l : m_logstore_families) { - l->device_truncate(treq); + for (auto& [id, logdev] : m_id_logdev_map) { + logdev->device_truncate_under_lock(treq); } - if (treq->wait_till_done) { std::unique_lock< std::mutex > lk{treq->mtx}; treq->cv.wait(lk, [&] { return (treq->trunc_outstanding == 0); }); @@ -143,14 +238,11 @@ void LogStoreService::device_truncate(const device_truncate_cb_t& cb, const bool } void LogStoreService::flush_if_needed() { - for (auto& f : m_logstore_families) { - f->logdev().flush_if_needed(); + for (auto& [id, logdev] : m_id_logdev_map) { + logdev->flush_if_needed(); } } -LogDev& LogStoreService::data_logdev() { return data_log_family()->logdev(); } -LogDev& LogStoreService::ctrl_logdev() { return ctrl_log_family()->logdev(); } - void LogStoreService::start_threads() { struct Context { std::condition_variable cv; @@ -193,41 +285,32 @@ void LogStoreService::start_threads() { nlohmann::json LogStoreService::dump_log_store(const log_dump_req& dump_req) { nlohmann::json json_dump{}; // create root object if (dump_req.log_store == nullptr) { - for (auto& family : m_logstore_families) { - json_dump[family->get_name()] = family->dump_log_store(dump_req); + for (auto& [id, logdev] : m_id_logdev_map) { + json_dump[logdev->get_id()] = logdev->dump_log_store(dump_req); } } else { - auto& family = dump_req.log_store->get_family(); + auto logdev = dump_req.log_store->get_logdev(); // must use operator= construction as copy construction results in error - nlohmann::json val = family.dump_log_store(dump_req); - json_dump[family.get_name()] = std::move(val); + nlohmann::json val = logdev->dump_log_store(dump_req); + json_dump[logdev->get_id()] = std::move(val); } return json_dump; } nlohmann::json LogStoreService::get_status(const int verbosity) const { nlohmann::json js; - for (auto& l : m_logstore_families) { - js[l->get_name()] = l->get_status(verbosity); + for (auto& [id, logdev] : m_id_logdev_map) { + js[logdev->get_id()] = logdev->get_status(verbosity); } return js; } -uint32_t LogStoreService::used_size() const { - uint32_t sz{0}; - if (m_data_logdev_vdev) { sz += m_data_logdev_vdev->used_size(); } - if (m_ctrl_logdev_vdev) { sz += m_ctrl_logdev_vdev->used_size(); } - return sz; -} +uint32_t LogStoreService::used_size() const { return m_logdev_vdev->used_size(); } -uint32_t LogStoreService::total_size() const { - uint32_t sz{0}; - if (m_data_logdev_vdev) { sz += m_data_logdev_vdev->size(); } - if (m_ctrl_logdev_vdev) { sz += m_ctrl_logdev_vdev->size(); } - return sz; -} +uint32_t LogStoreService::total_size() const { return m_logdev_vdev->size(); } LogStoreServiceMetrics::LogStoreServiceMetrics() : sisl::MetricsGroup("LogStores", "AllLogStores") { + REGISTER_COUNTER(logdevs_count, "Total number of log devs", sisl::_publish_as::publish_as_gauge); REGISTER_COUNTER(logstores_count, "Total number of log stores", sisl::_publish_as::publish_as_gauge); REGISTER_COUNTER(logstore_append_count, "Total number of append requests to log stores", "logstore_op_count", {"op", "write"}); diff --git a/src/lib/replication/log_store/home_raft_log_store.cpp b/src/lib/replication/log_store/home_raft_log_store.cpp index bb3b7d2fb..56992d43c 100644 --- a/src/lib/replication/log_store/home_raft_log_store.cpp +++ b/src/lib/replication/log_store/home_raft_log_store.cpp @@ -56,30 +56,31 @@ static uint64_t extract_term(const log_buffer& log_bytes) { return (*r_cast< uint64_t const* >(raw_ptr)); } -HomeRaftLogStore::HomeRaftLogStore(logstore_id_t logstore_id) { +HomeRaftLogStore::HomeRaftLogStore(logdev_id_t logdev_id, logstore_id_t logstore_id) { m_dummy_log_entry = nuraft::cs_new< nuraft::log_entry >(0, nuraft::buffer::alloc(0), nuraft::log_val_type::app_log); if (logstore_id == UINT32_MAX) { - m_log_store = logstore_service().create_new_log_store(LogStoreService::DATA_LOG_FAMILY_IDX, true); + m_logdev_id = logstore_service().create_new_logdev(); + m_log_store = logstore_service().create_new_log_store(m_logdev_id, true); if (!m_log_store) { throw std::runtime_error("Failed to create log store"); } m_logstore_id = m_log_store->get_store_id(); - LOGDEBUGMOD(replication, "Opened new home log store id={}", m_logstore_id); + LOGDEBUGMOD(replication, "Opened new home log dev = {} store id={}", m_logdev_id, m_logstore_id); } else { + m_logdev_id = logdev_id; m_logstore_id = logstore_id; - LOGDEBUGMOD(replication, "Opening existing home log store id={}", logstore_id); - logstore_service().open_log_store(LogStoreService::DATA_LOG_FAMILY_IDX, logstore_id, true, - [this](shared< HomeLogStore > log_store) { - m_log_store = std::move(log_store); - DEBUG_ASSERT_EQ(m_logstore_id, m_log_store->get_store_id(), - "Mismatch in passed and create logstore id"); - REPL_STORE_LOG(DEBUG, "Home Log store created/opened successfully"); - }); + LOGDEBUGMOD(replication, "Opening existing home log dev = {} store id={}", m_logdev_id, logstore_id); + logstore_service().open_logdev(m_logdev_id); + logstore_service().open_log_store(m_logdev_id, logstore_id, true, [this](shared< HomeLogStore > log_store) { + m_log_store = std::move(log_store); + DEBUG_ASSERT_EQ(m_logstore_id, m_log_store->get_store_id(), "Mismatch in passed and create logstore id"); + REPL_STORE_LOG(DEBUG, "Home Log store created/opened successfully"); + }); } } void HomeRaftLogStore::remove_store() { REPL_STORE_LOG(DEBUG, "Logstore is being physically removed"); - logstore_service().remove_log_store(LogStoreService::DATA_LOG_FAMILY_IDX, m_logstore_id); + logstore_service().remove_log_store(m_logdev_id, m_logstore_id); m_log_store.reset(); } diff --git a/src/lib/replication/log_store/home_raft_log_store.h b/src/lib/replication/log_store/home_raft_log_store.h index c49cef310..4e6288d1a 100644 --- a/src/lib/replication/log_store/home_raft_log_store.h +++ b/src/lib/replication/log_store/home_raft_log_store.h @@ -35,7 +35,7 @@ using raft_buf_ptr_t = nuraft::ptr< nuraft::buffer >; class HomeRaftLogStore : public nuraft::log_store { public: - HomeRaftLogStore(homestore::logstore_id_t logstore_id = UINT32_MAX); + HomeRaftLogStore(logdev_id_t logdev_id = UINT32_MAX, homestore::logstore_id_t logstore_id = UINT32_MAX); virtual ~HomeRaftLogStore() = default; void remove_store(); @@ -170,9 +170,11 @@ class HomeRaftLogStore : public nuraft::log_store { virtual ulong last_durable_index() override; logstore_id_t logstore_id() const { return m_logstore_id; } + logdev_id_t logdev_id() const { return m_logdev_id; } private: logstore_id_t m_logstore_id; + logdev_id_t m_logdev_id; shared< HomeLogStore > m_log_store; nuraft::ptr< nuraft::log_entry > m_dummy_log_entry; store_lsn_t m_last_durable_lsn{-1}; diff --git a/src/lib/replication/repl_dev/common.h b/src/lib/replication/repl_dev/common.h index aa6935581..a39e44c12 100644 --- a/src/lib/replication/repl_dev/common.h +++ b/src/lib/replication/repl_dev/common.h @@ -60,11 +60,12 @@ struct repl_dev_superblk { uint64_t magic{REPL_DEV_SB_MAGIC}; uint32_t version{REPL_DEV_SB_VERSION}; - uuid_t group_id; // group_id of this replica set - logstore_id_t data_journal_id; // Logstore id for the data journal - int64_t commit_lsn; // LSN upto which this replica has committed - int64_t checkpoint_lsn; // LSN upto which this replica have checkpointed the data - uint64_t group_ordinal; // Ordinal number which will be used to indicate the rdevXYZ for debugging + uuid_t group_id; // group_id of this replica set + logdev_id_t logdev_id; + logstore_id_t logstore_id; // Logstore id for the data journal + int64_t commit_lsn; // LSN upto which this replica has committed + int64_t checkpoint_lsn; // LSN upto which this replica have checkpointed the data + uint64_t group_ordinal; // Ordinal number which will be used to indicate the rdevXYZ for debugging uint64_t get_magic() const { return magic; } uint32_t get_version() const { return version; } diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index 0ee5885e8..7e1f3fdd8 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -26,7 +26,8 @@ RaftReplDev::RaftReplDev(RaftReplService& svc, superblk< raft_repl_dev_superblk m_state_machine = std::make_shared< RaftStateMachine >(*this); if (load_existing) { - m_data_journal = std::make_shared< ReplLogStore >(*this, *m_state_machine, m_rd_sb->data_journal_id); + m_data_journal = + std::make_shared< ReplLogStore >(*this, *m_state_machine, m_rd_sb->logdev_id, m_rd_sb->logstore_id); m_next_dsn = m_rd_sb->last_applied_dsn + 1; m_commit_upto_lsn = m_rd_sb->commit_lsn; m_last_flushed_commit_lsn = m_commit_upto_lsn; @@ -38,22 +39,22 @@ RaftReplDev::RaftReplDev(RaftReplService& svc, superblk< raft_repl_dev_superblk } if (m_rd_sb->is_timeline_consistent) { - logstore_service().open_log_store(LogStoreService::CTRL_LOG_FAMILY_IDX, m_rd_sb->free_blks_journal_id, - false, [this](shared< HomeLogStore > log_store) { + logstore_service().open_log_store(m_rd_sb->logdev_id, m_rd_sb->free_blks_journal_id, false, + [this](shared< HomeLogStore > log_store) { m_free_blks_journal = std::move(log_store); m_rd_sb->free_blks_journal_id = m_free_blks_journal->get_store_id(); }); } } else { m_data_journal = std::make_shared< ReplLogStore >(*this, *m_state_machine); - m_rd_sb->data_journal_id = m_data_journal->logstore_id(); + m_rd_sb->logdev_id = m_data_journal->logdev_id(); + m_rd_sb->logstore_id = m_data_journal->logstore_id(); m_rd_sb->last_applied_dsn = 0; m_rd_sb->group_ordinal = s_next_group_ordinal.fetch_add(1); m_rdev_name = fmt::format("rdev{}", m_rd_sb->group_ordinal); if (m_rd_sb->is_timeline_consistent) { - m_free_blks_journal = - logstore_service().create_new_log_store(LogStoreService::CTRL_LOG_FAMILY_IDX, false /* append_mode */); + m_free_blks_journal = logstore_service().create_new_log_store(m_rd_sb->logdev_id, false /* append_mode */); m_rd_sb->free_blks_journal_id = m_free_blks_journal->get_store_id(); } m_rd_sb.write(); diff --git a/src/lib/replication/repl_dev/solo_repl_dev.cpp b/src/lib/replication/repl_dev/solo_repl_dev.cpp index 3d62182ea..0db9cdfd8 100644 --- a/src/lib/replication/repl_dev/solo_repl_dev.cpp +++ b/src/lib/replication/repl_dev/solo_repl_dev.cpp @@ -10,19 +10,21 @@ namespace homestore { SoloReplDev::SoloReplDev(superblk< repl_dev_superblk >&& rd_sb, bool load_existing) : m_rd_sb{std::move(rd_sb)}, m_group_id{m_rd_sb->group_id} { if (load_existing) { - logstore_service().open_log_store(LogStoreService::DATA_LOG_FAMILY_IDX, m_rd_sb->data_journal_id, true, + logstore_service().open_logdev(m_rd_sb->logdev_id); + logstore_service().open_log_store(m_rd_sb->logdev_id, m_rd_sb->logstore_id, true, bind_this(SoloReplDev::on_data_journal_created, 1)); } else { - m_data_journal = - logstore_service().create_new_log_store(LogStoreService::DATA_LOG_FAMILY_IDX, true /* append_mode */); - m_rd_sb->data_journal_id = m_data_journal->get_store_id(); + m_logdev_id = logstore_service().create_new_logdev(); + m_data_journal = logstore_service().create_new_log_store(m_logdev_id, true /* append_mode */); + m_rd_sb->logstore_id = m_data_journal->get_store_id(); + m_rd_sb->logdev_id = m_logdev_id; m_rd_sb.write(); } } void SoloReplDev::on_data_journal_created(shared< HomeLogStore > log_store) { m_data_journal = std::move(log_store); - m_rd_sb->data_journal_id = m_data_journal->get_store_id(); + m_rd_sb->logstore_id = m_data_journal->get_store_id(); m_data_journal->register_log_found_cb(bind_this(SoloReplDev::on_log_found, 3)); } diff --git a/src/lib/replication/repl_dev/solo_repl_dev.h b/src/lib/replication/repl_dev/solo_repl_dev.h index 331003f4a..e1318e1f8 100644 --- a/src/lib/replication/repl_dev/solo_repl_dev.h +++ b/src/lib/replication/repl_dev/solo_repl_dev.h @@ -29,6 +29,7 @@ class CP; class SoloReplDev : public ReplDev { private: + logdev_id_t m_logdev_id; std::shared_ptr< HomeLogStore > m_data_journal; superblk< repl_dev_superblk > m_rd_sb; uuid_t m_group_id; diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt index a9752a3ef..d3af99a40 100644 --- a/src/tests/CMakeLists.txt +++ b/src/tests/CMakeLists.txt @@ -111,8 +111,8 @@ if (${io_tests}) add_test(NAME MetaBlkMgr-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_meta_blk_mgr) add_test(NAME DataService-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_data_service) add_test(NAME SoloReplDev-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_solo_repl_dev) - # add_test(NAME HomeRaftLogStore-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_home_raft_logstore) - # add_test(NAME RaftReplDev-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_raft_repl_dev) + add_test(NAME HomeRaftLogStore-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_home_raft_logstore) + add_test(NAME RaftReplDev-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_raft_repl_dev) endif() can_build_spdk_io_tests(spdk_tests) diff --git a/src/tests/log_store_benchmark.cpp b/src/tests/log_store_benchmark.cpp index b8d43e6ec..d6fb50a4c 100644 --- a/src/tests/log_store_benchmark.cpp +++ b/src/tests/log_store_benchmark.cpp @@ -55,8 +55,8 @@ class BenchLogStore { public: friend class SampleDB; BenchLogStore() { - m_log_store = - logstore_service().create_new_log_store(LogStoreService::DATA_LOG_FAMILY_IDX, true /* append_mode */); + m_logdev_id = logstore_service().create_new_logdev(); + m_log_store = logstore_service().create_new_log_store(m_logdev_id, true /* append_mode */); m_log_store->register_log_found_cb(bind_this(BenchLogStore::on_log_found, 3)); m_nth_entry.store(0); generate_rand_data(); @@ -141,6 +141,7 @@ class BenchLogStore { } private: + logdev_id_t m_logdev_id; std::shared_ptr< HomeLogStore > m_log_store; std::atomic< int32_t > m_outstanding{0}; std::atomic< int64_t > m_nth_entry{0}; @@ -155,7 +156,6 @@ class BenchLogStore { uint32_t m_iteration{1}; std::vector< std::string > m_data; - logstore_family_id_t m_family; logstore_id_t m_store_id; }; @@ -168,10 +168,8 @@ static void test_append(benchmark::State& state) { } static void setup() { - test_common::HSTestHelper::start_homestore("test_log_store", - {{HS_SERVICE::META, {.size_pct = 5.0}}, - {HS_SERVICE::LOG_REPLICATED, {.size_pct = 85.0}}, - {HS_SERVICE::LOG_LOCAL, {.size_pct = 2.0}}}); + test_common::HSTestHelper::start_homestore( + "test_log_store", {{HS_SERVICE::META, {.size_pct = 5.0}}, {HS_SERVICE::LOG, {.size_pct = 87.0}}}); } static void teardown() { test_common::HSTestHelper::shutdown_homestore(); } diff --git a/src/tests/test_common/homestore_test_common.hpp b/src/tests/test_common/homestore_test_common.hpp index 758becb58..00561d49f 100644 --- a/src/tests/test_common/homestore_test_common.hpp +++ b/src/tests/test_common/homestore_test_common.hpp @@ -241,7 +241,7 @@ class HSTestHelper { hsi->with_data_service(tp.custom_chunk_selector); } else if (svc == HS_SERVICE::INDEX) { hsi->with_index_service(std::unique_ptr< IndexServiceCallbacks >(tp.index_svc_cbs)); - } else if ((svc == HS_SERVICE::LOG_REPLICATED) || (svc == HS_SERVICE::LOG_LOCAL)) { + } else if ((svc == HS_SERVICE::LOG)) { hsi->with_log_service(); } else if (svc == HS_SERVICE::REPLICATION) { hsi->with_repl_data_service(tp.repl_app, tp.custom_chunk_selector); @@ -252,14 +252,10 @@ class HSTestHelper { if (need_format) { hsi->format_and_start({{HS_SERVICE::META, {.size_pct = svc_params[HS_SERVICE::META].size_pct}}, - {HS_SERVICE::LOG_REPLICATED, - {.size_pct = 1, - .chunk_size = svc_params[HS_SERVICE::LOG_REPLICATED].chunk_size, - .vdev_size_type = svc_params[HS_SERVICE::LOG_REPLICATED].vdev_size_type}}, - {HS_SERVICE::LOG_LOCAL, - {.size_pct = 1, - .chunk_size = svc_params[HS_SERVICE::LOG_LOCAL].chunk_size, - .vdev_size_type = svc_params[HS_SERVICE::LOG_LOCAL].vdev_size_type}}, + {HS_SERVICE::LOG, + {.size_pct = svc_params[HS_SERVICE::LOG].size_pct, + .chunk_size = svc_params[HS_SERVICE::LOG].chunk_size, + .vdev_size_type = svc_params[HS_SERVICE::LOG].vdev_size_type}}, {HS_SERVICE::DATA, {.size_pct = svc_params[HS_SERVICE::DATA].size_pct, .num_chunks = svc_params[HS_SERVICE::DATA].num_chunks, diff --git a/src/tests/test_common/hs_repl_test_common.hpp b/src/tests/test_common/hs_repl_test_common.hpp index 19dec9959..d2d794995 100644 --- a/src/tests/test_common/hs_repl_test_common.hpp +++ b/src/tests/test_common/hs_repl_test_common.hpp @@ -167,8 +167,7 @@ class HSReplTestHelper { name_ + std::to_string(replica_num_), {{HS_SERVICE::META, {.size_pct = 5.0}}, {HS_SERVICE::REPLICATION, {.size_pct = 60.0, .repl_app = std::make_unique< TestReplApplication >(*this)}}, - {HS_SERVICE::LOG_REPLICATED, {.size_pct = 20.0}}, - {HS_SERVICE::LOG_LOCAL, {.size_pct = 2.0}}}); + {HS_SERVICE::LOG, {.size_pct = 22.0}}}); } void teardown() { diff --git a/src/tests/test_home_raft_logstore.cpp b/src/tests/test_home_raft_logstore.cpp index d9c9df4c5..6c6ceb07c 100644 --- a/src/tests/test_home_raft_logstore.cpp +++ b/src/tests/test_home_raft_logstore.cpp @@ -162,6 +162,7 @@ class RaftLogStoreClient { } private: + homestore::logdev_id_t m_logdev_id{UINT32_MAX}; homestore::logstore_id_t m_store_id{UINT32_MAX}; std::unique_ptr< HomeRaftLogStore > m_rls; sisl::sparse_vector< std::string > m_shadow_log; @@ -173,15 +174,15 @@ class RaftLogStoreClient { class TestRaftLogStore : public ::testing::Test { public: void SetUp() { - test_common::HSTestHelper::start_homestore("test_home_raft_log_store", - {{HS_SERVICE::META, {.size_pct = 5.0}}, - {HS_SERVICE::LOG_REPLICATED, {.size_pct = 70.0}}, - {HS_SERVICE::LOG_LOCAL, {.size_pct = 2.0}}}); + test_common::HSTestHelper::start_homestore( + "test_home_raft_log_store", {{HS_SERVICE::META, {.size_pct = 5.0}}, {HS_SERVICE::LOG, {.size_pct = 72.0}}}); m_leader_store.m_rls = std::make_unique< HomeRaftLogStore >(); m_leader_store.m_store_id = m_leader_store.m_rls->logstore_id(); + m_leader_store.m_logdev_id = m_leader_store.m_rls->logdev_id(); m_follower_store.m_rls = std::make_unique< HomeRaftLogStore >(); m_follower_store.m_store_id = m_follower_store.m_rls->logstore_id(); + m_follower_store.m_logdev_id = m_follower_store.m_rls->logdev_id(); } void restart() { @@ -189,11 +190,12 @@ class TestRaftLogStore : public ::testing::Test { m_follower_store.m_rls.reset(); test_common::HSTestHelper::start_homestore( - "test_home_raft_log_store", - {{HS_SERVICE::META, {}}, {HS_SERVICE::LOG_REPLICATED, {}}, {HS_SERVICE::LOG_LOCAL, {}}}, + "test_home_raft_log_store", {{HS_SERVICE::META, {.size_pct = 5.0}}, {HS_SERVICE::LOG, {.size_pct = 72.0}}}, [this]() { - m_leader_store.m_rls = std::make_unique< HomeRaftLogStore >(m_leader_store.m_store_id); - m_follower_store.m_rls = std::make_unique< HomeRaftLogStore >(m_follower_store.m_store_id); + m_leader_store.m_rls = + std::make_unique< HomeRaftLogStore >(m_leader_store.m_logdev_id, m_leader_store.m_store_id); + m_follower_store.m_rls = + std::make_unique< HomeRaftLogStore >(m_follower_store.m_logdev_id, m_follower_store.m_store_id); }, true /* restart */); } diff --git a/src/tests/test_journal_vdev.cpp b/src/tests/test_journal_vdev.cpp index a34eae090..0b8ddc34f 100644 --- a/src/tests/test_journal_vdev.cpp +++ b/src/tests/test_journal_vdev.cpp @@ -71,27 +71,29 @@ class VDevJournalIOTest : public ::testing::Test { virtual void SetUp() override { auto const ndevices = SISL_OPTIONS["num_devs"].as< uint32_t >(); auto const dev_size = SISL_OPTIONS["dev_size_mb"].as< uint64_t >() * 1024 * 1024; - test_common::HSTestHelper::start_homestore( - "test_journal_vdev", - {{HS_SERVICE::META, {.size_pct = 15.0}}, - {HS_SERVICE::LOG_REPLICATED, - {.chunk_size = 32 * 1024 * 1024, .vdev_size_type = vdev_size_type_t::VDEV_SIZE_DYNAMIC}}, - {HS_SERVICE::LOG_LOCAL, - {.chunk_size = 32 * 1024 * 1024, .vdev_size_type = vdev_size_type_t::VDEV_SIZE_DYNAMIC}}}, - nullptr /* starting_cb */, false /* restart */); + test_common::HSTestHelper::start_homestore("test_journal_vdev", + { + {HS_SERVICE::META, {.size_pct = 15.0}}, + {HS_SERVICE::LOG, + {.size_pct = 50.0, + .chunk_size = 32 * 1024 * 1024, + .vdev_size_type = vdev_size_type_t::VDEV_SIZE_DYNAMIC}}, + }, + nullptr /* starting_cb */, false /* restart */); } virtual void TearDown() override { test_common::HSTestHelper::shutdown_homestore(); } void restart_homestore() { - test_common::HSTestHelper::start_homestore( - "test_journal_vdev", - {{HS_SERVICE::META, {.size_pct = 15.0}}, - {HS_SERVICE::LOG_REPLICATED, - {.chunk_size = 32 * 1024 * 1024, .vdev_size_type = vdev_size_type_t::VDEV_SIZE_DYNAMIC}}, - {HS_SERVICE::LOG_LOCAL, - {.chunk_size = 32 * 1024 * 1024, .vdev_size_type = vdev_size_type_t::VDEV_SIZE_DYNAMIC}}}, - nullptr /* starting_cb */, true /* restart */); + test_common::HSTestHelper::start_homestore("test_journal_vdev", + { + {HS_SERVICE::META, {.size_pct = 15.0}}, + {HS_SERVICE::LOG, + {.size_pct = 50.0, + .chunk_size = 32 * 1024 * 1024, + .vdev_size_type = vdev_size_type_t::VDEV_SIZE_DYNAMIC}}, + }, + nullptr /* starting_cb */, true /* restart */); } }; @@ -107,7 +109,7 @@ class JournalDescriptorTest { std::shared_ptr< JournalVirtualDev::Descriptor > vdev_jd() { return m_vdev_jd; } void reinit() { - auto vdev = hs()->logstore_service().get_vdev(homestore::LogStoreService::DATA_LOG_FAMILY_IDX); + auto vdev = hs()->logstore_service().get_vdev(); m_vdev_jd = vdev->open(m_logdev_id); } diff --git a/src/tests/test_log_store.cpp b/src/tests/test_log_store.cpp index 2cfb94b74..cb9741a30 100644 --- a/src/tests/test_log_store.cpp +++ b/src/tests/test_log_store.cpp @@ -49,7 +49,6 @@ #include #include "logstore/log_dev.hpp" -#include "logstore/log_store_family.hpp" #include "test_common/homestore_test_common.hpp" using namespace homestore; @@ -76,7 +75,7 @@ struct test_log_data { } }; -typedef std::function< void(logstore_family_id_t, logstore_seq_num_t, logdev_key) > test_log_store_comp_cb_t; +typedef std::function< void(logdev_id_t, logstore_seq_num_t, logdev_key) > test_log_store_comp_cb_t; class Timer { public: Timer() : beg_{clock_::now()} {} @@ -99,15 +98,15 @@ class SampleLogStoreClient { public: friend class SampleDB; - SampleLogStoreClient(std::shared_ptr< HomeLogStore > store, const logstore_family_id_t family_idx, + SampleLogStoreClient(std::shared_ptr< HomeLogStore > store, const logdev_id_t logdev_id, const test_log_store_comp_cb_t& cb) : - m_store_id{store->get_store_id()}, m_comp_cb{cb}, m_family{family_idx} { + m_store_id{store->get_store_id()}, m_comp_cb{cb}, m_logdev_id{logdev_id} { set_log_store(store); } - explicit SampleLogStoreClient(const logstore_family_id_t family_idx, const test_log_store_comp_cb_t& cb) : - SampleLogStoreClient(logstore_service().create_new_log_store(family_idx, false /* append_mode */), - family_idx, cb) {} + explicit SampleLogStoreClient(const logdev_id_t logdev_id, const test_log_store_comp_cb_t& cb) : + SampleLogStoreClient(logstore_service().create_new_log_store(logdev_id, false /* append_mode */), logdev_id, + cb) {} SampleLogStoreClient(const SampleLogStoreClient&) = delete; SampleLogStoreClient(SampleLogStoreClient&&) noexcept = delete; @@ -161,7 +160,7 @@ class SampleLogStoreClient { } else { std::free(voidptr_cast(d)); } - m_comp_cb(m_family, seq_num, ld_key); + m_comp_cb(m_logdev_id, seq_num, ld_key); }); } } @@ -323,8 +322,7 @@ class SampleLogStoreClient { } void rollback_record_count(uint32_t expected_count) { - auto actual_count = - m_log_store->get_family().logdev().log_dev_meta().num_rollback_records(m_log_store->get_store_id()); + auto actual_count = m_log_store->get_logdev()->log_dev_meta().num_rollback_records(m_log_store->get_store_id()); ASSERT_EQ(actual_count, expected_count); } @@ -403,7 +401,7 @@ class SampleLogStoreClient { folly::Synchronized< std::map< logstore_seq_num_t, bool > > m_hole_lsns; int64_t m_n_recovered_lsns = 0; int64_t m_n_recovered_truncated_lsns = 0; - logstore_family_id_t m_family; + logdev_id_t m_logdev_id; }; uint64_t SampleLogStoreClient::s_max_flush_multiple = 0; @@ -436,13 +434,14 @@ class SampleDB { test_common::HSTestHelper::start_homestore( "test_log_store", {{HS_SERVICE::META, {.size_pct = 5.0}}, - {HS_SERVICE::LOG_REPLICATED, {.size_pct = 5.0, .chunk_size = 32 * 1024 * 1024}}, - {HS_SERVICE::LOG_LOCAL, {.size_pct = 5.0, .chunk_size = 32 * 1024 * 1024}}}, + {HS_SERVICE::LOG, {.size_pct = 84.0, .chunk_size = 32 * 1024 * 1024}}}, [this, restart, n_log_stores]() { if (restart) { for (uint32_t i{0}; i < n_log_stores; ++i) { SampleLogStoreClient* client = m_log_store_clients[i].get(); - logstore_service().open_log_store(client->m_family, client->m_store_id, false /* append_mode */, + logstore_service().open_logdev(client->m_logdev_id); + logstore_service().open_log_store(client->m_logdev_id, client->m_store_id, + false /* append_mode */, [i, this, client](std::shared_ptr< HomeLogStore > log_store) { client->set_log_store(log_store); }); @@ -452,15 +451,13 @@ class SampleDB { restart); if (!restart) { + auto logdev_id = logstore_service().create_new_logdev(); for (uint32_t i{0}; i < n_log_stores; ++i) { - auto family_idx = - ((i % 2) == 0) ? LogStoreService::DATA_LOG_FAMILY_IDX : LogStoreService::CTRL_LOG_FAMILY_IDX; m_log_store_clients.push_back(std::make_unique< SampleLogStoreClient >( - family_idx, bind_this(SampleDB::on_log_insert_completion, 3))); + logdev_id, bind_this(SampleDB::on_log_insert_completion, 3))); } SampleLogStoreClient::s_max_flush_multiple = - std::max(logstore_service().data_logdev().get_flush_size_multiple(), - logstore_service().ctrl_logdev().get_flush_size_multiple()); + logstore_service().get_logdev(logdev_id)->get_flush_size_multiple(); } } @@ -469,7 +466,8 @@ class SampleDB { if (cleanup) { m_log_store_clients.clear(); } } - void on_log_insert_completion(logstore_family_id_t fid, logstore_seq_num_t lsn, logdev_key ld_key) { + void on_log_insert_completion(logdev_id_t fid, logstore_seq_num_t lsn, logdev_key ld_key) { + if (m_highest_log_idx.count(fid) == 0) { m_highest_log_idx[fid] = std::atomic{-1}; } atomic_update_max(m_highest_log_idx[fid], ld_key.idx); if (m_io_closure) m_io_closure(fid, lsn, ld_key); } @@ -478,7 +476,7 @@ class SampleDB { bool removed{false}; for (auto it = std::begin(m_log_store_clients); it != std::end(m_log_store_clients); ++it) { if ((*it)->m_log_store->get_store_id() == store_id) { - logstore_service().remove_log_store((*it)->m_family, store_id); + logstore_service().remove_log_store((*it)->m_logdev_id, store_id); m_log_store_clients.erase(it); removed = true; break; @@ -487,7 +485,9 @@ class SampleDB { return removed; } - logid_t highest_log_idx(logstore_family_id_t fid) const { return m_highest_log_idx[fid].load(); } + logid_t highest_log_idx(logdev_id_t fid) { + return m_highest_log_idx.count(fid) ? m_highest_log_idx[fid].load() : -1; + } private: const static std::string s_fpath_root; @@ -495,7 +495,7 @@ class SampleDB { std::function< void() > m_on_schedule_io_cb; test_log_store_comp_cb_t m_io_closure; std::vector< std::unique_ptr< SampleLogStoreClient > > m_log_store_clients; - std::array< std::atomic< logid_t >, LogStoreService::num_log_families > m_highest_log_idx = {-1, -1}; + std::map< logdev_id_t, std::atomic< logid_t > > m_highest_log_idx; }; const std::string SampleDB::s_fpath_root{"/tmp/log_store_dev_"}; @@ -554,7 +554,7 @@ class LogStoreTest : public ::testing::Test { } } - void on_insert_completion([[maybe_unused]] logstore_family_id_t fid, logstore_seq_num_t lsn, logdev_key ld_key) { + void on_insert_completion([[maybe_unused]] logdev_id_t fid, logstore_seq_num_t lsn, logdev_key ld_key) { bool notify{false}; uint64_t waiting_to_issue{0}; { @@ -593,12 +593,12 @@ class LogStoreTest : public ::testing::Test { size_t dump_sz{0}; int64_t rec_count{0}; - for (logstore_family_id_t fid{0}; fid < LogStoreService::num_log_families; ++fid) { - auto* family = (fid == 0) ? logstore_service().data_log_family() : logstore_service().ctrl_log_family(); - nlohmann::json json_dump = family->dump_log_store(dump_req); + auto logdev_vec = logstore_service().get_all_logdev(); + for (auto& logdev : logdev_vec) { + nlohmann::json json_dump = logdev->dump_log_store(dump_req); dump_sz += json_dump.size(); - LOGINFO("Printing json dump of all logstores in family_id{}. \n {}", fid, json_dump.dump()); + LOGINFO("Printing json dump of all logstores in logdev {}. \n {}", logdev->get_id(), json_dump.dump()); for (const auto& logdump : json_dump) { const auto itr = logdump.find("log_records"); if (itr != std::end(logdump)) { rec_count += static_cast< int64_t >(logdump["log_records"].size()); } @@ -614,7 +614,7 @@ class LogStoreTest : public ::testing::Test { if (lsc->m_log_store->get_store_id() != id) { continue; } log_dump_req dump_req; - const auto fid = lsc->m_family; + const auto fid = lsc->m_logdev_id; if (print_content) dump_req.verbosity_level = log_dump_verbosity::CONTENT; dump_req.log_store = lsc->m_log_store; @@ -622,9 +622,9 @@ class LogStoreTest : public ::testing::Test { dump_req.end_seq_num = end_seq; // must use operator= construction as copy construction results in error - auto* family = (fid == 0) ? logstore_service().data_log_family() : logstore_service().ctrl_log_family(); - nlohmann::json json_dump = family->dump_log_store(dump_req); - LOGINFO("Printing json dump of family_id={} logstore id {}, start_seq {}, end_seq {}, \n\n {}", fid, id, + auto logdev = lsc->m_log_store->get_logdev(); + nlohmann::json json_dump = logdev->dump_log_store(dump_req); + LOGINFO("Printing json dump of logdev={} logstore id {}, start_seq {}, end_seq {}, \n\n {}", fid, id, start_seq, end_seq, json_dump.dump()); const auto itr_id = json_dump.find(std::to_string(id)); if (itr_id != std::end(json_dump)) { @@ -642,11 +642,11 @@ class LogStoreTest : public ::testing::Test { } } - int find_garbage_upto(logstore_family_id_t family_idx, logid_t idx) { + int find_garbage_upto(logdev_id_t logdev_id, logid_t idx) { int count{0}; - auto it = std::begin(m_garbage_stores_upto[family_idx]); + auto it = std::begin(m_garbage_stores_upto[logdev_id]); - while (it != std::end(m_garbage_stores_upto[family_idx])) { + while (it != std::end(m_garbage_stores_upto[logdev_id])) { if (it->first > idx) { return count; } ++it; ++count; @@ -684,7 +684,7 @@ class LogStoreTest : public ::testing::Test { bool failed{false}; logstore_service().device_truncate( - [this, is_parallel_to_write, &failed](const auto& trunc_lds) { + [this, is_parallel_to_write, &failed](auto& trunc_lds) { bool expect_forward_progress{true}; uint32_t n_fully_truncated{0}; if (is_parallel_to_write) { @@ -701,14 +701,14 @@ class LogStoreTest : public ::testing::Test { } if (expect_forward_progress) { - for (logstore_family_id_t fid{0}; fid < trunc_lds.size(); ++fid) { - const auto trunc_loc = trunc_lds[fid]; + for (auto [fid, trunc_loc] : trunc_lds) { if (trunc_loc == logdev_key::out_of_bound_ld_key()) { LOGINFO("No forward progress for device truncation yet."); } else { // Validate the truncation is actually moving forward - if (trunc_loc.idx <= m_truncate_log_idx[fid].load()) { failed = true; } - ASSERT_GT(trunc_loc.idx, m_truncate_log_idx[fid].load()); + auto idx = m_truncate_log_idx.count(fid) ? m_truncate_log_idx[fid].load() : -1; + if (trunc_loc.idx <= idx) { failed = true; } + ASSERT_GT(trunc_loc.idx, idx); m_truncate_log_idx[fid].store(trunc_loc.idx); } } @@ -719,8 +719,10 @@ class LogStoreTest : public ::testing::Test { true /* wait_till_done */); ASSERT_FALSE(failed); - for (logstore_family_id_t fid{0}; fid < LogStoreService::num_log_families; ++fid) { - const auto upto_count = find_garbage_upto(fid, m_truncate_log_idx[fid].load()); + for (auto& logdev : logstore_service().get_all_logdev()) { + auto fid = logdev->get_id(); + auto idx = m_truncate_log_idx.count(fid) ? m_truncate_log_idx[fid].load() : -1; + const auto upto_count = find_garbage_upto(fid, idx); std::remove_const_t< decltype(upto_count) > count{0}; for (auto it = std::begin(m_garbage_stores_upto[fid]); count < upto_count; ++count) { it = m_garbage_stores_upto[fid].erase(it); @@ -763,11 +765,11 @@ class LogStoreTest : public ::testing::Test { size_t actual_garbage_ids{0}; size_t exp_garbage_store_count{0}; - for (logstore_family_id_t fid{0}; fid < LogStoreService::num_log_families; ++fid) { + for (auto& logdev : logstore_service().get_all_logdev()) { + auto fid = logdev->get_id(); std::vector< logstore_id_t > reg_ids, garbage_ids; - LogDev& ld = (fid == LogStoreService::DATA_LOG_FAMILY_IDX) ? logstore_service().data_logdev() - : logstore_service().ctrl_logdev(); - ld.get_registered_store_ids(reg_ids, garbage_ids); + + logdev->get_registered_store_ids(reg_ids, garbage_ids); actual_valid_ids += reg_ids.size() - garbage_ids.size(); actual_garbage_ids += garbage_ids.size(); @@ -783,7 +785,7 @@ class LogStoreTest : public ::testing::Test { void delete_validate(uint32_t idx) { auto& db = SampleDB::instance(); - auto fid = db.m_log_store_clients[idx]->m_family; + auto fid = db.m_log_store_clients[idx]->m_logdev_id; validate_num_stores(); const auto l_idx = db.highest_log_idx(fid); @@ -841,8 +843,8 @@ class LogStoreTest : public ::testing::Test { uint64_t m_nrecords_waiting_to_complete{0}; std::mutex m_pending_mtx; std::condition_variable m_pending_cv; - std::array< std::atomic< logid_t >, LogStoreService::num_log_families > m_truncate_log_idx = {-1, -1}; - std::array< std::map< logid_t, uint32_t >, LogStoreService::num_log_families > m_garbage_stores_upto; + std::map< logdev_id_t, std::atomic< logid_t > > m_truncate_log_idx; + std::map< logdev_id_t, std::map< logid_t, uint32_t > > m_garbage_stores_upto; std::array< uint32_t, 100 > m_store_distribution; uint32_t m_q_depth{64}; @@ -1233,8 +1235,8 @@ TEST_F(LogStoreTest, WriteSyncThenRead) { for (uint32_t iteration{0}; iteration < iterations; ++iteration) { LOGINFO("Iteration {}", iteration); - std::shared_ptr< HomeLogStore > tmp_log_store = - logstore_service().create_new_log_store(LogStoreService::DATA_LOG_FAMILY_IDX, false); + auto logdev_id = logstore_service().create_new_logdev(); + auto tmp_log_store = logstore_service().create_new_log_store(logdev_id, false); const auto store_id = tmp_log_store->get_store_id(); LOGINFO("Created new log store -> id {}", store_id); const unsigned count{10}; @@ -1262,7 +1264,7 @@ TEST_F(LogStoreTest, WriteSyncThenRead) { ASSERT_EQ(actual, expected) << "Data mismatch for LSN=" << store_id << ":" << i << " size=" << tl->size; } - logstore_service().remove_log_store(LogStoreService::DATA_LOG_FAMILY_IDX, store_id); + logstore_service().remove_log_store(logdev_id, store_id); LOGINFO("Remove logstore -> i {}", store_id); } } diff --git a/src/tests/test_solo_repl_dev.cpp b/src/tests/test_solo_repl_dev.cpp index 492a8006a..1feca0d1e 100644 --- a/src/tests/test_solo_repl_dev.cpp +++ b/src/tests/test_solo_repl_dev.cpp @@ -155,8 +155,10 @@ class SoloReplDevTest : public testing::Test { "test_solo_repl_dev", {{HS_SERVICE::META, {.size_pct = 5.0}}, {HS_SERVICE::REPLICATION, {.size_pct = 60.0, .repl_app = std::make_unique< Application >(*this)}}, - {HS_SERVICE::LOG_REPLICATED, {.size_pct = 20.0}}, - {HS_SERVICE::LOG_LOCAL, {.size_pct = 2.0}}}); + {HS_SERVICE::LOG, + {.size_pct = 22.0, + .chunk_size = 32 * 1024 * 1024, + .vdev_size_type = vdev_size_type_t::VDEV_SIZE_DYNAMIC}}}); m_uuid1 = hs_utils::gen_random_uuid(); m_uuid2 = hs_utils::gen_random_uuid(); m_repl_dev1 = hs()->repl_service().create_repl_dev(m_uuid1, {}).get().value(); @@ -175,9 +177,12 @@ class SoloReplDevTest : public testing::Test { test_common::HSTestHelper::start_homestore( "test_solo_repl_dev", - {{HS_SERVICE::REPLICATION, {.repl_app = std::make_unique< Application >(*this)}}, - {HS_SERVICE::LOG_REPLICATED, {}}, - {HS_SERVICE::LOG_LOCAL, {}}}, + {{HS_SERVICE::META, {.size_pct = 5.0}}, + {HS_SERVICE::REPLICATION, {.size_pct = 60.0, .repl_app = std::make_unique< Application >(*this)}}, + {HS_SERVICE::LOG, + {.size_pct = 22.0, + .chunk_size = 32 * 1024 * 1024, + .vdev_size_type = vdev_size_type_t::VDEV_SIZE_DYNAMIC}}}, nullptr, true /* restart */); m_repl_dev1 = hs()->repl_service().get_repl_dev(m_uuid1).value();