From 8d92ff49f9d42b897053511f799f1bcc7dd940f8 Mon Sep 17 00:00:00 2001 From: Sanal P Date: Tue, 24 Oct 2023 14:01:11 -0700 Subject: [PATCH] Index write back cache fixes. Add locks for indexbuffer to avoid concurrency issues with cp flush and insert threads. Create per CP indexbuffer. Add locks in test for index btree shadow map as there are concurrent requests. Use shared ptr's in wb cache. --- .../homestore/btree/detail/btree_node_mgr.ipp | 2 +- .../homestore/btree/detail/prefix_node.hpp | 2 +- .../homestore/index/index_internal.hpp | 64 ++++++++++--- src/include/homestore/index/index_table.hpp | 35 ++++--- src/include/homestore/index/wb_cache_base.hpp | 4 +- src/include/homestore/index_service.hpp | 2 +- src/lib/checkpoint/cp.hpp | 4 +- src/lib/checkpoint/cp_mgr.cpp | 9 +- src/lib/index/index_cp.hpp | 95 +++++++++++++++--- src/lib/index/index_service.cpp | 15 ++- src/lib/index/wb_cache.cpp | 96 ++++++++++++------- src/lib/index/wb_cache.hpp | 14 +-- src/tests/btree_helpers/btree_test_helper.hpp | 50 ++++++++-- src/tests/test_index_btree.cpp | 66 ++++++++----- 14 files changed, 334 insertions(+), 124 deletions(-) diff --git a/src/include/homestore/btree/detail/btree_node_mgr.ipp b/src/include/homestore/btree/detail/btree_node_mgr.ipp index f1476574c..87ae2a6fc 100644 --- a/src/include/homestore/btree/detail/btree_node_mgr.ipp +++ b/src/include/homestore/btree/detail/btree_node_mgr.ipp @@ -307,7 +307,7 @@ BtreeNode* Btree< K, V >::init_node(uint8_t* node_buf, uint32_t node_ctx_size, b /* Note:- This function assumes that access of this node is thread safe. */ template < typename K, typename V > void Btree< K, V >::free_node(const BtreeNodePtr& node, locktype_t cur_lock, void* context) { - BT_NODE_LOG(DEBUG, node, "Freeing node"); + BT_NODE_LOG(TRACE, node, "Freeing node"); COUNTER_DECREMENT_IF_ELSE(m_metrics, node->is_leaf(), btree_leaf_node_count, btree_int_node_count, 1); if (cur_lock != locktype_t::NONE) { diff --git a/src/include/homestore/btree/detail/prefix_node.hpp b/src/include/homestore/btree/detail/prefix_node.hpp index 62003da7a..fd8b1956c 100644 --- a/src/include/homestore/btree/detail/prefix_node.hpp +++ b/src/include/homestore/btree/detail/prefix_node.hpp @@ -687,7 +687,7 @@ class FixedPrefixNode : public VariantNode< K, V > { --phdr->used_slots; prefix_bitset_.reset_bit(slot_num); if ((slot_num != 0) && (slot_num == phdr->tail_slot)) { - uint16_t prev_slot = prefix_bitset_.get_prev_set_bit(slot_num); + uint16_t prev_slot = prefix_bitset_.get_next_set_bit(slot_num); if (prev_slot != std::numeric_limits< uint16_t >::max()) { phdr->tail_slot = prev_slot; } } } diff --git a/src/include/homestore/index/index_internal.hpp b/src/include/homestore/index/index_internal.hpp index 2c8a09849..cd914d315 100644 --- a/src/include/homestore/index/index_internal.hpp +++ b/src/include/homestore/index/index_internal.hpp @@ -64,29 +64,70 @@ enum class index_buf_state_t : uint8_t { }; ///////////////////////// Btree Node and Buffer Portion ////////////////////////// + + +// Multiple IndexBuffer could point to the same NodeBuffer if its clean. +struct NodeBuffer; +typedef std::shared_ptr< NodeBuffer > NodeBufferPtr; +struct NodeBuffer { + uint8_t* m_bytes{nullptr}; // Actual data buffer + std::atomic< index_buf_state_t > m_state{index_buf_state_t::CLEAN}; // Is buffer yet to persist? + NodeBuffer(uint32_t buf_size, uint32_t align_size); + ~NodeBuffer(); +}; + +// IndexBuffer is for each CP. The dependent index buffers are chained using +// m_next_buffer and each buffer is flushed only its wait_for_leaders reaches 0 +// which means all its dependent buffers are flushed. struct IndexBuffer; typedef std::shared_ptr< IndexBuffer > IndexBufferPtr; - struct IndexBuffer { - uint8_t* m_node_buf{nullptr}; // Actual buffer - index_buf_state_t m_buf_state{index_buf_state_t::CLEAN}; // Is buffer yet to persist? - BlkId m_blkid; // BlkId where this needs to be persisted - std::weak_ptr< IndexBuffer > m_next_buffer; // Next buffer in the chain + NodeBufferPtr m_node_buf; + BlkId m_blkid; // BlkId where this needs to be persisted + std::weak_ptr< IndexBuffer > m_next_buffer; // Next buffer in the chain // Number of leader buffers we are waiting for before we write this buffer sisl::atomic_counter< int > m_wait_for_leaders{0}; + std::mutex m_mutex; IndexBuffer(BlkId blkid, uint32_t buf_size, uint32_t align_size); + IndexBuffer(NodeBufferPtr node_buf, BlkId blkid); ~IndexBuffer(); BlkId blkid() const { return m_blkid; } - uint8_t* raw_buffer() { return m_node_buf; } + uint8_t* raw_buffer() { + RELEASE_ASSERT(m_node_buf, "Node buffer null"); + return m_node_buf->m_bytes; + } + + bool is_clean() const { + RELEASE_ASSERT(m_node_buf, "Node buffer null"); + return (m_node_buf->m_state.load() == index_buf_state_t::CLEAN); + } + + index_buf_state_t state() { + RELEASE_ASSERT(m_node_buf, "Node buffer null"); + return m_node_buf->m_state; + } + + void set_state(index_buf_state_t state) { + RELEASE_ASSERT(m_node_buf, "Node buffer null"); + m_node_buf->m_state = state; + } - bool is_clean() const { return (m_buf_state == index_buf_state_t::CLEAN); } std::string to_string() const { - return fmt::format("IndexBuffer {} blkid={} state={} node_buf={} next_buffer={} wait_for={}", - reinterpret_cast< void* >(const_cast< IndexBuffer* >(this)), m_blkid.to_integer(), - static_cast< int >(m_buf_state), static_cast< void* >(m_node_buf), - voidptr_cast(m_next_buffer.lock().get()), m_wait_for_leaders.get()); + std::lock_guard lock{m_mutex}; + auto str = fmt::format("IndexBuffer {} blkid={}", reinterpret_cast< void* >(const_cast< IndexBuffer* >(this)), + m_blkid.to_integer()); + if (m_node_buf == nullptr) { + fmt::format_to(std::back_inserter(str), " node_buf=nullptr"); + } else { + fmt::format_to(std::back_inserter(str), " state={} node_buf={}", + static_cast< int >(m_node_buf->m_state.load()), static_cast< void* >(m_node_buf->m_bytes)); + } + fmt::format_to(std::back_inserter(str), " next_buffer={} wait_for={}", + m_next_buffer.lock() ? reinterpret_cast< void* >(m_next_buffer.lock().get()) : 0, + m_wait_for_leaders.get()); + return str; } }; @@ -97,7 +138,6 @@ struct IndexBtreeNode { public: IndexBufferPtr m_idx_buf; // Buffer backing this node cp_id_t m_last_mod_cp_id{-1}; // This node is previously modified by the cp id; - public: IndexBtreeNode(const IndexBufferPtr& buf) : m_idx_buf{buf} {} ~IndexBtreeNode() { m_idx_buf.reset(); } diff --git a/src/include/homestore/index/index_table.hpp b/src/include/homestore/index/index_table.hpp index 246557a62..0e59e5b70 100644 --- a/src/include/homestore/index/index_table.hpp +++ b/src/include/homestore/index/index_table.hpp @@ -114,7 +114,7 @@ class IndexTable : public IndexTableBase, public Btree< K, V > { // Need to put it in wb cache wb_cache().write_buf(node, idx_node->m_idx_buf, cp_ctx); idx_node->m_last_mod_cp_id = cp_ctx->id(); - LOGTRACEMOD(wbcache, "{}", idx_node->m_idx_buf->to_string()); + LOGTRACEMOD(wbcache, "add to dirty list cp {} {}", cp_ctx->id(), idx_node->m_idx_buf->to_string()); } node->set_checksum(this->m_bt_cfg); return btree_status_t::success; @@ -129,17 +129,28 @@ class IndexTable : public IndexTableBase, public Btree< K, V > { auto& left_child_buf = left_child_idx_node->m_idx_buf; auto& parent_buf = parent_idx_node->m_idx_buf; - LOGTRACEMOD(wbcache, "left {} parent {} ", left_child_buf->to_string(), parent_buf->to_string()); - // Write new nodes in the list as standalone outside transacted pairs. // Write the new right child nodes, left node and parent in order. + // Create the relationship of right child to the left node via prepend_to_chain below. + // Parent and left node are linked in the prepare_node_txn for (const auto& right_child_node : new_nodes) { auto right_child = IndexBtreeNode::convert(right_child_node.get()); write_node_impl(right_child_node, context); - wb_cache().prepend_to_chain(right_child->m_idx_buf, left_child_buf); - LOGTRACEMOD(wbcache, "right {} left {} ", right_child->m_idx_buf->to_string(), left_child_buf->to_string()); + wb_cache().prepend_to_chain(right_child->m_idx_buf, left_child_buf, cp_ctx); } + auto trace_index_bufs = [&]() { + std::string str; + str = fmt::format("cp {} left {} parent {}", cp_ctx->id(), left_child_buf->to_string(), + parent_buf->to_string()); + for (const auto& right_child_node : new_nodes) { + auto right_child = IndexBtreeNode::convert(right_child_node.get()); + fmt::format_to(std::back_inserter(str), " right {}", right_child->m_idx_buf->to_string()); + } + return str; + }; + + LOGTRACEMOD(wbcache, "{}", trace_index_bufs()); write_node_impl(left_child_node, context); write_node_impl(parent_node, context); @@ -181,18 +192,17 @@ class IndexTable : public IndexTableBase, public Btree< K, V > { // realloc_node(node); // } - // If the backing buffer is already in a clean state, we don't need to make a copy of it - if (idx_node->m_idx_buf->is_clean()) { return btree_status_t::success; } - - // Make a new btree buffer and copy the contents and swap it to make it the current node's buffer. The + // We create IndexBuffer for each CP. But if the backing buffer is already in a clean state + // we dont copy the node buffer. Copy buffer will handle it. If the node buffer is dirty, + // make a new btree buffer and copy the contents and swap it to make it the current node's buffer. The // buffer prior to this copy, would have been written and already added into the dirty buffer list. - idx_node->m_idx_buf = wb_cache().copy_buffer(idx_node->m_idx_buf); + idx_node->m_idx_buf = wb_cache().copy_buffer(idx_node->m_idx_buf, cp_ctx); idx_node->m_last_mod_cp_id = -1; node->m_phys_node_buf = idx_node->m_idx_buf->raw_buffer(); node->set_checksum(this->m_bt_cfg); - LOGTRACEMOD(wbcache, "buf {} ", idx_node->m_idx_buf->to_string()); + LOGTRACEMOD(wbcache, "cp {} {} ", cp_ctx->id(), idx_node->m_idx_buf->to_string()); #ifndef NO_CHECKSUM if (!node->verify_node(this->m_bt_cfg)) { @@ -221,6 +231,8 @@ class IndexTable : public IndexTableBase, public Btree< K, V > { auto& child_buf = child_idx_node->m_idx_buf; auto& parent_buf = parent_idx_node->m_idx_buf; + LOGTRACEMOD(wbcache, "cp {} left {} parent {} ", cp_ctx->id(), child_buf->to_string(), parent_buf->to_string()); + auto [child_copied, parent_copied] = wb_cache().create_chain(child_buf, parent_buf, cp_ctx); if (child_copied) { child_node->m_phys_node_buf = child_buf->raw_buffer(); @@ -231,7 +243,6 @@ class IndexTable : public IndexTableBase, public Btree< K, V > { parent_idx_node->m_last_mod_cp_id = -1; } - LOGTRACEMOD(wbcache, "child {} parent {} ", child_buf->to_string(), parent_buf->to_string()); return btree_status_t::success; } diff --git a/src/include/homestore/index/wb_cache_base.hpp b/src/include/homestore/index/wb_cache_base.hpp index f59d4b1ce..d53f3124f 100644 --- a/src/include/homestore/index/wb_cache_base.hpp +++ b/src/include/homestore/index/wb_cache_base.hpp @@ -63,7 +63,7 @@ class IndexWBCacheBase { /// @brief Prepend to the chain that was already created with second /// @param first /// @param second - virtual void prepend_to_chain(const IndexBufferPtr& first, const IndexBufferPtr& second) = 0; + virtual void prepend_to_chain(const IndexBufferPtr& first, const IndexBufferPtr& second, CPContext* cp_ctx) = 0; /// @brief Free the buffer allocated and remove it from wb cache /// @param buf @@ -73,7 +73,7 @@ class IndexWBCacheBase { /// @brief Copy buffer /// @param cur_buf /// @return - virtual IndexBufferPtr copy_buffer(const IndexBufferPtr& cur_buf) const = 0; + virtual IndexBufferPtr copy_buffer(const IndexBufferPtr& cur_buf, CPContext* context) const = 0; }; } // namespace homestore diff --git a/src/include/homestore/index_service.hpp b/src/include/homestore/index_service.hpp index 9952e3852..09543a5fb 100644 --- a/src/include/homestore/index_service.hpp +++ b/src/include/homestore/index_service.hpp @@ -22,7 +22,7 @@ #include #include #include - +#include namespace homestore { class IndexWBCacheBase; diff --git a/src/lib/checkpoint/cp.hpp b/src/lib/checkpoint/cp.hpp index 5644adac7..05bda5be7 100644 --- a/src/lib/checkpoint/cp.hpp +++ b/src/lib/checkpoint/cp.hpp @@ -21,7 +21,7 @@ #include #include - +#include #include "common/homestore_assert.hpp" /* @@ -73,7 +73,7 @@ struct CP { bool m_cp_waiting_to_trigger{false}; // it is waiting for previous cp to complete cp_id_t m_cp_id; std::array< std::unique_ptr< CPContext >, (size_t)cp_consumer_t::SENTINEL > m_contexts; - folly::Promise< bool > m_comp_promise; + folly::SharedPromise< bool > m_comp_promise; public: CP(CPManager* mgr) : m_cp_mgr{mgr} {} diff --git a/src/lib/checkpoint/cp_mgr.cpp b/src/lib/checkpoint/cp_mgr.cpp index 944c90222..c5838b966 100644 --- a/src/lib/checkpoint/cp_mgr.cpp +++ b/src/lib/checkpoint/cp_mgr.cpp @@ -142,8 +142,11 @@ folly::Future< bool > CPManager::trigger_cp_flush(bool force) { std::unique_lock< std::mutex > lk(trigger_cp_mtx); auto cur_cp = cp_guard(); HS_DBG_ASSERT_NE(cur_cp->m_cp_status, cp_status_t::cp_flush_prepare); - cur_cp->m_comp_promise = std::move(folly::Promise< bool >{}); - cur_cp->m_cp_waiting_to_trigger = true; + // If multiple threads call trigger, they all get the future from the same promise. + if (!cur_cp->m_cp_waiting_to_trigger) { + cur_cp->m_comp_promise = std::move(folly::SharedPromise< bool >{}); + cur_cp->m_cp_waiting_to_trigger = true; + } return cur_cp->m_comp_promise.getFuture(); } else { return folly::makeFuture< bool >(false); @@ -177,7 +180,7 @@ folly::Future< bool > CPManager::trigger_cp_flush(bool force) { // originally by the caller will be untouched and completed upto CP completion/ ret_fut = folly::makeFuture< bool >(true); } else { - cur_cp->m_comp_promise = std::move(folly::Promise< bool >{}); + cur_cp->m_comp_promise = std::move(folly::SharedPromise< bool >{}); ret_fut = cur_cp->m_comp_promise.getFuture(); } cur_cp->m_cp_status = cp_status_t::cp_flush_prepare; diff --git a/src/lib/index/index_cp.hpp b/src/lib/index/index_cp.hpp index 077c9824b..3e24966d2 100644 --- a/src/lib/index/index_cp.hpp +++ b/src/lib/index/index_cp.hpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include "checkpoint/cp.hpp" @@ -37,7 +38,6 @@ struct IndexCPContext : public CPContext { sisl::ThreadVector< IndexBufferPtr >* m_dirty_buf_list{nullptr}; sisl::ThreadVector< BlkId >* m_free_node_blkid_list{nullptr}; sisl::atomic_counter< int64_t > m_dirty_buf_count{0}; - IndexBufferPtr m_last_in_chain; std::mutex m_flush_buffer_mtx; flush_buffer_iterator m_buf_it; @@ -47,11 +47,6 @@ struct IndexCPContext : public CPContext { CPContext(cp_id), m_dirty_buf_list{dirty_list}, m_free_node_blkid_list{free_blkid_list} {} virtual ~IndexCPContext() { - auto it = m_dirty_buf_list->begin(true /* latest */); - IndexBufferPtr *tmp = nullptr; - while((tmp = m_dirty_buf_list->next(it)) != nullptr) { - tmp->reset(); - } m_dirty_buf_list->clear(); m_free_node_blkid_list->clear(); } @@ -62,11 +57,9 @@ struct IndexCPContext : public CPContext { } void add_to_dirty_list(const IndexBufferPtr& buf) { - buf->m_buf_state = index_buf_state_t::DIRTY; + buf->m_node_buf->m_state = index_buf_state_t::DIRTY; m_dirty_buf_list->push_back(buf); m_dirty_buf_count.increment(1); - m_last_in_chain = buf; - LOGTRACEMOD(wbcache, "{}", buf->to_string()); } void add_to_free_node_list(BlkId blkid) { m_free_node_blkid_list->push_back(blkid); } @@ -77,13 +70,91 @@ struct IndexCPContext : public CPContext { BlkId* next_blkid() { return m_free_node_blkid_list->next(m_buf_it.free_node_list_it); } std::string to_string() const { std::string str{ - fmt::format("IndexCPContext cpid={} dirty_buf_count={} dirty_buf_list_size={} blkid_list_size={}", id(), + fmt::format("IndexCPContext cpid={} dirty_buf_count={} dirty_buf_list_size={} blkid_list_size={}\n", id(), m_dirty_buf_count.get(), m_dirty_buf_list->size(), m_free_node_blkid_list->size())}; - // TODO dump all index buffers. + // Mapping from a node to all its parents in the graph. + // Display all buffers and its dependencies and state. + std::unordered_map< IndexBuffer*, std::vector< IndexBuffer* > > parents; + IndexBufferPtr* buf; + auto it = m_dirty_buf_list->begin(true /* latest */); + while ((buf = m_dirty_buf_list->next(it)) != nullptr) { + // Add this buf to his children. + parents[(*buf)->m_next_buffer.lock().get()].emplace_back(buf->get()); + } + + it = m_dirty_buf_list->begin(true /* latest */); + while ((buf = m_dirty_buf_list->next(it)) != nullptr) { + fmt::format_to(std::back_inserter(str), "{}", (*buf)->to_string()); + auto first = true; + for (const auto& p : parents[buf->get()]) { + if (first) { + fmt::format_to(std::back_inserter(str), "\nDepends:"); + first = false; + } + fmt::format_to(std::back_inserter(str), " {}({})", r_cast< void* >(p), s_cast< int >(p->state())); + } + fmt::format_to(std::back_inserter(str), "\n"); + } + return str; } -}; + + void check_cycle() { + // Use dfs to find if the graph is cycle + IndexBufferPtr* buf; + auto it = m_dirty_buf_list->begin(true /* latest */); + while ((buf = m_dirty_buf_list->next(it)) != nullptr) { + std::set< IndexBuffer* > visited; + check_cycle_recurse(*buf, visited); + } + } + + void check_cycle_recurse(IndexBufferPtr buf, std::set< IndexBuffer* >& visited) const { + if (visited.count(buf.get()) != 0) { + LOGERROR("Cycle found for {}", buf->to_string()); + for (auto& x : visited) { + LOGERROR("Path : {}", x->to_string()); + } + return; + } + + visited.insert(buf.get()); + if (buf->m_next_buffer.lock()) { check_cycle_recurse(buf->m_next_buffer.lock(), visited); } + } + + void check_wait_for_leaders() { + // Use the next buffer as indegree to find if wait_for_leaders is invalid. + std::unordered_map< IndexBuffer*, int > wait_for_leaders; + IndexBufferPtr* buf; + + // Store the wait for leader count for each buffer. + auto it = m_dirty_buf_list->begin(true /* latest */); + while ((buf = m_dirty_buf_list->next(it)) != nullptr) { + wait_for_leaders[(*buf).get()] = (*buf)->m_wait_for_leaders.get(); + } + + // Decrement the count using the next buffer. + it = m_dirty_buf_list->begin(true /* latest */); + while ((buf = m_dirty_buf_list->next(it)) != nullptr) { + auto next_buf = (*buf)->m_next_buffer.lock(); + if (next_buf.get() == nullptr) continue; + wait_for_leaders[next_buf.get()]--; + } + + bool issue = false; + for (const auto& [buf, waits] : wait_for_leaders) { + // Any value other than zero means the dependency graph is invalid. + if (waits != 0) { + issue = true; + LOGERROR("Leaders wait not zero cp {} buf {} waits {}", id(), buf->to_string(), waits); + } + } + + assert(issue == false); + } + +}; // namespace homestore class IndexWBCache; class IndexCPCallbacks : public CPCallbacks { diff --git a/src/lib/index/index_service.cpp b/src/lib/index/index_service.cpp index d3d0984b5..bfa96f8bc 100644 --- a/src/lib/index/index_service.cpp +++ b/src/lib/index/index_service.cpp @@ -82,7 +82,9 @@ void IndexService::stop() { HS_REL_ASSERT_EQ(success, true, "CP Flush failed"); LOGINFO("CP Flush completed"); - for (auto [id, tbl] : m_index_map) { tbl->destroy(); } + for (auto [id, tbl] : m_index_map) { + tbl->destroy(); + } } void IndexService::add_index_table(const std::shared_ptr< IndexTableBase >& tbl) { std::unique_lock lg(m_index_map_mtx); @@ -107,9 +109,16 @@ uint64_t IndexService::used_size() const { return size; } +NodeBuffer::NodeBuffer(uint32_t buf_size, uint32_t align_size) : + m_bytes{hs_utils::iobuf_alloc(buf_size, sisl::buftag::btree_node, align_size)} {} + +NodeBuffer::~NodeBuffer() { hs_utils::iobuf_free(m_bytes, sisl::buftag::btree_node); } + IndexBuffer::IndexBuffer(BlkId blkid, uint32_t buf_size, uint32_t align_size) : - m_node_buf{hs_utils::iobuf_alloc(buf_size, sisl::buftag::btree_node, align_size)}, m_blkid{blkid} {} + m_node_buf{std::make_shared< NodeBuffer >(buf_size, align_size)}, m_blkid{blkid} {} + +IndexBuffer::IndexBuffer(NodeBufferPtr node_buf, BlkId blkid) : m_node_buf(node_buf), m_blkid(blkid) {} -IndexBuffer::~IndexBuffer() { hs_utils::iobuf_free(m_node_buf, sisl::buftag::btree_node); } +IndexBuffer::~IndexBuffer() { m_node_buf.reset(); } } // namespace homestore diff --git a/src/lib/index/wb_cache.cpp b/src/lib/index/wb_cache.cpp index 21ad44345..4b7b4bb5a 100644 --- a/src/lib/index/wb_cache.cpp +++ b/src/lib/index/wb_cache.cpp @@ -88,7 +88,6 @@ BtreeNodePtr IndexWBCache::alloc_buf(node_initializer_t&& node_initializer) { // Alloc buffer and initialize the node auto idx_buf = std::make_shared< IndexBuffer >(blkid, m_node_size, m_vdev->align_size()); auto node = node_initializer(idx_buf); - LOGTRACEMOD(wbcache, "idx_buf {} blkid {}", static_cast< void* >(idx_buf.get()), blkid.to_integer()); // Add the node to the cache bool done = m_cache.insert(node); @@ -102,16 +101,31 @@ void IndexWBCache::realloc_buf(const IndexBufferPtr& buf) { } void IndexWBCache::write_buf(const BtreeNodePtr& node, const IndexBufferPtr& buf, CPContext* cp_ctx) { + // TODO upsert always returns false even if it succeeds. m_cache.upsert(node); r_cast< IndexCPContext* >(cp_ctx)->add_to_dirty_list(buf); resource_mgr().inc_dirty_buf_size(m_node_size); } -IndexBufferPtr IndexWBCache::copy_buffer(const IndexBufferPtr& cur_buf) const { - auto new_buf = std::make_shared< IndexBuffer >(cur_buf->m_blkid, m_node_size, m_vdev->align_size()); - std::memcpy(new_buf->raw_buffer(), cur_buf->raw_buffer(), m_node_size); - LOGTRACEMOD(wbcache, "new_buf {} cur_buf {} cur_buf_blkid {}", static_cast< void* >(new_buf.get()), - static_cast< void* >(cur_buf.get()), cur_buf->m_blkid.to_integer()); +IndexBufferPtr IndexWBCache::copy_buffer(const IndexBufferPtr& cur_buf, CPContext* cp_ctx) const { + IndexBufferPtr new_buf = nullptr; + bool copied = false; + + // When we copy the buffer we check if the node buffer is clean or not. If its clean + // we could reuse it otherwise create a copy. + if (cur_buf->is_clean()) { + // Refer to the same node buffer. + new_buf = std::make_shared< IndexBuffer >(cur_buf->m_node_buf, cur_buf->m_blkid); + } else { + // If its not clean, we do deep copy. + new_buf = std::make_shared< IndexBuffer >(cur_buf->m_blkid, m_node_size, m_vdev->align_size()); + std::memcpy(new_buf->raw_buffer(), cur_buf->raw_buffer(), m_node_size); + copied = true; + } + + LOGTRACEMOD(wbcache, "cp {} new_buf {} cur_buf {} cur_buf_blkid {} copied {}", cp_ctx->id(), + static_cast< void* >(new_buf.get()), static_cast< void* >(cur_buf.get()), cur_buf->m_blkid.to_integer(), + copied); return new_buf; } @@ -141,41 +155,43 @@ void IndexWBCache::read_buf(bnodeid_t id, BtreeNodePtr& node, node_initializer_t std::tuple< bool, bool > IndexWBCache::create_chain(IndexBufferPtr& second, IndexBufferPtr& third, CPContext* cp_ctx) { bool second_copied{false}, third_copied{false}; - + auto chain = second; + auto old_third = third; if (!second->is_clean()) { - auto new_second = copy_buffer(second); - LOGTRACEMOD(wbcache, "second copied blkid {} {} new_second {}", second->m_blkid.to_integer(), - static_cast< void* >(second.get()), static_cast< void* >(new_second.get())); + auto new_second = copy_buffer(second, cp_ctx); second = new_second; second_copied = true; } + if (!third->is_clean()) { - auto new_third = copy_buffer(third); - LOGTRACEMOD(wbcache, "third copied blkid {} {} new_third {}", third->m_blkid.to_integer(), - static_cast< void* >(third.get()), static_cast< void* >(new_third.get())); + auto new_third = copy_buffer(third, cp_ctx); third = new_third; third_copied = true; } // Append parent(third) to the left child(second). - prepend_to_chain(second, third); + second->m_next_buffer = third; + third->m_wait_for_leaders.increment(1); + if (chain != second) { + // We want buffers to be append to the end of the chain which are related. + // If we split a node multiple times in same or different CP's, each dirty buffer will be + // added to the end of that chain. + while (chain->m_next_buffer.lock() != nullptr) { + chain = chain->m_next_buffer.lock(); + } - // TODO the index buffer are added to end of the chain, instead add to the dependency. - auto& last_in_chain = r_cast< IndexCPContext* >(cp_ctx)->m_last_in_chain; - if (last_in_chain) { - // Add this to the end of the chain. - last_in_chain->m_next_buffer = second; + chain->m_next_buffer = second; second->m_wait_for_leaders.increment(1); } return {second_copied, third_copied}; } -void IndexWBCache::prepend_to_chain(const IndexBufferPtr& first, const IndexBufferPtr& second) { +void IndexWBCache::prepend_to_chain(const IndexBufferPtr& first, const IndexBufferPtr& second, CPContext* cp_ctx) { + assert(first->m_next_buffer.lock() != second); assert(first->m_next_buffer.lock() == nullptr); first->m_next_buffer = second; second->m_wait_for_leaders.increment(1); - LOGTRACEMOD(wbcache, "first {} second {}", first->to_string(), second->to_string()); } void IndexWBCache::free_buf(const IndexBufferPtr& buf, CPContext* cp_ctx) { @@ -190,12 +206,19 @@ void IndexWBCache::free_buf(const IndexBufferPtr& buf, CPContext* cp_ctx) { //////////////////// CP Related API section ///////////////////////////////// folly::Future< bool > IndexWBCache::async_cp_flush(CPContext* context) { IndexCPContext* cp_ctx = s_cast< IndexCPContext* >(context); - LOGTRACEMOD(wbcache, "cp_ctx {}", cp_ctx->to_string()); + LOGTRACEMOD(wbcache, "wb_cache cp_ctx {}", cp_ctx->to_string()); if (!cp_ctx->any_dirty_buffers()) { CP_PERIODIC_LOG(DEBUG, cp_ctx->id(), "Btree does not have any dirty buffers to flush"); return folly::makeFuture< bool >(true); // nothing to flush } +#ifndef NDEBUG + // Check no cycles or invalid wait_for_leader count in the dirty buffer + // dependency graph. + cp_ctx->check_wait_for_leaders(); + cp_ctx->check_cycle(); +#endif + cp_ctx->prepare_flush_iteration(); for (auto& fiber : m_cp_flush_fibers) { @@ -219,31 +242,33 @@ std::unique_ptr< CPContext > IndexWBCache::create_cp_context(cp_id_t cp_id) { m_free_blkid_list[cp_id_slot].get()); } -void IndexWBCache::do_flush_one_buf(IndexCPContext* cp_ctx, const IndexBufferPtr& buf, bool part_of_batch) { - LOGTRACEMOD(wbcache, "buf {}", buf->to_string()); - buf->m_buf_state = index_buf_state_t::FLUSHING; +void IndexWBCache::do_flush_one_buf(IndexCPContext* cp_ctx, IndexBufferPtr buf, bool part_of_batch) { + LOGTRACEMOD(wbcache, "cp {} buf {}", cp_ctx->id(), buf->to_string()); + { buf->set_state(index_buf_state_t::FLUSHING); } m_vdev->async_write(r_cast< const char* >(buf->raw_buffer()), m_node_size, buf->m_blkid, part_of_batch) - .thenValue([pbuf = buf.get(), cp_ctx](auto) { + .thenValue([buf, cp_ctx](auto) { auto& pthis = s_cast< IndexWBCache& >(wb_cache()); // Avoiding more than 16 bytes capture - pthis.process_write_completion(cp_ctx, pbuf); + pthis.process_write_completion(cp_ctx, buf); }); if (!part_of_batch) { m_vdev->submit_batch(); } } -void IndexWBCache::process_write_completion(IndexCPContext* cp_ctx, IndexBuffer* pbuf) { - LOGTRACEMOD(wbcache, "buf {}", pbuf->to_string()); +void IndexWBCache::process_write_completion(IndexCPContext* cp_ctx, IndexBufferPtr buf) { + LOGTRACEMOD(wbcache, "cp {} buf {}", cp_ctx->id(), buf->to_string()); resource_mgr().dec_dirty_buf_size(m_node_size); - auto [next_buf, has_more] = on_buf_flush_done(cp_ctx, pbuf); + auto [next_buf, has_more] = on_buf_flush_done(cp_ctx, buf); if (next_buf) { do_flush_one_buf(cp_ctx, next_buf, false); } else if (!has_more) { // We are done flushing the buffers, lets free the btree blocks and then flush the bitmap free_btree_blks_and_flush(cp_ctx); + } else { + } } -std::pair< IndexBufferPtr, bool > IndexWBCache::on_buf_flush_done(IndexCPContext* cp_ctx, IndexBuffer* buf) { +std::pair< IndexBufferPtr, bool > IndexWBCache::on_buf_flush_done(IndexCPContext* cp_ctx, IndexBufferPtr buf) { if (m_cp_flush_fibers.size() > 1) { std::unique_lock lg(m_flush_mtx); return on_buf_flush_done_internal(cp_ctx, buf); @@ -252,9 +277,9 @@ std::pair< IndexBufferPtr, bool > IndexWBCache::on_buf_flush_done(IndexCPContext } } -std::pair< IndexBufferPtr, bool > IndexWBCache::on_buf_flush_done_internal(IndexCPContext* cp_ctx, IndexBuffer* buf) { +std::pair< IndexBufferPtr, bool > IndexWBCache::on_buf_flush_done_internal(IndexCPContext* cp_ctx, IndexBufferPtr buf) { static thread_local std::vector< IndexBufferPtr > t_buf_list; - buf->m_buf_state = index_buf_state_t::CLEAN; + { buf->set_state(index_buf_state_t::CLEAN); } t_buf_list.clear(); @@ -275,10 +300,9 @@ void IndexWBCache::get_next_bufs(IndexCPContext* cp_ctx, uint32_t max_count, std } } -void IndexWBCache::get_next_bufs_internal(IndexCPContext* cp_ctx, uint32_t max_count, IndexBuffer* prev_flushed_buf, +void IndexWBCache::get_next_bufs_internal(IndexCPContext* cp_ctx, uint32_t max_count, IndexBufferPtr prev_flushed_buf, std::vector< IndexBufferPtr >& bufs) { uint32_t count{0}; - // First attempt to execute any follower buffer flush if (prev_flushed_buf) { auto next_buffer = prev_flushed_buf->m_next_buffer.lock(); @@ -310,7 +334,7 @@ void IndexWBCache::free_btree_blks_and_flush(IndexCPContext* cp_ctx) { // Pick a CP Manager blocking IO fiber to execute the cp flush of vdev iomanager.run_on_forget(hs()->cp_mgr().pick_blocking_io_fiber(), [this, cp_ctx]() { - LOGTRACEMOD(wbcache, "Initiating CP flush"); + LOGTRACEMOD(wbcache, "Initiating vdev cp flush"); m_vdev->cp_flush(nullptr); // This is a blocking io call cp_ctx->complete(true); }); diff --git a/src/lib/index/wb_cache.hpp b/src/lib/index/wb_cache.hpp index ad5255be9..a2db3f99a 100644 --- a/src/lib/index/wb_cache.hpp +++ b/src/lib/index/wb_cache.hpp @@ -53,23 +53,23 @@ class IndexWBCache : public IndexWBCacheBase { void write_buf(const BtreeNodePtr& node, const IndexBufferPtr& buf, CPContext* cp_ctx) override; void read_buf(bnodeid_t id, BtreeNodePtr& node, node_initializer_t&& node_initializer) override; std::tuple< bool, bool > create_chain(IndexBufferPtr& second, IndexBufferPtr& third, CPContext* cp_ctx) override; - void prepend_to_chain(const IndexBufferPtr& first, const IndexBufferPtr& second) override; + void prepend_to_chain(const IndexBufferPtr& first, const IndexBufferPtr& second, CPContext* cp_ctx) override; void free_buf(const IndexBufferPtr& buf, CPContext* cp_ctx) override; //////////////////// CP Related API section ///////////////////////////////// folly::Future< bool > async_cp_flush(CPContext* context); std::unique_ptr< CPContext > create_cp_context(cp_id_t cp_id); - IndexBufferPtr copy_buffer(const IndexBufferPtr& cur_buf) const; + IndexBufferPtr copy_buffer(const IndexBufferPtr& cur_buf, CPContext *cp_ctx) const; private: void start_flush_threads(); - void process_write_completion(IndexCPContext* cp_ctx, IndexBuffer* pbuf); - void do_flush_one_buf(IndexCPContext* cp_ctx, const IndexBufferPtr& buf, bool part_of_batch); - std::pair< IndexBufferPtr, bool > on_buf_flush_done(IndexCPContext* cp_ctx, IndexBuffer* buf); - std::pair< IndexBufferPtr, bool > on_buf_flush_done_internal(IndexCPContext* cp_ctx, IndexBuffer* buf); + void process_write_completion(IndexCPContext* cp_ctx, IndexBufferPtr pbuf); + void do_flush_one_buf(IndexCPContext* cp_ctx, const IndexBufferPtr buf, bool part_of_batch); + std::pair< IndexBufferPtr, bool > on_buf_flush_done(IndexCPContext* cp_ctx, IndexBufferPtr buf); + std::pair< IndexBufferPtr, bool > on_buf_flush_done_internal(IndexCPContext* cp_ctx, IndexBufferPtr buf); void get_next_bufs(IndexCPContext* cp_ctx, uint32_t max_count, std::vector< IndexBufferPtr >& bufs); - void get_next_bufs_internal(IndexCPContext* cp_ctx, uint32_t max_count, IndexBuffer* prev_flushed_buf, + void get_next_bufs_internal(IndexCPContext* cp_ctx, uint32_t max_count, IndexBufferPtr prev_flushed_buf, std::vector< IndexBufferPtr >& bufs); void free_btree_blks_and_flush(IndexCPContext* cp_ctx); diff --git a/src/tests/btree_helpers/btree_test_helper.hpp b/src/tests/btree_helpers/btree_test_helper.hpp index 3bc943fc0..62cf24fa2 100644 --- a/src/tests/btree_helpers/btree_test_helper.hpp +++ b/src/tests/btree_helpers/btree_test_helper.hpp @@ -70,6 +70,7 @@ struct BtreeTestHelper : public testing::Test { protected: std::shared_ptr< typename T::BtreeType > m_bt; ShadowMap< K, V > m_shadow_map; + std::mutex m_shadow_map_mutex; BtreeConfig m_cfg{g_node_size}; RangeScheduler m_range_scheduler; uint32_t m_max_range_input{1000}; @@ -130,6 +131,7 @@ struct BtreeTestHelper : public testing::Test { preq.enable_route_tracing(); ASSERT_EQ(m_bt->put(preq), btree_status_t::success) << "range_put failed for " << start_k << "-" << end_k; + std::lock_guard lock{m_shadow_map_mutex}; if (update) { m_shadow_map.range_update(start_key, nkeys, value); m_range_scheduler.remove_keys_from_working(start_k, end_k); @@ -160,6 +162,7 @@ struct BtreeTestHelper : public testing::Test { auto rreq = BtreeSingleRemoveRequest{pk.get(), existing_v.get()}; bool removed = (m_bt->remove(rreq) == btree_status_t::success); + std::lock_guard lock{m_shadow_map_mutex}; ASSERT_EQ(removed, m_shadow_map.exists(*pk)) << "Removal of key " << pk->key() << " status doesn't match with shadow"; @@ -179,6 +182,7 @@ struct BtreeTestHelper : public testing::Test { } void range_remove_existing(uint32_t start_k, uint32_t count) { + std::lock_guard lock{m_shadow_map_mutex}; auto [start_key, end_key] = m_shadow_map.pick_existing_range(K{start_k}, count); do_range_remove(start_k, end_key.key(), true /* removing_all_existing */); } @@ -187,10 +191,12 @@ struct BtreeTestHelper : public testing::Test { static std::uniform_int_distribution< uint32_t > s_rand_range_generator{2, 5}; auto const [start_k, end_k] = m_range_scheduler.pick_random_existing_keys(s_rand_range_generator(m_re)); + std::lock_guard lock{m_shadow_map_mutex}; do_range_remove(start_k, end_k, true /* only_existing */); } void range_remove_any(uint32_t start_k, uint32_t end_k) { + std::lock_guard lock{m_shadow_map_mutex}; do_range_remove(start_k, end_k, false /* removing_all_existing */); } @@ -202,6 +208,7 @@ struct BtreeTestHelper : public testing::Test { } void do_query(uint32_t start_k, uint32_t end_k, uint32_t batch_size) { + std::lock_guard lock{m_shadow_map_mutex}; std::vector< std::pair< K, V > > out_vector; uint32_t remaining = m_shadow_map.num_elems_in_range(start_k, end_k); auto it = m_shadow_map.map_const().lower_bound(K{start_k}); @@ -248,6 +255,7 @@ struct BtreeTestHelper : public testing::Test { ////////////////////// All get operation variants /////////////////////////////// void get_all() const { + std::lock_guard lock{m_shadow_map_mutex}; for (const auto& [key, value] : m_shadow_map.map_const()) { auto copy_key = std::make_unique< K >(); *copy_key = key; @@ -267,6 +275,7 @@ struct BtreeTestHelper : public testing::Test { auto req = BtreeSingleGetRequest{pk.get(), out_v.get()}; const auto status = m_bt->get(req); + std::lock_guard lock{m_shadow_map_mutex}; if (status == btree_status_t::success) { m_shadow_map.validate_data(req.key(), (const V&)req.value()); } else { @@ -280,6 +289,8 @@ struct BtreeTestHelper : public testing::Test { auto req = BtreeGetAnyRequest< K >{BtreeKeyRange< K >{K{start_k}, true, K{end_k}, true}, out_k.get(), out_v.get()}; const auto status = m_bt->get(req); + + std::lock_guard lock{m_shadow_map_mutex}; if (status == btree_status_t::success) { ASSERT_EQ(m_shadow_map.exists_in_range(*(K*)req.m_outkey, start_k, end_k), true) << "Get Any returned key=" << *(K*)req.m_outkey << " which is not in range " << start_k << "-" << end_k @@ -303,14 +314,32 @@ struct BtreeTestHelper : public testing::Test { void print_keys() const { m_bt->print_tree_keys(); } void compare_files(const std::string& before, const std::string& after) { - std::ifstream b(before); - std::ifstream a(after); - std::ostringstream ss_before, ss_after; - ss_before << b.rdbuf(); - ss_after << a.rdbuf(); - std::string s1 = ss_before.str(); - std::string s2 = ss_after.str(); - ASSERT_EQ(s1, s2) << "Mismatch in btree structure"; + std::ifstream b(before, std::ifstream::ate); + std::ifstream a(after, std::ifstream::ate); + if (a.fail() || b.fail()) { + LOGINFO("Failed to open file"); + assert(false); + } + if (a.tellg() != b.tellg()) { + LOGINFO("Mismatch in btree files"); + assert(false); + } + + int64_t pending = a.tellg(); + const int64_t batch_size = 4096; + a.seekg(0, ifstream::beg); + b.seekg(0, ifstream::beg); + char a_buffer[batch_size], b_buffer[batch_size]; + while (pending > 0) { + auto count = std::min(pending, batch_size); + a.read(a_buffer, count); + b.read(b_buffer, count); + if (std::memcmp(a_buffer, b_buffer, count) != 0) { + LOGINFO("Mismatch in btree files"); + assert(false); + } + pending -= count; + } } private: @@ -320,6 +349,7 @@ struct BtreeTestHelper : public testing::Test { auto sreq = BtreeSinglePutRequest{&key, &value, put_type, existing_v.get()}; bool done = (m_bt->put(sreq) == btree_status_t::success); + std::lock_guard lock{m_shadow_map_mutex}; if (put_type == btree_put_type::INSERT) { ASSERT_EQ(done, !m_shadow_map.exists(key)); } else { @@ -336,6 +366,8 @@ struct BtreeTestHelper : public testing::Test { auto rreq = BtreeRangeRemoveRequest< K >{BtreeKeyRange< K >{start_key, true, end_key, true}}; auto const ret = m_bt->remove(rreq); + + std::lock_guard lock{m_shadow_map_mutex}; m_shadow_map.range_erase(start_key, end_key); if (all_existing) { @@ -380,4 +412,4 @@ struct BtreeTestHelper : public testing::Test { } LOGINFO("ALL parallel jobs joined"); } -}; \ No newline at end of file +}; diff --git a/src/tests/test_index_btree.cpp b/src/tests/test_index_btree.cpp index 14062cd1f..4a73e629f 100644 --- a/src/tests/test_index_btree.cpp +++ b/src/tests/test_index_btree.cpp @@ -42,13 +42,15 @@ SISL_OPTIONS_ENABLE(logging, test_index_btree, iomgr, test_common_setup) SISL_LOGGING_DECL(test_index_btree) std::vector< std::string > test_common::HSTestHelper::s_dev_names; -// TODO increase num_entries to 65k as io mgr page size is 512 and its slow. + +// TODO Add tests to do write,remove after recovery. +// TODO Test with var len key with io mgr page size is 512. SISL_OPTION_GROUP(test_index_btree, (num_iters, "", "num_iters", "number of iterations for rand ops", ::cxxopts::value< uint32_t >()->default_value("1500"), "number"), (num_entries, "", "num_entries", "number of entries to test with", - ::cxxopts::value< uint32_t >()->default_value("15000"), "number"), + ::cxxopts::value< uint32_t >()->default_value("65000"), "number"), (seed, "", "seed", "random engine seed, use random if not defined", ::cxxopts::value< uint64_t >()->default_value("0"), "number")) @@ -159,12 +161,9 @@ struct BtreeTest : public BtreeTestHelper< TestType > { } }; -// TODO sanal fix the varkey issue. -// using BtreeTypes = testing::Types< FixedLenBtreeTest, VarKeySizeBtreeTest, VarValueSizeBtreeTest, -// VarObjSizeBtreeTest -// >; - -using BtreeTypes = testing::Types< FixedLenBtreeTest >; +using BtreeTypes = testing::Types< FixedLenBtreeTest, VarKeySizeBtreeTest, VarValueSizeBtreeTest, +VarObjSizeBtreeTest +>; TYPED_TEST_SUITE(BtreeTest, BtreeTypes); @@ -225,7 +224,6 @@ TYPED_TEST(BtreeTest, RandomInsert) { this->get_all(); } -#if 0 TYPED_TEST(BtreeTest, SequentialRemove) { LOGINFO("SequentialRemove test start"); // Forward sequential insert @@ -280,7 +278,6 @@ TYPED_TEST(BtreeTest, RandomRemove) { } this->get_all(); } -#endif TYPED_TEST(BtreeTest, RangeUpdate) { LOGINFO("RangeUpdate test start"); @@ -309,23 +306,29 @@ TYPED_TEST(BtreeTest, CpFlush) { for (uint32_t i = 0; i < num_entries; ++i) { this->put(i, btree_put_type::INSERT); } + + // Remove some of the entries. + for (uint32_t i = 0; i < num_entries; i += 10) { + this->remove_one(i); + } + LOGINFO("Query {} entries and validate with pagination of 75 entries", num_entries / 2); this->do_query(0, num_entries / 2 - 1, 75); - this->print(std::string("before.txt")); - LOGINFO("Trigger checkpoint flush."); test_common::HSTestHelper::trigger_cp(true /* wait */); LOGINFO("Query {} entries and validate with pagination of 75 entries", num_entries); this->do_query(0, num_entries - 1, 75); + this->print(std::string("before.txt")); + this->destroy_btree(); // Restart homestore. m_bt is updated by the TestIndexServiceCallback. this->restart_homestore(); - std::this_thread::sleep_for(std::chrono::seconds{3}); + std::this_thread::sleep_for(std::chrono::seconds{1}); LOGINFO("Restarted homestore with index recovered"); this->print(std::string("after.txt")); @@ -373,7 +376,7 @@ TYPED_TEST(BtreeTest, MultipleCpFlush) { // Restart homestore. m_bt is updated by the TestIndexServiceCallback. this->restart_homestore(); - std::this_thread::sleep_for(std::chrono::seconds{3}); + std::this_thread::sleep_for(std::chrono::seconds{1}); LOGINFO(" Restarted homestore with index recovered"); this->print(std::string("after.txt")); @@ -388,24 +391,41 @@ TYPED_TEST(BtreeTest, ThreadedCpFlush) { LOGINFO("ThreadedCpFlush test start"); const auto num_entries = SISL_OPTIONS["num_entries"].as< uint32_t >(); - bool stop_cp_flush = false; - auto io_thread = std::thread([this, num_entries] { + bool stop = false; + std::atomic< uint32_t > last_index{0}; + auto insert_io_thread = std::thread([this, num_entries, &last_index] { LOGINFO("Do Forward sequential insert for {} entries", num_entries); + uint32_t j = 0; for (uint32_t i = 0; i < num_entries; ++i) { this->put(i, btree_put_type::INSERT); + last_index = i; } }); - auto cp_flush_thread = std::thread([this, &stop_cp_flush] { - while (!stop_cp_flush) { - LOGINFO("Trigger checkpoint flush wait=false."); - test_common::HSTestHelper::trigger_cp(false /* wait */); + auto remove_io_thread = std::thread([this, &stop, num_entries, &last_index] { + LOGINFO("Do random removes for {} entries", num_entries); + while (!stop) { + std::this_thread::sleep_for(std::chrono::milliseconds{10}); + // Remove a random entry. + std::uniform_int_distribution< uint32_t > rand{0, last_index.load()}; + auto rm_idx = rand(g_re); + LOGINFO("Removing entry {}", rm_idx); + this->remove_one(rm_idx); + } + }); + + auto cp_flush_thread = std::thread([this, &stop] { + while (!stop) { std::this_thread::sleep_for(std::chrono::seconds{1}); + LOGINFO("Trigger checkpoint flush wait=true."); + test_common::HSTestHelper::trigger_cp(true /* wait */); + LOGINFO("Trigger checkpoint flush wait=true done."); } }); - io_thread.join(); - stop_cp_flush = true; + insert_io_thread.join(); + stop = true; + remove_io_thread.join(); cp_flush_thread.join(); LOGINFO("Trigger checkpoint flush wait=true."); @@ -420,7 +440,7 @@ TYPED_TEST(BtreeTest, ThreadedCpFlush) { // Restart homestore. m_bt is updated by the TestIndexServiceCallback. this->restart_homestore(); - std::this_thread::sleep_for(std::chrono::seconds{3}); + std::this_thread::sleep_for(std::chrono::seconds{1}); LOGINFO(" Restarted homestore with index recovered"); this->print(std::string("after.txt"));