diff --git a/conanfile.py b/conanfile.py index 1522a15ef..3eab992ad 100644 --- a/conanfile.py +++ b/conanfile.py @@ -5,7 +5,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "5.1.2" + version = "5.1.3" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" diff --git a/src/include/homestore/homestore.hpp b/src/include/homestore/homestore.hpp index c9eafbc21..aa7688ab2 100644 --- a/src/include/homestore/homestore.hpp +++ b/src/include/homestore/homestore.hpp @@ -56,8 +56,7 @@ class ReplApplication; using HomeStoreSafePtr = std::shared_ptr< HomeStore >; -VENUM(hs_vdev_type_t, uint32_t, DATA_VDEV = 1, INDEX_VDEV = 2, META_VDEV = 3, DATA_LOGDEV_VDEV = 4, - CTRL_LOGDEV_VDEV = 5); +VENUM(hs_vdev_type_t, uint32_t, DATA_VDEV = 1, INDEX_VDEV = 2, META_VDEV = 3, LOGDEV_VDEV = 4); #pragma pack(1) struct hs_vdev_context { @@ -76,11 +75,10 @@ struct hs_stats { struct HS_SERVICE { static constexpr uint32_t META = 1 << 0; - static constexpr uint32_t LOG_REPLICATED = 1 << 1; - static constexpr uint32_t LOG_LOCAL = 1 << 2; - static constexpr uint32_t DATA = 1 << 3; - static constexpr uint32_t INDEX = 1 << 4; - static constexpr uint32_t REPLICATION = 1 << 5; + static constexpr uint32_t LOG = 1 << 1; + static constexpr uint32_t DATA = 1 << 2; + static constexpr uint32_t INDEX = 1 << 3; + static constexpr uint32_t REPLICATION = 1 << 4; uint32_t svcs; @@ -91,8 +89,7 @@ struct HS_SERVICE { if (svcs & META) { str += "meta,"; } if (svcs & DATA) { str += "data,"; } if (svcs & INDEX) { str += "index,"; } - if (svcs & LOG_REPLICATED) { str += "log_replicated,"; } - if (svcs & LOG_LOCAL) { str += "log_local,"; } + if (svcs & LOG) { str += "log,"; } if (svcs & REPLICATION) { str += "replication,"; } return str; } diff --git a/src/include/homestore/logstore/log_store.hpp b/src/include/homestore/logstore/log_store.hpp index 6b46c69e6..dfb725bb5 100644 --- a/src/include/homestore/logstore/log_store.hpp +++ b/src/include/homestore/logstore/log_store.hpp @@ -35,7 +35,6 @@ namespace homestore { -class LogStoreFamily; class LogDev; class LogStoreServiceMetrics; @@ -44,7 +43,7 @@ typedef std::function< void(logstore_seq_num_t) > on_rollback_cb_t; class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > { public: - HomeLogStore(LogStoreFamily& family, logstore_id_t id, bool append_mode, logstore_seq_num_t start_lsn); + HomeLogStore(std::shared_ptr< LogDev > logdev, logstore_id_t id, bool append_mode, logstore_seq_num_t start_lsn); HomeLogStore(const HomeLogStore&) = delete; HomeLogStore(HomeLogStore&&) noexcept = delete; HomeLogStore& operator=(const HomeLogStore&) = delete; @@ -266,7 +265,7 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > { auto seq_num() const { return m_seq_num.load(std::memory_order_acquire); } - LogStoreFamily& get_family() { return m_logstore_family; } + std::shared_ptr< LogDev > get_logdev() { return m_logdev; } nlohmann::json dump_log_store(const log_dump_req& dump_req = log_dump_req()); @@ -285,8 +284,7 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > { int search_max_le(logstore_seq_num_t input_sn); logstore_id_t m_store_id; - LogStoreFamily& m_logstore_family; - LogDev& m_logdev; + std::shared_ptr< LogDev > m_logdev; sisl::StreamTracker< logstore_record > m_records; bool m_append_mode{false}; log_req_comp_cb_t m_comp_cb; diff --git a/src/include/homestore/logstore/log_store_internal.hpp b/src/include/homestore/logstore/log_store_internal.hpp index 7e442c48d..6a2103d01 100644 --- a/src/include/homestore/logstore/log_store_internal.hpp +++ b/src/include/homestore/logstore/log_store_internal.hpp @@ -43,13 +43,14 @@ typedef int64_t logstore_seq_num_t; typedef std::function< void(logstore_req*, logdev_key) > log_req_comp_cb_t; typedef sisl::byte_view log_buffer; typedef uint32_t logstore_id_t; -typedef uint8_t logstore_family_id_t; +typedef uint32_t logdev_id_t; typedef std::function< void(logstore_req*, logdev_key) > log_req_comp_cb_t; typedef std::function< void(logstore_seq_num_t, sisl::io_blob&, logdev_key, void*) > log_write_comp_cb_t; typedef std::function< void(logstore_seq_num_t, log_buffer, void*) > log_found_cb_t; typedef std::function< void(std::shared_ptr< HomeLogStore >) > log_store_opened_cb_t; typedef std::function< void(std::shared_ptr< HomeLogStore >, logstore_seq_num_t) > log_replay_done_cb_t; +typedef std::function< void(const std::unordered_map< logdev_id_t, logdev_key >&) > device_truncate_cb_t; typedef int64_t logid_t; diff --git a/src/include/homestore/logstore_service.hpp b/src/include/homestore/logstore_service.hpp index 6d8fc09f9..9eb971eea 100644 --- a/src/include/homestore/logstore_service.hpp +++ b/src/include/homestore/logstore_service.hpp @@ -41,7 +41,6 @@ class LogStoreServiceMetrics : public sisl::MetricsGroup { LogStoreServiceMetrics& operator=(LogStoreServiceMetrics&&) noexcept = delete; }; -class LogStoreFamily; class HomeLogStore; class LogDev; struct logdev_key; @@ -49,18 +48,13 @@ class VirtualDev; class JournalVirtualDev; struct vdev_info; struct log_dump_req; +struct logdev_superblk; class LogStoreService { friend class HomeLogStore; - friend class LogStoreFamily; friend class LogDev; public: - static constexpr logstore_family_id_t DATA_LOG_FAMILY_IDX{0}; - static constexpr logstore_family_id_t CTRL_LOG_FAMILY_IDX{1}; - static constexpr size_t num_log_families = CTRL_LOG_FAMILY_IDX + 1; - typedef std::function< void(const std::array< logdev_key, num_log_families >&) > device_truncate_cb_t; - LogStoreService(); LogStoreService(const LogStoreService&) = delete; LogStoreService(LogStoreService&&) noexcept = delete; @@ -81,36 +75,53 @@ class LogStoreService { */ void stop(); + /** + * @brief Create a brand new log dev. A logdev manages a list of chunks and state about the log offsets. + * Internally each logdev has a journal descriptor which maintains the data start and tail offsets and list of + * chunks. Logdev can start with zero chunks and dynamically add chunks based on write request. + * @return Newly created log dev id. + */ + logdev_id_t create_new_logdev(); + + /** + * @brief Open a log dev. + * + * @param logdev_id: Logdev ID + * @return Newly created log dev id. + */ + void open_logdev(logdev_id_t logdev_id); + /** * @brief Create a brand new log store (both in-memory and on device) and returns its instance. It also book * keeps the created log store and user can get this instance of log store by using logstore_id * - * @param family_id: Logstores can be created on different log_devs. As of now we only support data log_dev and - * ctrl log dev. The idx indicates which log device it is from. Its a mandatory parameter. + * @param logdev_id: Logstores can be created on different log_devs. * @param append_mode: If the log store have to be in append mode, user can call append_async and do not need to * maintain the log_idx. Else user is expected to keep track of the log idx. Default to false * * @return std::shared_ptr< HomeLogStore > */ - shared< HomeLogStore > create_new_log_store(logstore_family_id_t family_id, bool append_mode = false); + std::shared_ptr< HomeLogStore > create_new_log_store(logdev_id_t logdev_id, bool append_mode = false); /** * @brief Open an existing log store and does a recovery. It then creates an instance of this logstore and * returns - * + * @param logdev_id: Logdev ID of the log store to close * @param store_id: Store ID of the log store to open + * @param append_mode: Append or not. + * @param on_open_cb: Callback to be called once log store is opened. * @return std::shared_ptr< HomeLogStore > */ - folly::Future< shared< HomeLogStore > > open_log_store(logstore_family_id_t family_id, logstore_id_t store_id, + folly::Future< shared< HomeLogStore > > open_log_store(logdev_id_t logdev_id, logstore_id_t store_id, bool append_mode); /** * @brief Close the log store instance and free-up the resources - * + * @param logdev_id: Logdev ID of the log store to close * @param store_id: Store ID of the log store to close * @return true on success */ - bool close_log_store(const logstore_family_id_t family_id, const logstore_id_t store_id) { + bool close_log_store(logdev_id_t logdev_id, logstore_id_t store_id) { // TODO: Implement this method return true; } @@ -121,7 +132,7 @@ class LogStoreService { * * @param store_id */ - void remove_log_store(const logstore_family_id_t family_id, const logstore_id_t store_id); + void remove_log_store(logdev_id_t logdev_id, logstore_id_t store_id); /** * @brief Schedule a truncate all the log stores physically on the device. @@ -131,24 +142,18 @@ class LogStoreService { * Default to false * @param dry_run: If the truncate is a real one or just dry run to simulate the truncation */ - void device_truncate(const device_truncate_cb_t& cb = nullptr, const bool wait_till_done = false, - const bool dry_run = false); + void device_truncate(const device_truncate_cb_t& cb = nullptr, bool wait_till_done = false, bool dry_run = false); - folly::Future< std::error_code > create_vdev(uint64_t size, logstore_family_id_t family, uint32_t chunk_size); - shared< VirtualDev > open_vdev(const vdev_info& vinfo, logstore_family_id_t family, bool load_existing); - shared< JournalVirtualDev > get_vdev(logstore_family_id_t family) const { - return (family == DATA_LOG_FAMILY_IDX) ? m_data_logdev_vdev : m_ctrl_logdev_vdev; - } + folly::Future< std::error_code > create_vdev(uint64_t size, uint32_t chunk_size); + std::shared_ptr< VirtualDev > open_vdev(const vdev_info& vinfo, bool load_existing); + std::shared_ptr< JournalVirtualDev > get_vdev() const { return m_logdev_vdev; } + std::vector< std::shared_ptr< LogDev > > get_all_logdevs(); + std::shared_ptr< LogDev > get_logdev(logdev_id_t id); nlohmann::json dump_log_store(const log_dump_req& dum_req); - nlohmann::json get_status(const int verbosity) const; + nlohmann::json get_status(int verbosity) const; LogStoreServiceMetrics& metrics() { return m_metrics; } - LogStoreFamily* data_log_family() { return m_logstore_families[DATA_LOG_FAMILY_IDX].get(); } - LogStoreFamily* ctrl_log_family() { return m_logstore_families[CTRL_LOG_FAMILY_IDX].get(); } - - LogDev& data_logdev(); - LogDev& ctrl_logdev(); uint32_t used_size() const; uint32_t total_size() const; @@ -156,13 +161,18 @@ class LogStoreService { iomgr::io_fiber_t truncate_thread() { return m_truncate_fiber; } private: + std::shared_ptr< LogDev > create_new_logdev_internal(logdev_id_t logdev_id); + void logdev_super_blk_found(const sisl::byte_view& buf, void* meta_cookie); + void rollback_super_blk_found(const sisl::byte_view& buf, void* meta_cookie); void start_threads(); void flush_if_needed(); private: - std::array< std::unique_ptr< LogStoreFamily >, num_log_families > m_logstore_families; - std::shared_ptr< JournalVirtualDev > m_data_logdev_vdev; - std::shared_ptr< JournalVirtualDev > m_ctrl_logdev_vdev; + std::unordered_map< logdev_id_t, std::shared_ptr< LogDev > > m_id_logdev_map; + std::unique_ptr< sisl::IDReserver > m_id_reserver; + folly::SharedMutexWritePriority m_logdev_map_mtx; + + std::shared_ptr< JournalVirtualDev > m_logdev_vdev; iomgr::io_fiber_t m_truncate_fiber; iomgr::io_fiber_t m_flush_fiber; LogStoreServiceMetrics m_metrics; diff --git a/src/lib/common/homestore_config.fbs b/src/lib/common/homestore_config.fbs index 66181ed2b..1ac3545c7 100644 --- a/src/lib/common/homestore_config.fbs +++ b/src/lib/common/homestore_config.fbs @@ -24,12 +24,12 @@ table BlkAllocator { /* Count of free blks cache in-terms of device size */ free_blk_cache_count_by_vdev_percent: double = 80.0; - /* Percentage of overall memory allocated for blkallocator free blks cache. The memory allocated effictively is the + /* Percentage of overall memory allocated for blkallocator free blks cache. The memory allocated effictively is the * min of memory occupied by (free_blk_cache_size_by_vdev_percent, max_free_blk_cache_memory_percent) */ max_free_blk_cache_memory_percent: double = 1.0; /* Free blk cache slab distribution percentage - * An example assuming blk_size=4K is [4K, 8K, 16K, 32K, 64K, 128K, 256K, 512K, 1M, 2M, 4M, 8M, 16M] + * An example assuming blk_size=4K is [4K, 8K, 16K, 32K, 64K, 128K, 256K, 512K, 1M, 2M, 4M, 8M, 16M] * free_blk_slab_distribution : [double] = [20.0, 10.0, 10.0, 10.0, 35.0, 3.0, 3.0, 3.0, 2.0, 1.0, 1.0, 1.0, 1.0] */ free_blk_slab_distribution : [double]; @@ -54,7 +54,7 @@ table BlkAllocator { } table Btree { - max_nodes_to_rebalance: uint32 = 3; + max_nodes_to_rebalance: uint32 = 3; mem_btree_page_size: uint32 = 8192; } @@ -78,9 +78,9 @@ table Device { // Max completions to process per event in a thread max_completions_process_per_event_per_thread: uint32 = 200; - + // DIRECT_IO mode, switch for HDD IO mode; - direct_io_mode: bool = false; + direct_io_mode: bool = false; } table LogStore { @@ -107,8 +107,7 @@ table LogStore { try_flush_iteration: uint64 = 10240(hotswap); // Logdev flushes in multiples of this size, setting to 0 will make it use default device optimal size - flush_size_multiple_data_logdev: uint64 = 0; - flush_size_multiple_ctrl_logdev: uint64 = 512; + flush_size_multiple_logdev: uint64 = 512; // Logdev will flush the logs only in a dedicated thread. Turn this on, if flush IO doesn't want to // intervene with data IO path. @@ -137,7 +136,7 @@ table Generic { // number of threads for btree writes; num_btree_write_threads : uint32 = 2; - // percentage of cache used to create indx mempool. It should be more than 100 to + // percentage of cache used to create indx mempool. It should be more than 100 to // take into account some floating buffers in writeback cache. indx_mempool_percent : uint32 = 110; @@ -148,13 +147,13 @@ table Generic { table ResourceLimits { /* it is going to use 2 times of this space because of two concurrent cps */ dirty_buf_percent: uint32 = 1 (hotswap); - + /* it is going to use 2 times of this space because of two concurrent cps */ free_blk_cnt: uint32 = 10000000 (hotswap); free_blk_size_percent: uint32 = 2 (hotswap); - + /* Percentage of memory allocated for homestore cache */ - cache_size_percent: uint32 = 65; + cache_size_percent: uint32 = 65; /* precentage of memory used during recovery */ memory_in_recovery_precent: uint32 = 40; @@ -167,18 +166,18 @@ table ResourceLimits { } table MetaBlkStore { - // turn on/off compression feature + // turn on/off compression feature compress_feature_on : bool = true (hotswap); // turn on/off skip header check skip_header_size_check : bool = false (hotswap); - // Compress buffer larger than this memory limit in MB will not trigger compress; + // Compress buffer larger than this memory limit in MB will not trigger compress; max_compress_memory_size_mb: uint32 = 512 (hotswap); - + // Inital memory allocated for compress buffer init_compress_memory_size_mb: uint32 = 10 (hotswap); - + // Try to do compress only when input buffer is larger than this size min_compress_size_mb: uint32 = 1 (hotswap); @@ -188,7 +187,7 @@ table MetaBlkStore { // percentage of *free* root fs while dump to file for get_status; percent_of_free_space: uint32 = 10 (hotswap); - // meta sanity check interval + // meta sanity check interval sanity_check_interval: uint32 = 10 (hotswap); } diff --git a/src/lib/device/journal_vdev.hpp b/src/lib/device/journal_vdev.hpp index 899253a2a..13ec18c65 100644 --- a/src/lib/device/journal_vdev.hpp +++ b/src/lib/device/journal_vdev.hpp @@ -25,12 +25,11 @@ #include "device.h" #include "physical_dev.hpp" #include "virtual_dev.hpp" +#include namespace homestore { typedef std::function< void(const off_t ret_off) > alloc_next_blk_cb_t; using journal_id_t = uint64_t; -// Each logstore family is associated to a single logdevice. -using logdev_id_t = uint64_t; // Chunks used for journal vdev has journal related info stored in chunk private data. // Each log device has a list of journal chunk data with next_chunk. diff --git a/src/lib/homestore.cpp b/src/lib/homestore.cpp index 427fd6ddb..ba6d1eaa6 100644 --- a/src/lib/homestore.cpp +++ b/src/lib/homestore.cpp @@ -40,7 +40,6 @@ #include "device/virtual_dev.hpp" #include "common/resource_mgr.hpp" #include "meta/meta_sb.hpp" -#include "logstore/log_store_family.hpp" #include "replication/service/generic_repl_svc.h" /* @@ -78,13 +77,13 @@ HomeStore& HomeStore::with_index_service(std::unique_ptr< IndexServiceCallbacks } HomeStore& HomeStore::with_log_service() { - m_services.svcs |= HS_SERVICE::LOG_REPLICATED | HS_SERVICE::LOG_LOCAL; + m_services.svcs |= HS_SERVICE::LOG; return *this; } HomeStore& HomeStore::with_repl_data_service(cshared< ReplApplication >& repl_app, cshared< ChunkSelector >& custom_chunk_selector) { - m_services.svcs |= HS_SERVICE::REPLICATION | HS_SERVICE::LOG_REPLICATED | HS_SERVICE::LOG_LOCAL; + m_services.svcs |= HS_SERVICE::REPLICATION | HS_SERVICE::LOG; m_services.svcs &= ~HS_SERVICE::DATA; // ReplicationDataSvc or DataSvc are mutually exclusive s_repl_app = repl_app; s_custom_chunk_selector = std::move(custom_chunk_selector); @@ -166,12 +165,10 @@ void HomeStore::format_and_start(std::map< uint32_t, hs_format_params >&& format if ((svc_type & HS_SERVICE::META) && has_meta_service()) { m_meta_service->create_vdev(pct_to_size(fparams.size_pct, HSDevType::Fast), fparams.num_chunks); - } else if ((svc_type & HS_SERVICE::LOG_REPLICATED) && has_log_service()) { - futs.emplace_back(m_log_service->create_vdev(pct_to_size(fparams.size_pct, HSDevType::Fast), - LogStoreService::DATA_LOG_FAMILY_IDX, fparams.chunk_size)); - } else if ((svc_type & HS_SERVICE::LOG_LOCAL) && has_log_service()) { - futs.emplace_back(m_log_service->create_vdev(pct_to_size(fparams.size_pct, HSDevType::Fast), - LogStoreService::CTRL_LOG_FAMILY_IDX, fparams.chunk_size)); + + } else if ((svc_type & HS_SERVICE::LOG) && has_log_service()) { + futs.emplace_back( + m_log_service->create_vdev(pct_to_size(fparams.size_pct, HSDevType::Fast), fparams.chunk_size)); } else if ((svc_type & HS_SERVICE::DATA) && has_data_service()) { m_data_service->create_vdev(pct_to_size(fparams.size_pct, HSDevType::Data), fparams.block_size, fparams.alloc_type, fparams.chunk_sel_type, fparams.num_chunks); @@ -309,7 +306,7 @@ bool HomeStore::has_repl_data_service() const { return m_services.svcs & HS_SERV bool HomeStore::has_meta_service() const { return m_services.svcs & HS_SERVICE::META; } bool HomeStore::has_log_service() const { auto const s = m_services.svcs; - return (s & (HS_SERVICE::LOG_REPLICATED | HS_SERVICE::LOG_LOCAL)); + return (s & HS_SERVICE::LOG); } #if 0 @@ -347,18 +344,9 @@ shared< VirtualDev > HomeStore::create_vdev_cb(const vdev_info& vinfo, bool load auto vdev_context = r_cast< const hs_vdev_context* >(vinfo.get_user_private()); switch (vdev_context->type) { - case hs_vdev_type_t::DATA_LOGDEV_VDEV: - if (has_log_service()) { - ret_vdev = m_log_service->open_vdev(vinfo, LogStoreService::DATA_LOG_FAMILY_IDX, load_existing); - } - break; - - case hs_vdev_type_t::CTRL_LOGDEV_VDEV: - if (has_log_service()) { - ret_vdev = m_log_service->open_vdev(vinfo, LogStoreService::CTRL_LOG_FAMILY_IDX, load_existing); - } + case hs_vdev_type_t::LOGDEV_VDEV: + if (has_log_service()) { ret_vdev = m_log_service->open_vdev(vinfo, load_existing); } break; - case hs_vdev_type_t::META_VDEV: if (has_meta_service()) { ret_vdev = m_meta_service->open_vdev(vinfo, load_existing); } break; diff --git a/src/lib/logstore/CMakeLists.txt b/src/lib/logstore/CMakeLists.txt index 7f2cd749c..9a93bde75 100644 --- a/src/lib/logstore/CMakeLists.txt +++ b/src/lib/logstore/CMakeLists.txt @@ -10,7 +10,6 @@ target_sources(hs_logdev PRIVATE log_group.cpp log_stream.cpp log_store.cpp - log_store_family.cpp log_store_service.cpp ) target_link_libraries(hs_logdev ${COMMON_DEPS}) \ No newline at end of file diff --git a/src/lib/logstore/log_dev.cpp b/src/lib/logstore/log_dev.cpp index 2bb8e6f24..83c284c3e 100644 --- a/src/lib/logstore/log_dev.cpp +++ b/src/lib/logstore/log_dev.cpp @@ -35,33 +35,23 @@ namespace homestore { SISL_LOGGING_DECL(logstore) -#define THIS_LOGDEV_LOG(level, msg, ...) HS_SUBMOD_LOG(level, logstore, , "logdev", m_family_id, msg, __VA_ARGS__) +#define THIS_LOGDEV_LOG(level, msg, ...) HS_SUBMOD_LOG(level, logstore, , "logdev", m_logdev_id, msg, __VA_ARGS__) #define THIS_LOGDEV_PERIODIC_LOG(level, msg, ...) \ - HS_PERIODIC_DETAILED_LOG(level, logstore, "logdev", m_family_id, , , msg, __VA_ARGS__) + HS_PERIODIC_DETAILED_LOG(level, logstore, "logdev", m_logdev_id, , , msg, __VA_ARGS__) static bool has_data_service() { return HomeStore::instance()->has_data_service(); } // static BlkDataService& data_service() { return HomeStore::instance()->data_service(); } -LogDev::LogDev(const logstore_family_id_t f_id, const std::string& logdev_name) : - m_family_id{f_id}, m_logdev_meta{logdev_name} { - m_flush_size_multiple = 0; - if (f_id == LogStoreService::DATA_LOG_FAMILY_IDX) { - m_flush_size_multiple = HS_DYNAMIC_CONFIG(logstore->flush_size_multiple_data_logdev); - } else if (f_id == LogStoreService::CTRL_LOG_FAMILY_IDX) { - m_flush_size_multiple = HS_DYNAMIC_CONFIG(logstore->flush_size_multiple_ctrl_logdev); - } +LogDev::LogDev(const logdev_id_t id) : m_logdev_id{id} { + m_flush_size_multiple = HS_DYNAMIC_CONFIG(logstore->flush_size_multiple_logdev); } LogDev::~LogDev() = default; void LogDev::start(bool format, JournalVirtualDev* vdev) { - HS_LOG_ASSERT((m_append_comp_cb != nullptr), "Expected Append callback to be registered"); - HS_LOG_ASSERT((m_store_found_cb != nullptr), "Expected Log store found callback to be registered"); - HS_LOG_ASSERT((m_logfound_cb != nullptr), "Expected Logs found callback to be registered"); - - // Each family has one journal descriptor. + // Each logdev has one journal descriptor. m_vdev = vdev; - m_vdev_jd = m_vdev->open(m_family_id); + m_vdev_jd = m_vdev->open(m_logdev_id); RELEASE_ASSERT(m_vdev_jd, "Journal descriptor is null"); if (m_flush_size_multiple == 0) { m_flush_size_multiple = m_vdev->optimal_page_size(); } @@ -76,8 +66,7 @@ void LogDev::start(bool format, JournalVirtualDev* vdev) { // First read the info block if (format) { HS_LOG_ASSERT(m_logdev_meta.is_empty(), "Expected meta to be not present"); - m_logdev_meta.create(); - + m_logdev_meta.create(m_logdev_id); m_vdev_jd->update_data_start_offset(0); } else { HS_LOG_ASSERT(!m_logdev_meta.is_empty(), "Expected meta data to be read already before loading"); @@ -85,7 +74,7 @@ void LogDev::start(bool format, JournalVirtualDev* vdev) { // Notify to the caller that a new log store was reserved earlier and it is being loaded, with its meta info for (const auto& spair : store_list) { - m_store_found_cb(spair.first, spair.second); + on_log_store_found(spair.first, spair.second); } THIS_LOGDEV_LOG(INFO, "get start vdev offset during recovery {} log indx {} ", @@ -105,10 +94,26 @@ void LogDev::start(bool format, JournalVirtualDev* vdev) { if (m_pending_flush_size.load() && !m_is_flushing.load(std::memory_order_relaxed)) { flush_if_needed(); } }, true /* wait_to_schedule */); + + handle_unopened_log_stores(format); + + { + // Also call the logstore to inform that start/replay is completed. + folly::SharedMutexWritePriority::WriteHolder holder(m_store_map_mtx); + if (!format) { + for (auto& p : m_id_logstore_map) { + auto& lstore{p.second.log_store}; + if (lstore && lstore->get_log_replay_done_cb()) { + lstore->get_log_replay_done_cb()(lstore, lstore->seq_num() - 1); + lstore->truncate(lstore->truncated_upto()); + } + } + } + } } void LogDev::stop() { - THIS_LOGDEV_LOG(INFO, "Logdev stopping family {}", m_family_id); + THIS_LOGDEV_LOG(INFO, "Logdev stopping id {}", m_logdev_id); HS_LOG_ASSERT((m_pending_flush_size == 0), "LogDev stop attempted while writes to logdev are pending completion"); const bool locked_now = run_under_flush_lock([this]() { { @@ -130,6 +135,11 @@ void LogDev::stop() { // cancel the timer iomanager.cancel_timer(m_flush_timer_hdl, true); + { + folly::SharedMutexWritePriority::WriteHolder holder(m_store_map_mtx); + m_id_logstore_map.clear(); + } + m_log_records = nullptr; m_logdev_meta.reset(); m_log_idx.store(0); @@ -145,7 +155,7 @@ void LogDev::stop() { m_log_group_pool[i].stop(); } - THIS_LOGDEV_LOG(INFO, "LogDev stopped successfully family {}", m_family_id); + THIS_LOGDEV_LOG(INFO, "LogDev stopped successfully id {}", m_logdev_id); m_hs.reset(); } @@ -154,7 +164,7 @@ void LogDev::do_load(const off_t device_cursor) { logid_t loaded_from{-1}; off_t group_dev_offset = 0; - THIS_LOGDEV_LOG(TRACE, "LogDev::do_load start {} ", m_family_id); + THIS_LOGDEV_LOG(TRACE, "LogDev::do_load start {} ", m_logdev_id); do { const auto buf = lstream.next_group(&group_dev_offset); @@ -188,19 +198,18 @@ void LogDev::do_load(const off_t device_cursor) { b.move_forward(data_offset); b.set_size(rec->size); if (m_last_truncate_idx == -1) { m_last_truncate_idx = header->start_idx() + i; } - if (m_logfound_cb) { - // Validate if the id is present in rollback info - if (m_logdev_meta.is_rolled_back(rec->store_id, header->start_idx() + i)) { - THIS_LOGDEV_LOG( - DEBUG, "logstore_id[{}] log_idx={}, lsn={} has been rolledback, not notifying the logstore", - rec->store_id, (header->start_idx() + i), rec->store_seq_num); - } else { - THIS_LOGDEV_LOG(TRACE, "seq num {}, log indx {}, group dev offset {} size {}", rec->store_seq_num, - (header->start_idx() + i), group_dev_offset, rec->size); - m_logfound_cb(rec->store_id, rec->store_seq_num, {header->start_idx() + i, group_dev_offset}, - flush_ld_key, b, (header->nrecords() - (i + 1))); - } + // Validate if the id is present in rollback info + if (m_logdev_meta.is_rolled_back(rec->store_id, header->start_idx() + i)) { + THIS_LOGDEV_LOG(DEBUG, + "logstore_id[{}] log_idx={}, lsn={} has been rolledback, not notifying the logstore", + rec->store_id, (header->start_idx() + i), rec->store_seq_num); + } else { + THIS_LOGDEV_LOG(TRACE, "seq num {}, log indx {}, group dev offset {} size {}", rec->store_seq_num, + (header->start_idx() + i), group_dev_offset, rec->size); + on_logfound(rec->store_id, rec->store_seq_num, {header->start_idx() + i, group_dev_offset}, + flush_ld_key, b, (header->nrecords() - (i + 1))); } + ++i; } @@ -211,7 +220,7 @@ void LogDev::do_load(const off_t device_cursor) { // Update the tail offset with where we finally end up loading, so that new append entries can be written from // here. m_vdev_jd->update_tail_offset(group_dev_offset); - THIS_LOGDEV_LOG(TRACE, "LogDev::do_load end {} ", m_family_id); + THIS_LOGDEV_LOG(TRACE, "LogDev::do_load end {} ", m_logdev_id); } void LogDev::assert_next_pages(log_stream_reader& lstream) { @@ -224,8 +233,8 @@ void LogDev::assert_next_pages(log_stream_reader& lstream) { auto* header = r_cast< const log_group_header* >(buf.bytes()); HS_REL_ASSERT_GT(m_log_idx.load(std::memory_order_acquire), header->start_idx(), "Found a header with future log_idx after reaching end of log. Hence rbuf which was read " - "must have been corrupted, Family {} Header: {}", - m_family_id, *header); + "must have been corrupted, logdev id {} Header: {}", + m_logdev_id, *header); } } } @@ -410,7 +419,7 @@ bool LogDev::flush_if_needed(int64_t threshold_size) { void LogDev::do_flush(LogGroup* lg) { #ifdef _PRERELEASE if (iomgr_flip::instance()->delay_flip< int >( - "simulate_log_flush_delay", [this, lg]() { do_flush_write(lg); }, m_family_id)) { + "simulate_log_flush_delay", [this, lg]() { do_flush_write(lg); }, m_logdev_id)) { THIS_LOGDEV_LOG(INFO, "Delaying flush by rescheduling the async write"); return; } @@ -449,7 +458,7 @@ void LogDev::on_flush_completion(LogGroup* lg) { auto dev_offset = lg->m_log_dev_offset; for (auto idx = from_indx; idx <= upto_indx; ++idx) { auto& record = m_log_records->at(idx); - m_append_comp_cb(record.store_id, logdev_key{idx, dev_offset}, flush_ld_key, upto_indx - idx, record.context); + on_io_completion(record.store_id, logdev_key{idx, dev_offset}, flush_ld_key, upto_indx - idx, record.context); } lg->m_post_flush_process_done_time = Clock::now(); @@ -531,12 +540,14 @@ void LogDev::unlock_flush(bool do_flush) { uint64_t LogDev::truncate(const logdev_key& key) { HS_DBG_ASSERT_GE(key.idx, m_last_truncate_idx); uint64_t const num_records_to_truncate = static_cast< uint64_t >(key.idx - m_last_truncate_idx); + LOGINFO("LogDev::truncate {}", num_records_to_truncate); if (num_records_to_truncate > 0) { HS_PERIODIC_LOG(INFO, logstore, - "Truncating log device upto {} log_id={} vdev_offset={} truncated {} log records", m_family_id, - key.idx, key.dev_offset, num_records_to_truncate); + "Truncating log device upto logdev {} log_id={} vdev_offset={} truncated {} log records", + m_logdev_id, key.idx, key.dev_offset, num_records_to_truncate); m_log_records->truncate(key.idx); m_vdev_jd->truncate(key.dev_offset); + LOGINFO("LogDev::truncate {}", key.idx); m_last_truncate_idx = key.idx; { @@ -563,6 +574,7 @@ uint64_t LogDev::truncate(const logdev_key& key) { // We can remove the rollback records of those upto which logid is getting truncated m_logdev_meta.remove_rollback_record_upto(key.idx, false /* persist_now */); + LOGINFO("LogDev::truncate remove rollback {}", key.idx); m_logdev_meta.persist(); #ifdef _PRERELEASE if (garbage_collect && iomgr_flip::instance()->test_flip("logdev_abort_after_garbage")) { @@ -585,7 +597,266 @@ void LogDev::rollback(logstore_id_t store_id, logid_range_t id_range) { m_logdev_meta.add_rollback_record(store_id, id_range, true); } -void LogDev::get_status(const int verbosity, nlohmann::json& js) const { +/////////////////////////////// LogStore Section /////////////////////////////////////// + +void LogDev::handle_unopened_log_stores(bool format) { + for (auto it{std::begin(m_unopened_store_io)}; it != std::end(m_unopened_store_io); ++it) { + LOGINFO("skip log entries for store id {}-{}, ios {}", m_logdev_id, it->first, it->second); + } + m_unopened_store_io.clear(); + + // If there are any unopened storeids found, loop and check again if they are indeed open later. Unopened log store + // could be possible if the ids are deleted, but it is delayed to remove from store id reserver. In that case, + // do the remove from store id reserver now. + // TODO: At present we are assuming all unopened store ids could be removed. In future have a callback to this + // start routine, which takes the list of unopened store ids and can return a new set, which can be removed. + { + folly::SharedMutexWritePriority::WriteHolder holder(m_store_map_mtx); + for (auto it{std::begin(m_unopened_store_id)}; it != std::end(m_unopened_store_id);) { + if (m_id_logstore_map.find(*it) == m_id_logstore_map.end()) { + // Not opened even on second time check, simply unreserve id + unreserve_store_id(*it); + } + it = m_unopened_store_id.erase(it); + } + } +} + +std::shared_ptr< HomeLogStore > LogDev::create_new_log_store(bool append_mode) { + auto const store_id = reserve_store_id(); + std::shared_ptr< HomeLogStore > lstore; + lstore = std::make_shared< HomeLogStore >(shared_from_this(), store_id, append_mode, 0); + + { + folly::SharedMutexWritePriority::WriteHolder holder(m_store_map_mtx); + const auto it = m_id_logstore_map.find(store_id); + HS_REL_ASSERT((it == m_id_logstore_map.end()), "store_id {}-{} already exists", m_logdev_id, store_id); + m_id_logstore_map.insert(std::pair(store_id, logstore_info{.log_store = lstore, .append_mode = append_mode})); + } + LOGINFO("Created log store id {}-{}", m_logdev_id, store_id); + return lstore; +} + +folly::Future< shared< HomeLogStore > > LogDev::open_log_store(logstore_id_t store_id, bool append_mode) { + folly::SharedMutexWritePriority::WriteHolder holder(m_store_map_mtx); + auto it = m_id_logstore_map.find(store_id); + if (it == m_id_logstore_map.end()) { + bool happened; + std::tie(it, happened) = m_id_logstore_map.insert(std::pair(store_id, + logstore_info{ + .log_store = nullptr, + .append_mode = append_mode, + })); + HS_REL_ASSERT_EQ(happened, true, "Unable to insert logstore into id_logstore_map"); + } + return it->second.promise.getFuture(); +} + +void LogDev::remove_log_store(logstore_id_t store_id) { + LOGINFO("Removing log store id {}-{}", m_logdev_id, store_id); + { + folly::SharedMutexWritePriority::WriteHolder holder(m_store_map_mtx); + auto ret = m_id_logstore_map.erase(store_id); + HS_REL_ASSERT((ret == 1), "try to remove invalid store_id {}-{}", m_logdev_id, store_id); + } + unreserve_store_id(store_id); +} + +void LogDev::device_truncate_under_lock(const std::shared_ptr< truncate_req >& treq) { + run_under_flush_lock([this, treq]() { + iomanager.run_on_forget(logstore_service().truncate_thread(), [this, treq]() { + const logdev_key trunc_upto = do_device_truncate(treq->dry_run); + bool done{false}; + if (treq->cb || treq->wait_till_done) { + { + std::lock_guard< std::mutex > lk{treq->mtx}; + done = (--treq->trunc_outstanding == 0); + treq->m_trunc_upto_result[m_logdev_id] = trunc_upto; + } + } + if (done) { + if (treq->cb) { treq->cb(treq->m_trunc_upto_result); } + if (treq->wait_till_done) { treq->cv.notify_one(); } + } + unlock_flush(); + }); + return false; // Do not release the flush lock yet, the scheduler will unlock it. + }); +} + +void LogDev::on_log_store_found(logstore_id_t store_id, const logstore_superblk& sb) { + folly::SharedMutexWritePriority::ReadHolder holder(m_store_map_mtx); + auto it = m_id_logstore_map.find(store_id); + if (it == m_id_logstore_map.end()) { + LOGERROR("Store Id {}-{} found but not opened yet, it will be discarded after logstore is started", m_logdev_id, + store_id); + m_unopened_store_id.insert(store_id); + m_unopened_store_io.insert(std::make_pair<>(store_id, 0)); + return; + } + + LOGINFO("Found a logstore store_id={}-{} with start seq_num={}, Creating a new HomeLogStore instance", m_logdev_id, + store_id, sb.m_first_seq_num); + logstore_info& info = it->second; + info.log_store = + std::make_shared< HomeLogStore >(shared_from_this(), store_id, info.append_mode, sb.m_first_seq_num); + info.promise.setValue(info.log_store); +} + +static thread_local std::vector< std::shared_ptr< HomeLogStore > > s_cur_flush_batch_stores; + +void LogDev::on_io_completion(logstore_id_t id, logdev_key ld_key, logdev_key flush_ld_key, + uint32_t nremaining_in_batch, void* ctx) { + auto* req = s_cast< logstore_req* >(ctx); + HomeLogStore* log_store = req->log_store; + + if (req->is_write) { + HS_LOG_ASSERT_EQ(log_store->get_store_id(), id, "Expecting store id in log store and io completion to match"); + log_store->on_write_completion(req, ld_key); + on_batch_completion(log_store, nremaining_in_batch, flush_ld_key); + } else { + log_store->on_read_completion(req, ld_key); + } +} + +void LogDev::on_logfound(logstore_id_t id, logstore_seq_num_t seq_num, logdev_key ld_key, logdev_key flush_ld_key, + log_buffer buf, uint32_t nremaining_in_batch) { + HomeLogStore* log_store{nullptr}; + + { + folly::SharedMutexWritePriority::ReadHolder holder(m_store_map_mtx); + auto const it = m_id_logstore_map.find(id); + if (it == m_id_logstore_map.end()) { + auto [unopened_it, inserted] = m_unopened_store_io.insert(std::make_pair<>(id, 0)); + if (inserted) { + // HS_REL_ASSERT(0, "log id {}-{} not found", m_logdev_id, id); + } + ++unopened_it->second; + return; + } + log_store = it->second.log_store.get(); + } + if (!log_store) { return; } + log_store->on_log_found(seq_num, ld_key, flush_ld_key, buf); + on_batch_completion(log_store, nremaining_in_batch, flush_ld_key); +} + +void LogDev::on_batch_completion(HomeLogStore* log_store, uint32_t nremaining_in_batch, logdev_key flush_ld_key) { + /* check if it is a first update on this log store */ + auto id = log_store->get_store_id(); + const auto it = m_last_flush_info.find(id); + if ((it == std::end(m_last_flush_info)) || (it->second != flush_ld_key.idx)) { + // first time completion in this batch for a given store_id + m_last_flush_info.insert_or_assign(id, flush_ld_key.idx); + if (it == std::end(m_last_flush_info)) { s_cur_flush_batch_stores.push_back(log_store->shared_from_this()); } + } + if (nremaining_in_batch == 0) { + // This batch is completed, call all log stores participated in this batch about the end of batch + HS_LOG_ASSERT_GT(s_cur_flush_batch_stores.size(), 0U, "Expecting one store to be flushed in batch"); + + for (auto& l : s_cur_flush_batch_stores) { + l->on_batch_completion(flush_ld_key); + } + s_cur_flush_batch_stores.clear(); + m_last_flush_info.clear(); + } +} + +logdev_key LogDev::do_device_truncate(bool dry_run) { + static thread_local std::vector< std::shared_ptr< HomeLogStore > > m_min_trunc_stores; + static thread_local std::vector< std::shared_ptr< HomeLogStore > > m_non_participating_stores; + + m_min_trunc_stores.clear(); + m_non_participating_stores.clear(); + logdev_key min_safe_ld_key = logdev_key::out_of_bound_ld_key(); + + std::string dbg_str{"Format [store_id:trunc_lsn:logidx:dev_trunc_pending?:active_writes_in_trucate?] "}; + + { + folly::SharedMutexWritePriority::ReadHolder holder(m_store_map_mtx); + for (auto& id_logstore : m_id_logstore_map) { + auto& store_ptr = id_logstore.second.log_store; + const auto& trunc_info = store_ptr->pre_device_truncation(); + + if (!trunc_info.pending_dev_truncation && !trunc_info.active_writes_not_part_of_truncation) { + // This log store neither has any pending device truncation nor active logstore io going on for now. + // Ignore this log store for min safe boundary calculation. + fmt::format_to(std::back_inserter(dbg_str), "[{}:None] ", store_ptr->get_store_id()); + m_non_participating_stores.push_back(store_ptr); + continue; + } + + fmt::format_to(std::back_inserter(dbg_str), "[{}:{}:{}:{}:{}] ", store_ptr->get_store_id(), + trunc_info.seq_num.load(), trunc_info.ld_key.idx, trunc_info.pending_dev_truncation, + trunc_info.active_writes_not_part_of_truncation); + if (trunc_info.ld_key.idx > min_safe_ld_key.idx) { continue; } + + if (trunc_info.ld_key.idx < min_safe_ld_key.idx) { + // New minimum safe l entry + min_safe_ld_key = trunc_info.ld_key; + m_min_trunc_stores.clear(); + } + m_min_trunc_stores.push_back(store_ptr); + } + } + + if ((min_safe_ld_key == logdev_key::out_of_bound_ld_key()) || (min_safe_ld_key.idx < 0)) { + HS_PERIODIC_LOG( + INFO, logstore, + "[Logdev={}] No log store append on any log stores, skipping device truncation, all_logstore_info:<{}>", + m_logdev_id, dbg_str); + return min_safe_ld_key; + } + + // Got the safest log id to truncate and actually truncate upto the safe log idx to the log device + if (!dry_run) { truncate(min_safe_ld_key); } + HS_PERIODIC_LOG(INFO, logstore, + "[Logdev={}] LogDevice truncate, all_logstore_info:<{}> safe log dev key to truncate={}", + m_logdev_id, dbg_str, min_safe_ld_key); + + // We call post device truncation only to the log stores whose prepared truncation points are fully + // truncated or to stores which didn't particpate in this device truncation. + for (auto& store_ptr : m_min_trunc_stores) { + store_ptr->post_device_truncation(min_safe_ld_key); + } + for (auto& store_ptr : m_non_participating_stores) { + store_ptr->post_device_truncation(min_safe_ld_key); + } + m_min_trunc_stores.clear(); // Not clearing here, would cause a shared_ptr ref holding. + m_non_participating_stores.clear(); + + return min_safe_ld_key; +} + +nlohmann::json LogDev::dump_log_store(const log_dump_req& dump_req) { + nlohmann::json json_dump{}; // create root object + if (dump_req.log_store == nullptr) { + folly::SharedMutexWritePriority::ReadHolder holder(m_store_map_mtx); + for (auto& id_logstore : m_id_logstore_map) { + auto store_ptr{id_logstore.second.log_store}; + const std::string id{std::to_string(store_ptr->get_store_id())}; + // must use operator= construction as copy construction results in error + nlohmann::json val = store_ptr->dump_log_store(dump_req); + json_dump[id] = std::move(val); + } + } else { + const std::string id{std::to_string(dump_req.log_store->get_store_id())}; + // must use operator= construction as copy construction results in error + nlohmann::json val = dump_req.log_store->dump_log_store(dump_req); + json_dump[id] = std::move(val); + } + return json_dump; +} + +nlohmann::json LogDev::get_status(int verbosity) const { + nlohmann::json js; + auto unopened = nlohmann::json::array(); + for (const auto& l : m_unopened_store_id) { + unopened.push_back(l); + } + js["logstores_unopened"] = std::move(unopened); + + // Logdev status js["current_log_idx"] = m_log_idx.load(std::memory_order_relaxed); js["last_flush_log_idx"] = m_last_flush_idx; js["last_truncate_log_idx"] = m_last_truncate_idx; @@ -596,27 +867,21 @@ void LogDev::get_status(const int verbosity, nlohmann::json& js) const { js["logdev_sb_start_offset"] = m_logdev_meta.get_start_dev_offset(); js["logdev_sb_num_stores_reserved"] = m_logdev_meta.num_stores_reserved(); } + + // All logstores + { + folly::SharedMutexWritePriority::ReadHolder holder(m_store_map_mtx); + for (const auto& [id, lstore] : m_id_logstore_map) { + js["logstore_id_" + std::to_string(id)] = lstore.log_store->get_status(verbosity); + } + } + return js; } /////////////////////////////// LogDevMetadata Section /////////////////////////////////////// -LogDevMetadata::LogDevMetadata(const std::string& logdev_name) : - m_sb{logdev_name + "_logdev_sb"}, m_rollback_sb{logdev_name + "_rollback_sb"} { - meta_service().register_handler( - logdev_name + "_logdev_sb", - [this](meta_blk* mblk, sisl::byte_view buf, size_t size) { - logdev_super_blk_found(std::move(buf), voidptr_cast(mblk)); - }, - nullptr); - - meta_service().register_handler( - logdev_name + "_rollback_sb", - [this](meta_blk* mblk, sisl::byte_view buf, size_t size) { - rollback_super_blk_found(std::move(buf), voidptr_cast(mblk)); - }, - nullptr); -} +LogDevMetadata::LogDevMetadata() : m_sb{logdev_sb_meta_name}, m_rollback_sb{logdev_rollback_sb_meta_name} {} -logdev_superblk* LogDevMetadata::create() { +logdev_superblk* LogDevMetadata::create(logdev_id_t id) { logdev_superblk* sb = m_sb.create(logdev_sb_size_needed(0)); rollback_superblk* rsb = m_rollback_sb.create(rollback_superblk::size_needed(1)); @@ -624,7 +889,10 @@ logdev_superblk* LogDevMetadata::create() { std::fill_n(sb_area, store_capacity(), logstore_superblk::default_value()); m_id_reserver = std::make_unique< sisl::IDReserver >(); + m_sb->logdev_id = id; m_sb.write(); + + m_rollback_sb->logdev_id = id; m_rollback_sb.write(); return sb; } @@ -787,6 +1055,7 @@ void LogDevMetadata::remove_rollback_record_upto(logid_t upto_id, bool persist_n uint32_t n_removed{0}; for (auto i = m_rollback_sb->num_records; i > 0; --i) { auto& rec = m_rollback_sb->at(i - 1); + LOGINFO("Removing record sb {} {}", rec.idx_range.second, upto_id); if (rec.idx_range.second <= upto_id) { m_rollback_sb->remove_ith_record(i - 1); ++n_removed; @@ -795,6 +1064,7 @@ void LogDevMetadata::remove_rollback_record_upto(logid_t upto_id, bool persist_n if (n_removed) { for (auto it = m_rollback_info.begin(); it != m_rollback_info.end();) { + LOGINFO("Removing info {} {}", it->second.second, upto_id); if (it->second.second <= upto_id) { it = m_rollback_info.erase(it); } else { diff --git a/src/lib/logstore/log_dev.hpp b/src/lib/logstore/log_dev.hpp index a417c1fc6..bfb2afb1a 100644 --- a/src/lib/logstore/log_dev.hpp +++ b/src/lib/logstore/log_dev.hpp @@ -30,6 +30,7 @@ #include #include #include +#include #include #include @@ -416,6 +417,7 @@ struct logdev_superblk { uint32_t magic{LOGDEV_SB_MAGIC}; uint32_t version{LOGDEV_SB_VERSION}; + logdev_id_t logdev_id{0}; uint32_t num_stores{0}; uint64_t start_dev_offset{0}; logid_t key_idx{0}; @@ -455,6 +457,7 @@ struct rollback_superblk { uint32_t magic{ROLLBACK_SB_MAGIC}; uint32_t version{ROLLBACK_SB_VERSION}; + logdev_id_t logdev_id{0}; uint32_t num_records{0}; uint32_t get_magic() const { return magic; } @@ -488,14 +491,14 @@ class LogDevMetadata { friend class LogDev; public: - LogDevMetadata(const std::string& logdev_name); + LogDevMetadata(); LogDevMetadata(const LogDevMetadata&) = delete; LogDevMetadata& operator=(const LogDevMetadata&) = delete; LogDevMetadata(LogDevMetadata&&) noexcept = delete; LogDevMetadata& operator=(LogDevMetadata&&) noexcept = delete; ~LogDevMetadata() = default; - logdev_superblk* create(); + logdev_superblk* create(logdev_id_t id); void reset(); std::vector< std::pair< logstore_id_t, logstore_superblk > > load(); void persist(); @@ -522,11 +525,12 @@ class LogDevMetadata { uint32_t num_rollback_records(logstore_id_t store_id) const; bool is_rolled_back(logstore_id_t store_id, logid_t logid) const; + void logdev_super_blk_found(const sisl::byte_view& buf, void* meta_cookie); + void rollback_super_blk_found(const sisl::byte_view& buf, void* meta_cookie); + private: bool resize_logdev_sb_if_needed(); bool resize_rollback_sb_if_needed(); - void logdev_super_blk_found(const sisl::byte_view& buf, void* meta_cookie); - void rollback_super_blk_found(const sisl::byte_view& buf, void* meta_cookie); uint32_t logdev_sb_size_needed(uint32_t nstores) const { return sizeof(logdev_superblk) + (nstores * sizeof(logstore_superblk)); @@ -574,9 +578,25 @@ class log_stream_reader { uint64_t m_read_size_multiple; }; -struct meta_blk; +struct logstore_info { + std::shared_ptr< HomeLogStore > log_store; + bool append_mode; + folly::SharedPromise< std::shared_ptr< HomeLogStore > > promise{}; +}; +struct truncate_req { + std::mutex mtx; + std::condition_variable cv; + bool wait_till_done{false}; + bool dry_run{false}; + device_truncate_cb_t cb; + std::unordered_map< logdev_id_t, logdev_key > m_trunc_upto_result; + int trunc_outstanding{0}; +}; + +static std::string const logdev_sb_meta_name{"Logdev_sb"}; +static std::string const logdev_rollback_sb_meta_name{"Logdev_rollback_sb"}; -class LogDev { +class LogDev : public std::enable_shared_from_this< LogDev > { friend class HomeLogStore; public: @@ -592,7 +612,7 @@ class LogDev { return HS_DYNAMIC_CONFIG(logstore.flush_threshold_size) - sizeof(log_group_header); } - LogDev(logstore_family_id_t f_id, const std::string& metablk_name); + LogDev(logdev_id_t logdev_id); LogDev(const LogDev&) = delete; LogDev& operator=(const LogDev&) = delete; LogDev(LogDev&&) noexcept = delete; @@ -653,37 +673,6 @@ class LogDev { */ void load(uint64_t offset); - /** - * @brief Register the callback to receive upon completion of append. - * NOTE: This method is not thread safe. - * - * @param cb Callback to call upon completion of append. It will call with 2 parameters - * a) logdev_key: The key to access the log dev data. It can be treated as opaque (internally has log_id and device - * offset) - * b) Context: The context which was passed to append method. - */ - void register_append_cb(const log_append_comp_callback& cb) { m_append_comp_cb = cb; } - - /** - * @brief Register the callback to receive new logs during recovery from the device. - * NOTE: This method is not thread safe. - * - * @param cb Callback to call upon completion of append. It will call with 3 parameters - * a) logdev_key: The key to access the log dev data, which is retrieved from the device. It can be treated as - * opaque (internally has log_id and device offset) b) log_buffer: Opaque structure which contains the data and size - * of the log which key refers to. The underlying buffer it returns is ref counted and hence it need not be - * explicitly freed. - */ - void register_logfound_cb(const log_found_callback& cb) { m_logfound_cb = cb; } - - /** - * @brief Register the callback when a store is found during loading phase - * - * @param cb This callback is called only during load phase where it found a log store. The parameter is a store id - * used to register earlier. - */ - void register_store_found_cb(const store_found_callback& cb) { m_store_found_cb = cb; } - // callback from blkstore, registered at vdev creation; // void process_logdev_completions(const boost::intrusive_ptr< virtualdev_req >& vd_req); @@ -760,7 +749,9 @@ class LogDev { void update_store_superblk(logstore_id_t idx, const logstore_superblk& meta, bool persist_now); - void get_status(int verbosity, nlohmann::json& out_json) const; + nlohmann::json dump_log_store(const log_dump_req& dum_req); + nlohmann::json get_status(int verbosity) const; + bool flush_if_needed(int64_t threshold_size = -1); bool is_aligned_buf_needed(size_t size) const { @@ -773,6 +764,26 @@ class LogDev { LogDevMetadata& log_dev_meta() { return m_logdev_meta; } static bool can_flush_in_this_thread(); + // Logstore management. + std::shared_ptr< HomeLogStore > create_new_log_store(bool append_mode = false); + folly::Future< shared< HomeLogStore > > open_log_store(logstore_id_t store_id, bool append_mode); + bool close_log_store(logstore_id_t store_id) { + // TODO: Implement this method + return true; + } + void remove_log_store(logstore_id_t store_id); + void on_io_completion(logstore_id_t id, logdev_key ld_key, logdev_key flush_idx, uint32_t nremaining_in_batch, + void* ctx); + void on_log_store_found(logstore_id_t store_id, const logstore_superblk& sb); + void on_logfound(logstore_id_t id, logstore_seq_num_t seq_num, logdev_key ld_key, logdev_key flush_ld_key, + log_buffer buf, uint32_t nremaining_in_batch); + void on_batch_completion(HomeLogStore* log_store, uint32_t nremaining_in_batch, logdev_key flush_ld_key); + void device_truncate_under_lock(const std::shared_ptr< truncate_req >& treq); + logdev_key do_device_truncate(bool dry_run = false); + void handle_unopened_log_stores(bool format); + + logdev_id_t get_id() { return m_logdev_id; } + private: LogGroup* make_log_group(uint32_t estimated_records) { m_log_group_pool[m_log_group_idx].reset(estimated_records); @@ -806,11 +817,17 @@ class LogDev { std::atomic< int64_t > m_pending_flush_size{0}; // How much flushable logs are pending std::atomic< bool > m_is_flushing{false}; // Is LogDev currently flushing (so far supports one flusher at a time) bool m_stopped{false}; // Is Logdev stopped. We don't need lock here, because it is updated under flush lock - logstore_family_id_t m_family_id; // The family id this logdev is part of + logdev_id_t m_logdev_id; JournalVirtualDev* m_vdev{nullptr}; shared< JournalVirtualDev::Descriptor > m_vdev_jd; // Journal descriptor. HomeStoreSafePtr m_hs; // Back pointer to homestore + folly::SharedMutexWritePriority m_store_map_mtx; + std::unordered_map< logstore_id_t, logstore_info > m_id_logstore_map; + std::unordered_map< logstore_id_t, uint64_t > m_unopened_store_io; + std::unordered_set< logstore_id_t > m_unopened_store_id; + std::unordered_map< logstore_id_t, logid_t > m_last_flush_info; + std::multimap< logid_t, logstore_id_t > m_garbage_store_ids; Clock::time_point m_last_flush_time; diff --git a/src/lib/logstore/log_store.cpp b/src/lib/logstore/log_store.cpp index f51e29944..089bb3286 100644 --- a/src/lib/logstore/log_store.cpp +++ b/src/lib/logstore/log_store.cpp @@ -23,7 +23,6 @@ #include #include #include "common/homestore_assert.hpp" -#include "log_store_family.hpp" #include "log_dev.hpp" namespace homestore { @@ -33,17 +32,17 @@ SISL_LOGGING_DECL(logstore) #define THIS_LOGSTORE_PERIODIC_LOG(level, msg, ...) \ HS_PERIODIC_DETAILED_LOG(level, logstore, "store", m_fq_name, , , msg, __VA_ARGS__) -HomeLogStore::HomeLogStore(LogStoreFamily& family, logstore_id_t id, bool append_mode, logstore_seq_num_t start_lsn) : +HomeLogStore::HomeLogStore(std::shared_ptr< LogDev > logdev, logstore_id_t id, bool append_mode, + logstore_seq_num_t start_lsn) : m_store_id{id}, - m_logstore_family{family}, - m_logdev{family.logdev()}, + m_logdev{logdev}, m_records{"HomeLogStoreRecords", start_lsn - 1}, m_append_mode{append_mode}, m_seq_num{start_lsn}, - m_fq_name{fmt::format("{}.{}", family.get_family_id(), id)}, + m_fq_name{fmt::format("{}.{}", logdev->get_id(), id)}, m_metrics{logstore_service().metrics()} { m_truncation_barriers.reserve(10000); - m_safe_truncation_boundary.ld_key = m_logdev.get_last_flush_ld_key(); + m_safe_truncation_boundary.ld_key = m_logdev->get_last_flush_ld_key(); m_safe_truncation_boundary.seq_num.store(start_lsn - 1, std::memory_order_release); } @@ -83,7 +82,7 @@ void HomeLogStore::write_async(logstore_req* req, const log_req_comp_cb_t& cb) { HS_LOG_ASSERT((cb || m_comp_cb), "Expected either cb is not null or default cb registered"); req->cb = (cb ? cb : m_comp_cb); req->start_time = Clock::now(); - if (req->seq_num == 0) { m_safe_truncation_boundary.ld_key = m_logdev.get_last_flush_ld_key(); } + if (req->seq_num == 0) { m_safe_truncation_boundary.ld_key = m_logdev->get_last_flush_ld_key(); } #ifndef NDEBUG const auto trunc_upto_lsn = truncated_upto(); if (req->seq_num <= trunc_upto_lsn) { @@ -96,7 +95,7 @@ void HomeLogStore::write_async(logstore_req* req, const log_req_comp_cb_t& cb) { m_records.create(req->seq_num); COUNTER_INCREMENT(m_metrics, logstore_append_count, 1); HISTOGRAM_OBSERVE(m_metrics, logstore_record_size, req->data.size()); - m_logdev.append_async(m_store_id, req->seq_num, req->data, static_cast< void* >(req)); + m_logdev->append_async(m_store_id, req->seq_num, req->data, static_cast< void* >(req)); } void HomeLogStore::write_async(logstore_seq_num_t seq_num, const sisl::io_blob& b, void* cookie, @@ -141,7 +140,7 @@ log_buffer HomeLogStore::read_sync(logstore_seq_num_t seq_num) { // ld_key.idx, ld_key.dev_offset); COUNTER_INCREMENT(m_metrics, logstore_read_count, 1); serialized_log_record header; - const auto b = m_logdev.read(ld_key, header); + const auto b = m_logdev->read(ld_key, header); HISTOGRAM_OBSERVE(m_metrics, logstore_read_latency, get_elapsed_time_us(start_time)); return b; } @@ -152,7 +151,7 @@ void HomeLogStore::read_async(logstore_req* req, const log_found_cb_t& cb) { auto record = m_records.at(req->seq_num); logdev_key ld_key = record.m_dev_key; req->cb = cb; - m_logdev.read_async(ld_key, (void*)req); + m_logdev->read_async(ld_key, (void*)req); } void HomeLogStore::read_async(logstore_seq_num_t seq_num, void* cookie, const log_found_cb_t& cb) { @@ -243,7 +242,7 @@ void HomeLogStore::truncate(logstore_seq_num_t upto_seq_num, bool in_memory_trun // First try to block the flushing of logdevice and if we are successfully able to do, then auto shared_this = shared_from_this(); - m_logdev.run_under_flush_lock([shared_this, upto_seq_num]() { + m_logdev->run_under_flush_lock([shared_this, upto_seq_num]() { shared_this->do_truncate(upto_seq_num); return true; }); @@ -255,7 +254,7 @@ void HomeLogStore::do_truncate(logstore_seq_num_t upto_seq_num) { m_safe_truncation_boundary.seq_num.store(upto_seq_num, std::memory_order_release); // Need to update the superblock with meta, we don't persist yet, will be done as part of log dev truncation - m_logdev.update_store_superblk(m_store_id, logstore_superblk{upto_seq_num + 1}, false /* persist_now */); + m_logdev->update_store_superblk(m_store_id, logstore_superblk{upto_seq_num + 1}, false /* persist_now */); const int ind = search_max_le(upto_seq_num); if (ind < 0) { @@ -346,7 +345,7 @@ nlohmann::json HomeLogStore::dump_log_store(const log_dump_req& dump_req) { nlohmann::json json_val = nlohmann::json::object(); serialized_log_record record_header; - const auto log_buffer{m_logdev.read(record.m_dev_key, record_header)}; + const auto log_buffer{m_logdev->read(record.m_dev_key, record_header)}; try { json_val["size"] = static_cast< uint32_t >(record_header.size); @@ -376,7 +375,7 @@ void HomeLogStore::foreach (int64_t start_idx, const std::function< bool(logstor m_records.foreach_all_completed(start_idx, [&](int64_t cur_idx, homestore::logstore_record& record) -> bool { // do a sync read serialized_log_record header; - auto log_buf = m_logdev.read(record.m_dev_key, header); + auto log_buf = m_logdev->read(record.m_dev_key, header); return cb(cur_idx, log_buf); }); } @@ -412,7 +411,7 @@ void HomeLogStore::flush_sync(logstore_seq_num_t upto_seq_num) { if (!m_records.status(upto_seq_num).is_active) { return; } // Step 3: Force a flush (with least threshold) - m_logdev.flush_if_needed(1); + m_logdev->flush_if_needed(1); // Step 4: Wait for completion m_sync_flush_cv.wait(lk, [this, upto_seq_num] { return !m_records.status(upto_seq_num).is_active; }); @@ -445,10 +444,10 @@ uint64_t HomeLogStore::rollback_async(logstore_seq_num_t to_lsn, on_rollback_cb_ m_records.at(from_lsn).m_dev_key.idx); // Get the logid range to rollback m_records.rollback(to_lsn); // Rollback all bitset records and from here on, we can't access any lsns beyond to_lsn - m_logdev.run_under_flush_lock([logid_range, to_lsn, this, comp_cb = std::move(cb)]() { + m_logdev->run_under_flush_lock([logid_range, to_lsn, this, comp_cb = std::move(cb)]() { iomanager.run_on_forget(logstore_service().truncate_thread(), [logid_range, to_lsn, this, comp_cb]() { // Rollback the log_ids in the range, for this log store (which persists this info in its superblk) - m_logdev.rollback(m_store_id, logid_range); + m_logdev->rollback(m_store_id, logid_range); // Remove all truncation barriers on rolled back lsns for (auto it = std::rbegin(m_truncation_barriers); it != std::rend(m_truncation_barriers); ++it) { @@ -460,7 +459,7 @@ uint64_t HomeLogStore::rollback_async(logstore_seq_num_t to_lsn, on_rollback_cb_ } m_flush_batch_max_lsn = invalid_lsn(); // Reset the flush batch for next batch. if (comp_cb) { comp_cb(to_lsn); } - m_logdev.unlock_flush(); + m_logdev->unlock_flush(); }); return false; }); @@ -478,7 +477,7 @@ nlohmann::json HomeLogStore::get_status(int verbosity) const { js["truncation_pending_on_device?"] = m_safe_truncation_boundary.pending_dev_truncation; js["truncation_parallel_to_writes?"] = m_safe_truncation_boundary.active_writes_not_part_of_truncation; js["logstore_records"] = m_records.get_status(verbosity); - js["logstore_sb_first_lsn"] = m_logdev.log_dev_meta().store_superblk(m_store_id).m_first_seq_num; + js["logstore_sb_first_lsn"] = m_logdev->log_dev_meta().store_superblk(m_store_id).m_first_seq_num; return js; } diff --git a/src/lib/logstore/log_store_family.cpp b/src/lib/logstore/log_store_family.cpp deleted file mode 100644 index 33e1e4313..000000000 --- a/src/lib/logstore/log_store_family.cpp +++ /dev/null @@ -1,336 +0,0 @@ -/********************************************************************************* - * Modifications Copyright 2017-2019 eBay Inc. - * - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed - * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - * - *********************************************************************************/ -#include -#include - -#include -#include -#include - -#include -#include - -#include "common/homestore_assert.hpp" -#include "log_store_family.hpp" -#include "log_dev.hpp" - -namespace homestore { -SISL_LOGGING_DECL(logstore) - -LogStoreFamily::LogStoreFamily(logstore_family_id_t f_id) : - m_family_id{f_id}, m_name{std::string("LogDevFamily") + std::to_string(f_id)}, m_log_dev{f_id, m_name} {} - -void LogStoreFamily::start(bool format, JournalVirtualDev* blk_store) { - m_log_dev.register_store_found_cb(bind_this(LogStoreFamily::on_log_store_found, 2)); - m_log_dev.register_append_cb(bind_this(LogStoreFamily::on_io_completion, 5)); - m_log_dev.register_logfound_cb(bind_this(LogStoreFamily::on_logfound, 6)); - - // Start the logdev, which loads the device in case of recovery. - m_log_dev.start(format, blk_store); - for (auto it{std::begin(m_unopened_store_io)}; it != std::end(m_unopened_store_io); ++it) { - LOGINFO("skip log entries for store id {}-{}, ios {}", m_family_id, it->first, it->second); - } - m_unopened_store_io.clear(); - - // If there are any unopened storeids found, loop and check again if they are indeed open later. Unopened log store - // could be possible if the ids are deleted, but it is delayed to remove from store id reserver. In that case, - // do the remove from store id reserver now. - // TODO: At present we are assuming all unopened store ids could be removed. In future have a callback to this - // start routine, which takes the list of unopened store ids and can return a new set, which can be removed. - { - folly::SharedMutexWritePriority::WriteHolder holder(m_store_map_mtx); - for (auto it{std::begin(m_unopened_store_id)}; it != std::end(m_unopened_store_id);) { - if (m_id_logstore_map.find(*it) == m_id_logstore_map.end()) { - // Not opened even on second time check, simply unreserve id - m_log_dev.unreserve_store_id(*it); - } - it = m_unopened_store_id.erase(it); - } - - // Also call the logstore to inform that start/replay is completed. - if (!format) { - for (auto& p : m_id_logstore_map) { - auto& lstore{p.second.log_store}; - if (lstore && lstore->get_log_replay_done_cb()) { - lstore->get_log_replay_done_cb()(lstore, lstore->seq_num() - 1); - lstore->truncate(lstore->truncated_upto()); - } - } - } - } -} - -void LogStoreFamily::stop() { - { - folly::SharedMutexWritePriority::WriteHolder holder(m_store_map_mtx); - m_id_logstore_map.clear(); - } - m_log_dev.stop(); -} - -std::shared_ptr< HomeLogStore > LogStoreFamily::create_new_log_store(bool append_mode) { - auto const store_id = m_log_dev.reserve_store_id(); - std::shared_ptr< HomeLogStore > lstore; - lstore = std::make_shared< HomeLogStore >(*this, store_id, append_mode, 0); - - { - folly::SharedMutexWritePriority::WriteHolder holder(m_store_map_mtx); - const auto it = m_id_logstore_map.find(store_id); - HS_REL_ASSERT((it == m_id_logstore_map.end()), "store_id {}-{} already exists", m_family_id, store_id); - m_id_logstore_map.insert(std::pair(store_id, logstore_info{.log_store = lstore, .append_mode = append_mode})); - } - LOGINFO("Created log store id {}-{}", m_family_id, store_id); - return lstore; -} - -folly::Future< shared< HomeLogStore > > LogStoreFamily::open_log_store(logstore_id_t store_id, bool append_mode) { - folly::SharedMutexWritePriority::WriteHolder holder(m_store_map_mtx); - auto it = m_id_logstore_map.find(store_id); - if (it == m_id_logstore_map.end()) { - bool happened; - std::tie(it, happened) = m_id_logstore_map.insert(std::pair(store_id, - logstore_info{ - .log_store = nullptr, - .append_mode = append_mode, - })); - HS_REL_ASSERT_EQ(happened, true, "Unable to insert logstore into id_logstore_map"); - } - return it->second.promise.getFuture(); -} - -void LogStoreFamily::remove_log_store(logstore_id_t store_id) { - LOGINFO("Removing log store id {}-{}", m_family_id, store_id); - - { - folly::SharedMutexWritePriority::WriteHolder holder(m_store_map_mtx); - auto ret = m_id_logstore_map.erase(store_id); - HS_REL_ASSERT((ret == 1), "try to remove invalid store_id {}-{}", m_family_id, store_id); - } - m_log_dev.unreserve_store_id(store_id); -} - -void LogStoreFamily::device_truncate(const std::shared_ptr< truncate_req >& treq) { - m_log_dev.run_under_flush_lock([this, treq]() { - iomanager.run_on_forget(logstore_service().truncate_thread(), [this, treq]() { - const logdev_key trunc_upto = do_device_truncate(treq->dry_run); - bool done{false}; - - if (treq->cb || treq->wait_till_done) { - { - std::lock_guard< std::mutex > lk{treq->mtx}; - done = (--treq->trunc_outstanding == 0); - treq->m_trunc_upto_result[m_family_id] = trunc_upto; - } - } - if (done) { - if (treq->cb) { treq->cb(treq->m_trunc_upto_result); } - if (treq->wait_till_done) { - std::lock_guard< std::mutex > lk{treq->mtx}; - treq->cv.notify_one(); - } - } - m_log_dev.unlock_flush(); - }); - return false; // Do not release the flush lock yet, the scheduler will unlock it. - }); -} - -void LogStoreFamily::on_log_store_found(logstore_id_t store_id, const logstore_superblk& sb) { - folly::SharedMutexWritePriority::ReadHolder holder(m_store_map_mtx); - auto it = m_id_logstore_map.find(store_id); - if (it == m_id_logstore_map.end()) { - LOGERROR("Store Id {}-{} found but not opened yet, it will be discarded after logstore is started", m_family_id, - store_id); - m_unopened_store_id.insert(store_id); - m_unopened_store_io.insert(std::make_pair<>(store_id, 0)); - return; - } - - LOGINFO("Found a logstore store_id={}-{} with start seq_num={}, Creating a new HomeLogStore instance", m_family_id, - store_id, sb.m_first_seq_num); - logstore_info& info = it->second; - info.log_store = std::make_shared< HomeLogStore >(*this, store_id, info.append_mode, sb.m_first_seq_num); - info.promise.setValue(info.log_store); -} - -static thread_local std::vector< std::shared_ptr< HomeLogStore > > s_cur_flush_batch_stores; - -void LogStoreFamily::on_io_completion(logstore_id_t id, logdev_key ld_key, logdev_key flush_ld_key, - uint32_t nremaining_in_batch, void* ctx) { - auto* req = s_cast< logstore_req* >(ctx); - HomeLogStore* log_store = req->log_store; - - if (req->is_write) { - HS_LOG_ASSERT_EQ(log_store->get_store_id(), id, "Expecting store id in log store and io completion to match"); - log_store->on_write_completion(req, ld_key); - on_batch_completion(log_store, nremaining_in_batch, flush_ld_key); - } else { - log_store->on_read_completion(req, ld_key); - } -} - -void LogStoreFamily::on_logfound(logstore_id_t id, logstore_seq_num_t seq_num, logdev_key ld_key, - logdev_key flush_ld_key, log_buffer buf, uint32_t nremaining_in_batch) { - HomeLogStore* log_store{nullptr}; - - { - folly::SharedMutexWritePriority::ReadHolder holder(m_store_map_mtx); - auto const it = m_id_logstore_map.find(id); - if (it == m_id_logstore_map.end()) { - auto [unopened_it, inserted] = m_unopened_store_io.insert(std::make_pair<>(id, 0)); - if (inserted) { - // HS_REL_ASSERT(0, "log id {}-{} not found", m_family_id, id); - } - ++unopened_it->second; - return; - } - log_store = it->second.log_store.get(); - } - if (!log_store) { return; } - log_store->on_log_found(seq_num, ld_key, flush_ld_key, buf); - on_batch_completion(log_store, nremaining_in_batch, flush_ld_key); -} - -void LogStoreFamily::on_batch_completion(HomeLogStore* log_store, uint32_t nremaining_in_batch, - logdev_key flush_ld_key) { - - /* check if it is a first update on this log store */ - auto id = log_store->get_store_id(); - const auto it = m_last_flush_info.find(id); - if ((it == std::end(m_last_flush_info)) || (it->second != flush_ld_key.idx)) { - // first time completion in this batch for a given store_id - m_last_flush_info.insert_or_assign(id, flush_ld_key.idx); - if (it == std::end(m_last_flush_info)) { s_cur_flush_batch_stores.push_back(log_store->shared_from_this()); } - } - if (nremaining_in_batch == 0) { - // This batch is completed, call all log stores participated in this batch about the end of batch - HS_LOG_ASSERT_GT(s_cur_flush_batch_stores.size(), 0U, "Expecting one store to be flushed in batch"); - - for (auto& l : s_cur_flush_batch_stores) { - l->on_batch_completion(flush_ld_key); - } - s_cur_flush_batch_stores.clear(); - m_last_flush_info.clear(); - } -} - -logdev_key LogStoreFamily::do_device_truncate(bool dry_run) { - static thread_local std::vector< std::shared_ptr< HomeLogStore > > m_min_trunc_stores; - static thread_local std::vector< std::shared_ptr< HomeLogStore > > m_non_participating_stores; - - m_min_trunc_stores.clear(); - m_non_participating_stores.clear(); - logdev_key min_safe_ld_key = logdev_key::out_of_bound_ld_key(); - - std::string dbg_str{"Format [store_id:trunc_lsn:logidx:dev_trunc_pending?:active_writes_in_trucate?] "}; - - { - folly::SharedMutexWritePriority::ReadHolder holder(m_store_map_mtx); - for (auto& id_logstore : m_id_logstore_map) { - auto& store_ptr = id_logstore.second.log_store; - const auto& trunc_info = store_ptr->pre_device_truncation(); - - if (!trunc_info.pending_dev_truncation && !trunc_info.active_writes_not_part_of_truncation) { - // This log store neither has any pending device truncation nor active logstore io going on for now. - // Ignore this log store for min safe boundary calculation. - fmt::format_to(std::back_inserter(dbg_str), "[{}:None] ", store_ptr->get_store_id()); - m_non_participating_stores.push_back(store_ptr); - continue; - } - - fmt::format_to(std::back_inserter(dbg_str), "[{}:{}:{}:{}:{}] ", store_ptr->get_store_id(), - trunc_info.seq_num.load(), trunc_info.ld_key.idx, trunc_info.pending_dev_truncation, - trunc_info.active_writes_not_part_of_truncation); - if (trunc_info.ld_key.idx > min_safe_ld_key.idx) { continue; } - - if (trunc_info.ld_key.idx < min_safe_ld_key.idx) { - // New minimum safe l entry - min_safe_ld_key = trunc_info.ld_key; - m_min_trunc_stores.clear(); - } - m_min_trunc_stores.push_back(store_ptr); - } - } - - if ((min_safe_ld_key == logdev_key::out_of_bound_ld_key()) || (min_safe_ld_key.idx < 0)) { - HS_PERIODIC_LOG( - INFO, logstore, - "[Family={}] No log store append on any log stores, skipping device truncation, all_logstore_info:<{}>", - m_family_id, dbg_str); - return min_safe_ld_key; - } - - // Got the safest log id to truncate and actually truncate upto the safe log idx to the log device - if (!dry_run) { m_log_dev.truncate(min_safe_ld_key); } - HS_PERIODIC_LOG(INFO, logstore, - "[Family={}] LogDevice truncate, all_logstore_info:<{}> safe log dev key to truncate={}", - m_family_id, dbg_str, min_safe_ld_key); - - // We call post device truncation only to the log stores whose prepared truncation points are fully - // truncated or to stores which didn't particpate in this device truncation. - for (auto& store_ptr : m_min_trunc_stores) { - store_ptr->post_device_truncation(min_safe_ld_key); - } - for (auto& store_ptr : m_non_participating_stores) { - store_ptr->post_device_truncation(min_safe_ld_key); - } - m_min_trunc_stores.clear(); // Not clearing here, would cause a shared_ptr ref holding. - m_non_participating_stores.clear(); - - return min_safe_ld_key; -} - -nlohmann::json LogStoreFamily::dump_log_store(const log_dump_req& dump_req) { - nlohmann::json json_dump{}; // create root object - if (dump_req.log_store == nullptr) { - folly::SharedMutexWritePriority::ReadHolder holder(m_store_map_mtx); - for (auto& id_logstore : m_id_logstore_map) { - auto store_ptr = id_logstore.second.log_store; - const std::string id = std::to_string(store_ptr->get_store_id()); - // must use operator= construction as copy construction results in error - nlohmann::json val = store_ptr->dump_log_store(dump_req); - json_dump[id] = std::move(val); - } - } else { - const std::string id{std::to_string(dump_req.log_store->get_store_id())}; - // must use operator= construction as copy construction results in error - nlohmann::json val = dump_req.log_store->dump_log_store(dump_req); - json_dump[id] = std::move(val); - } - return json_dump; -} - -nlohmann::json LogStoreFamily::get_status(int verbosity) const { - nlohmann::json js; - auto unopened = nlohmann::json::array(); - for (const auto& l : m_unopened_store_id) { - unopened.push_back(l); - } - js["logstores_unopened"] = std::move(unopened); - - // Logdev status - m_log_dev.get_status(verbosity, js); - - // All logstores - { - folly::SharedMutexWritePriority::ReadHolder holder(m_store_map_mtx); - for (const auto& [id, lstore] : m_id_logstore_map) { - js["logstore_id_" + std::to_string(id)] = lstore.log_store->get_status(verbosity); - } - } - return js; -} -} // namespace homestore diff --git a/src/lib/logstore/log_store_family.hpp b/src/lib/logstore/log_store_family.hpp deleted file mode 100644 index b451a70bf..000000000 --- a/src/lib/logstore/log_store_family.hpp +++ /dev/null @@ -1,114 +0,0 @@ -/********************************************************************************* - * Modifications Copyright 2017-2019 eBay Inc. - * - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed - * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - * - *********************************************************************************/ -#pragma once - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include - -#include -#include -#include "log_dev.hpp" - -namespace homestore { -struct log_dump_req; - -struct logstore_info { - std::shared_ptr< HomeLogStore > log_store; - bool append_mode; - folly::SharedPromise< std::shared_ptr< HomeLogStore > > promise{}; -}; - -struct truncate_req { - std::mutex mtx; - std::condition_variable cv; - bool wait_till_done{false}; - bool dry_run{false}; - LogStoreService::device_truncate_cb_t cb; - std::array< logdev_key, LogStoreService::num_log_families > m_trunc_upto_result; - int trunc_outstanding{0}; -}; - -class JournalVirtualDev; -class HomeLogStore; -struct meta_blk; - -class LogStoreFamily { - friend class LogStoreService; - friend class LogDev; - -public: - LogStoreFamily(const logstore_family_id_t f_id); - LogStoreFamily(const LogStoreFamily&) = delete; - LogStoreFamily(LogStoreFamily&&) noexcept = delete; - LogStoreFamily& operator=(const LogStoreFamily&) = delete; - LogStoreFamily& operator=(LogStoreFamily&&) noexcept = delete; - - void start(const bool format, JournalVirtualDev* blk_store); - void stop(); - - shared< HomeLogStore > create_new_log_store(bool append_mode = false); - folly::Future< shared< HomeLogStore > > open_log_store(logstore_id_t store_id, bool append_mode); - - bool close_log_store(logstore_id_t store_id) { - // TODO: Implement this method - return true; - } - void remove_log_store(logstore_id_t store_id); - - void device_truncate(const std::shared_ptr< truncate_req >& treq); - - nlohmann::json dump_log_store(const log_dump_req& dum_req); - - LogDev& logdev() { return m_log_dev; } - - nlohmann::json get_status(int verbosity) const; - std::string get_name() const { return m_name; } - - logstore_family_id_t get_family_id() const { return m_family_id; } - - logdev_key do_device_truncate(bool dry_run = false); - -private: - void on_log_store_found(logstore_id_t store_id, const logstore_superblk& meta); - void on_io_completion(logstore_id_t id, logdev_key ld_key, logdev_key flush_idx, uint32_t nremaining_in_batch, - void* ctx); - void on_logfound(logstore_id_t id, logstore_seq_num_t seq_num, logdev_key ld_key, logdev_key flush_ld_key, - log_buffer buf, uint32_t nremaining_in_batch); - void on_batch_completion(HomeLogStore* log_store, uint32_t nremaining_in_batch, logdev_key flush_ld_key); - -private: - mutable folly::SharedMutexWritePriority m_store_map_mtx; - std::unordered_map< logstore_id_t, logstore_info > m_id_logstore_map; - std::unordered_map< logstore_id_t, uint64_t > m_unopened_store_io; - std::unordered_set< logstore_id_t > m_unopened_store_id; - std::unordered_map< logstore_id_t, logid_t > m_last_flush_info; - logstore_family_id_t m_family_id; - std::string m_name; - LogDev m_log_dev; -}; -} // namespace homestore diff --git a/src/lib/logstore/log_store_service.cpp b/src/lib/logstore/log_store_service.cpp index e8f74d60e..90e35cf06 100644 --- a/src/lib/logstore/log_store_service.cpp +++ b/src/lib/logstore/log_store_service.cpp @@ -29,7 +29,6 @@ #include "common/homestore_status_mgr.hpp" #include "device/journal_vdev.hpp" #include "device/physical_dev.hpp" -#include "log_store_family.hpp" #include "log_dev.hpp" namespace homestore { @@ -38,30 +37,34 @@ SISL_LOGGING_DECL(logstore) LogStoreService& logstore_service() { return hs()->logstore_service(); } /////////////////////////////////////// LogStoreService Section /////////////////////////////////////// -LogStoreService::LogStoreService() : - m_logstore_families{std::make_unique< LogStoreFamily >(DATA_LOG_FAMILY_IDX), - std::make_unique< LogStoreFamily >(CTRL_LOG_FAMILY_IDX)} {} +LogStoreService::LogStoreService() { + m_id_reserver = std::make_unique< sisl::IDReserver >(); + meta_service().register_handler( + logdev_sb_meta_name, + [this](meta_blk* mblk, sisl::byte_view buf, size_t size) { + logdev_super_blk_found(std::move(buf), voidptr_cast(mblk)); + }, + nullptr); + + meta_service().register_handler( + logdev_rollback_sb_meta_name, + [this](meta_blk* mblk, sisl::byte_view buf, size_t size) { + rollback_super_blk_found(std::move(buf), voidptr_cast(mblk)); + }, + nullptr); +} -folly::Future< std::error_code > LogStoreService::create_vdev(uint64_t size, logstore_family_id_t family, - uint32_t chunk_size) { +folly::Future< std::error_code > LogStoreService::create_vdev(uint64_t size, uint32_t chunk_size) { const auto atomic_page_size = hs()->device_mgr()->atomic_page_size(HSDevType::Fast); hs_vdev_context hs_ctx; - std::string name; - - if (family == DATA_LOG_FAMILY_IDX) { - name = "data_logdev"; - hs_ctx.type = hs_vdev_type_t::DATA_LOGDEV_VDEV; - } else { - name = "ctrl_logdev"; - hs_ctx.type = hs_vdev_type_t::CTRL_LOGDEV_VDEV; - } + hs_ctx.type = hs_vdev_type_t::LOGDEV_VDEV; // reason we set alloc_type/chunk_sel_type here instead of by homestore logstore service consumer is because // consumer doesn't care or understands the underlying alloc/chunkSel for this service, if this changes in the // future, we can let consumer set it by then; auto vdev = - hs()->device_mgr()->create_vdev(vdev_parameters{.vdev_name = name, + hs()->device_mgr()->create_vdev(vdev_parameters{.vdev_name = "LogDev", .size_type = vdev_size_type_t::VDEV_SIZE_DYNAMIC, .vdev_size = 0, .num_chunks = 0, @@ -76,14 +79,10 @@ folly::Future< std::error_code > LogStoreService::create_vdev(uint64_t size, log return vdev->async_format(); } -shared< VirtualDev > LogStoreService::open_vdev(const vdev_info& vinfo, logstore_family_id_t family, - bool load_existing) { +std::shared_ptr< VirtualDev > LogStoreService::open_vdev(const vdev_info& vinfo, bool load_existing) { + RELEASE_ASSERT(m_logdev_vdev == nullptr, "Duplicate journal vdev"); auto vdev = std::make_shared< JournalVirtualDev >(*(hs()->device_mgr()), vinfo, nullptr); - if (family == DATA_LOG_FAMILY_IDX) { - m_data_logdev_vdev = std::dynamic_pointer_cast< JournalVirtualDev >(vdev); - } else { - m_ctrl_logdev_vdev = std::dynamic_pointer_cast< JournalVirtualDev >(vdev); - } + m_logdev_vdev = std::dynamic_pointer_cast< JournalVirtualDev >(vdev); return vdev; } @@ -93,49 +92,145 @@ void LogStoreService::start(bool format) { // Create an truncate thread loop which handles truncation which does sync IO start_threads(); - // Start the logstore families - m_logstore_families[DATA_LOG_FAMILY_IDX]->start(format, m_data_logdev_vdev.get()); - m_logstore_families[CTRL_LOG_FAMILY_IDX]->start(format, m_ctrl_logdev_vdev.get()); + for (auto& [logdev_id, logdev] : m_id_logdev_map) { + logdev->start(format, m_logdev_vdev.get()); + } } void LogStoreService::stop() { device_truncate(nullptr, true, false); - for (auto& f : m_logstore_families) { - f->stop(); + for (auto& [id, logdev] : m_id_logdev_map) { + logdev->stop(); + } + { + folly::SharedMutexWritePriority::WriteHolder holder(m_logdev_map_mtx); + m_id_logdev_map.clear(); + } + m_id_reserver.reset(); +} + +logdev_id_t LogStoreService::create_new_logdev() { + folly::SharedMutexWritePriority::WriteHolder holder(m_logdev_map_mtx); + logdev_id_t logdev_id = m_id_reserver->reserve(); + auto logdev = create_new_logdev_internal(logdev_id); + logdev->start(true /* format */, m_logdev_vdev.get()); + COUNTER_INCREMENT(m_metrics, logdevs_count, 1); + LOGINFO("Created log dev id {}", logdev_id); + return logdev_id; +} + +std::shared_ptr< LogDev > LogStoreService::create_new_logdev_internal(logdev_id_t logdev_id) { + auto logdev = std::make_shared< LogDev >(logdev_id); + const auto it = m_id_logdev_map.find(logdev_id); + HS_REL_ASSERT((it == m_id_logdev_map.end()), "logdev id {} already exists", logdev_id); + m_id_logdev_map.insert(std::make_pair<>(logdev_id, logdev)); + return logdev; +} + +void LogStoreService::open_logdev(logdev_id_t logdev_id) { + folly::SharedMutexWritePriority::WriteHolder holder(m_logdev_map_mtx); + const auto it = m_id_logdev_map.find(logdev_id); + if (it == m_id_logdev_map.end()) { + m_id_reserver->reserve(logdev_id); + create_new_logdev_internal(logdev_id); + } +} + +std::vector< std::shared_ptr< LogDev > > LogStoreService::get_all_logdevs() { + folly::SharedMutexWritePriority::ReadHolder holder(m_logdev_map_mtx); + std::vector< std::shared_ptr< LogDev > > res; + for (auto& [id, logdev] : m_id_logdev_map) { + res.push_back(logdev); } + return res; } -shared< HomeLogStore > LogStoreService::create_new_log_store(const logstore_family_id_t family_id, - const bool append_mode) { - HS_REL_ASSERT_LT(family_id, num_log_families); +std::shared_ptr< LogDev > LogStoreService::get_logdev(logdev_id_t id) { + folly::SharedMutexWritePriority::ReadHolder holder(m_logdev_map_mtx); + const auto it = m_id_logdev_map.find(id); + HS_REL_ASSERT((it != m_id_logdev_map.end()), "logdev id {} doesnt exists", id); + return it->second; +} + +void LogStoreService::logdev_super_blk_found(const sisl::byte_view& buf, void* meta_cookie) { + superblk< logdev_superblk > sb; + sb.load(buf, meta_cookie); + HS_REL_ASSERT_EQ(sb->get_magic(), logdev_superblk::LOGDEV_SB_MAGIC, "Invalid logdev metablk, magic mismatch"); + HS_REL_ASSERT_EQ(sb->get_version(), logdev_superblk::LOGDEV_SB_VERSION, "Invalid version of logdev metablk"); + { + folly::SharedMutexWritePriority::WriteHolder holder(m_logdev_map_mtx); + std::shared_ptr< LogDev > logdev; + auto id = sb->logdev_id; + const auto it = m_id_logdev_map.find(id); + if (it == m_id_logdev_map.end()) { + m_id_reserver->reserve(id); + logdev = create_new_logdev_internal(id); + } else { + logdev = it->second; + } + + logdev->log_dev_meta().logdev_super_blk_found(buf, meta_cookie); + } +} + +void LogStoreService::rollback_super_blk_found(const sisl::byte_view& buf, void* meta_cookie) { + superblk< rollback_superblk > rollback_sb; + rollback_sb.load(buf, meta_cookie); + HS_REL_ASSERT_EQ(rollback_sb->get_magic(), rollback_superblk::ROLLBACK_SB_MAGIC, "Rollback sb magic mismatch"); + HS_REL_ASSERT_EQ(rollback_sb->get_version(), rollback_superblk::ROLLBACK_SB_VERSION, + "Rollback sb version mismatch"); + { + folly::SharedMutexWritePriority::WriteHolder holder(m_logdev_map_mtx); + std::shared_ptr< LogDev > logdev; + auto id = rollback_sb->logdev_id; + const auto it = m_id_logdev_map.find(id); + if (it == m_id_logdev_map.end()) { + m_id_reserver->reserve(id); + logdev = create_new_logdev_internal(id); + } else { + logdev = it->second; + } + + logdev->log_dev_meta().rollback_super_blk_found(buf, meta_cookie); + } +} + +std::shared_ptr< HomeLogStore > LogStoreService::create_new_log_store(logdev_id_t logdev_id, bool append_mode) { + folly::SharedMutexWritePriority::ReadHolder holder(m_logdev_map_mtx); COUNTER_INCREMENT(m_metrics, logstores_count, 1); - return m_logstore_families[family_id]->create_new_log_store(append_mode); + const auto it = m_id_logdev_map.find(logdev_id); + HS_REL_ASSERT((it != m_id_logdev_map.end()), "logdev id {} doesnt exist", logdev_id); + return it->second->create_new_log_store(append_mode); } -folly::Future< shared< HomeLogStore > > LogStoreService::open_log_store(logstore_family_id_t family_id, - logstore_id_t store_id, bool append_mode) { - HS_REL_ASSERT_LT(family_id, num_log_families); +folly::Future< shared< HomeLogStore > > LogStoreService::open_log_store(logdev_id_t logdev_id, logstore_id_t store_id, + bool append_mode) { + folly::SharedMutexWritePriority::ReadHolder holder(m_logdev_map_mtx); + const auto it = m_id_logdev_map.find(logdev_id); + HS_REL_ASSERT((it != m_id_logdev_map.end()), "logdev id {} doesnt exist", logdev_id); COUNTER_INCREMENT(m_metrics, logstores_count, 1); - return m_logstore_families[family_id]->open_log_store(store_id, append_mode); + return it->second->open_log_store(store_id, append_mode); } -void LogStoreService::remove_log_store(const logstore_family_id_t family_id, const logstore_id_t store_id) { - HS_REL_ASSERT_LT(family_id, num_log_families); - m_logstore_families[family_id]->remove_log_store(store_id); +void LogStoreService::remove_log_store(logdev_id_t logdev_id, logstore_id_t store_id) { + folly::SharedMutexWritePriority::ReadHolder holder(m_logdev_map_mtx); + COUNTER_INCREMENT(m_metrics, logstores_count, 1); + const auto it = m_id_logdev_map.find(logdev_id); + HS_REL_ASSERT((it != m_id_logdev_map.end()), "logdev id {} doesnt exist", logdev_id); + it->second->remove_log_store(store_id); COUNTER_DECREMENT(m_metrics, logstores_count, 1); } -void LogStoreService::device_truncate(const device_truncate_cb_t& cb, const bool wait_till_done, const bool dry_run) { +void LogStoreService::device_truncate(const device_truncate_cb_t& cb, bool wait_till_done, bool dry_run) { const auto treq = std::make_shared< truncate_req >(); treq->wait_till_done = wait_till_done; treq->dry_run = dry_run; treq->cb = cb; - if (treq->wait_till_done) { treq->trunc_outstanding = m_logstore_families.size(); } + if (treq->wait_till_done) { treq->trunc_outstanding = m_id_logdev_map.size(); } - for (auto& l : m_logstore_families) { - l->device_truncate(treq); + for (auto& [id, logdev] : m_id_logdev_map) { + logdev->device_truncate_under_lock(treq); } - if (treq->wait_till_done) { std::unique_lock< std::mutex > lk{treq->mtx}; treq->cv.wait(lk, [&] { return (treq->trunc_outstanding == 0); }); @@ -143,14 +238,11 @@ void LogStoreService::device_truncate(const device_truncate_cb_t& cb, const bool } void LogStoreService::flush_if_needed() { - for (auto& f : m_logstore_families) { - f->logdev().flush_if_needed(); + for (auto& [id, logdev] : m_id_logdev_map) { + logdev->flush_if_needed(); } } -LogDev& LogStoreService::data_logdev() { return data_log_family()->logdev(); } -LogDev& LogStoreService::ctrl_logdev() { return ctrl_log_family()->logdev(); } - void LogStoreService::start_threads() { struct Context { std::condition_variable cv; @@ -193,41 +285,32 @@ void LogStoreService::start_threads() { nlohmann::json LogStoreService::dump_log_store(const log_dump_req& dump_req) { nlohmann::json json_dump{}; // create root object if (dump_req.log_store == nullptr) { - for (auto& family : m_logstore_families) { - json_dump[family->get_name()] = family->dump_log_store(dump_req); + for (auto& [id, logdev] : m_id_logdev_map) { + json_dump[logdev->get_id()] = logdev->dump_log_store(dump_req); } } else { - auto& family = dump_req.log_store->get_family(); + auto logdev = dump_req.log_store->get_logdev(); // must use operator= construction as copy construction results in error - nlohmann::json val = family.dump_log_store(dump_req); - json_dump[family.get_name()] = std::move(val); + nlohmann::json val = logdev->dump_log_store(dump_req); + json_dump[logdev->get_id()] = std::move(val); } return json_dump; } nlohmann::json LogStoreService::get_status(const int verbosity) const { nlohmann::json js; - for (auto& l : m_logstore_families) { - js[l->get_name()] = l->get_status(verbosity); + for (auto& [id, logdev] : m_id_logdev_map) { + js[logdev->get_id()] = logdev->get_status(verbosity); } return js; } -uint32_t LogStoreService::used_size() const { - uint32_t sz{0}; - if (m_data_logdev_vdev) { sz += m_data_logdev_vdev->used_size(); } - if (m_ctrl_logdev_vdev) { sz += m_ctrl_logdev_vdev->used_size(); } - return sz; -} +uint32_t LogStoreService::used_size() const { return m_logdev_vdev->used_size(); } -uint32_t LogStoreService::total_size() const { - uint32_t sz{0}; - if (m_data_logdev_vdev) { sz += m_data_logdev_vdev->size(); } - if (m_ctrl_logdev_vdev) { sz += m_ctrl_logdev_vdev->size(); } - return sz; -} +uint32_t LogStoreService::total_size() const { return m_logdev_vdev->size(); } LogStoreServiceMetrics::LogStoreServiceMetrics() : sisl::MetricsGroup("LogStores", "AllLogStores") { + REGISTER_COUNTER(logdevs_count, "Total number of log devs", sisl::_publish_as::publish_as_gauge); REGISTER_COUNTER(logstores_count, "Total number of log stores", sisl::_publish_as::publish_as_gauge); REGISTER_COUNTER(logstore_append_count, "Total number of append requests to log stores", "logstore_op_count", {"op", "write"}); diff --git a/src/lib/replication/log_store/home_raft_log_store.cpp b/src/lib/replication/log_store/home_raft_log_store.cpp index a8718053d..7444169ee 100644 --- a/src/lib/replication/log_store/home_raft_log_store.cpp +++ b/src/lib/replication/log_store/home_raft_log_store.cpp @@ -56,31 +56,31 @@ static uint64_t extract_term(const log_buffer& log_bytes) { return (*r_cast< uint64_t const* >(raw_ptr)); } -HomeRaftLogStore::HomeRaftLogStore(logstore_id_t logstore_id) { +HomeRaftLogStore::HomeRaftLogStore(logdev_id_t logdev_id, logstore_id_t logstore_id) { m_dummy_log_entry = nuraft::cs_new< nuraft::log_entry >(0, nuraft::buffer::alloc(0), nuraft::log_val_type::app_log); if (logstore_id == UINT32_MAX) { - m_log_store = logstore_service().create_new_log_store(LogStoreService::DATA_LOG_FAMILY_IDX, true); + m_logdev_id = logstore_service().create_new_logdev(); + m_log_store = logstore_service().create_new_log_store(m_logdev_id, true); if (!m_log_store) { throw std::runtime_error("Failed to create log store"); } m_logstore_id = m_log_store->get_store_id(); - LOGDEBUGMOD(replication, "Opened new home log store id={}", m_logstore_id); + LOGDEBUGMOD(replication, "Opened new home log dev = {} store id={}", m_logdev_id, m_logstore_id); } else { + m_logdev_id = logdev_id; m_logstore_id = logstore_id; - LOGDEBUGMOD(replication, "Opening existing home log store id={}", logstore_id); - logstore_service() - .open_log_store(LogStoreService::DATA_LOG_FAMILY_IDX, logstore_id, true) - .thenValue([this](auto log_store) { - m_log_store = std::move(log_store); - DEBUG_ASSERT_EQ(m_logstore_id, m_log_store->get_store_id(), - "Mismatch in passed and create logstore id"); - REPL_STORE_LOG(DEBUG, "Home Log store created/opened successfully"); - }); + LOGDEBUGMOD(replication, "Opening existing home log dev = {} store id={}", m_logdev_id, logstore_id); + logstore_service().open_logdev(m_logdev_id); + logstore_service().open_log_store(m_logdev_id, logstore_id, true).thenValue([this](auto log_store) { + m_log_store = std::move(log_store); + DEBUG_ASSERT_EQ(m_logstore_id, m_log_store->get_store_id(), "Mismatch in passed and create logstore id"); + REPL_STORE_LOG(DEBUG, "Home Log store created/opened successfully"); + }); } } void HomeRaftLogStore::remove_store() { REPL_STORE_LOG(DEBUG, "Logstore is being physically removed"); - logstore_service().remove_log_store(LogStoreService::DATA_LOG_FAMILY_IDX, m_logstore_id); + logstore_service().remove_log_store(m_logdev_id, m_logstore_id); m_log_store.reset(); } diff --git a/src/lib/replication/log_store/home_raft_log_store.h b/src/lib/replication/log_store/home_raft_log_store.h index c49cef310..4e6288d1a 100644 --- a/src/lib/replication/log_store/home_raft_log_store.h +++ b/src/lib/replication/log_store/home_raft_log_store.h @@ -35,7 +35,7 @@ using raft_buf_ptr_t = nuraft::ptr< nuraft::buffer >; class HomeRaftLogStore : public nuraft::log_store { public: - HomeRaftLogStore(homestore::logstore_id_t logstore_id = UINT32_MAX); + HomeRaftLogStore(logdev_id_t logdev_id = UINT32_MAX, homestore::logstore_id_t logstore_id = UINT32_MAX); virtual ~HomeRaftLogStore() = default; void remove_store(); @@ -170,9 +170,11 @@ class HomeRaftLogStore : public nuraft::log_store { virtual ulong last_durable_index() override; logstore_id_t logstore_id() const { return m_logstore_id; } + logdev_id_t logdev_id() const { return m_logdev_id; } private: logstore_id_t m_logstore_id; + logdev_id_t m_logdev_id; shared< HomeLogStore > m_log_store; nuraft::ptr< nuraft::log_entry > m_dummy_log_entry; store_lsn_t m_last_durable_lsn{-1}; diff --git a/src/lib/replication/repl_dev/common.h b/src/lib/replication/repl_dev/common.h index aa6935581..a39e44c12 100644 --- a/src/lib/replication/repl_dev/common.h +++ b/src/lib/replication/repl_dev/common.h @@ -60,11 +60,12 @@ struct repl_dev_superblk { uint64_t magic{REPL_DEV_SB_MAGIC}; uint32_t version{REPL_DEV_SB_VERSION}; - uuid_t group_id; // group_id of this replica set - logstore_id_t data_journal_id; // Logstore id for the data journal - int64_t commit_lsn; // LSN upto which this replica has committed - int64_t checkpoint_lsn; // LSN upto which this replica have checkpointed the data - uint64_t group_ordinal; // Ordinal number which will be used to indicate the rdevXYZ for debugging + uuid_t group_id; // group_id of this replica set + logdev_id_t logdev_id; + logstore_id_t logstore_id; // Logstore id for the data journal + int64_t commit_lsn; // LSN upto which this replica has committed + int64_t checkpoint_lsn; // LSN upto which this replica have checkpointed the data + uint64_t group_ordinal; // Ordinal number which will be used to indicate the rdevXYZ for debugging uint64_t get_magic() const { return magic; } uint32_t get_version() const { return version; } diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index 398c8d19c..91b85ca4d 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -31,7 +31,8 @@ RaftReplDev::RaftReplDev(RaftReplService& svc, superblk< raft_repl_dev_superblk m_state_machine = std::make_shared< RaftStateMachine >(*this); if (load_existing) { - m_data_journal = std::make_shared< ReplLogStore >(*this, *m_state_machine, m_rd_sb->data_journal_id); + m_data_journal = + std::make_shared< ReplLogStore >(*this, *m_state_machine, m_rd_sb->logdev_id, m_rd_sb->logstore_id); m_next_dsn = m_rd_sb->last_applied_dsn + 1; m_commit_upto_lsn = m_rd_sb->commit_lsn; m_last_flushed_commit_lsn = m_commit_upto_lsn; @@ -44,7 +45,7 @@ RaftReplDev::RaftReplDev(RaftReplService& svc, superblk< raft_repl_dev_superblk if (m_rd_sb->is_timeline_consistent) { logstore_service() - .open_log_store(LogStoreService::CTRL_LOG_FAMILY_IDX, m_rd_sb->free_blks_journal_id, false) + .open_log_store(m_rd_sb->logdev_id, m_rd_sb->free_blks_journal_id, false) .thenValue([this](auto log_store) { m_free_blks_journal = std::move(log_store); m_rd_sb->free_blks_journal_id = m_free_blks_journal->get_store_id(); @@ -52,14 +53,14 @@ RaftReplDev::RaftReplDev(RaftReplService& svc, superblk< raft_repl_dev_superblk } } else { m_data_journal = std::make_shared< ReplLogStore >(*this, *m_state_machine); - m_rd_sb->data_journal_id = m_data_journal->logstore_id(); + m_rd_sb->logdev_id = m_data_journal->logdev_id(); + m_rd_sb->logstore_id = m_data_journal->logstore_id(); m_rd_sb->last_applied_dsn = 0; m_rd_sb->group_ordinal = s_next_group_ordinal.fetch_add(1); m_rdev_name = fmt::format("rdev{}", m_rd_sb->group_ordinal); if (m_rd_sb->is_timeline_consistent) { - m_free_blks_journal = - logstore_service().create_new_log_store(LogStoreService::CTRL_LOG_FAMILY_IDX, false /* append_mode */); + m_free_blks_journal = logstore_service().create_new_log_store(m_rd_sb->logdev_id, false /* append_mode */); m_rd_sb->free_blks_journal_id = m_free_blks_journal->get_store_id(); } m_rd_sb.write(); diff --git a/src/lib/replication/repl_dev/solo_repl_dev.cpp b/src/lib/replication/repl_dev/solo_repl_dev.cpp index 6277c2368..73348f53e 100644 --- a/src/lib/replication/repl_dev/solo_repl_dev.cpp +++ b/src/lib/replication/repl_dev/solo_repl_dev.cpp @@ -10,17 +10,19 @@ namespace homestore { SoloReplDev::SoloReplDev(superblk< repl_dev_superblk >&& rd_sb, bool load_existing) : m_rd_sb{std::move(rd_sb)}, m_group_id{m_rd_sb->group_id} { if (load_existing) { + logstore_service().open_logdev(m_rd_sb->logdev_id); logstore_service() - .open_log_store(LogStoreService::DATA_LOG_FAMILY_IDX, m_rd_sb->data_journal_id, true /* append_mode */) + .open_log_store(m_rd_sb->logdev_id, m_rd_sb->logstore_id, true /* append_mode */) .thenValue([this](auto log_store) { m_data_journal = std::move(log_store); - m_rd_sb->data_journal_id = m_data_journal->get_store_id(); + m_rd_sb->logstore_id = m_data_journal->get_store_id(); m_data_journal->register_log_found_cb(bind_this(SoloReplDev::on_log_found, 3)); }); } else { - m_data_journal = - logstore_service().create_new_log_store(LogStoreService::DATA_LOG_FAMILY_IDX, true /* append_mode */); - m_rd_sb->data_journal_id = m_data_journal->get_store_id(); + m_logdev_id = logstore_service().create_new_logdev(); + m_data_journal = logstore_service().create_new_log_store(m_logdev_id, true /* append_mode */); + m_rd_sb->logstore_id = m_data_journal->get_store_id(); + m_rd_sb->logdev_id = m_logdev_id; m_rd_sb.write(); } } diff --git a/src/lib/replication/repl_dev/solo_repl_dev.h b/src/lib/replication/repl_dev/solo_repl_dev.h index 86d609477..c96dc6877 100644 --- a/src/lib/replication/repl_dev/solo_repl_dev.h +++ b/src/lib/replication/repl_dev/solo_repl_dev.h @@ -29,6 +29,7 @@ class CP; class SoloReplDev : public ReplDev { private: + logdev_id_t m_logdev_id; std::shared_ptr< HomeLogStore > m_data_journal; superblk< repl_dev_superblk > m_rd_sb; uuid_t m_group_id; diff --git a/src/tests/log_store_benchmark.cpp b/src/tests/log_store_benchmark.cpp index b8d43e6ec..d6fb50a4c 100644 --- a/src/tests/log_store_benchmark.cpp +++ b/src/tests/log_store_benchmark.cpp @@ -55,8 +55,8 @@ class BenchLogStore { public: friend class SampleDB; BenchLogStore() { - m_log_store = - logstore_service().create_new_log_store(LogStoreService::DATA_LOG_FAMILY_IDX, true /* append_mode */); + m_logdev_id = logstore_service().create_new_logdev(); + m_log_store = logstore_service().create_new_log_store(m_logdev_id, true /* append_mode */); m_log_store->register_log_found_cb(bind_this(BenchLogStore::on_log_found, 3)); m_nth_entry.store(0); generate_rand_data(); @@ -141,6 +141,7 @@ class BenchLogStore { } private: + logdev_id_t m_logdev_id; std::shared_ptr< HomeLogStore > m_log_store; std::atomic< int32_t > m_outstanding{0}; std::atomic< int64_t > m_nth_entry{0}; @@ -155,7 +156,6 @@ class BenchLogStore { uint32_t m_iteration{1}; std::vector< std::string > m_data; - logstore_family_id_t m_family; logstore_id_t m_store_id; }; @@ -168,10 +168,8 @@ static void test_append(benchmark::State& state) { } static void setup() { - test_common::HSTestHelper::start_homestore("test_log_store", - {{HS_SERVICE::META, {.size_pct = 5.0}}, - {HS_SERVICE::LOG_REPLICATED, {.size_pct = 85.0}}, - {HS_SERVICE::LOG_LOCAL, {.size_pct = 2.0}}}); + test_common::HSTestHelper::start_homestore( + "test_log_store", {{HS_SERVICE::META, {.size_pct = 5.0}}, {HS_SERVICE::LOG, {.size_pct = 87.0}}}); } static void teardown() { test_common::HSTestHelper::shutdown_homestore(); } diff --git a/src/tests/test_common/homestore_test_common.hpp b/src/tests/test_common/homestore_test_common.hpp index 2cb946bd8..c30d30bb1 100644 --- a/src/tests/test_common/homestore_test_common.hpp +++ b/src/tests/test_common/homestore_test_common.hpp @@ -237,7 +237,7 @@ class HSTestHelper { hsi->with_data_service(tp.custom_chunk_selector); } else if (svc == HS_SERVICE::INDEX) { hsi->with_index_service(std::unique_ptr< IndexServiceCallbacks >(tp.index_svc_cbs)); - } else if ((svc == HS_SERVICE::LOG_REPLICATED) || (svc == HS_SERVICE::LOG_LOCAL)) { + } else if ((svc == HS_SERVICE::LOG)) { hsi->with_log_service(); } else if (svc == HS_SERVICE::REPLICATION) { hsi->with_repl_data_service(tp.repl_app, tp.custom_chunk_selector); @@ -248,14 +248,10 @@ class HSTestHelper { if (need_format) { hsi->format_and_start({{HS_SERVICE::META, {.size_pct = svc_params[HS_SERVICE::META].size_pct}}, - {HS_SERVICE::LOG_REPLICATED, - {.size_pct = 1, - .chunk_size = svc_params[HS_SERVICE::LOG_REPLICATED].chunk_size, - .vdev_size_type = svc_params[HS_SERVICE::LOG_REPLICATED].vdev_size_type}}, - {HS_SERVICE::LOG_LOCAL, - {.size_pct = 1, - .chunk_size = svc_params[HS_SERVICE::LOG_LOCAL].chunk_size, - .vdev_size_type = svc_params[HS_SERVICE::LOG_LOCAL].vdev_size_type}}, + {HS_SERVICE::LOG, + {.size_pct = svc_params[HS_SERVICE::LOG].size_pct, + .chunk_size = svc_params[HS_SERVICE::LOG].chunk_size, + .vdev_size_type = svc_params[HS_SERVICE::LOG].vdev_size_type}}, {HS_SERVICE::DATA, {.size_pct = svc_params[HS_SERVICE::DATA].size_pct, .num_chunks = svc_params[HS_SERVICE::DATA].num_chunks, diff --git a/src/tests/test_common/hs_repl_test_common.hpp b/src/tests/test_common/hs_repl_test_common.hpp index 005ea97db..1d78bc432 100644 --- a/src/tests/test_common/hs_repl_test_common.hpp +++ b/src/tests/test_common/hs_repl_test_common.hpp @@ -172,8 +172,7 @@ class HSReplTestHelper { name_ + std::to_string(replica_num_), {{HS_SERVICE::META, {.size_pct = 5.0}}, {HS_SERVICE::REPLICATION, {.size_pct = 60.0, .repl_app = std::make_unique< TestReplApplication >(*this)}}, - {HS_SERVICE::LOG_REPLICATED, {.size_pct = 20.0}}, - {HS_SERVICE::LOG_LOCAL, {.size_pct = 2.0}}}); + {HS_SERVICE::LOG, {.size_pct = 20.0}}}); } void teardown() { @@ -191,8 +190,7 @@ class HSReplTestHelper { test_common::HSTestHelper::start_homestore( name_ + std::to_string(replica_num_), {{HS_SERVICE::REPLICATION, {.repl_app = std::make_unique< TestReplApplication >(*this)}}, - {HS_SERVICE::LOG_REPLICATED, {}}, - {HS_SERVICE::LOG_LOCAL, {}}}, + {HS_SERVICE::LOG, {}}}, nullptr, true /* restart */); } @@ -202,8 +200,7 @@ class HSReplTestHelper { test_common::HSTestHelper::start_homestore( name_ + std::to_string(replica_num_), {{HS_SERVICE::REPLICATION, {.repl_app = std::make_unique< TestReplApplication >(*this)}}, - {HS_SERVICE::LOG_REPLICATED, {}}, - {HS_SERVICE::LOG_LOCAL, {}}}, + {HS_SERVICE::LOG, {}}}, nullptr, true /* restart */); }); } diff --git a/src/tests/test_home_raft_logstore.cpp b/src/tests/test_home_raft_logstore.cpp index d9c9df4c5..6c6ceb07c 100644 --- a/src/tests/test_home_raft_logstore.cpp +++ b/src/tests/test_home_raft_logstore.cpp @@ -162,6 +162,7 @@ class RaftLogStoreClient { } private: + homestore::logdev_id_t m_logdev_id{UINT32_MAX}; homestore::logstore_id_t m_store_id{UINT32_MAX}; std::unique_ptr< HomeRaftLogStore > m_rls; sisl::sparse_vector< std::string > m_shadow_log; @@ -173,15 +174,15 @@ class RaftLogStoreClient { class TestRaftLogStore : public ::testing::Test { public: void SetUp() { - test_common::HSTestHelper::start_homestore("test_home_raft_log_store", - {{HS_SERVICE::META, {.size_pct = 5.0}}, - {HS_SERVICE::LOG_REPLICATED, {.size_pct = 70.0}}, - {HS_SERVICE::LOG_LOCAL, {.size_pct = 2.0}}}); + test_common::HSTestHelper::start_homestore( + "test_home_raft_log_store", {{HS_SERVICE::META, {.size_pct = 5.0}}, {HS_SERVICE::LOG, {.size_pct = 72.0}}}); m_leader_store.m_rls = std::make_unique< HomeRaftLogStore >(); m_leader_store.m_store_id = m_leader_store.m_rls->logstore_id(); + m_leader_store.m_logdev_id = m_leader_store.m_rls->logdev_id(); m_follower_store.m_rls = std::make_unique< HomeRaftLogStore >(); m_follower_store.m_store_id = m_follower_store.m_rls->logstore_id(); + m_follower_store.m_logdev_id = m_follower_store.m_rls->logdev_id(); } void restart() { @@ -189,11 +190,12 @@ class TestRaftLogStore : public ::testing::Test { m_follower_store.m_rls.reset(); test_common::HSTestHelper::start_homestore( - "test_home_raft_log_store", - {{HS_SERVICE::META, {}}, {HS_SERVICE::LOG_REPLICATED, {}}, {HS_SERVICE::LOG_LOCAL, {}}}, + "test_home_raft_log_store", {{HS_SERVICE::META, {.size_pct = 5.0}}, {HS_SERVICE::LOG, {.size_pct = 72.0}}}, [this]() { - m_leader_store.m_rls = std::make_unique< HomeRaftLogStore >(m_leader_store.m_store_id); - m_follower_store.m_rls = std::make_unique< HomeRaftLogStore >(m_follower_store.m_store_id); + m_leader_store.m_rls = + std::make_unique< HomeRaftLogStore >(m_leader_store.m_logdev_id, m_leader_store.m_store_id); + m_follower_store.m_rls = + std::make_unique< HomeRaftLogStore >(m_follower_store.m_logdev_id, m_follower_store.m_store_id); }, true /* restart */); } diff --git a/src/tests/test_journal_vdev.cpp b/src/tests/test_journal_vdev.cpp index a34eae090..0b8ddc34f 100644 --- a/src/tests/test_journal_vdev.cpp +++ b/src/tests/test_journal_vdev.cpp @@ -71,27 +71,29 @@ class VDevJournalIOTest : public ::testing::Test { virtual void SetUp() override { auto const ndevices = SISL_OPTIONS["num_devs"].as< uint32_t >(); auto const dev_size = SISL_OPTIONS["dev_size_mb"].as< uint64_t >() * 1024 * 1024; - test_common::HSTestHelper::start_homestore( - "test_journal_vdev", - {{HS_SERVICE::META, {.size_pct = 15.0}}, - {HS_SERVICE::LOG_REPLICATED, - {.chunk_size = 32 * 1024 * 1024, .vdev_size_type = vdev_size_type_t::VDEV_SIZE_DYNAMIC}}, - {HS_SERVICE::LOG_LOCAL, - {.chunk_size = 32 * 1024 * 1024, .vdev_size_type = vdev_size_type_t::VDEV_SIZE_DYNAMIC}}}, - nullptr /* starting_cb */, false /* restart */); + test_common::HSTestHelper::start_homestore("test_journal_vdev", + { + {HS_SERVICE::META, {.size_pct = 15.0}}, + {HS_SERVICE::LOG, + {.size_pct = 50.0, + .chunk_size = 32 * 1024 * 1024, + .vdev_size_type = vdev_size_type_t::VDEV_SIZE_DYNAMIC}}, + }, + nullptr /* starting_cb */, false /* restart */); } virtual void TearDown() override { test_common::HSTestHelper::shutdown_homestore(); } void restart_homestore() { - test_common::HSTestHelper::start_homestore( - "test_journal_vdev", - {{HS_SERVICE::META, {.size_pct = 15.0}}, - {HS_SERVICE::LOG_REPLICATED, - {.chunk_size = 32 * 1024 * 1024, .vdev_size_type = vdev_size_type_t::VDEV_SIZE_DYNAMIC}}, - {HS_SERVICE::LOG_LOCAL, - {.chunk_size = 32 * 1024 * 1024, .vdev_size_type = vdev_size_type_t::VDEV_SIZE_DYNAMIC}}}, - nullptr /* starting_cb */, true /* restart */); + test_common::HSTestHelper::start_homestore("test_journal_vdev", + { + {HS_SERVICE::META, {.size_pct = 15.0}}, + {HS_SERVICE::LOG, + {.size_pct = 50.0, + .chunk_size = 32 * 1024 * 1024, + .vdev_size_type = vdev_size_type_t::VDEV_SIZE_DYNAMIC}}, + }, + nullptr /* starting_cb */, true /* restart */); } }; @@ -107,7 +109,7 @@ class JournalDescriptorTest { std::shared_ptr< JournalVirtualDev::Descriptor > vdev_jd() { return m_vdev_jd; } void reinit() { - auto vdev = hs()->logstore_service().get_vdev(homestore::LogStoreService::DATA_LOG_FAMILY_IDX); + auto vdev = hs()->logstore_service().get_vdev(); m_vdev_jd = vdev->open(m_logdev_id); } diff --git a/src/tests/test_log_store.cpp b/src/tests/test_log_store.cpp index bf71cfa66..262270c88 100644 --- a/src/tests/test_log_store.cpp +++ b/src/tests/test_log_store.cpp @@ -49,7 +49,6 @@ #include #include "logstore/log_dev.hpp" -#include "logstore/log_store_family.hpp" #include "test_common/homestore_test_common.hpp" using namespace homestore; @@ -76,7 +75,7 @@ struct test_log_data { } }; -typedef std::function< void(logstore_family_id_t, logstore_seq_num_t, logdev_key) > test_log_store_comp_cb_t; +typedef std::function< void(logdev_id_t, logstore_seq_num_t, logdev_key) > test_log_store_comp_cb_t; class Timer { public: Timer() : beg_{clock_::now()} {} @@ -99,15 +98,15 @@ class SampleLogStoreClient { public: friend class SampleDB; - SampleLogStoreClient(std::shared_ptr< HomeLogStore > store, const logstore_family_id_t family_idx, + SampleLogStoreClient(std::shared_ptr< HomeLogStore > store, const logdev_id_t logdev_id, const test_log_store_comp_cb_t& cb) : - m_store_id{store->get_store_id()}, m_comp_cb{cb}, m_family{family_idx} { + m_store_id{store->get_store_id()}, m_comp_cb{cb}, m_logdev_id{logdev_id} { set_log_store(store); } - explicit SampleLogStoreClient(const logstore_family_id_t family_idx, const test_log_store_comp_cb_t& cb) : - SampleLogStoreClient(logstore_service().create_new_log_store(family_idx, false /* append_mode */), - family_idx, cb) {} + explicit SampleLogStoreClient(const logdev_id_t logdev_id, const test_log_store_comp_cb_t& cb) : + SampleLogStoreClient(logstore_service().create_new_log_store(logdev_id, false /* append_mode */), logdev_id, + cb) {} SampleLogStoreClient(const SampleLogStoreClient&) = delete; SampleLogStoreClient(SampleLogStoreClient&&) noexcept = delete; @@ -161,7 +160,7 @@ class SampleLogStoreClient { } else { std::free(voidptr_cast(d)); } - m_comp_cb(m_family, seq_num, ld_key); + m_comp_cb(m_logdev_id, seq_num, ld_key); }); } } @@ -308,13 +307,26 @@ class SampleLogStoreClient { } void rollback_validate(uint32_t num_lsns_to_rollback) { + std::mutex mtx; + std::condition_variable cv; + bool rollback_done = false; auto const upto_lsn = m_cur_lsn.fetch_sub(num_lsns_to_rollback) - num_lsns_to_rollback - 1; - m_log_store->rollback_async(upto_lsn, nullptr); - ASSERT_EQ(m_log_store->get_contiguous_completed_seq_num(-1), upto_lsn) - << "Last completed seq num is not reset after rollback"; - ASSERT_EQ(m_log_store->get_contiguous_issued_seq_num(-1), upto_lsn) - << "Last issued seq num is not reset after rollback"; - read_validate(true); + m_log_store->rollback_async(upto_lsn, [&](logstore_seq_num_t) { + ASSERT_EQ(m_log_store->get_contiguous_completed_seq_num(-1), upto_lsn) + << "Last completed seq num is not reset after rollback"; + ASSERT_EQ(m_log_store->get_contiguous_issued_seq_num(-1), upto_lsn) + << "Last issued seq num is not reset after rollback"; + read_validate(true); + { + std::unique_lock lock(mtx); + rollback_done = true; + } + cv.notify_one(); + }); + + // We wait till async rollback is finished as we do validation. + std::unique_lock lock(mtx); + cv.wait(lock, [&rollback_done] { return rollback_done == true; }); } void read(const logstore_seq_num_t lsn) { @@ -323,8 +335,7 @@ class SampleLogStoreClient { } void rollback_record_count(uint32_t expected_count) { - auto actual_count = - m_log_store->get_family().logdev().log_dev_meta().num_rollback_records(m_log_store->get_store_id()); + auto actual_count = m_log_store->get_logdev()->log_dev_meta().num_rollback_records(m_log_store->get_store_id()); ASSERT_EQ(actual_count, expected_count); } @@ -403,7 +414,7 @@ class SampleLogStoreClient { folly::Synchronized< std::map< logstore_seq_num_t, bool > > m_hole_lsns; int64_t m_n_recovered_lsns = 0; int64_t m_n_recovered_truncated_lsns = 0; - logstore_family_id_t m_family; + logdev_id_t m_logdev_id; }; uint64_t SampleLogStoreClient::s_max_flush_multiple = 0; @@ -436,14 +447,14 @@ class SampleDB { test_common::HSTestHelper::start_homestore( "test_log_store", {{HS_SERVICE::META, {.size_pct = 5.0}}, - {HS_SERVICE::LOG_REPLICATED, {.size_pct = 5.0, .chunk_size = 32 * 1024 * 1024}}, - {HS_SERVICE::LOG_LOCAL, {.size_pct = 5.0, .chunk_size = 32 * 1024 * 1024}}}, + {HS_SERVICE::LOG, {.size_pct = 84.0, .chunk_size = 32 * 1024 * 1024}}}, [this, restart, n_log_stores]() { if (restart) { for (uint32_t i{0}; i < n_log_stores; ++i) { SampleLogStoreClient* client = m_log_store_clients[i].get(); + logstore_service().open_logdev(client->m_logdev_id); logstore_service() - .open_log_store(client->m_family, client->m_store_id, false /* append_mode */) + .open_log_store(client->m_logdev_id, client->m_store_id, false /* append_mode */) .thenValue([i, this, client](auto log_store) { client->set_log_store(log_store); }); } } @@ -451,15 +462,13 @@ class SampleDB { restart); if (!restart) { + auto logdev_id = logstore_service().create_new_logdev(); for (uint32_t i{0}; i < n_log_stores; ++i) { - auto family_idx = - ((i % 2) == 0) ? LogStoreService::DATA_LOG_FAMILY_IDX : LogStoreService::CTRL_LOG_FAMILY_IDX; m_log_store_clients.push_back(std::make_unique< SampleLogStoreClient >( - family_idx, bind_this(SampleDB::on_log_insert_completion, 3))); + logdev_id, bind_this(SampleDB::on_log_insert_completion, 3))); } SampleLogStoreClient::s_max_flush_multiple = - std::max(logstore_service().data_logdev().get_flush_size_multiple(), - logstore_service().ctrl_logdev().get_flush_size_multiple()); + logstore_service().get_logdev(logdev_id)->get_flush_size_multiple(); } } @@ -468,7 +477,8 @@ class SampleDB { if (cleanup) { m_log_store_clients.clear(); } } - void on_log_insert_completion(logstore_family_id_t fid, logstore_seq_num_t lsn, logdev_key ld_key) { + void on_log_insert_completion(logdev_id_t fid, logstore_seq_num_t lsn, logdev_key ld_key) { + if (m_highest_log_idx.count(fid) == 0) { m_highest_log_idx[fid] = std::atomic{-1}; } atomic_update_max(m_highest_log_idx[fid], ld_key.idx); if (m_io_closure) m_io_closure(fid, lsn, ld_key); } @@ -477,7 +487,7 @@ class SampleDB { bool removed{false}; for (auto it = std::begin(m_log_store_clients); it != std::end(m_log_store_clients); ++it) { if ((*it)->m_log_store->get_store_id() == store_id) { - logstore_service().remove_log_store((*it)->m_family, store_id); + logstore_service().remove_log_store((*it)->m_logdev_id, store_id); m_log_store_clients.erase(it); removed = true; break; @@ -486,7 +496,9 @@ class SampleDB { return removed; } - logid_t highest_log_idx(logstore_family_id_t fid) const { return m_highest_log_idx[fid].load(); } + logid_t highest_log_idx(logdev_id_t fid) { + return m_highest_log_idx.count(fid) ? m_highest_log_idx[fid].load() : -1; + } private: const static std::string s_fpath_root; @@ -494,7 +506,7 @@ class SampleDB { std::function< void() > m_on_schedule_io_cb; test_log_store_comp_cb_t m_io_closure; std::vector< std::unique_ptr< SampleLogStoreClient > > m_log_store_clients; - std::array< std::atomic< logid_t >, LogStoreService::num_log_families > m_highest_log_idx = {-1, -1}; + std::map< logdev_id_t, std::atomic< logid_t > > m_highest_log_idx; }; const std::string SampleDB::s_fpath_root{"/tmp/log_store_dev_"}; @@ -553,7 +565,7 @@ class LogStoreTest : public ::testing::Test { } } - void on_insert_completion([[maybe_unused]] logstore_family_id_t fid, logstore_seq_num_t lsn, logdev_key ld_key) { + void on_insert_completion([[maybe_unused]] logdev_id_t fid, logstore_seq_num_t lsn, logdev_key ld_key) { bool notify{false}; uint64_t waiting_to_issue{0}; { @@ -592,12 +604,12 @@ class LogStoreTest : public ::testing::Test { size_t dump_sz{0}; int64_t rec_count{0}; - for (logstore_family_id_t fid{0}; fid < LogStoreService::num_log_families; ++fid) { - auto* family = (fid == 0) ? logstore_service().data_log_family() : logstore_service().ctrl_log_family(); - nlohmann::json json_dump = family->dump_log_store(dump_req); + auto logdev_vec = logstore_service().get_all_logdevs(); + for (auto& logdev : logdev_vec) { + nlohmann::json json_dump = logdev->dump_log_store(dump_req); dump_sz += json_dump.size(); - LOGINFO("Printing json dump of all logstores in family_id{}. \n {}", fid, json_dump.dump()); + LOGINFO("Printing json dump of all logstores in logdev {}. \n {}", logdev->get_id(), json_dump.dump()); for (const auto& logdump : json_dump) { const auto itr = logdump.find("log_records"); if (itr != std::end(logdump)) { rec_count += static_cast< int64_t >(logdump["log_records"].size()); } @@ -613,7 +625,7 @@ class LogStoreTest : public ::testing::Test { if (lsc->m_log_store->get_store_id() != id) { continue; } log_dump_req dump_req; - const auto fid = lsc->m_family; + const auto fid = lsc->m_logdev_id; if (print_content) dump_req.verbosity_level = log_dump_verbosity::CONTENT; dump_req.log_store = lsc->m_log_store; @@ -621,9 +633,9 @@ class LogStoreTest : public ::testing::Test { dump_req.end_seq_num = end_seq; // must use operator= construction as copy construction results in error - auto* family = (fid == 0) ? logstore_service().data_log_family() : logstore_service().ctrl_log_family(); - nlohmann::json json_dump = family->dump_log_store(dump_req); - LOGINFO("Printing json dump of family_id={} logstore id {}, start_seq {}, end_seq {}, \n\n {}", fid, id, + auto logdev = lsc->m_log_store->get_logdev(); + nlohmann::json json_dump = logdev->dump_log_store(dump_req); + LOGINFO("Printing json dump of logdev={} logstore id {}, start_seq {}, end_seq {}, \n\n {}", fid, id, start_seq, end_seq, json_dump.dump()); const auto itr_id = json_dump.find(std::to_string(id)); if (itr_id != std::end(json_dump)) { @@ -641,11 +653,11 @@ class LogStoreTest : public ::testing::Test { } } - int find_garbage_upto(logstore_family_id_t family_idx, logid_t idx) { + int find_garbage_upto(logdev_id_t logdev_id, logid_t idx) { int count{0}; - auto it = std::begin(m_garbage_stores_upto[family_idx]); + auto it = std::begin(m_garbage_stores_upto[logdev_id]); - while (it != std::end(m_garbage_stores_upto[family_idx])) { + while (it != std::end(m_garbage_stores_upto[logdev_id])) { if (it->first > idx) { return count; } ++it; ++count; @@ -683,7 +695,7 @@ class LogStoreTest : public ::testing::Test { bool failed{false}; logstore_service().device_truncate( - [this, is_parallel_to_write, &failed](const auto& trunc_lds) { + [this, is_parallel_to_write, &failed](auto& trunc_lds) { bool expect_forward_progress{true}; uint32_t n_fully_truncated{0}; if (is_parallel_to_write) { @@ -700,14 +712,14 @@ class LogStoreTest : public ::testing::Test { } if (expect_forward_progress) { - for (logstore_family_id_t fid{0}; fid < trunc_lds.size(); ++fid) { - const auto trunc_loc = trunc_lds[fid]; + for (auto [fid, trunc_loc] : trunc_lds) { if (trunc_loc == logdev_key::out_of_bound_ld_key()) { LOGINFO("No forward progress for device truncation yet."); } else { // Validate the truncation is actually moving forward - if (trunc_loc.idx <= m_truncate_log_idx[fid].load()) { failed = true; } - ASSERT_GT(trunc_loc.idx, m_truncate_log_idx[fid].load()); + auto idx = m_truncate_log_idx.count(fid) ? m_truncate_log_idx[fid].load() : -1; + if (trunc_loc.idx <= idx) { failed = true; } + ASSERT_GT(trunc_loc.idx, idx); m_truncate_log_idx[fid].store(trunc_loc.idx); } } @@ -718,8 +730,10 @@ class LogStoreTest : public ::testing::Test { true /* wait_till_done */); ASSERT_FALSE(failed); - for (logstore_family_id_t fid{0}; fid < LogStoreService::num_log_families; ++fid) { - const auto upto_count = find_garbage_upto(fid, m_truncate_log_idx[fid].load()); + for (auto& logdev : logstore_service().get_all_logdevs()) { + auto fid = logdev->get_id(); + auto idx = m_truncate_log_idx.count(fid) ? m_truncate_log_idx[fid].load() : -1; + const auto upto_count = find_garbage_upto(fid, idx); std::remove_const_t< decltype(upto_count) > count{0}; for (auto it = std::begin(m_garbage_stores_upto[fid]); count < upto_count; ++count) { it = m_garbage_stores_upto[fid].erase(it); @@ -762,11 +776,11 @@ class LogStoreTest : public ::testing::Test { size_t actual_garbage_ids{0}; size_t exp_garbage_store_count{0}; - for (logstore_family_id_t fid{0}; fid < LogStoreService::num_log_families; ++fid) { + for (auto& logdev : logstore_service().get_all_logdevs()) { + auto fid = logdev->get_id(); std::vector< logstore_id_t > reg_ids, garbage_ids; - LogDev& ld = (fid == LogStoreService::DATA_LOG_FAMILY_IDX) ? logstore_service().data_logdev() - : logstore_service().ctrl_logdev(); - ld.get_registered_store_ids(reg_ids, garbage_ids); + + logdev->get_registered_store_ids(reg_ids, garbage_ids); actual_valid_ids += reg_ids.size() - garbage_ids.size(); actual_garbage_ids += garbage_ids.size(); @@ -782,7 +796,7 @@ class LogStoreTest : public ::testing::Test { void delete_validate(uint32_t idx) { auto& db = SampleDB::instance(); - auto fid = db.m_log_store_clients[idx]->m_family; + auto fid = db.m_log_store_clients[idx]->m_logdev_id; validate_num_stores(); const auto l_idx = db.highest_log_idx(fid); @@ -840,8 +854,8 @@ class LogStoreTest : public ::testing::Test { uint64_t m_nrecords_waiting_to_complete{0}; std::mutex m_pending_mtx; std::condition_variable m_pending_cv; - std::array< std::atomic< logid_t >, LogStoreService::num_log_families > m_truncate_log_idx = {-1, -1}; - std::array< std::map< logid_t, uint32_t >, LogStoreService::num_log_families > m_garbage_stores_upto; + std::map< logdev_id_t, std::atomic< logid_t > > m_truncate_log_idx; + std::map< logdev_id_t, std::map< logid_t, uint32_t > > m_garbage_stores_upto; std::array< uint32_t, 100 > m_store_distribution; uint32_t m_q_depth{64}; @@ -1232,8 +1246,8 @@ TEST_F(LogStoreTest, WriteSyncThenRead) { for (uint32_t iteration{0}; iteration < iterations; ++iteration) { LOGINFO("Iteration {}", iteration); - std::shared_ptr< HomeLogStore > tmp_log_store = - logstore_service().create_new_log_store(LogStoreService::DATA_LOG_FAMILY_IDX, false); + auto logdev_id = logstore_service().create_new_logdev(); + auto tmp_log_store = logstore_service().create_new_log_store(logdev_id, false); const auto store_id = tmp_log_store->get_store_id(); LOGINFO("Created new log store -> id {}", store_id); const unsigned count{10}; @@ -1261,7 +1275,7 @@ TEST_F(LogStoreTest, WriteSyncThenRead) { ASSERT_EQ(actual, expected) << "Data mismatch for LSN=" << store_id << ":" << i << " size=" << tl->size; } - logstore_service().remove_log_store(LogStoreService::DATA_LOG_FAMILY_IDX, store_id); + logstore_service().remove_log_store(logdev_id, store_id); LOGINFO("Remove logstore -> i {}", store_id); } } diff --git a/src/tests/test_solo_repl_dev.cpp b/src/tests/test_solo_repl_dev.cpp index 90b5bcca0..aefbf1170 100644 --- a/src/tests/test_solo_repl_dev.cpp +++ b/src/tests/test_solo_repl_dev.cpp @@ -159,8 +159,10 @@ class SoloReplDevTest : public testing::Test { "test_solo_repl_dev", {{HS_SERVICE::META, {.size_pct = 5.0}}, {HS_SERVICE::REPLICATION, {.size_pct = 60.0, .repl_app = std::make_unique< Application >(*this)}}, - {HS_SERVICE::LOG_REPLICATED, {.size_pct = 20.0}}, - {HS_SERVICE::LOG_LOCAL, {.size_pct = 2.0}}}); + {HS_SERVICE::LOG, + {.size_pct = 22.0, + .chunk_size = 32 * 1024 * 1024, + .vdev_size_type = vdev_size_type_t::VDEV_SIZE_DYNAMIC}}}); m_uuid1 = hs_utils::gen_random_uuid(); m_uuid2 = hs_utils::gen_random_uuid(); m_repl_dev1 = hs()->repl_service().create_repl_dev(m_uuid1, {}).get().value(); @@ -179,9 +181,12 @@ class SoloReplDevTest : public testing::Test { test_common::HSTestHelper::start_homestore( "test_solo_repl_dev", - {{HS_SERVICE::REPLICATION, {.repl_app = std::make_unique< Application >(*this)}}, - {HS_SERVICE::LOG_REPLICATED, {}}, - {HS_SERVICE::LOG_LOCAL, {}}}, + {{HS_SERVICE::META, {.size_pct = 5.0}}, + {HS_SERVICE::REPLICATION, {.size_pct = 60.0, .repl_app = std::make_unique< Application >(*this)}}, + {HS_SERVICE::LOG, + {.size_pct = 22.0, + .chunk_size = 32 * 1024 * 1024, + .vdev_size_type = vdev_size_type_t::VDEV_SIZE_DYNAMIC}}}, nullptr, true /* restart */); m_repl_dev1 = hs()->repl_service().get_repl_dev(m_uuid1).value();