From 3b545dfad895e11ba1f4e339407cf85ddead503e Mon Sep 17 00:00:00 2001 From: Harihara Kadayam Date: Tue, 31 Oct 2023 15:43:31 -0700 Subject: [PATCH] Persistence of allocated blocks during CP and moved to C++-20. Without this commit, homestore 4.x doesn't persist the bitmap of allocated blks during CP. This commit handles that gap. On top of that, the free blk id collection is moved to VirtualDev layer, so that both index and data service has common blk id collection. This blkid collection is essential so that all free blk ids are moved to the next CP after the current CP is flushed. Changed it to C++-20 standard which resulted in few compilation errors, fixed that. --- CMakeLists.txt | 3 +- conanfile.py | 2 +- src/include/homestore/checkpoint/cp_mgr.hpp | 7 +- src/lib/blkalloc/append_blk_allocator.cpp | 14 +--- src/lib/blkalloc/blk_cache.h | 64 ++++++++++--------- src/lib/blkdata_svc/blkdata_service.cpp | 5 +- src/lib/blkdata_svc/data_svc_cp.cpp | 6 +- src/lib/checkpoint/cp_mgr.cpp | 2 + src/lib/device/chunk.cpp | 2 - src/lib/device/chunk.h | 2 - src/lib/device/device_manager.cpp | 1 - src/lib/device/virtual_dev.cpp | 53 +++++++++------ src/lib/device/virtual_dev.hpp | 15 +++-- src/lib/index/index_cp.cpp | 2 +- src/lib/index/index_cp.hpp | 55 ++++++---------- src/lib/index/wb_cache.cpp | 51 +++++---------- src/lib/index/wb_cache.hpp | 8 +-- src/lib/meta/meta_blk_service.cpp | 2 +- .../replication/repl_dev/solo_repl_dev.cpp | 1 + src/tests/test_blk_read_tracker.cpp | 8 +-- src/tests/test_cp_mgr.cpp | 4 +- src/tests/test_device_manager.cpp | 2 +- src/tests/test_mem_btree.cpp | 9 ++- src/tests/test_meta_blk_mgr.cpp | 2 +- 24 files changed, 148 insertions(+), 172 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index bd27696ae..1e55c5b2f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -8,7 +8,7 @@ set_property(GLOBAL PROPERTY USE_FOLDERS ON) # turn on folder hierarchies include (cmake/Flags.cmake) -set(CMAKE_CXX_STANDARD 17) +set(CMAKE_CXX_STANDARD 20) enable_testing() if(EXISTS ${CMAKE_CURRENT_BINARY_DIR}/conanbuildinfo.cmake) @@ -107,6 +107,7 @@ else() endif() endif() +add_flags("-g") add_subdirectory(src) # build info diff --git a/conanfile.py b/conanfile.py index a2c858e77..5eb61e71c 100644 --- a/conanfile.py +++ b/conanfile.py @@ -5,7 +5,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "4.5.9" + version = "4.5.10" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" diff --git a/src/include/homestore/checkpoint/cp_mgr.hpp b/src/include/homestore/checkpoint/cp_mgr.hpp index bd3714cc1..15f0d67e7 100644 --- a/src/include/homestore/checkpoint/cp_mgr.hpp +++ b/src/include/homestore/checkpoint/cp_mgr.hpp @@ -56,15 +56,14 @@ VENUM(cp_consumer_t, uint8_t, struct CP; class CPContext { -private: - cp_id_t m_cp_id; +protected: CP* m_cp; folly::Promise< bool > m_flush_comp; public: - CPContext(cp_id_t id) : m_cp_id{id} {} - cp_id_t id() const { return m_cp_id; } + CPContext(CP* cp) : m_cp{cp} {} CP* cp() { return m_cp; } + cp_id_t id() const; void complete(bool status) { m_flush_comp.setValue(status); } folly::Future< bool > get_future() { return m_flush_comp.getFuture(); } diff --git a/src/lib/blkalloc/append_blk_allocator.cpp b/src/lib/blkalloc/append_blk_allocator.cpp index 471b21442..5852682d9 100644 --- a/src/lib/blkalloc/append_blk_allocator.cpp +++ b/src/lib/blkalloc/append_blk_allocator.cpp @@ -119,19 +119,11 @@ BlkAllocStatus AppendBlkAllocator::alloc(blk_count_t nblks, const blk_alloc_hint return BlkAllocStatus::SUCCESS; } -BlkAllocStatus AppendBlkAllocator::alloc_on_disk(BlkId const&) { - DEBUG_ASSERT(false, "alloc_on_disk called on non-persisted allocator"); - return BlkAllocStatus::SUCCESS; -} +BlkAllocStatus AppendBlkAllocator::alloc_on_disk(BlkId const&) { return BlkAllocStatus::SUCCESS; } -void AppendBlkAllocator::free_on_disk(BlkId const&) { - DEBUG_ASSERT(false, "free_on_disk called on non-persisted allocator"); -} +void AppendBlkAllocator::free_on_disk(BlkId const&) {} -bool AppendBlkAllocator::is_blk_alloced_on_disk(BlkId const&, bool) const { - DEBUG_ASSERT(false, "is_blk_alloced_on_disk called on non-persisted allocator"); - return false; -} +bool AppendBlkAllocator::is_blk_alloced_on_disk(BlkId const&, bool) const { return false; } // // cp_flush doesn't need CPGuard as it is triggered by CPMgr which already handles the reference check; diff --git a/src/lib/blkalloc/blk_cache.h b/src/lib/blkalloc/blk_cache.h index 132230465..7e78bcc6b 100644 --- a/src/lib/blkalloc/blk_cache.h +++ b/src/lib/blkalloc/blk_cache.h @@ -156,7 +156,24 @@ struct blk_cache_refill_status { void mark_refill_done() { slab_refilled_count = slab_required_count; } }; +} // namespace homestore + +namespace fmt { +template <> +struct formatter< homestore::blk_cache_refill_status > { + template < typename ParseContext > + constexpr auto parse(ParseContext& ctx) { + return ctx.begin(); + } + + template < typename FormatContext > + auto format(const homestore::blk_cache_refill_status& s, FormatContext& ctx) { + return format_to(ctx.out(), "{}/{}", s.slab_refilled_count, s.slab_required_count); + } +}; +} // namespace fmt +namespace homestore { struct blk_cache_fill_session { uint64_t session_id; std::vector< blk_cache_refill_status > slab_requirements; // A slot for each slab about count of required/refilled @@ -231,7 +248,24 @@ struct SlabCacheConfig { } std::string get_name() const { return m_name; } }; +} // namespace homestore + +namespace fmt { +template <> +struct formatter< homestore::SlabCacheConfig > { + template < typename ParseContext > + constexpr auto parse(ParseContext& ctx) { + return ctx.begin(); + } + template < typename FormatContext > + auto format(const homestore::SlabCacheConfig& s, FormatContext& ctx) { + return format_to(ctx.out(), "{}", s.to_string()); + } +}; +} // namespace fmt + +namespace homestore { class FreeBlkCache { public: FreeBlkCache() = default; @@ -281,32 +315,4 @@ class FreeBlkCache { return nblks_to_round_down_slab_tbl[nblks]; } }; -} // namespace homestore - -namespace fmt { -template <> -struct formatter< homestore::blk_cache_refill_status > { - template < typename ParseContext > - constexpr auto parse(ParseContext& ctx) { - return ctx.begin(); - } - - template < typename FormatContext > - auto format(const homestore::blk_cache_refill_status& s, FormatContext& ctx) { - return format_to(ctx.out(), "{}/{}", s.slab_refilled_count, s.slab_required_count); - } -}; - -template <> -struct formatter< homestore::SlabCacheConfig > { - template < typename ParseContext > - constexpr auto parse(ParseContext& ctx) { - return ctx.begin(); - } - - template < typename FormatContext > - auto format(const homestore::SlabCacheConfig& s, FormatContext& ctx) { - return format_to(ctx.out(), "{}", s.to_string()); - } -}; -} // namespace fmt +} // namespace homestore \ No newline at end of file diff --git a/src/lib/blkdata_svc/blkdata_service.cpp b/src/lib/blkdata_svc/blkdata_service.cpp index 8a260ecdb..50d92d6e5 100644 --- a/src/lib/blkdata_svc/blkdata_service.cpp +++ b/src/lib/blkdata_svc/blkdata_service.cpp @@ -212,7 +212,10 @@ folly::Future< std::error_code > BlkDataService::async_free_blk(MultiBlkId const auto f = promise.getFuture(); m_blk_read_tracker->wait_on(bids, [this, bids, p = std::move(promise)]() mutable { - m_vdev->free_blk(bids); + { + auto cpg = hs()->cp_mgr().cp_guard(); + m_vdev->free_blk(bids, s_cast< VDevCPContext* >(cpg.context(cp_consumer_t::BLK_DATA_SVC))); + } p.setValue(std::error_code{}); }); return f; diff --git a/src/lib/blkdata_svc/data_svc_cp.cpp b/src/lib/blkdata_svc/data_svc_cp.cpp index b76a4dc85..104a32d22 100644 --- a/src/lib/blkdata_svc/data_svc_cp.cpp +++ b/src/lib/blkdata_svc/data_svc_cp.cpp @@ -22,21 +22,21 @@ namespace homestore { DataSvcCPCallbacks::DataSvcCPCallbacks(shared< VirtualDev > vdev) : m_vdev{vdev} {} std::unique_ptr< CPContext > DataSvcCPCallbacks::on_switchover_cp(CP* cur_cp, CP* new_cp) { - return m_vdev->create_cp_context(new_cp->id()); + return m_vdev->create_cp_context(new_cp); } folly::Future< bool > DataSvcCPCallbacks::cp_flush(CP* cp) { // Pick a CP Manager blocking IO fiber to execute the cp flush of vdev // iomanager.run_on_forget(hs()->cp_mgr().pick_blocking_io_fiber(), [this, cp]() { auto cp_ctx = s_cast< VDevCPContext* >(cp->context(cp_consumer_t::BLK_DATA_SVC)); - m_vdev->cp_flush(cp); // this is a blocking io call + m_vdev->cp_flush(cp_ctx); // this is a blocking io call cp_ctx->complete(true); //}); return folly::makeFuture< bool >(true); } -void DataSvcCPCallbacks::cp_cleanup(CP* cp) { m_vdev->cp_cleanup(cp); } +void DataSvcCPCallbacks::cp_cleanup(CP* cp) {} int DataSvcCPCallbacks::cp_progress_percent() { return m_vdev->cp_progress_percent(); } diff --git a/src/lib/checkpoint/cp_mgr.cpp b/src/lib/checkpoint/cp_mgr.cpp index 944c90222..89921620a 100644 --- a/src/lib/checkpoint/cp_mgr.cpp +++ b/src/lib/checkpoint/cp_mgr.cpp @@ -405,4 +405,6 @@ void CPWatchdog::cp_watchdog_timer() { } } +cp_id_t CPContext::id() const { return m_cp->id(); } + } // namespace homestore diff --git a/src/lib/device/chunk.cpp b/src/lib/device/chunk.cpp index f1ddbe097..0853571a1 100644 --- a/src/lib/device/chunk.cpp +++ b/src/lib/device/chunk.cpp @@ -22,8 +22,6 @@ namespace homestore { Chunk::Chunk(PhysicalDev* pdev, const chunk_info& cinfo, uint32_t chunk_slot) : m_chunk_info{cinfo}, m_pdev{pdev}, m_chunk_slot{chunk_slot}, m_stream_id{pdev->chunk_to_stream_id(cinfo)} {} -void Chunk::cp_flush(CP* cp) { blk_allocator_mutable()->cp_flush(cp); } - std::string Chunk::to_string() const { return fmt::format("chunk_id={}, vdev_id={}, start_offset={}, size={}, end_of_chunk={}, slot_num_in_pdev={} " "pdev_ordinal={} vdev_ordinal={} stream_id={}", diff --git a/src/lib/device/chunk.h b/src/lib/device/chunk.h index 71150b521..bb36b8449 100644 --- a/src/lib/device/chunk.h +++ b/src/lib/device/chunk.h @@ -38,8 +38,6 @@ class Chunk { Chunk& operator=(Chunk&&) noexcept = delete; virtual ~Chunk() = default; - void cp_flush(CP* cp); - /////////////// Pointer Getters //////////////////// const PhysicalDev* physical_dev() const { return m_pdev; } PhysicalDev* physical_dev_mutable() { return m_pdev; }; diff --git a/src/lib/device/device_manager.cpp b/src/lib/device/device_manager.cpp index c5d0d4f3f..ecab5562c 100644 --- a/src/lib/device/device_manager.cpp +++ b/src/lib/device/device_manager.cpp @@ -66,7 +66,6 @@ DeviceManager::DeviceManager(const std::vector< dev_info >& devs, vdev_create_cb if (is_hdd(dev_info.dev_name)) { HomeStoreStaticConfig::instance().hdd_drive_present = true; found_hdd_dev = true; - LOGINFO("HDD device found: {}"); break; } } diff --git a/src/lib/device/virtual_dev.cpp b/src/lib/device/virtual_dev.cpp index 04a0d3930..2f1311002 100644 --- a/src/lib/device/virtual_dev.cpp +++ b/src/lib/device/virtual_dev.cpp @@ -200,7 +200,7 @@ BlkAllocStatus VirtualDev::alloc_blks(blk_count_t nblks, blk_alloc_hints const& } if ((status != BlkAllocStatus::SUCCESS) && !((status == BlkAllocStatus::PARTIAL) && hints.partial_alloc_ok)) { - LOGERROR("nblks={} failed to alloc after trying to alloc on every chunks {} and devices {}.", nblks); + LOGERROR("nblks={} failed to alloc after trying to alloc on every chunks and devices", nblks); COUNTER_INCREMENT(m_metrics, vdev_num_alloc_failure, 1); } @@ -252,14 +252,25 @@ BlkAllocStatus VirtualDev::alloc_blks_from_chunk(blk_count_t nblks, blk_alloc_hi return status; } -void VirtualDev::free_blk(BlkId const& b) { - if (b.is_multi()) { - MultiBlkId const& mb = r_cast< MultiBlkId const& >(b); - Chunk* chunk = m_dmgr.get_chunk_mutable(mb.chunk_num()); - chunk->blk_allocator_mutable()->free(mb); +void VirtualDev::free_blk(BlkId const& bid, VDevCPContext* vctx) { + auto do_free_action = [this](auto const& b, VDevCPContext* vctx) { + if (vctx) { + vctx->m_free_blkid_list.push_back(b); + } else { + BlkAllocator* allocator = m_dmgr.get_chunk_mutable(b.chunk_num())->blk_allocator_mutable(); + if (m_auto_recovery) { allocator->free_on_disk(b); } + allocator->free(b); + } + }; + + if (bid.is_multi()) { + MultiBlkId const& mbid = r_cast< MultiBlkId const& >(bid); + auto it = mbid.iterate(); + while (auto const b = it.next()) { + do_free_action(*b, vctx); + } } else { - Chunk* chunk = m_dmgr.get_chunk_mutable(b.chunk_num()); - chunk->blk_allocator_mutable()->free(b); + do_free_action(bid, vctx); } } @@ -541,25 +552,29 @@ void VirtualDev::update_vdev_private(const sisl::blob& private_data) { } ///////////////////////// VirtualDev Checkpoint methods ///////////////////////////// +VDevCPContext::VDevCPContext(CP* cp) : CPContext(cp) {} -VDevCPContext::VDevCPContext(cp_id_t cp_id) : CPContext(cp_id) {} +std::unique_ptr< CPContext > VirtualDev::create_cp_context(CP* cp) { return std::make_unique< VDevCPContext >(cp); } -std::unique_ptr< CPContext > VirtualDev::create_cp_context(cp_id_t cp_id) { - return std::make_unique< VDevCPContext >(cp_id); -} +void VirtualDev::cp_flush(VDevCPContext* v_cp_ctx) { + CP* cp = v_cp_ctx->cp(); -void VirtualDev::cp_flush(CP* cp) { - // pass down cp so that underlying componnents can get their customized CP context if needed; - m_chunk_selector->foreach_chunks([this, cp](cshared< Chunk >& chunk) { chunk->cp_flush(cp); }); + // pass down cp so that underlying components can get their customized CP context if needed; + m_chunk_selector->foreach_chunks( + [this, cp](cshared< Chunk >& chunk) { chunk->blk_allocator_mutable()->cp_flush(cp); }); + + // All of the blkids which were captured in the current vdev cp context will now be freed and hence available for + // allocation on the new CP dirty collection session which is ongoing + for (auto const& b : v_cp_ctx->m_free_blkid_list) { + BlkAllocator* allocator = m_dmgr.get_chunk_mutable(b.chunk_num())->blk_allocator_mutable(); + if (m_auto_recovery) { allocator->free_on_disk(b); } + allocator->free(b); + } } // sync-ops during cp_flush, so return 100; int VirtualDev::cp_progress_percent() { return 100; } -void VirtualDev::cp_cleanup(CP*) { - // no-op; -} - ///////////////////////// VirtualDev Private Methods ///////////////////////////// uint64_t VirtualDev::to_dev_offset(BlkId const& b, Chunk** chunk) const { *chunk = m_dmgr.get_chunk_mutable(b.chunk_num()); diff --git a/src/lib/device/virtual_dev.hpp b/src/lib/device/virtual_dev.hpp index 0fcb9f709..d28a67f35 100644 --- a/src/lib/device/virtual_dev.hpp +++ b/src/lib/device/virtual_dev.hpp @@ -30,6 +30,7 @@ #include #include #include +#include #include #include @@ -79,6 +80,7 @@ struct blkalloc_cp; class VirtualDev; ENUM(vdev_event_t, uint8_t, SIZE_THRESHOLD_REACHED, VDEV_ERRORED_OUT); using vdev_event_cb_t = std::function< void(VirtualDev&, vdev_event_t, const std::string&) >; +class VDevCPContext; class VirtualDev { protected: @@ -150,7 +152,7 @@ class VirtualDev { /// @return Allocation Status virtual BlkAllocStatus commit_blk(BlkId const& blkid); - virtual void free_blk(BlkId const& b); + virtual void free_blk(BlkId const& b, VDevCPContext* vctx = nullptr); /////////////////////// Write API related methods ///////////////////////////// /// @brief Asynchornously write the buffer to the device on a given blkid @@ -255,14 +257,12 @@ class VirtualDev { /// @brief /// /// @param cp - void cp_flush(CP* cp); - - void cp_cleanup(CP* cp); + void cp_flush(VDevCPContext* v_cp_ctx); /// @brief : percentage CP has been progressed, this api is normally used for cp watchdog; int cp_progress_percent(); - std::unique_ptr< CPContext > create_cp_context(cp_id_t cp_id); + std::unique_ptr< CPContext > create_cp_context(CP* cp); ////////////////////////// Standard Getters /////////////////////////////// virtual uint64_t available_blks() const; @@ -295,7 +295,10 @@ class VirtualDev { // place holder for future needs in which components underlying virtualdev needs cp flush context; class VDevCPContext : public CPContext { public: - VDevCPContext(cp_id_t cp_id); + sisl::ConcurrentInsertVector< BlkId > m_free_blkid_list; + +public: + VDevCPContext(CP* cp); virtual ~VDevCPContext() = default; }; diff --git a/src/lib/index/index_cp.cpp b/src/lib/index/index_cp.cpp index 3add1647e..ab7ff8f12 100644 --- a/src/lib/index/index_cp.cpp +++ b/src/lib/index/index_cp.cpp @@ -5,7 +5,7 @@ namespace homestore { IndexCPCallbacks::IndexCPCallbacks(IndexWBCache* wb_cache) : m_wb_cache{wb_cache} {} std::unique_ptr< CPContext > IndexCPCallbacks::on_switchover_cp(CP* cur_cp, CP* new_cp) { - return m_wb_cache->create_cp_context(new_cp->id()); + return std::make_unique< IndexCPContext >(new_cp); } folly::Future< bool > IndexCPCallbacks::cp_flush(CP* cp) { diff --git a/src/lib/index/index_cp.hpp b/src/lib/index/index_cp.hpp index 077c9824b..f06f53091 100644 --- a/src/lib/index/index_cp.hpp +++ b/src/lib/index/index_cp.hpp @@ -15,70 +15,53 @@ *********************************************************************************/ #pragma once #include -#include +#include #include #include #include #include "checkpoint/cp.hpp" +#include "device/virtual_dev.hpp" SISL_LOGGING_DECL(wbcache) namespace homestore { -struct flush_buffer_iterator { - sisl::thread_vector_iterator dirty_buf_list_it; - sisl::thread_vector_iterator free_node_list_it; -}; - -struct IndexCPContext : public CPContext { +struct IndexCPContext : public VDevCPContext { public: std::atomic< uint64_t > m_num_nodes_added{0}; std::atomic< uint64_t > m_num_nodes_removed{0}; - sisl::ThreadVector< IndexBufferPtr >* m_dirty_buf_list{nullptr}; - sisl::ThreadVector< BlkId >* m_free_node_blkid_list{nullptr}; + sisl::ConcurrentInsertVector< IndexBufferPtr > m_dirty_buf_list; sisl::atomic_counter< int64_t > m_dirty_buf_count{0}; IndexBufferPtr m_last_in_chain; std::mutex m_flush_buffer_mtx; - flush_buffer_iterator m_buf_it; + sisl::ConcurrentInsertVector< IndexBufferPtr >::iterator m_dirty_buf_it; public: - IndexCPContext(cp_id_t cp_id, sisl::ThreadVector< IndexBufferPtr >* dirty_list, - sisl::ThreadVector< BlkId >* free_blkid_list) : - CPContext(cp_id), m_dirty_buf_list{dirty_list}, m_free_node_blkid_list{free_blkid_list} {} - - virtual ~IndexCPContext() { - auto it = m_dirty_buf_list->begin(true /* latest */); - IndexBufferPtr *tmp = nullptr; - while((tmp = m_dirty_buf_list->next(it)) != nullptr) { - tmp->reset(); - } - m_dirty_buf_list->clear(); - m_free_node_blkid_list->clear(); - } - - void prepare_flush_iteration() { - m_buf_it.dirty_buf_list_it = m_dirty_buf_list->begin(true /* latest */); - m_buf_it.free_node_list_it = m_free_node_blkid_list->begin(true /* latest */); - } + IndexCPContext(CP* cp) : VDevCPContext(cp) {} + virtual ~IndexCPContext() = default; void add_to_dirty_list(const IndexBufferPtr& buf) { buf->m_buf_state = index_buf_state_t::DIRTY; - m_dirty_buf_list->push_back(buf); + m_dirty_buf_list.push_back(buf); m_dirty_buf_count.increment(1); m_last_in_chain = buf; LOGTRACEMOD(wbcache, "{}", buf->to_string()); } - void add_to_free_node_list(BlkId blkid) { m_free_node_blkid_list->push_back(blkid); } - bool any_dirty_buffers() const { return !m_dirty_buf_count.testz(); } - IndexBufferPtr* next_dirty() { return m_dirty_buf_list->next(m_buf_it.dirty_buf_list_it); } - BlkId* next_blkid() { return m_free_node_blkid_list->next(m_buf_it.free_node_list_it); } + void prepare_flush_iteration() { m_dirty_buf_it = m_dirty_buf_list.begin(); } + + std::optional< IndexBufferPtr > next_dirty() { + if (m_dirty_buf_it == m_dirty_buf_list.end()) { return std::nullopt; } + IndexBufferPtr ret = *m_dirty_buf_it; + ++m_dirty_buf_it; + return ret; + } + std::string to_string() const { - std::string str{ - fmt::format("IndexCPContext cpid={} dirty_buf_count={} dirty_buf_list_size={} blkid_list_size={}", id(), - m_dirty_buf_count.get(), m_dirty_buf_list->size(), m_free_node_blkid_list->size())}; + std::string str{fmt::format("IndexCPContext cpid={} dirty_buf_count={} dirty_buf_list_size={}", m_cp->id(), + m_dirty_buf_count.get(), m_dirty_buf_list.size())}; // TODO dump all index buffers. return str; diff --git a/src/lib/index/wb_cache.cpp b/src/lib/index/wb_cache.cpp index 21ad44345..a00a46dfa 100644 --- a/src/lib/index/wb_cache.cpp +++ b/src/lib/index/wb_cache.cpp @@ -43,10 +43,6 @@ IndexWBCache::IndexWBCache(const std::shared_ptr< VirtualDev >& vdev, const std: }}, m_node_size{node_size} { start_flush_threads(); - for (size_t i{0}; i < MAX_CP_COUNT; ++i) { - m_dirty_list[i] = std::make_unique< sisl::ThreadVector< IndexBufferPtr > >(); - m_free_blkid_list[i] = std::make_unique< sisl::ThreadVector< BlkId > >(); - } } void IndexWBCache::start_flush_threads() { @@ -93,6 +89,9 @@ BtreeNodePtr IndexWBCache::alloc_buf(node_initializer_t&& node_initializer) { // Add the node to the cache bool done = m_cache.insert(node); HS_REL_ASSERT_EQ(done, true, "Unable to add alloc'd node to cache, low memory or duplicate inserts?"); + + // The entire index is updated in the commit path, so we alloc the blk and commit them right away + m_vdev->commit_blk(blkid); return node; } @@ -184,12 +183,11 @@ void IndexWBCache::free_buf(const IndexBufferPtr& buf, CPContext* cp_ctx) { HS_REL_ASSERT_EQ(done, true, "Race on cache removal of btree blkid?"); resource_mgr().inc_free_blk(m_node_size); - r_cast< IndexCPContext* >(cp_ctx)->add_to_free_node_list(buf->m_blkid); + m_vdev->free_blk(buf->m_blkid, s_cast< VDevCPContext* >(cp_ctx)); } //////////////////// CP Related API section ///////////////////////////////// -folly::Future< bool > IndexWBCache::async_cp_flush(CPContext* context) { - IndexCPContext* cp_ctx = s_cast< IndexCPContext* >(context); +folly::Future< bool > IndexWBCache::async_cp_flush(IndexCPContext* cp_ctx) { LOGTRACEMOD(wbcache, "cp_ctx {}", cp_ctx->to_string()); if (!cp_ctx->any_dirty_buffers()) { CP_PERIODIC_LOG(DEBUG, cp_ctx->id(), "Btree does not have any dirty buffers to flush"); @@ -213,12 +211,6 @@ folly::Future< bool > IndexWBCache::async_cp_flush(CPContext* context) { return std::move(cp_ctx->get_future()); } -std::unique_ptr< CPContext > IndexWBCache::create_cp_context(cp_id_t cp_id) { - size_t const cp_id_slot = cp_id % MAX_CP_COUNT; - return std::make_unique< IndexCPContext >(cp_id, m_dirty_list[cp_id_slot].get(), - m_free_blkid_list[cp_id_slot].get()); -} - void IndexWBCache::do_flush_one_buf(IndexCPContext* cp_ctx, const IndexBufferPtr& buf, bool part_of_batch) { LOGTRACEMOD(wbcache, "buf {}", buf->to_string()); buf->m_buf_state = index_buf_state_t::FLUSHING; @@ -238,8 +230,13 @@ void IndexWBCache::process_write_completion(IndexCPContext* cp_ctx, IndexBuffer* if (next_buf) { do_flush_one_buf(cp_ctx, next_buf, false); } else if (!has_more) { - // We are done flushing the buffers, lets free the btree blocks and then flush the bitmap - free_btree_blks_and_flush(cp_ctx); + // We are done flushing the buffers, We flush the vdev to persist the vdev bitmaps and free blks + // Pick a CP Manager blocking IO fiber to execute the cp flush of vdev + iomanager.run_on_forget(hs()->cp_mgr().pick_blocking_io_fiber(), [this, cp_ctx]() { + LOGTRACEMOD(wbcache, "Initiating CP flush"); + m_vdev->cp_flush(cp_ctx); // This is a blocking io call + cp_ctx->complete(true); + }); } } @@ -290,11 +287,11 @@ void IndexWBCache::get_next_bufs_internal(IndexCPContext* cp_ctx, uint32_t max_c // If we still have room to push the next buffer, take it from the main list while (count < max_count) { - IndexBufferPtr* ppbuf = cp_ctx->next_dirty(); - if (ppbuf == nullptr) { break; } // End of list - IndexBufferPtr buf = *ppbuf; - if (buf->m_wait_for_leaders.testz()) { - bufs.emplace_back(std::move(buf)); + std::optional< IndexBufferPtr > buf = cp_ctx->next_dirty(); + if (!buf) { break; } // End of list + + if ((*buf)->m_wait_for_leaders.testz()) { + bufs.emplace_back(std::move(*buf)); ++count; } else { // There is some leader buffer still flushing, once done its completion will flush this buffer @@ -302,20 +299,6 @@ void IndexWBCache::get_next_bufs_internal(IndexCPContext* cp_ctx, uint32_t max_c } } -void IndexWBCache::free_btree_blks_and_flush(IndexCPContext* cp_ctx) { - BlkId* pbid; - while ((pbid = cp_ctx->next_blkid()) != nullptr) { - m_vdev->free_blk(*pbid); - } - - // Pick a CP Manager blocking IO fiber to execute the cp flush of vdev - iomanager.run_on_forget(hs()->cp_mgr().pick_blocking_io_fiber(), [this, cp_ctx]() { - LOGTRACEMOD(wbcache, "Initiating CP flush"); - m_vdev->cp_flush(nullptr); // This is a blocking io call - cp_ctx->complete(true); - }); -} - IndexBtreeNode* IndexBtreeNode::convert(BtreeNode* bt_node) { return r_cast< IndexBtreeNode* >(bt_node->get_node_context()); } diff --git a/src/lib/index/wb_cache.hpp b/src/lib/index/wb_cache.hpp index ad5255be9..7639d0714 100644 --- a/src/lib/index/wb_cache.hpp +++ b/src/lib/index/wb_cache.hpp @@ -38,9 +38,6 @@ class IndexWBCache : public IndexWBCacheBase { sisl::SimpleCache< BlkId, BtreeNodePtr > m_cache; uint32_t m_node_size; - // Dirty buffer list arranged in a dependent list fashion - std::unique_ptr< sisl::ThreadVector< IndexBufferPtr > > m_dirty_list[MAX_CP_COUNT]; - std::unique_ptr< sisl::ThreadVector< BlkId > > m_free_blkid_list[MAX_CP_COUNT]; // Free'd btree blkids per cp std::vector< iomgr::io_fiber_t > m_cp_flush_fibers; std::mutex m_flush_mtx; @@ -57,8 +54,7 @@ class IndexWBCache : public IndexWBCacheBase { void free_buf(const IndexBufferPtr& buf, CPContext* cp_ctx) override; //////////////////// CP Related API section ///////////////////////////////// - folly::Future< bool > async_cp_flush(CPContext* context); - std::unique_ptr< CPContext > create_cp_context(cp_id_t cp_id); + folly::Future< bool > async_cp_flush(IndexCPContext* context); IndexBufferPtr copy_buffer(const IndexBufferPtr& cur_buf) const; private: @@ -71,7 +67,5 @@ class IndexWBCache : public IndexWBCacheBase { void get_next_bufs(IndexCPContext* cp_ctx, uint32_t max_count, std::vector< IndexBufferPtr >& bufs); void get_next_bufs_internal(IndexCPContext* cp_ctx, uint32_t max_count, IndexBuffer* prev_flushed_buf, std::vector< IndexBufferPtr >& bufs); - void free_btree_blks_and_flush(IndexCPContext* cp_ctx); - }; } // namespace homestore diff --git a/src/lib/meta/meta_blk_service.cpp b/src/lib/meta/meta_blk_service.cpp index 02ba4b836..e19a99daf 100644 --- a/src/lib/meta/meta_blk_service.cpp +++ b/src/lib/meta/meta_blk_service.cpp @@ -374,7 +374,7 @@ void MetaBlkService::register_handler(meta_sub_type type, const meta_blk_found_c m_sub_info[type].cb = cb; m_sub_info[type].comp_cb = comp_cb; m_sub_info[type].do_crc = do_crc ? 1 : 0; - HS_LOG(INFO, metablk, "[type={}] registered with do_crc: {}", type, do_crc); + HS_LOG(DEBUG, metablk, "[type={}] registered with do_crc: {}", type, do_crc); } void MetaBlkService::add_sub_sb(meta_sub_type type, const uint8_t* context_data, uint64_t sz, void*& cookie) { diff --git a/src/lib/replication/repl_dev/solo_repl_dev.cpp b/src/lib/replication/repl_dev/solo_repl_dev.cpp index 653ef5302..7620c0b98 100644 --- a/src/lib/replication/repl_dev/solo_repl_dev.cpp +++ b/src/lib/replication/repl_dev/solo_repl_dev.cpp @@ -83,6 +83,7 @@ void SoloReplDev::write_journal(intrusive< repl_req_ctx > rreq) { auto cur_lsn = m_commit_upto.load(); if (cur_lsn < lsn) { m_commit_upto.compare_exchange_strong(cur_lsn, lsn); } + data_service().commit_blk(rreq->local_blkid); m_listener->on_commit(rreq->lsn, rreq->header, rreq->key, rreq->local_blkid, rreq); }); } diff --git a/src/tests/test_blk_read_tracker.cpp b/src/tests/test_blk_read_tracker.cpp index f7a030ac5..4c656ac0b 100644 --- a/src/tests/test_blk_read_tracker.cpp +++ b/src/tests/test_blk_read_tracker.cpp @@ -157,18 +157,18 @@ TEST_F(BlkReadTrackerTest, TestInsertWithWaiter) { * */ TEST_F(BlkReadTrackerTest, TestInsRmWithWaiterOnSameBid) { BlkId b{16, 20, 0}; - LOGINFO("Step 1: read blkid: {} into hash map.", b.to_string()); + LOGINFO("Step 1: read blkid: {} into hash map.", b); get_inst()->insert(b); bool called{false}; - LOGINFO("Step 2: free blkid: {} to be completed on reading"); + LOGINFO("Step 2: free blkid: {} to be completed on reading", b); get_inst()->wait_on(b, [&called, &b]() { LOGMSG_ASSERT_EQ(called, false, "not expecting wait_on callback to be called more than once!"); called = true; - LOGINFO("wait_on callback triggered on blkid: {};", b.to_string()); + LOGINFO("wait_on callback triggered on blkid: {};", b); }); - LOGINFO("Step 3: try to do read completed on blkid: {}. ", b.to_string()); + LOGINFO("Step 3: try to do read completed on blkid: {}. ", b); get_inst()->remove(b); // cb should be called in same thread serving remove; diff --git a/src/tests/test_cp_mgr.cpp b/src/tests/test_cp_mgr.cpp index 0db53f176..ad0e8a60d 100644 --- a/src/tests/test_cp_mgr.cpp +++ b/src/tests/test_cp_mgr.cpp @@ -41,7 +41,7 @@ SISL_OPTION_GROUP(test_cp_mgr, class TestCPContext : public CPContext { public: - TestCPContext(cp_id_t id) : CPContext{id} {} + TestCPContext(CP* cp) : CPContext{cp} {} virtual ~TestCPContext() = default; void add() { @@ -69,7 +69,7 @@ class TestCPContext : public CPContext { class TestCPCallbacks : public CPCallbacks { public: std::unique_ptr< CPContext > on_switchover_cp(CP*, CP* new_cp) override { - return std::make_unique< TestCPContext >(new_cp->id()); + return std::make_unique< TestCPContext >(new_cp); } folly::Future< bool > cp_flush(CP* cp) override { diff --git a/src/tests/test_device_manager.cpp b/src/tests/test_device_manager.cpp index 8b1363688..13bef9d6b 100644 --- a/src/tests/test_device_manager.cpp +++ b/src/tests/test_device_manager.cpp @@ -143,7 +143,7 @@ TEST_F(DeviceMgrTest, StripedVDevCreation) { uint32_t size_pct = 2; uint64_t remain_size = avail_size; - LOGINFO("Step 1: Creating {} vdevs with combined size as {}", in_bytes(avail_size)); + LOGINFO("Step 1: Creating {} vdevs with combined size as {}", num_test_vdevs, in_bytes(avail_size)); for (uint32_t i = 0; i < num_test_vdevs; ++i) { std::string name = "test_vdev_" + std::to_string(i + 1); uint64_t size = std::min(remain_size, (avail_size * size_pct) / 100); diff --git a/src/tests/test_mem_btree.cpp b/src/tests/test_mem_btree.cpp index 495cb8fd3..12129e538 100644 --- a/src/tests/test_mem_btree.cpp +++ b/src/tests/test_mem_btree.cpp @@ -502,7 +502,7 @@ TYPED_TEST(BtreeTest, RandomRemoveRange) { static thread_local std::uniform_int_distribution< uint32_t > s_rand_key_generator{0, 2 * num_entries}; // this->print_keys(); LOGINFO("Step 2: Do range remove for maximum of {} iterations", num_iters); - for (uint32_t i{0}; i < num_iters && this->m_shadow_map.size() > 0; ++i) { + for (uint32_t i{0}; i< num_iters&& this->m_shadow_map.size() > 0; ++i) { uint32_t key1 = s_rand_key_generator(g_re); uint32_t key2 = s_rand_key_generator(g_re); uint32_t start_key = std::min(key1, key2); @@ -541,13 +541,12 @@ class BtreeConcurrentTest : public testing::Test { void print_keys() const { m_bt->print_tree_keys(); } void execute(const std::vector< std::pair< std::string, int > >& op_list) { - LOGINFO("Starting iomgr with {} threads", SISL_OPTIONS["n_threads"].as< uint32_t >()); ioenvironment.with_iomgr(iomgr::iomgr_params{.num_threads = SISL_OPTIONS["n_threads"].as< uint32_t >(), - false, + .is_spdk = false, .num_fibers = 1 + SISL_OPTIONS["n_fibers"].as< uint32_t >(), - 0, - 0}); + .app_mem_size_mb = 0, + .hugepage_size_mb = 0}); std::mutex mtx; iomanager.run_on_wait(iomgr::reactor_regex::all_io, [this, &mtx]() { auto fv = iomanager.sync_io_capable_fibers(); diff --git a/src/tests/test_meta_blk_mgr.cpp b/src/tests/test_meta_blk_mgr.cpp index 1e4c211f0..deda07a82 100644 --- a/src/tests/test_meta_blk_mgr.cpp +++ b/src/tests/test_meta_blk_mgr.cpp @@ -560,7 +560,7 @@ class VMetaBlkMgrTest : public ::testing::Test { freq.set_count(1); freq.set_percent(100); m_fc.inject_noreturn_flip(flip_name, {null_cond}, freq); - LOGDEBUG("Flip " + flip_name + " set"); + LOGDEBUG("Flip {} set", flip_name); } #endif