From 96137b84596c9c3d90ac955998b7dace60ec6ad1 Mon Sep 17 00:00:00 2001 From: Sanal P Date: Thu, 25 Apr 2024 13:58:26 -0700 Subject: [PATCH] Remove logdev which are unopened during recovery. Removing repl dev will remove the logdev. If logdev or log store are not opened during, they are marked as ununsed and deleted. Remove chunks from the journal descriptor. Remove the logdev metablk. --- src/include/homestore/logstore_service.hpp | 9 +++ src/lib/device/device_manager.cpp | 5 +- src/lib/device/journal_vdev.cpp | 34 +++++++-- src/lib/device/journal_vdev.hpp | 5 ++ src/lib/device/physical_dev.cpp | 2 +- src/lib/logstore/log_dev.cpp | 24 ++++--- src/lib/logstore/log_dev.hpp | 15 ++-- src/lib/logstore/log_store_service.cpp | 73 +++++++++++++++---- src/tests/test_log_dev.cpp | 81 ++++++++++++++++++++++ 9 files changed, 214 insertions(+), 34 deletions(-) diff --git a/src/include/homestore/logstore_service.hpp b/src/include/homestore/logstore_service.hpp index 44e1acac1..24af7ef2b 100644 --- a/src/include/homestore/logstore_service.hpp +++ b/src/include/homestore/logstore_service.hpp @@ -91,6 +91,13 @@ class LogStoreService { */ void open_logdev(logdev_id_t logdev_id); + /** + * @brief Destroy a log dev. + * + * @param logdev_id: Logdev ID + */ + void destroy_log_dev(logdev_id_t logdev_id); + /** * @brief Create a brand new log store (both in-memory and on device) and returns its instance. It also book * keeps the created log store and user can get this instance of log store by using logstore_id @@ -168,6 +175,7 @@ class LogStoreService { private: std::shared_ptr< LogDev > create_new_logdev_internal(logdev_id_t logdev_id); + void delete_unopened_logdevs(); void logdev_super_blk_found(const sisl::byte_view& buf, void* meta_cookie); void rollback_super_blk_found(const sisl::byte_view& buf, void* meta_cookie); void start_threads(); @@ -182,6 +190,7 @@ class LogStoreService { iomgr::io_fiber_t m_truncate_fiber; iomgr::io_fiber_t m_flush_fiber; LogStoreServiceMetrics m_metrics; + std::unordered_set< logdev_id_t > m_unopened_logdev; }; extern LogStoreService& logstore_service(); diff --git a/src/lib/device/device_manager.cpp b/src/lib/device/device_manager.cpp index 4e96a2088..f38738966 100644 --- a/src/lib/device/device_manager.cpp +++ b/src/lib/device/device_manager.cpp @@ -273,7 +273,8 @@ shared< VirtualDev > DeviceManager::create_vdev(vdev_parameters&& vparam) { vparam.vdev_name, in_bytes(input_chunk_size), in_bytes(vparam.chunk_size)); } - vparam.vdev_size = sisl::round_down(vparam.vdev_size, vparam.chunk_size); + // For dynamic size vdev, size starts with zero. + vparam.vdev_size = 0; if (input_vdev_size != vparam.vdev_size) { LOGINFO("{} Virtual device is attempted to be created with size={}, it needs to be rounded to new_size={}", vparam.vdev_name, in_bytes(input_vdev_size), in_bytes(vparam.vdev_size)); @@ -285,7 +286,6 @@ shared< VirtualDev > DeviceManager::create_vdev(vdev_parameters&& vparam) { RELEASE_ASSERT(vparam.chunk_size >= min_chunk_size, "chunk_size should be greater than or equal to min_chunk_size"); RELEASE_ASSERT(vparam.num_chunks <= max_num_chunks, "num_chunks should be less than or equal to max_num_chunks"); - RELEASE_ASSERT(input_vdev_size >= vparam.vdev_size, "vdev_size should be less than or equal to input_vdev_size"); LOGINFO( "New Virtal Dev={} of size={} with id={} is attempted to be created with multi_pdev_opts={}. The params are " @@ -512,6 +512,7 @@ static void populate_vdev_info(const vdev_parameters& vparam, uint32_t vdev_id, out_info->set_user_private(vparam.context_data); out_info->alloc_type = s_cast< uint8_t >(vparam.alloc_type); out_info->chunk_sel_type = s_cast< uint8_t >(vparam.chunk_sel_type); + out_info->size_type = vparam.size_type; out_info->compute_checksum(); } diff --git a/src/lib/device/journal_vdev.cpp b/src/lib/device/journal_vdev.cpp index bc20bd5f3..247f0a16b 100644 --- a/src/lib/device/journal_vdev.cpp +++ b/src/lib/device/journal_vdev.cpp @@ -125,7 +125,19 @@ void JournalVirtualDev::init() { if (!visited_chunks.count(chunk->chunk_id())) { orphan_chunks.push_back(chunk); } } - for (auto& chunk : orphan_chunks) { + // Remove the orphan chunks. + if (!orphan_chunks.empty()) { + LOGINFOMOD(journalvdev, "Removing orphan chunks"); + remove_journal_chunks(orphan_chunks); + } + + // Start the chunk pool. + m_chunk_pool->start(); + LOGINFO("Journal vdev init done"); +} + +void JournalVirtualDev::remove_journal_chunks(std::vector< shared< Chunk > >& chunks) { + for (auto& chunk : chunks) { auto* data = r_cast< JournalChunkPrivate* >(const_cast< uint8_t* >(chunk->user_private())); auto chunk_id = chunk->chunk_id(); auto logdev_id = data->logdev_id; @@ -135,14 +147,9 @@ void JournalVirtualDev::init() { *data = JournalChunkPrivate{}; update_chunk_private(chunk, data); - LOGINFOMOD(journalvdev, "Removing orphan chunk {} found for logdev {} next {}.", chunk_id, logdev_id, - next_chunk); + LOGINFOMOD(journalvdev, "Removing chunk {} found for logdev {} next {}.", chunk_id, logdev_id, next_chunk); m_dmgr.remove_chunk_locked(chunk); } - - // Start the chunk pool. - m_chunk_pool->start(); - LOGINFO("Journal vdev init done"); } void JournalVirtualDev::update_chunk_private(shared< Chunk >& chunk, JournalChunkPrivate* private_data) { @@ -171,6 +178,19 @@ shared< JournalVirtualDev::Descriptor > JournalVirtualDev::open(logdev_id_t logd return it->second; } +void JournalVirtualDev::destroy(logdev_id_t logdev_id) { + auto it = m_journal_descriptors.find(logdev_id); + if (it == m_journal_descriptors.end()) { + LOGERROR("logdev not found log_dev={}", logdev_id); + return; + } + + // Remove all the chunks. + remove_journal_chunks(it->second->m_journal_chunks); + m_journal_descriptors.erase(it); + LOGINFOMOD(journalvdev, "Journal vdev destroyed log_dev={}", logdev_id); +} + void JournalVirtualDev::Descriptor::append_chunk() { // Get a new chunk from the pool. auto new_chunk = m_vdev.m_chunk_pool->dequeue(); diff --git a/src/lib/device/journal_vdev.hpp b/src/lib/device/journal_vdev.hpp index b4f3fb163..388799a87 100644 --- a/src/lib/device/journal_vdev.hpp +++ b/src/lib/device/journal_vdev.hpp @@ -401,6 +401,9 @@ class JournalVirtualDev : public VirtualDev { // where log entries are stored. It also mantains offsets, size etc. shared< Descriptor > open(logdev_id_t id); + // Destroy a logdev and release all the chunks related to the logdev. + void destroy(logdev_id_t id); + /** * @brief Get the status of the journal vdev and its internal structures * @param log_level: Log level to do verbosity. @@ -410,7 +413,9 @@ class JournalVirtualDev : public VirtualDev { uint64_t used_size() const override; uint64_t available_blks() const override; + uint64_t num_descriptors() const { return m_journal_descriptors.size(); } + void remove_journal_chunks(std::vector< shared< Chunk > >& chunks); void update_chunk_private(shared< Chunk >& chunk, JournalChunkPrivate* chunk_private); uint64_t get_end_of_chunk(shared< Chunk >& chunk) const; diff --git a/src/lib/device/physical_dev.cpp b/src/lib/device/physical_dev.cpp index b431cc384..5ebb1963d 100644 --- a/src/lib/device/physical_dev.cpp +++ b/src/lib/device/physical_dev.cpp @@ -246,7 +246,7 @@ std::vector< shared< Chunk > > PhysicalDev::create_chunks(const std::vector< uin auto chunk = std::make_shared< Chunk >(this, *cinfo, cslot); ret_chunks.push_back(chunk); - get_stream(chunk).m_chunks_map.insert(std::pair{chunk_ids[cit], std::move(chunk)}); + get_stream(chunk).m_chunks_map.insert(std::pair{chunk_ids[cit], chunk}); HS_LOG(INFO, device, "Creating chunk {}", chunk->to_string()); cinfo->~chunk_info(); } diff --git a/src/lib/logstore/log_dev.cpp b/src/lib/logstore/log_dev.cpp index 88e98f915..b09c6eb45 100644 --- a/src/lib/logstore/log_dev.cpp +++ b/src/lib/logstore/log_dev.cpp @@ -42,18 +42,16 @@ SISL_LOGGING_DECL(logstore) static bool has_data_service() { return HomeStore::instance()->has_data_service(); } // static BlkDataService& data_service() { return HomeStore::instance()->data_service(); } -LogDev::LogDev(const logdev_id_t id) : m_logdev_id{id} { +LogDev::LogDev(const logdev_id_t id, JournalVirtualDev* vdev) : m_logdev_id{id}, m_vdev(vdev) { m_flush_size_multiple = HS_DYNAMIC_CONFIG(logstore->flush_size_multiple_logdev); -} - -LogDev::~LogDev() = default; - -void LogDev::start(bool format, JournalVirtualDev* vdev) { // Each logdev has one journal descriptor. - m_vdev = vdev; m_vdev_jd = m_vdev->open(m_logdev_id); RELEASE_ASSERT(m_vdev_jd, "Journal descriptor is null"); +} +LogDev::~LogDev() = default; + +void LogDev::start(bool format) { if (m_flush_size_multiple == 0) { m_flush_size_multiple = m_vdev->optimal_page_size(); } THIS_LOGDEV_LOG(INFO, "Initializing logdev with flush size multiple={}", m_flush_size_multiple); @@ -106,7 +104,7 @@ void LogDev::start(bool format, JournalVirtualDev* vdev) { } void LogDev::stop() { - THIS_LOGDEV_LOG(INFO, "Logdev stopping id {}", m_logdev_id); + THIS_LOGDEV_LOG(INFO, "Logdev stopping log_dev={}", m_logdev_id); HS_LOG_ASSERT((m_pending_flush_size == 0), "LogDev stop attempted while writes to logdev are pending completion"); const bool locked_now = run_under_flush_lock([this]() { { @@ -151,6 +149,11 @@ void LogDev::stop() { m_hs.reset(); } +void LogDev::destroy() { + THIS_LOGDEV_LOG(INFO, "Logdev destroy metablks log_dev={}", m_logdev_id); + m_logdev_meta.destroy(); +} + void LogDev::start_timer() { // Currently only tests set it to 0. if (HS_DYNAMIC_CONFIG(logstore.flush_timer_frequency_us) == 0) { return; } @@ -920,6 +923,11 @@ logdev_superblk* LogDevMetadata::create(logdev_id_t id) { return sb; } +void LogDevMetadata::destroy() { + m_rollback_sb.destroy(); + m_sb.destroy(); +} + void LogDevMetadata::reset() { m_id_reserver.reset(); m_store_info.clear(); diff --git a/src/lib/logstore/log_dev.hpp b/src/lib/logstore/log_dev.hpp index 6cdad9df3..7affcce96 100644 --- a/src/lib/logstore/log_dev.hpp +++ b/src/lib/logstore/log_dev.hpp @@ -501,6 +501,7 @@ class LogDevMetadata { ~LogDevMetadata() = default; logdev_superblk* create(logdev_id_t id); + void destroy(); void reset(); std::vector< std::pair< logstore_id_t, logstore_superblk > > load(); void persist(); @@ -614,7 +615,7 @@ class LogDev : public std::enable_shared_from_this< LogDev > { return HS_DYNAMIC_CONFIG(logstore.flush_threshold_size) - sizeof(log_group_header); } - LogDev(logdev_id_t logdev_id); + LogDev(logdev_id_t logdev_id, JournalVirtualDev* vdev); LogDev(const LogDev&) = delete; LogDev& operator=(const LogDev&) = delete; LogDev(LogDev&&) noexcept = delete; @@ -626,9 +627,8 @@ class LogDev : public std::enable_shared_from_this< LogDev > { * to the recovery. It is expected that all callbacks are registered before calling the start. * * @param format: Do we need to format the logdev or not. - * @param blk_store: The blk_store associated to this logdev */ - void start(bool format, JournalVirtualDev* vdev); + void start(bool format); /** * @brief Stop the logdev. It resets all the parameters it is using and thus can be started later @@ -636,6 +636,12 @@ class LogDev : public std::enable_shared_from_this< LogDev > { */ void stop(); + /** + * @brief Destroy the logdev metablks. + * + */ + void destroy(); + /** * @brief Start the flush timer. * @@ -798,6 +804,7 @@ class LogDev : public std::enable_shared_from_this< LogDev > { void handle_unopened_log_stores(bool format); logdev_id_t get_id() { return m_logdev_id; } shared< JournalVirtualDev::Descriptor > get_journal_descriptor() const { return m_vdev_jd; } + bool is_stopped() { return m_stopped; } // bool ready_for_truncate() const { return m_vdev_jd->ready_for_truncate(); } @@ -854,7 +861,7 @@ class LogDev : public std::enable_shared_from_this< LogDev > { std::atomic< logid_t > m_log_idx{0}; // Generator of log idx std::atomic< int64_t > m_pending_flush_size{0}; // How much flushable logs are pending std::atomic< bool > m_is_flushing{false}; // Is LogDev currently flushing (so far supports one flusher at a time) - bool m_stopped{false}; // Is Logdev stopped. We don't need lock here, because it is updated under flush lock + bool m_stopped{true}; // Is Logdev stopped. We don't need lock here, because it is updated under flush lock logdev_id_t m_logdev_id; JournalVirtualDev* m_vdev{nullptr}; shared< JournalVirtualDev::Descriptor > m_vdev_jd; // Journal descriptor. diff --git a/src/lib/logstore/log_store_service.cpp b/src/lib/logstore/log_store_service.cpp index 46a85e6de..f1b0dd9b4 100644 --- a/src/lib/logstore/log_store_service.cpp +++ b/src/lib/logstore/log_store_service.cpp @@ -97,11 +97,13 @@ std::shared_ptr< VirtualDev > LogStoreService::open_vdev(const vdev_info& vinfo, void LogStoreService::start(bool format) { // hs()->status_mgr()->register_status_cb("LogStore", bind_this(LogStoreService::get_status, 1)); + delete_unopened_logdevs(); + // Create an truncate thread loop which handles truncation which does sync IO start_threads(); for (auto& [logdev_id, logdev] : m_id_logdev_map) { - logdev->start(format, m_logdev_vdev.get()); + logdev->start(format); } } @@ -121,13 +123,45 @@ logdev_id_t LogStoreService::create_new_logdev() { folly::SharedMutexWritePriority::WriteHolder holder(m_logdev_map_mtx); logdev_id_t logdev_id = m_id_reserver->reserve(); auto logdev = create_new_logdev_internal(logdev_id); - logdev->start(true /* format */, m_logdev_vdev.get()); + logdev->start(true /* format */); COUNTER_INCREMENT(m_metrics, logdevs_count, 1); + LOGINFO("Created log_dev={}", logdev_id); return logdev_id; } +void LogStoreService::destroy_log_dev(logdev_id_t logdev_id) { + folly::SharedMutexWritePriority::WriteHolder holder(m_logdev_map_mtx); + const auto it = m_id_logdev_map.find(logdev_id); + if (it == m_id_logdev_map.end()) { return; } + + // Stop the logdev and release all the chunks from the journal vdev. + auto& logdev = it->second; + if (!logdev->is_stopped()) { + // Stop the logdev if its started. + logdev->stop(); + } + + // First release all chunks. + m_logdev_vdev->destroy(logdev_id); + + // Destroy the metablks for logdev. + logdev->destroy(); + + m_id_logdev_map.erase(it); + COUNTER_DECREMENT(m_metrics, logdevs_count, 1); + LOGINFO("Removed log_dev={}", logdev_id); +} + +void LogStoreService::delete_unopened_logdevs() { + for (auto logdev_id : m_unopened_logdev) { + LOGINFO("Deleting unopened logdev={}", logdev_id); + destroy_log_dev(logdev_id); + } + m_unopened_logdev.clear(); +} + std::shared_ptr< LogDev > LogStoreService::create_new_logdev_internal(logdev_id_t logdev_id) { - auto logdev = std::make_shared< LogDev >(logdev_id); + auto logdev = std::make_shared< LogDev >(logdev_id, m_logdev_vdev.get()); const auto it = m_id_logdev_map.find(logdev_id); HS_REL_ASSERT((it == m_id_logdev_map.end()), "logdev id {} already exists", logdev_id); m_id_logdev_map.insert(std::make_pair<>(logdev_id, logdev)); @@ -139,8 +173,10 @@ void LogStoreService::open_logdev(logdev_id_t logdev_id) { const auto it = m_id_logdev_map.find(logdev_id); if (it == m_id_logdev_map.end()) { m_id_reserver->reserve(logdev_id); - create_new_logdev_internal(logdev_id); + auto logdev = std::make_shared< LogDev >(logdev_id, m_logdev_vdev.get()); + m_id_logdev_map.emplace(logdev_id, logdev); } + LOGDEBUGMOD(logstore, "Opened log_dev={}", logdev_id); } std::vector< std::shared_ptr< LogDev > > LogStoreService::get_all_logdevs() { @@ -168,12 +204,19 @@ void LogStoreService::logdev_super_blk_found(const sisl::byte_view& buf, void* m folly::SharedMutexWritePriority::WriteHolder holder(m_logdev_map_mtx); std::shared_ptr< LogDev > logdev; auto id = sb->logdev_id; + LOGDEBUGMOD(logstore, "Log dev superblk found logdev={}", id); const auto it = m_id_logdev_map.find(id); if (it == m_id_logdev_map.end()) { - m_id_reserver->reserve(id); - logdev = create_new_logdev_internal(id); - } else { + LOGERROR("logdev={} found but not opened yet, it will be discarded after logstore is started", id); + m_unopened_logdev.insert(id); + } + + // We could update the logdev map either with logdev or rollback superblks found callbacks. + if (it != m_id_logdev_map.end()) { logdev = it->second; + } else { + logdev = std::make_shared< LogDev >(id, m_logdev_vdev.get()); + m_id_logdev_map.emplace(id, logdev); } logdev->log_dev_meta().logdev_super_blk_found(buf, meta_cookie); @@ -190,12 +233,18 @@ void LogStoreService::rollback_super_blk_found(const sisl::byte_view& buf, void* folly::SharedMutexWritePriority::WriteHolder holder(m_logdev_map_mtx); std::shared_ptr< LogDev > logdev; auto id = rollback_sb->logdev_id; + LOGDEBUGMOD(logstore, "Log dev rollback superblk found logdev={}", id); const auto it = m_id_logdev_map.find(id); if (it == m_id_logdev_map.end()) { - m_id_reserver->reserve(id); - logdev = create_new_logdev_internal(id); - } else { + LOGERROR("logdev={} found but not opened yet, it will be discarded after logstore is started", id); + m_unopened_logdev.insert(id); + } + + if (it != m_id_logdev_map.end()) { logdev = it->second; + } else { + logdev = std::make_shared< LogDev >(id, m_logdev_vdev.get()); + m_id_logdev_map.emplace(id, logdev); } logdev->log_dev_meta().rollback_super_blk_found(buf, meta_cookie); @@ -203,7 +252,7 @@ void LogStoreService::rollback_super_blk_found(const sisl::byte_view& buf, void* } std::shared_ptr< HomeLogStore > LogStoreService::create_new_log_store(logdev_id_t logdev_id, bool append_mode) { - folly::SharedMutexWritePriority::ReadHolder holder(m_logdev_map_mtx); + folly::SharedMutexWritePriority::WriteHolder holder(m_logdev_map_mtx); COUNTER_INCREMENT(m_metrics, logstores_count, 1); const auto it = m_id_logdev_map.find(logdev_id); HS_REL_ASSERT((it != m_id_logdev_map.end()), "logdev id {} doesnt exist", logdev_id); @@ -220,7 +269,7 @@ folly::Future< shared< HomeLogStore > > LogStoreService::open_log_store(logdev_i } void LogStoreService::remove_log_store(logdev_id_t logdev_id, logstore_id_t store_id) { - folly::SharedMutexWritePriority::ReadHolder holder(m_logdev_map_mtx); + folly::SharedMutexWritePriority::WriteHolder holder(m_logdev_map_mtx); COUNTER_INCREMENT(m_metrics, logstores_count, 1); const auto it = m_id_logdev_map.find(logdev_id); HS_REL_ASSERT((it != m_id_logdev_map.end()), "logdev id {} doesnt exist", logdev_id); diff --git a/src/tests/test_log_dev.cpp b/src/tests/test_log_dev.cpp index c876fd6ac..7cb76af7e 100644 --- a/src/tests/test_log_dev.cpp +++ b/src/tests/test_log_dev.cpp @@ -336,6 +336,87 @@ TEST_F(LogDevTest, Rollback) { rollback_records_validate(log_store, 0 /* expected_count */); } +TEST_F(LogDevTest, CreateRemoveLogDev) { + auto num_logdev = SISL_OPTIONS["num_logdevs"].as< uint32_t >(); + std::vector< std::shared_ptr< HomeLogStore > > log_stores; + auto vdev = logstore_service().get_vdev(); + + // Create log dev, logstore, write some io. Delete all of them and + // verify the size of vdev and count of logdev. + auto log_devs = logstore_service().get_all_logdevs(); + ASSERT_EQ(log_devs.size(), 0); + ASSERT_EQ(logstore_service().used_size(), 0); + ASSERT_EQ(vdev->num_descriptors(), 0); + + for (uint32_t i{0}; i < num_logdev; ++i) { + auto id = logstore_service().create_new_logdev(); + s_max_flush_multiple = logstore_service().get_logdev(id)->get_flush_size_multiple(); + auto store = logstore_service().create_new_log_store(id, false); + log_stores.push_back(store); + } + + // Used size is still 0. + ASSERT_EQ(logstore_service().used_size(), 0); + ASSERT_EQ(vdev->num_descriptors(), num_logdev); + + log_devs = logstore_service().get_all_logdevs(); + ASSERT_EQ(log_devs.size(), num_logdev); + + for (auto& log_store : log_stores) { + const unsigned count{10}; + for (unsigned i{0}; i < count; ++i) { + // Insert new entry. + insert_sync(log_store, i); + // Verify the entry. + read_verify(log_store, i); + } + } + + // Used size should be non zero. + ASSERT_GT(logstore_service().used_size(), 0); + + for (auto& store : log_stores) { + logstore_service().remove_log_store(store->get_logdev()->get_id(), store->get_store_id()); + } + for (auto& store : log_stores) { + logstore_service().destroy_log_dev(store->get_logdev()->get_id()); + } + + // Test we released all chunks + log_devs = logstore_service().get_all_logdevs(); + ASSERT_EQ(log_devs.size(), 0); + ASSERT_EQ(vdev->num_descriptors(), 0); + ASSERT_EQ(logstore_service().used_size(), 0); + + // Test deletion of unopened logdev. + std::set< logdev_id_t > id_set, unopened_id_set; + for (uint32_t i{0}; i < num_logdev; ++i) { + auto id = logstore_service().create_new_logdev(); + id_set.insert(id); + if (i >= num_logdev / 2) { unopened_id_set.insert(id); } + } + + // Restart homestore with only half of the logdev's open. Rest will be deleted + // as there are unopened. + auto restart = [&]() { + auto starting_cb = [&]() { + auto it = id_set.begin(); + for (uint32_t i{0}; i < id_set.size() / 2; i++, it++) { + logstore_service().open_logdev(*it); + } + }; + start_homestore(true /* restart */, starting_cb); + }; + LOGINFO("Restart homestore"); + restart(); + + log_devs = logstore_service().get_all_logdevs(); + ASSERT_EQ(log_devs.size(), id_set.size() / 2); + for (auto& logdev : log_devs) { + ASSERT_EQ(unopened_id_set.count(logdev->get_id()), 0); + } +} + SISL_OPTION_GROUP(test_log_dev, (num_logdevs, "", "num_logdevs", "number of log devs", ::cxxopts::value< uint32_t >()->default_value("4"), "number"),