From e74e78f46475da33f77f3d997d7aa71b428fd97f Mon Sep 17 00:00:00 2001 From: Sanal Date: Fri, 10 Nov 2023 16:27:59 -0800 Subject: [PATCH 1/3] Index write back cache fixes. (#207) Add atomic for state for indexbuffer to avoid concurrency issues with cp flush and insert threads. Create per CP indexbuffer. Create NodeBuffer which points to actual data buffer. Several indexbuffer an can point to same node buffer. Add locks in test for index btree shadow map as there are concurrent requests. Use shared ptr's in wb cache. --- conanfile.py | 2 +- .../homestore/btree/detail/btree_node_mgr.ipp | 2 +- .../homestore/index/index_internal.hpp | 61 +++++++++--- src/include/homestore/index/index_table.hpp | 33 ++++--- src/include/homestore/index/wb_cache_base.hpp | 6 +- src/include/homestore/index_service.hpp | 2 +- src/lib/checkpoint/cp.hpp | 4 +- src/lib/checkpoint/cp_mgr.cpp | 9 +- src/lib/index/index_cp.hpp | 98 +++++++++++++++++-- src/lib/index/index_service.cpp | 15 ++- src/lib/index/wb_cache.cpp | 91 ++++++++++------- src/lib/index/wb_cache.hpp | 14 +-- src/tests/btree_helpers/btree_test_helper.hpp | 70 ++++++++----- src/tests/btree_helpers/shadow_map.hpp | 67 ++++++++++++- src/tests/test_index_btree.cpp | 66 ++++++++----- src/tests/test_mem_btree.cpp | 15 +-- 16 files changed, 412 insertions(+), 143 deletions(-) diff --git a/conanfile.py b/conanfile.py index db9fc4a7a..b4477e155 100644 --- a/conanfile.py +++ b/conanfile.py @@ -5,7 +5,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "4.8.1" + version = "4.8.2" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" diff --git a/src/include/homestore/btree/detail/btree_node_mgr.ipp b/src/include/homestore/btree/detail/btree_node_mgr.ipp index e81da107f..826c29789 100644 --- a/src/include/homestore/btree/detail/btree_node_mgr.ipp +++ b/src/include/homestore/btree/detail/btree_node_mgr.ipp @@ -305,7 +305,7 @@ BtreeNode* Btree< K, V >::init_node(uint8_t* node_buf, uint32_t node_ctx_size, b /* Note:- This function assumes that access of this node is thread safe. */ template < typename K, typename V > void Btree< K, V >::free_node(const BtreeNodePtr& node, locktype_t cur_lock, void* context) { - BT_NODE_LOG(DEBUG, node, "Freeing node"); + BT_NODE_LOG(TRACE, node, "Freeing node"); COUNTER_DECREMENT_IF_ELSE(m_metrics, node->is_leaf(), btree_leaf_node_count, btree_int_node_count, 1); if (cur_lock != locktype_t::NONE) { diff --git a/src/include/homestore/index/index_internal.hpp b/src/include/homestore/index/index_internal.hpp index 2c8a09849..756f47bbd 100644 --- a/src/include/homestore/index/index_internal.hpp +++ b/src/include/homestore/index/index_internal.hpp @@ -64,29 +64,68 @@ enum class index_buf_state_t : uint8_t { }; ///////////////////////// Btree Node and Buffer Portion ////////////////////////// + + +// Multiple IndexBuffer could point to the same NodeBuffer if its clean. +struct NodeBuffer; +typedef std::shared_ptr< NodeBuffer > NodeBufferPtr; +struct NodeBuffer { + uint8_t* m_bytes{nullptr}; // Actual data buffer + std::atomic< index_buf_state_t > m_state{index_buf_state_t::CLEAN}; // Is buffer yet to persist? + NodeBuffer(uint32_t buf_size, uint32_t align_size); + ~NodeBuffer(); +}; + +// IndexBuffer is for each CP. The dependent index buffers are chained using +// m_next_buffer and each buffer is flushed only its wait_for_leaders reaches 0 +// which means all its dependent buffers are flushed. struct IndexBuffer; typedef std::shared_ptr< IndexBuffer > IndexBufferPtr; - struct IndexBuffer { - uint8_t* m_node_buf{nullptr}; // Actual buffer - index_buf_state_t m_buf_state{index_buf_state_t::CLEAN}; // Is buffer yet to persist? - BlkId m_blkid; // BlkId where this needs to be persisted - std::weak_ptr< IndexBuffer > m_next_buffer; // Next buffer in the chain + NodeBufferPtr m_node_buf; + BlkId m_blkid; // BlkId where this needs to be persisted + std::weak_ptr< IndexBuffer > m_next_buffer; // Next buffer in the chain // Number of leader buffers we are waiting for before we write this buffer sisl::atomic_counter< int > m_wait_for_leaders{0}; IndexBuffer(BlkId blkid, uint32_t buf_size, uint32_t align_size); + IndexBuffer(NodeBufferPtr node_buf, BlkId blkid); ~IndexBuffer(); BlkId blkid() const { return m_blkid; } - uint8_t* raw_buffer() { return m_node_buf; } + uint8_t* raw_buffer() { + RELEASE_ASSERT(m_node_buf, "Node buffer null blkid {}", m_blkid.to_integer()); + return m_node_buf->m_bytes; + } + + bool is_clean() const { + RELEASE_ASSERT(m_node_buf, "Node buffer null blkid {}", m_blkid.to_integer()); + return (m_node_buf->m_state.load() == index_buf_state_t::CLEAN); + } + + index_buf_state_t state() const { + RELEASE_ASSERT(m_node_buf, "Node buffer null blkid {}", m_blkid.to_integer()); + return m_node_buf->m_state; + } + + void set_state(index_buf_state_t state) { + RELEASE_ASSERT(m_node_buf, "Node buffer null blkid {}", m_blkid.to_integer()); + m_node_buf->m_state = state; + } - bool is_clean() const { return (m_buf_state == index_buf_state_t::CLEAN); } std::string to_string() const { - return fmt::format("IndexBuffer {} blkid={} state={} node_buf={} next_buffer={} wait_for={}", - reinterpret_cast< void* >(const_cast< IndexBuffer* >(this)), m_blkid.to_integer(), - static_cast< int >(m_buf_state), static_cast< void* >(m_node_buf), - voidptr_cast(m_next_buffer.lock().get()), m_wait_for_leaders.get()); + auto str = fmt::format("IndexBuffer {} blkid={}", reinterpret_cast< void* >(const_cast< IndexBuffer* >(this)), + m_blkid.to_integer()); + if (m_node_buf == nullptr) { + fmt::format_to(std::back_inserter(str), " node_buf=nullptr"); + } else { + fmt::format_to(std::back_inserter(str), " state={} node_buf={}", + static_cast< int >(m_node_buf->m_state.load()), static_cast< void* >(m_node_buf->m_bytes)); + } + fmt::format_to(std::back_inserter(str), " next_buffer={} wait_for={}", + m_next_buffer.lock() ? reinterpret_cast< void* >(m_next_buffer.lock().get()) : 0, + m_wait_for_leaders.get()); + return str; } }; diff --git a/src/include/homestore/index/index_table.hpp b/src/include/homestore/index/index_table.hpp index 246557a62..bce7e8f36 100644 --- a/src/include/homestore/index/index_table.hpp +++ b/src/include/homestore/index/index_table.hpp @@ -114,7 +114,7 @@ class IndexTable : public IndexTableBase, public Btree< K, V > { // Need to put it in wb cache wb_cache().write_buf(node, idx_node->m_idx_buf, cp_ctx); idx_node->m_last_mod_cp_id = cp_ctx->id(); - LOGTRACEMOD(wbcache, "{}", idx_node->m_idx_buf->to_string()); + LOGTRACEMOD(wbcache, "add to dirty list cp {} {}", cp_ctx->id(), idx_node->m_idx_buf->to_string()); } node->set_checksum(this->m_bt_cfg); return btree_status_t::success; @@ -129,17 +129,28 @@ class IndexTable : public IndexTableBase, public Btree< K, V > { auto& left_child_buf = left_child_idx_node->m_idx_buf; auto& parent_buf = parent_idx_node->m_idx_buf; - LOGTRACEMOD(wbcache, "left {} parent {} ", left_child_buf->to_string(), parent_buf->to_string()); - // Write new nodes in the list as standalone outside transacted pairs. // Write the new right child nodes, left node and parent in order. + // Create the relationship of right child to the left node via prepend_to_chain below. + // Parent and left node are linked in the prepare_node_txn for (const auto& right_child_node : new_nodes) { auto right_child = IndexBtreeNode::convert(right_child_node.get()); write_node_impl(right_child_node, context); wb_cache().prepend_to_chain(right_child->m_idx_buf, left_child_buf); - LOGTRACEMOD(wbcache, "right {} left {} ", right_child->m_idx_buf->to_string(), left_child_buf->to_string()); } + auto trace_index_bufs = [&]() { + std::string str; + str = fmt::format("cp {} left {} parent {}", cp_ctx->id(), left_child_buf->to_string(), + parent_buf->to_string()); + for (const auto& right_child_node : new_nodes) { + auto right_child = IndexBtreeNode::convert(right_child_node.get()); + fmt::format_to(std::back_inserter(str), " right {}", right_child->m_idx_buf->to_string()); + } + return str; + }; + + LOGTRACEMOD(wbcache, "{}", trace_index_bufs()); write_node_impl(left_child_node, context); write_node_impl(parent_node, context); @@ -181,18 +192,17 @@ class IndexTable : public IndexTableBase, public Btree< K, V > { // realloc_node(node); // } - // If the backing buffer is already in a clean state, we don't need to make a copy of it - if (idx_node->m_idx_buf->is_clean()) { return btree_status_t::success; } - - // Make a new btree buffer and copy the contents and swap it to make it the current node's buffer. The + // We create IndexBuffer for each CP. But if the backing buffer is already in a clean state + // we dont copy the node buffer. Copy buffer will handle it. If the node buffer is dirty, + // make a new btree buffer and copy the contents and swap it to make it the current node's buffer. The // buffer prior to this copy, would have been written and already added into the dirty buffer list. - idx_node->m_idx_buf = wb_cache().copy_buffer(idx_node->m_idx_buf); + idx_node->m_idx_buf = wb_cache().copy_buffer(idx_node->m_idx_buf, cp_ctx); idx_node->m_last_mod_cp_id = -1; node->m_phys_node_buf = idx_node->m_idx_buf->raw_buffer(); node->set_checksum(this->m_bt_cfg); - LOGTRACEMOD(wbcache, "buf {} ", idx_node->m_idx_buf->to_string()); + LOGTRACEMOD(wbcache, "cp {} {} ", cp_ctx->id(), idx_node->m_idx_buf->to_string()); #ifndef NO_CHECKSUM if (!node->verify_node(this->m_bt_cfg)) { @@ -221,6 +231,8 @@ class IndexTable : public IndexTableBase, public Btree< K, V > { auto& child_buf = child_idx_node->m_idx_buf; auto& parent_buf = parent_idx_node->m_idx_buf; + LOGTRACEMOD(wbcache, "cp {} left {} parent {} ", cp_ctx->id(), child_buf->to_string(), parent_buf->to_string()); + auto [child_copied, parent_copied] = wb_cache().create_chain(child_buf, parent_buf, cp_ctx); if (child_copied) { child_node->m_phys_node_buf = child_buf->raw_buffer(); @@ -231,7 +243,6 @@ class IndexTable : public IndexTableBase, public Btree< K, V > { parent_idx_node->m_last_mod_cp_id = -1; } - LOGTRACEMOD(wbcache, "child {} parent {} ", child_buf->to_string(), parent_buf->to_string()); return btree_status_t::success; } diff --git a/src/include/homestore/index/wb_cache_base.hpp b/src/include/homestore/index/wb_cache_base.hpp index f59d4b1ce..caee6c557 100644 --- a/src/include/homestore/index/wb_cache_base.hpp +++ b/src/include/homestore/index/wb_cache_base.hpp @@ -52,13 +52,13 @@ class IndexWBCacheBase { /// @brief Start a chain of related btree buffers. Typically a chain is creating from second and third pairs and /// then first is prepended to the chain. In case the second buffer is already with the WB cache, it will create a - /// new buffer for both second and third. + /// new buffer for both second and third. We append the buffers to a list in dependency chain. /// @param second Second btree buffer in the chain. It will be updated to copy of second buffer if buffer already /// has dependencies. /// @param third Thrid btree buffer in the chain. It will be updated to copy of third buffer if buffer already /// has dependencies. /// @return Returns if the buffer had to be copied - virtual std::tuple< bool, bool > create_chain(IndexBufferPtr& second, IndexBufferPtr& third, CPContext* cp_ctx) = 0; + virtual std::pair< bool, bool > create_chain(IndexBufferPtr& second, IndexBufferPtr& third, CPContext* cp_ctx) = 0; /// @brief Prepend to the chain that was already created with second /// @param first @@ -73,7 +73,7 @@ class IndexWBCacheBase { /// @brief Copy buffer /// @param cur_buf /// @return - virtual IndexBufferPtr copy_buffer(const IndexBufferPtr& cur_buf) const = 0; + virtual IndexBufferPtr copy_buffer(const IndexBufferPtr& cur_buf, const CPContext* context) const = 0; }; } // namespace homestore diff --git a/src/include/homestore/index_service.hpp b/src/include/homestore/index_service.hpp index 9952e3852..09543a5fb 100644 --- a/src/include/homestore/index_service.hpp +++ b/src/include/homestore/index_service.hpp @@ -22,7 +22,7 @@ #include #include #include - +#include namespace homestore { class IndexWBCacheBase; diff --git a/src/lib/checkpoint/cp.hpp b/src/lib/checkpoint/cp.hpp index 5644adac7..05bda5be7 100644 --- a/src/lib/checkpoint/cp.hpp +++ b/src/lib/checkpoint/cp.hpp @@ -21,7 +21,7 @@ #include #include - +#include #include "common/homestore_assert.hpp" /* @@ -73,7 +73,7 @@ struct CP { bool m_cp_waiting_to_trigger{false}; // it is waiting for previous cp to complete cp_id_t m_cp_id; std::array< std::unique_ptr< CPContext >, (size_t)cp_consumer_t::SENTINEL > m_contexts; - folly::Promise< bool > m_comp_promise; + folly::SharedPromise< bool > m_comp_promise; public: CP(CPManager* mgr) : m_cp_mgr{mgr} {} diff --git a/src/lib/checkpoint/cp_mgr.cpp b/src/lib/checkpoint/cp_mgr.cpp index 89921620a..d26424248 100644 --- a/src/lib/checkpoint/cp_mgr.cpp +++ b/src/lib/checkpoint/cp_mgr.cpp @@ -142,8 +142,11 @@ folly::Future< bool > CPManager::trigger_cp_flush(bool force) { std::unique_lock< std::mutex > lk(trigger_cp_mtx); auto cur_cp = cp_guard(); HS_DBG_ASSERT_NE(cur_cp->m_cp_status, cp_status_t::cp_flush_prepare); - cur_cp->m_comp_promise = std::move(folly::Promise< bool >{}); - cur_cp->m_cp_waiting_to_trigger = true; + // If multiple threads call trigger, they all get the future from the same promise. + if (!cur_cp->m_cp_waiting_to_trigger) { + cur_cp->m_comp_promise = std::move(folly::SharedPromise< bool >{}); + cur_cp->m_cp_waiting_to_trigger = true; + } return cur_cp->m_comp_promise.getFuture(); } else { return folly::makeFuture< bool >(false); @@ -177,7 +180,7 @@ folly::Future< bool > CPManager::trigger_cp_flush(bool force) { // originally by the caller will be untouched and completed upto CP completion/ ret_fut = folly::makeFuture< bool >(true); } else { - cur_cp->m_comp_promise = std::move(folly::Promise< bool >{}); + cur_cp->m_comp_promise = std::move(folly::SharedPromise< bool >{}); ret_fut = cur_cp->m_comp_promise.getFuture(); } cur_cp->m_cp_status = cp_status_t::cp_flush_prepare; diff --git a/src/lib/index/index_cp.hpp b/src/lib/index/index_cp.hpp index f06f53091..d7aad88dc 100644 --- a/src/lib/index/index_cp.hpp +++ b/src/lib/index/index_cp.hpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include "checkpoint/cp.hpp" @@ -32,20 +33,18 @@ struct IndexCPContext : public VDevCPContext { std::atomic< uint64_t > m_num_nodes_removed{0}; sisl::ConcurrentInsertVector< IndexBufferPtr > m_dirty_buf_list; sisl::atomic_counter< int64_t > m_dirty_buf_count{0}; - IndexBufferPtr m_last_in_chain; std::mutex m_flush_buffer_mtx; sisl::ConcurrentInsertVector< IndexBufferPtr >::iterator m_dirty_buf_it; public: + IndexCPContext(CP* cp) : VDevCPContext(cp) {} virtual ~IndexCPContext() = default; void add_to_dirty_list(const IndexBufferPtr& buf) { - buf->m_buf_state = index_buf_state_t::DIRTY; m_dirty_buf_list.push_back(buf); + buf->set_state(index_buf_state_t::DIRTY); m_dirty_buf_count.increment(1); - m_last_in_chain = buf; - LOGTRACEMOD(wbcache, "{}", buf->to_string()); } bool any_dirty_buffers() const { return !m_dirty_buf_count.testz(); } @@ -59,15 +58,102 @@ struct IndexCPContext : public VDevCPContext { return ret; } - std::string to_string() const { + std::string to_string() { std::string str{fmt::format("IndexCPContext cpid={} dirty_buf_count={} dirty_buf_list_size={}", m_cp->id(), m_dirty_buf_count.get(), m_dirty_buf_list.size())}; - // TODO dump all index buffers. + // Mapping from a node to all its parents in the graph. + // Display all buffers and its dependencies and state. + std::unordered_map< IndexBuffer*, std::vector< IndexBuffer* > > parents; + + auto it = m_dirty_buf_list.begin(); + while (it != m_dirty_buf_list.end()) { + // Add this buf to his children. + IndexBufferPtr buf = *it; + parents[buf->m_next_buffer.lock().get()].emplace_back(buf.get()); + ++it; + } + + it = m_dirty_buf_list.begin(); + while (it != m_dirty_buf_list.end()) { + IndexBufferPtr buf = *it; + fmt::format_to(std::back_inserter(str), "{}", buf->to_string()); + auto first = true; + for (const auto& p : parents[buf.get()]) { + if (first) { + fmt::format_to(std::back_inserter(str), "\nDepends:"); + first = false; + } + fmt::format_to(std::back_inserter(str), " {}({})", r_cast< void* >(p), s_cast< int >(p->state())); + } + fmt::format_to(std::back_inserter(str), "\n"); + ++it; + } + return str; } + + void check_cycle() { + // Use dfs to find if the graph is cycle + auto it = m_dirty_buf_list.begin(); + while (it != m_dirty_buf_list.end()) { + IndexBufferPtr buf = *it;; + std::set< IndexBuffer* > visited; + check_cycle_recurse(buf, visited); + ++it; + } + } + + void check_cycle_recurse(IndexBufferPtr buf, std::set< IndexBuffer* >& visited) const { + if (visited.count(buf.get()) != 0) { + LOGERROR("Cycle found for {}", buf->to_string()); + for (auto& x : visited) { + LOGERROR("Path : {}", x->to_string()); + } + return; + } + + visited.insert(buf.get()); + if (buf->m_next_buffer.lock()) { check_cycle_recurse(buf->m_next_buffer.lock(), visited); } + } + + void check_wait_for_leaders() { + // Use the next buffer as indegree to find if wait_for_leaders is invalid. + std::unordered_map< IndexBuffer*, int > wait_for_leaders; + IndexBufferPtr buf; + + // Store the wait for leader count for each buffer. + auto it = m_dirty_buf_list.begin(); + while (it != m_dirty_buf_list.end()) { + buf = *it; + wait_for_leaders[buf.get()] = buf->m_wait_for_leaders.get(); + ++it; + } + + // Decrement the count using the next buffer. + it = m_dirty_buf_list.begin(); + while (it != m_dirty_buf_list.end()) { + buf = *it; + auto next_buf = buf->m_next_buffer.lock(); + if (next_buf.get() == nullptr) continue; + wait_for_leaders[next_buf.get()]--; + ++it; + } + + bool issue = false; + for (const auto& [buf, waits] : wait_for_leaders) { + // Any value other than zero means the dependency graph is invalid. + if (waits != 0) { + issue = true; + LOGERROR("Leaders wait not zero cp {} buf {} waits {}", id(), buf->to_string(), waits); + } + } + + RELEASE_ASSERT_EQ(issue, false, "Found issue with wait_for_leaders"); + } }; + class IndexWBCache; class IndexCPCallbacks : public CPCallbacks { public: diff --git a/src/lib/index/index_service.cpp b/src/lib/index/index_service.cpp index d3d0984b5..bfa96f8bc 100644 --- a/src/lib/index/index_service.cpp +++ b/src/lib/index/index_service.cpp @@ -82,7 +82,9 @@ void IndexService::stop() { HS_REL_ASSERT_EQ(success, true, "CP Flush failed"); LOGINFO("CP Flush completed"); - for (auto [id, tbl] : m_index_map) { tbl->destroy(); } + for (auto [id, tbl] : m_index_map) { + tbl->destroy(); + } } void IndexService::add_index_table(const std::shared_ptr< IndexTableBase >& tbl) { std::unique_lock lg(m_index_map_mtx); @@ -107,9 +109,16 @@ uint64_t IndexService::used_size() const { return size; } +NodeBuffer::NodeBuffer(uint32_t buf_size, uint32_t align_size) : + m_bytes{hs_utils::iobuf_alloc(buf_size, sisl::buftag::btree_node, align_size)} {} + +NodeBuffer::~NodeBuffer() { hs_utils::iobuf_free(m_bytes, sisl::buftag::btree_node); } + IndexBuffer::IndexBuffer(BlkId blkid, uint32_t buf_size, uint32_t align_size) : - m_node_buf{hs_utils::iobuf_alloc(buf_size, sisl::buftag::btree_node, align_size)}, m_blkid{blkid} {} + m_node_buf{std::make_shared< NodeBuffer >(buf_size, align_size)}, m_blkid{blkid} {} + +IndexBuffer::IndexBuffer(NodeBufferPtr node_buf, BlkId blkid) : m_node_buf(node_buf), m_blkid(blkid) {} -IndexBuffer::~IndexBuffer() { hs_utils::iobuf_free(m_node_buf, sisl::buftag::btree_node); } +IndexBuffer::~IndexBuffer() { m_node_buf.reset(); } } // namespace homestore diff --git a/src/lib/index/wb_cache.cpp b/src/lib/index/wb_cache.cpp index a00a46dfa..12cbbfc31 100644 --- a/src/lib/index/wb_cache.cpp +++ b/src/lib/index/wb_cache.cpp @@ -84,7 +84,6 @@ BtreeNodePtr IndexWBCache::alloc_buf(node_initializer_t&& node_initializer) { // Alloc buffer and initialize the node auto idx_buf = std::make_shared< IndexBuffer >(blkid, m_node_size, m_vdev->align_size()); auto node = node_initializer(idx_buf); - LOGTRACEMOD(wbcache, "idx_buf {} blkid {}", static_cast< void* >(idx_buf.get()), blkid.to_integer()); // Add the node to the cache bool done = m_cache.insert(node); @@ -101,16 +100,31 @@ void IndexWBCache::realloc_buf(const IndexBufferPtr& buf) { } void IndexWBCache::write_buf(const BtreeNodePtr& node, const IndexBufferPtr& buf, CPContext* cp_ctx) { + // TODO upsert always returns false even if it succeeds. m_cache.upsert(node); r_cast< IndexCPContext* >(cp_ctx)->add_to_dirty_list(buf); resource_mgr().inc_dirty_buf_size(m_node_size); } -IndexBufferPtr IndexWBCache::copy_buffer(const IndexBufferPtr& cur_buf) const { - auto new_buf = std::make_shared< IndexBuffer >(cur_buf->m_blkid, m_node_size, m_vdev->align_size()); - std::memcpy(new_buf->raw_buffer(), cur_buf->raw_buffer(), m_node_size); - LOGTRACEMOD(wbcache, "new_buf {} cur_buf {} cur_buf_blkid {}", static_cast< void* >(new_buf.get()), - static_cast< void* >(cur_buf.get()), cur_buf->m_blkid.to_integer()); +IndexBufferPtr IndexWBCache::copy_buffer(const IndexBufferPtr& cur_buf, const CPContext* cp_ctx) const { + IndexBufferPtr new_buf = nullptr; + bool copied = false; + + // When we copy the buffer we check if the node buffer is clean or not. If its clean + // we could reuse it otherwise create a copy. + if (cur_buf->is_clean()) { + // Refer to the same node buffer. + new_buf = std::make_shared< IndexBuffer >(cur_buf->m_node_buf, cur_buf->m_blkid); + } else { + // If its not clean, we do deep copy. + new_buf = std::make_shared< IndexBuffer >(cur_buf->m_blkid, m_node_size, m_vdev->align_size()); + std::memcpy(new_buf->raw_buffer(), cur_buf->raw_buffer(), m_node_size); + copied = true; + } + + LOGTRACEMOD(wbcache, "cp {} new_buf {} cur_buf {} cur_buf_blkid {} copied {}", cp_ctx->id(), + static_cast< void* >(new_buf.get()), static_cast< void* >(cur_buf.get()), cur_buf->m_blkid.to_integer(), + copied); return new_buf; } @@ -138,32 +152,34 @@ void IndexWBCache::read_buf(bnodeid_t id, BtreeNodePtr& node, node_initializer_t } } -std::tuple< bool, bool > IndexWBCache::create_chain(IndexBufferPtr& second, IndexBufferPtr& third, CPContext* cp_ctx) { +std::pair< bool, bool > IndexWBCache::create_chain(IndexBufferPtr& second, IndexBufferPtr& third, CPContext* cp_ctx) { bool second_copied{false}, third_copied{false}; - + auto chain = second; + auto old_third = third; if (!second->is_clean()) { - auto new_second = copy_buffer(second); - LOGTRACEMOD(wbcache, "second copied blkid {} {} new_second {}", second->m_blkid.to_integer(), - static_cast< void* >(second.get()), static_cast< void* >(new_second.get())); + auto new_second = copy_buffer(second, cp_ctx); second = new_second; second_copied = true; } + if (!third->is_clean()) { - auto new_third = copy_buffer(third); - LOGTRACEMOD(wbcache, "third copied blkid {} {} new_third {}", third->m_blkid.to_integer(), - static_cast< void* >(third.get()), static_cast< void* >(new_third.get())); + auto new_third = copy_buffer(third, cp_ctx); third = new_third; third_copied = true; } // Append parent(third) to the left child(second). - prepend_to_chain(second, third); + second->m_next_buffer = third; + third->m_wait_for_leaders.increment(1); + if (chain != second) { + // We want buffers to be append to the end of the chain which are related. + // If we split a node multiple times in same or different CP's, each dirty buffer will be + // added to the end of that chain. + while (chain->m_next_buffer.lock() != nullptr) { + chain = chain->m_next_buffer.lock(); + } - // TODO the index buffer are added to end of the chain, instead add to the dependency. - auto& last_in_chain = r_cast< IndexCPContext* >(cp_ctx)->m_last_in_chain; - if (last_in_chain) { - // Add this to the end of the chain. - last_in_chain->m_next_buffer = second; + chain->m_next_buffer = second; second->m_wait_for_leaders.increment(1); } @@ -171,10 +187,10 @@ std::tuple< bool, bool > IndexWBCache::create_chain(IndexBufferPtr& second, Inde } void IndexWBCache::prepend_to_chain(const IndexBufferPtr& first, const IndexBufferPtr& second) { + assert(first->m_next_buffer.lock() != second); assert(first->m_next_buffer.lock() == nullptr); first->m_next_buffer = second; second->m_wait_for_leaders.increment(1); - LOGTRACEMOD(wbcache, "first {} second {}", first->to_string(), second->to_string()); } void IndexWBCache::free_buf(const IndexBufferPtr& buf, CPContext* cp_ctx) { @@ -187,6 +203,7 @@ void IndexWBCache::free_buf(const IndexBufferPtr& buf, CPContext* cp_ctx) { } //////////////////// CP Related API section ///////////////////////////////// + folly::Future< bool > IndexWBCache::async_cp_flush(IndexCPContext* cp_ctx) { LOGTRACEMOD(wbcache, "cp_ctx {}", cp_ctx->to_string()); if (!cp_ctx->any_dirty_buffers()) { @@ -194,6 +211,13 @@ folly::Future< bool > IndexWBCache::async_cp_flush(IndexCPContext* cp_ctx) { return folly::makeFuture< bool >(true); // nothing to flush } +#ifndef NDEBUG + // Check no cycles or invalid wait_for_leader count in the dirty buffer + // dependency graph. + // cp_ctx->check_wait_for_leaders(); + // cp_ctx->check_cycle(); +#endif + cp_ctx->prepare_flush_iteration(); for (auto& fiber : m_cp_flush_fibers) { @@ -211,22 +235,22 @@ folly::Future< bool > IndexWBCache::async_cp_flush(IndexCPContext* cp_ctx) { return std::move(cp_ctx->get_future()); } -void IndexWBCache::do_flush_one_buf(IndexCPContext* cp_ctx, const IndexBufferPtr& buf, bool part_of_batch) { - LOGTRACEMOD(wbcache, "buf {}", buf->to_string()); - buf->m_buf_state = index_buf_state_t::FLUSHING; +void IndexWBCache::do_flush_one_buf(IndexCPContext* cp_ctx, IndexBufferPtr buf, bool part_of_batch) { + LOGTRACEMOD(wbcache, "cp {} buf {}", cp_ctx->id(), buf->to_string()); + buf->set_state(index_buf_state_t::FLUSHING); m_vdev->async_write(r_cast< const char* >(buf->raw_buffer()), m_node_size, buf->m_blkid, part_of_batch) - .thenValue([pbuf = buf.get(), cp_ctx](auto) { + .thenValue([buf, cp_ctx](auto) { auto& pthis = s_cast< IndexWBCache& >(wb_cache()); // Avoiding more than 16 bytes capture - pthis.process_write_completion(cp_ctx, pbuf); + pthis.process_write_completion(cp_ctx, buf); }); if (!part_of_batch) { m_vdev->submit_batch(); } } -void IndexWBCache::process_write_completion(IndexCPContext* cp_ctx, IndexBuffer* pbuf) { - LOGTRACEMOD(wbcache, "buf {}", pbuf->to_string()); +void IndexWBCache::process_write_completion(IndexCPContext* cp_ctx, IndexBufferPtr buf) { + LOGTRACEMOD(wbcache, "cp {} buf {}", cp_ctx->id(), buf->to_string()); resource_mgr().dec_dirty_buf_size(m_node_size); - auto [next_buf, has_more] = on_buf_flush_done(cp_ctx, pbuf); + auto [next_buf, has_more] = on_buf_flush_done(cp_ctx, buf); if (next_buf) { do_flush_one_buf(cp_ctx, next_buf, false); } else if (!has_more) { @@ -240,7 +264,7 @@ void IndexWBCache::process_write_completion(IndexCPContext* cp_ctx, IndexBuffer* } } -std::pair< IndexBufferPtr, bool > IndexWBCache::on_buf_flush_done(IndexCPContext* cp_ctx, IndexBuffer* buf) { +std::pair< IndexBufferPtr, bool > IndexWBCache::on_buf_flush_done(IndexCPContext* cp_ctx, IndexBufferPtr& buf) { if (m_cp_flush_fibers.size() > 1) { std::unique_lock lg(m_flush_mtx); return on_buf_flush_done_internal(cp_ctx, buf); @@ -249,9 +273,10 @@ std::pair< IndexBufferPtr, bool > IndexWBCache::on_buf_flush_done(IndexCPContext } } -std::pair< IndexBufferPtr, bool > IndexWBCache::on_buf_flush_done_internal(IndexCPContext* cp_ctx, IndexBuffer* buf) { +std::pair< IndexBufferPtr, bool > IndexWBCache::on_buf_flush_done_internal(IndexCPContext* cp_ctx, + IndexBufferPtr& buf) { static thread_local std::vector< IndexBufferPtr > t_buf_list; - buf->m_buf_state = index_buf_state_t::CLEAN; + buf->set_state(index_buf_state_t::CLEAN); t_buf_list.clear(); @@ -272,7 +297,7 @@ void IndexWBCache::get_next_bufs(IndexCPContext* cp_ctx, uint32_t max_count, std } } -void IndexWBCache::get_next_bufs_internal(IndexCPContext* cp_ctx, uint32_t max_count, IndexBuffer* prev_flushed_buf, +void IndexWBCache::get_next_bufs_internal(IndexCPContext* cp_ctx, uint32_t max_count, IndexBufferPtr prev_flushed_buf, std::vector< IndexBufferPtr >& bufs) { uint32_t count{0}; diff --git a/src/lib/index/wb_cache.hpp b/src/lib/index/wb_cache.hpp index 7639d0714..9a652ce2a 100644 --- a/src/lib/index/wb_cache.hpp +++ b/src/lib/index/wb_cache.hpp @@ -49,23 +49,23 @@ class IndexWBCache : public IndexWBCacheBase { void realloc_buf(const IndexBufferPtr& buf) override; void write_buf(const BtreeNodePtr& node, const IndexBufferPtr& buf, CPContext* cp_ctx) override; void read_buf(bnodeid_t id, BtreeNodePtr& node, node_initializer_t&& node_initializer) override; - std::tuple< bool, bool > create_chain(IndexBufferPtr& second, IndexBufferPtr& third, CPContext* cp_ctx) override; + std::pair< bool, bool > create_chain(IndexBufferPtr& second, IndexBufferPtr& third, CPContext* cp_ctx) override; void prepend_to_chain(const IndexBufferPtr& first, const IndexBufferPtr& second) override; void free_buf(const IndexBufferPtr& buf, CPContext* cp_ctx) override; //////////////////// CP Related API section ///////////////////////////////// folly::Future< bool > async_cp_flush(IndexCPContext* context); - IndexBufferPtr copy_buffer(const IndexBufferPtr& cur_buf) const; + IndexBufferPtr copy_buffer(const IndexBufferPtr& cur_buf, const CPContext *cp_ctx) const; private: void start_flush_threads(); - void process_write_completion(IndexCPContext* cp_ctx, IndexBuffer* pbuf); - void do_flush_one_buf(IndexCPContext* cp_ctx, const IndexBufferPtr& buf, bool part_of_batch); - std::pair< IndexBufferPtr, bool > on_buf_flush_done(IndexCPContext* cp_ctx, IndexBuffer* buf); - std::pair< IndexBufferPtr, bool > on_buf_flush_done_internal(IndexCPContext* cp_ctx, IndexBuffer* buf); + void process_write_completion(IndexCPContext* cp_ctx, IndexBufferPtr pbuf); + void do_flush_one_buf(IndexCPContext* cp_ctx, const IndexBufferPtr buf, bool part_of_batch); + std::pair< IndexBufferPtr, bool > on_buf_flush_done(IndexCPContext* cp_ctx, IndexBufferPtr& buf); + std::pair< IndexBufferPtr, bool > on_buf_flush_done_internal(IndexCPContext* cp_ctx, IndexBufferPtr& buf); void get_next_bufs(IndexCPContext* cp_ctx, uint32_t max_count, std::vector< IndexBufferPtr >& bufs); - void get_next_bufs_internal(IndexCPContext* cp_ctx, uint32_t max_count, IndexBuffer* prev_flushed_buf, + void get_next_bufs_internal(IndexCPContext* cp_ctx, uint32_t max_count, IndexBufferPtr prev_flushed_buf, std::vector< IndexBufferPtr >& bufs); }; } // namespace homestore diff --git a/src/tests/btree_helpers/btree_test_helper.hpp b/src/tests/btree_helpers/btree_test_helper.hpp index 3bc943fc0..1072ef981 100644 --- a/src/tests/btree_helpers/btree_test_helper.hpp +++ b/src/tests/btree_helpers/btree_test_helper.hpp @@ -41,7 +41,7 @@ struct BtreeTestHelper : public testing::Test { using mutex = iomgr::FiberManagerLib::shared_mutex; using op_func_t = std::function< void(void) >; - BtreeTestHelper() : testing::Test(), m_range_scheduler{SISL_OPTIONS["num_entries"].as< uint32_t >()} {} + BtreeTestHelper() : testing::Test(), m_shadow_map{SISL_OPTIONS["num_entries"].as< uint32_t >()} {} void SetUp() override { m_cfg.m_leaf_node_type = T::leaf_node_type; @@ -71,7 +71,6 @@ struct BtreeTestHelper : public testing::Test { std::shared_ptr< typename T::BtreeType > m_bt; ShadowMap< K, V > m_shadow_map; BtreeConfig m_cfg{g_node_size}; - RangeScheduler m_range_scheduler; uint32_t m_max_range_input{1000}; bool m_is_multi_threaded{false}; @@ -94,7 +93,6 @@ struct BtreeTestHelper : public testing::Test { iomanager.run_on_forget(m_fibers[i], [this, start_range, end_range, &test_count]() { for (uint32_t i = start_range; i < end_range; i++) { put(i, btree_put_type::INSERT); - m_range_scheduler.put_key(i); } { std::unique_lock lg(m_test_done_mtx); @@ -114,7 +112,7 @@ struct BtreeTestHelper : public testing::Test { void put(uint64_t k, btree_put_type put_type) { do_put(k, put_type, V::generate_rand()); } void put_random() { - auto [start_k, end_k] = m_range_scheduler.pick_random_non_existing_keys(1); + auto [start_k, end_k] = m_shadow_map.pick_random_non_existing_keys(1); RELEASE_ASSERT_EQ(start_k, end_k, "Range scheduler pick_random_non_existing_keys issue"); do_put(start_k, btree_put_type::INSERT, V::generate_rand()); @@ -132,10 +130,8 @@ struct BtreeTestHelper : public testing::Test { if (update) { m_shadow_map.range_update(start_key, nkeys, value); - m_range_scheduler.remove_keys_from_working(start_k, end_k); } else { m_shadow_map.range_upsert(start_k, nkeys, value); - m_range_scheduler.put_keys(start_k, end_k); } } @@ -146,8 +142,8 @@ struct BtreeTestHelper : public testing::Test { static thread_local std::uniform_int_distribution< uint32_t > s_rand_range_generator{1, 50}; auto const [start_k, end_k] = is_update - ? m_range_scheduler.pick_random_existing_keys(s_rand_range_generator(m_re)) - : m_range_scheduler.pick_random_non_working_keys(s_rand_range_generator(m_re)); + ? m_shadow_map.pick_random_existing_keys(s_rand_range_generator(m_re)) + : m_shadow_map.pick_random_non_working_keys(s_rand_range_generator(m_re)); range_put(start_k, end_k, V::generate_rand(), is_update); } @@ -167,15 +163,13 @@ struct BtreeTestHelper : public testing::Test { m_shadow_map.validate_data(rreq.key(), (const V&)rreq.value()); m_shadow_map.erase(rreq.key()); } - m_range_scheduler.remove_key(k); } void remove_random() { - auto const [start_k, end_k] = m_range_scheduler.pick_random_existing_keys(1); + auto const [start_k, end_k] = m_shadow_map.pick_random_existing_keys(1); RELEASE_ASSERT_EQ(start_k, end_k, "Range scheduler pick_random_existing_keys issue"); remove_one(start_k); - m_range_scheduler.remove_key(start_k); } void range_remove_existing(uint32_t start_k, uint32_t count) { @@ -186,7 +180,7 @@ struct BtreeTestHelper : public testing::Test { void range_remove_existing_random() { static std::uniform_int_distribution< uint32_t > s_rand_range_generator{2, 5}; - auto const [start_k, end_k] = m_range_scheduler.pick_random_existing_keys(s_rand_range_generator(m_re)); + auto const [start_k, end_k] = m_shadow_map.pick_random_existing_keys(s_rand_range_generator(m_re)); do_range_remove(start_k, end_k, true /* only_existing */); } @@ -203,6 +197,7 @@ struct BtreeTestHelper : public testing::Test { void do_query(uint32_t start_k, uint32_t end_k, uint32_t batch_size) { std::vector< std::pair< K, V > > out_vector; + m_shadow_map.guard().lock(); uint32_t remaining = m_shadow_map.num_elems_in_range(start_k, end_k); auto it = m_shadow_map.map_const().lower_bound(K{start_k}); @@ -234,21 +229,23 @@ struct BtreeTestHelper : public testing::Test { ASSERT_EQ(ret, btree_status_t::success) << "Expected success on query"; ASSERT_EQ(out_vector.size(), 0) << "Received incorrect value on empty query pagination"; + m_shadow_map.guard().unlock(); + if (start_k < m_max_range_input) { - m_range_scheduler.remove_keys_from_working(start_k, std::min(end_k, m_max_range_input - 1)); + m_shadow_map.remove_keys_from_working(start_k, std::min(end_k, m_max_range_input - 1)); } } void query_random() { static thread_local std::uniform_int_distribution< uint32_t > s_rand_range_generator{1, 100}; - auto const [start_k, end_k] = m_range_scheduler.pick_random_non_working_keys(s_rand_range_generator(m_re)); + auto const [start_k, end_k] = m_shadow_map.pick_random_non_working_keys(s_rand_range_generator(m_re)); do_query(start_k, end_k, 79); } ////////////////////// All get operation variants /////////////////////////////// void get_all() const { - for (const auto& [key, value] : m_shadow_map.map_const()) { + m_shadow_map.foreach ([this](K key, V value) { auto copy_key = std::make_unique< K >(); *copy_key = key; auto out_v = std::make_unique< V >(); @@ -258,7 +255,7 @@ struct BtreeTestHelper : public testing::Test { ASSERT_EQ(ret, btree_status_t::success) << "Missing key " << key << " in btree but present in shadow map"; ASSERT_EQ((const V&)req.value(), value) << "Found value in btree doesn't return correct data for key=" << key; - } + }); } void get_specific(uint32_t k) const { @@ -280,6 +277,7 @@ struct BtreeTestHelper : public testing::Test { auto req = BtreeGetAnyRequest< K >{BtreeKeyRange< K >{K{start_k}, true, K{end_k}, true}, out_k.get(), out_v.get()}; const auto status = m_bt->get(req); + if (status == btree_status_t::success) { ASSERT_EQ(m_shadow_map.exists_in_range(*(K*)req.m_outkey, start_k, end_k), true) << "Get Any returned key=" << *(K*)req.m_outkey << " which is not in range " << start_k << "-" << end_k @@ -303,14 +301,32 @@ struct BtreeTestHelper : public testing::Test { void print_keys() const { m_bt->print_tree_keys(); } void compare_files(const std::string& before, const std::string& after) { - std::ifstream b(before); - std::ifstream a(after); - std::ostringstream ss_before, ss_after; - ss_before << b.rdbuf(); - ss_after << a.rdbuf(); - std::string s1 = ss_before.str(); - std::string s2 = ss_after.str(); - ASSERT_EQ(s1, s2) << "Mismatch in btree structure"; + std::ifstream b(before, std::ifstream::ate); + std::ifstream a(after, std::ifstream::ate); + if (a.fail() || b.fail()) { + LOGINFO("Failed to open file"); + assert(false); + } + if (a.tellg() != b.tellg()) { + LOGINFO("Mismatch in btree files"); + assert(false); + } + + int64_t pending = a.tellg(); + const int64_t batch_size = 4096; + a.seekg(0, ifstream::beg); + b.seekg(0, ifstream::beg); + char a_buffer[batch_size], b_buffer[batch_size]; + while (pending > 0) { + auto count = std::min(pending, batch_size); + a.read(a_buffer, count); + b.read(b_buffer, count); + if (std::memcmp(a_buffer, b_buffer, count) != 0) { + LOGINFO("Mismatch in btree files"); + assert(false); + } + pending -= count; + } } private: @@ -327,7 +343,6 @@ struct BtreeTestHelper : public testing::Test { } m_shadow_map.put_and_check(key, value, *existing_v, done); - m_range_scheduler.put_key(k); } void do_range_remove(uint64_t start_k, uint64_t end_k, bool all_existing) { @@ -336,6 +351,7 @@ struct BtreeTestHelper : public testing::Test { auto rreq = BtreeRangeRemoveRequest< K >{BtreeKeyRange< K >{start_key, true, end_key, true}}; auto const ret = m_bt->remove(rreq); + m_shadow_map.range_erase(start_key, end_key); if (all_existing) { @@ -344,7 +360,7 @@ struct BtreeTestHelper : public testing::Test { } if (start_k < m_max_range_input) { - m_range_scheduler.remove_keys(start_k, std::min(end_k, uint64_cast(m_max_range_input - 1))); + m_shadow_map.remove_keys(start_k, std::min(end_k, uint64_cast(m_max_range_input - 1))); } } @@ -380,4 +396,4 @@ struct BtreeTestHelper : public testing::Test { } LOGINFO("ALL parallel jobs joined"); } -}; \ No newline at end of file +}; diff --git a/src/tests/btree_helpers/shadow_map.hpp b/src/tests/btree_helpers/shadow_map.hpp index 1e7418122..f8c40e140 100644 --- a/src/tests/btree_helpers/shadow_map.hpp +++ b/src/tests/btree_helpers/shadow_map.hpp @@ -7,26 +7,36 @@ template < typename K, typename V > class ShadowMap { private: std::map< K, V > m_map; + RangeScheduler m_range_scheduler; + using mutex = iomgr::FiberManagerLib::shared_mutex; + mutex m_mutex; public: + ShadowMap(uint32_t num_keys) : m_range_scheduler(num_keys) {} + void put_and_check(const K& key, const V& val, const V& old_val, bool expected_success) { + std::lock_guard lock{m_mutex}; auto const [it, happened] = m_map.insert(std::make_pair(key, val)); ASSERT_EQ(happened, expected_success) << "Testcase issue, expected inserted slots to be in shadow map"; if (!happened) { ASSERT_EQ(old_val, it->second) << "Put: Existing value doesn't return correct data for key: " << it->first; } + m_range_scheduler.put_key(key.key()); } void range_upsert(uint64_t start_k, uint32_t count, const V& val) { + std::lock_guard lock{m_mutex}; for (uint32_t i{0}; i < count; ++i) { K key{start_k + i}; V range_value{val}; if constexpr (std::is_same_v< V, TestIntervalValue >) { range_value.shift(i); } m_map.insert_or_assign(key, range_value); } + m_range_scheduler.put_keys(start_k, start_k + count - 1); } void range_update(const K& start_key, uint32_t count, const V& new_val) { + std::lock_guard lock{m_mutex}; auto const start_it = m_map.lower_bound(start_key); auto it = start_it; uint32_t c = 0; @@ -34,9 +44,11 @@ class ShadowMap { it->second = new_val; ++it; } + m_range_scheduler.remove_keys_from_working(start_key.key(), start_key.key() + count - 1); } std::pair< K, K > pick_existing_range(const K& start_key, uint32_t max_count) const { + std::shared_lock lock{m_mutex}; auto const start_it = m_map.lower_bound(start_key); auto it = start_it; uint32_t count = 0; @@ -46,9 +58,13 @@ class ShadowMap { return std::pair(start_it->first, it->first); } - bool exists(const K& key) const { return m_map.find(key) != m_map.end(); } + bool exists(const K& key) const { + std::shared_lock lock{m_mutex}; + return m_map.find(key) != m_map.end(); + } bool exists_in_range(const K& key, uint64_t start_k, uint64_t end_k) const { + std::shared_lock lock{m_mutex}; const auto itlower = m_map.lower_bound(K{start_k}); const auto itupper = m_map.upper_bound(K{end_k}); auto it = itlower; @@ -59,7 +75,10 @@ class ShadowMap { return false; } - uint64_t size() const { return m_map.size(); } + uint64_t size() const { + std::shared_lock lock{m_mutex}; + return m_map.size(); + } uint32_t num_elems_in_range(uint64_t start_k, uint64_t end_k) const { const auto itlower = m_map.lower_bound(K{start_k}); @@ -68,29 +87,71 @@ class ShadowMap { } void validate_data(const K& key, const V& btree_val) const { + std::shared_lock lock{m_mutex}; const auto r = m_map.find(key); ASSERT_NE(r, m_map.end()) << "Key " << key.to_string() << " is not present in shadow map"; ASSERT_EQ(btree_val, r->second) << "Found value in btree doesn't return correct data for key=" << r->first; } - void erase(const K& key) { m_map.erase(key); } + void erase(const K& key) { + std::lock_guard lock{m_mutex}; + m_map.erase(key); + m_range_scheduler.remove_key(key.key()); + } void range_erase(const K& start_key, uint32_t count) { + std::lock_guard lock{m_mutex}; auto const it = m_map.lower_bound(start_key); uint32_t i{0}; while ((it != m_map.cend()) && (i++ < count)) { it = m_map.erase(it); } + m_range_scheduler.remove_keys(start_key.key(), start_key.key() + count); } void range_erase(const K& start_key, const K& end_key) { + std::lock_guard lock{m_mutex}; auto it = m_map.lower_bound(start_key); auto const end_it = m_map.upper_bound(end_key); while ((it != m_map.cend()) && (it != end_it)) { it = m_map.erase(it); } + m_range_scheduler.remove_keys(start_key.key(), end_key.key()); } + mutex& guard() { return m_mutex; } std::map< K, V >& map() { return m_map; } const std::map< K, V >& map_const() const { return m_map; } + + void foreach (std::function< void(K, V) > func) const { + std::shared_lock lock{m_mutex}; + for (const auto& [key, value] : m_map) { + func(key, value); + } + } + + std::pair< uint32_t, uint32_t > pick_random_non_existing_keys(uint32_t max_keys) { + std::shared_lock lock{m_mutex}; + return m_range_scheduler.pick_random_non_existing_keys(max_keys); + } + + std::pair< uint32_t, uint32_t > pick_random_existing_keys(uint32_t max_keys) { + std::shared_lock lock{m_mutex}; + return m_range_scheduler.pick_random_existing_keys(max_keys); + } + + std::pair< uint32_t, uint32_t > pick_random_non_working_keys(uint32_t max_keys) { + std::shared_lock lock{m_mutex}; + return m_range_scheduler.pick_random_non_working_keys(max_keys); + } + + void remove_keys_from_working(uint32_t s, uint32_t e) { + std::lock_guard lock{m_mutex}; + m_range_scheduler.remove_keys_from_working(s, e); + } + + void remove_keys(uint32_t start_key, uint32_t end_key) { + std::lock_guard lock{m_mutex}; + m_range_scheduler.remove_keys(start_key, end_key); + } }; diff --git a/src/tests/test_index_btree.cpp b/src/tests/test_index_btree.cpp index 14062cd1f..4bab73ad0 100644 --- a/src/tests/test_index_btree.cpp +++ b/src/tests/test_index_btree.cpp @@ -42,13 +42,15 @@ SISL_OPTIONS_ENABLE(logging, test_index_btree, iomgr, test_common_setup) SISL_LOGGING_DECL(test_index_btree) std::vector< std::string > test_common::HSTestHelper::s_dev_names; -// TODO increase num_entries to 65k as io mgr page size is 512 and its slow. + +// TODO Add tests to do write,remove after recovery. +// TODO Test with var len key with io mgr page size is 512. SISL_OPTION_GROUP(test_index_btree, (num_iters, "", "num_iters", "number of iterations for rand ops", - ::cxxopts::value< uint32_t >()->default_value("1500"), "number"), + ::cxxopts::value< uint32_t >()->default_value("500"), "number"), (num_entries, "", "num_entries", "number of entries to test with", - ::cxxopts::value< uint32_t >()->default_value("15000"), "number"), + ::cxxopts::value< uint32_t >()->default_value("5000"), "number"), (seed, "", "seed", "random engine seed, use random if not defined", ::cxxopts::value< uint64_t >()->default_value("0"), "number")) @@ -159,12 +161,7 @@ struct BtreeTest : public BtreeTestHelper< TestType > { } }; -// TODO sanal fix the varkey issue. -// using BtreeTypes = testing::Types< FixedLenBtreeTest, VarKeySizeBtreeTest, VarValueSizeBtreeTest, -// VarObjSizeBtreeTest -// >; - -using BtreeTypes = testing::Types< FixedLenBtreeTest >; +using BtreeTypes = testing::Types< FixedLenBtreeTest, VarKeySizeBtreeTest, VarValueSizeBtreeTest, VarObjSizeBtreeTest >; TYPED_TEST_SUITE(BtreeTest, BtreeTypes); @@ -225,7 +222,6 @@ TYPED_TEST(BtreeTest, RandomInsert) { this->get_all(); } -#if 0 TYPED_TEST(BtreeTest, SequentialRemove) { LOGINFO("SequentialRemove test start"); // Forward sequential insert @@ -280,7 +276,6 @@ TYPED_TEST(BtreeTest, RandomRemove) { } this->get_all(); } -#endif TYPED_TEST(BtreeTest, RangeUpdate) { LOGINFO("RangeUpdate test start"); @@ -309,23 +304,29 @@ TYPED_TEST(BtreeTest, CpFlush) { for (uint32_t i = 0; i < num_entries; ++i) { this->put(i, btree_put_type::INSERT); } + + // Remove some of the entries. + for (uint32_t i = 0; i < num_entries; i += 10) { + this->remove_one(i); + } + LOGINFO("Query {} entries and validate with pagination of 75 entries", num_entries / 2); this->do_query(0, num_entries / 2 - 1, 75); - this->print(std::string("before.txt")); - LOGINFO("Trigger checkpoint flush."); test_common::HSTestHelper::trigger_cp(true /* wait */); LOGINFO("Query {} entries and validate with pagination of 75 entries", num_entries); this->do_query(0, num_entries - 1, 75); + this->print(std::string("before.txt")); + this->destroy_btree(); // Restart homestore. m_bt is updated by the TestIndexServiceCallback. this->restart_homestore(); - std::this_thread::sleep_for(std::chrono::seconds{3}); + std::this_thread::sleep_for(std::chrono::seconds{1}); LOGINFO("Restarted homestore with index recovered"); this->print(std::string("after.txt")); @@ -373,7 +374,7 @@ TYPED_TEST(BtreeTest, MultipleCpFlush) { // Restart homestore. m_bt is updated by the TestIndexServiceCallback. this->restart_homestore(); - std::this_thread::sleep_for(std::chrono::seconds{3}); + std::this_thread::sleep_for(std::chrono::seconds{1}); LOGINFO(" Restarted homestore with index recovered"); this->print(std::string("after.txt")); @@ -388,24 +389,41 @@ TYPED_TEST(BtreeTest, ThreadedCpFlush) { LOGINFO("ThreadedCpFlush test start"); const auto num_entries = SISL_OPTIONS["num_entries"].as< uint32_t >(); - bool stop_cp_flush = false; - auto io_thread = std::thread([this, num_entries] { + bool stop = false; + std::atomic< uint32_t > last_index{0}; + auto insert_io_thread = std::thread([this, num_entries, &last_index] { LOGINFO("Do Forward sequential insert for {} entries", num_entries); + uint32_t j = 0; for (uint32_t i = 0; i < num_entries; ++i) { this->put(i, btree_put_type::INSERT); + last_index = i; } }); - auto cp_flush_thread = std::thread([this, &stop_cp_flush] { - while (!stop_cp_flush) { - LOGINFO("Trigger checkpoint flush wait=false."); - test_common::HSTestHelper::trigger_cp(false /* wait */); + auto remove_io_thread = std::thread([this, &stop, num_entries, &last_index] { + LOGINFO("Do random removes for {} entries", num_entries); + while (!stop) { + std::this_thread::sleep_for(std::chrono::milliseconds{10}); + // Remove a random entry. + std::uniform_int_distribution< uint32_t > rand{0, last_index.load()}; + auto rm_idx = rand(g_re); + LOGINFO("Removing entry {}", rm_idx); + this->remove_one(rm_idx); + } + }); + + auto cp_flush_thread = std::thread([this, &stop] { + while (!stop) { std::this_thread::sleep_for(std::chrono::seconds{1}); + LOGINFO("Trigger checkpoint flush wait=true."); + test_common::HSTestHelper::trigger_cp(false /* wait */); + LOGINFO("Trigger checkpoint flush wait=true done."); } }); - io_thread.join(); - stop_cp_flush = true; + insert_io_thread.join(); + stop = true; + remove_io_thread.join(); cp_flush_thread.join(); LOGINFO("Trigger checkpoint flush wait=true."); @@ -420,7 +438,7 @@ TYPED_TEST(BtreeTest, ThreadedCpFlush) { // Restart homestore. m_bt is updated by the TestIndexServiceCallback. this->restart_homestore(); - std::this_thread::sleep_for(std::chrono::seconds{3}); + std::this_thread::sleep_for(std::chrono::seconds{1}); LOGINFO(" Restarted homestore with index recovered"); this->print(std::string("after.txt")); diff --git a/src/tests/test_mem_btree.cpp b/src/tests/test_mem_btree.cpp index 68bf4003d..bb794fca3 100644 --- a/src/tests/test_mem_btree.cpp +++ b/src/tests/test_mem_btree.cpp @@ -41,14 +41,15 @@ SISL_OPTION_GROUP( (num_entries, "", "num_entries", "number of entries to test with", ::cxxopts::value< uint32_t >()->default_value("10000"), "number"), (disable_merge, "", "disable_merge", "disable_merge", ::cxxopts::value< bool >()->default_value("0"), ""), - (n_threads, "", "n_threads", "number of threads", ::cxxopts::value< uint32_t >()->default_value("2"), "number"), - (n_fibers, "", "n_fibers", "number of fibers", ::cxxopts::value< uint32_t >()->default_value("10"), "number"), + (n_threads, "", "num_threads", "number of threads", ::cxxopts::value< uint32_t >()->default_value("2"), "number"), + (n_fibers, "", "num_fibers", "number of fibers", ::cxxopts::value< uint32_t >()->default_value("10"), "number"), (operation_list, "", "operation_list", "operation list instead of default created following by percentage", ::cxxopts::value< std::vector< std::string > >(), "operations [...]"), (preload_size, "", "preload_size", "number of entries to preload tree with", ::cxxopts::value< uint32_t >()->default_value("1000"), "number"), (seed, "", "seed", "random engine seed, use random if not defined", - ::cxxopts::value< uint64_t >()->default_value("0"), "number")) + ::cxxopts::value< uint64_t >()->default_value("0"), "number"), + (run_time, "", "run_time", "run time for io", ::cxxopts::value< uint32_t >()->default_value("360000"), "seconds")) struct FixedLenBtreeTest { using BtreeType = MemBtree< TestFixedKey, TestFixedValue >; @@ -265,7 +266,7 @@ TYPED_TEST(BtreeTest, RandomRemoveRange) { this->put(i, btree_put_type::INSERT); } // generate keys including out of bound - static thread_local std::uniform_int_distribution< uint32_t > s_rand_key_generator{0, 2 * num_entries}; + static thread_local std::uniform_int_distribution< uint32_t > s_rand_key_generator{0, num_entries}; // this->print_keys(); LOGINFO("Step 2: Do range remove for maximum of {} iterations", num_iters); for (uint32_t i{0}; (i < num_iters) && this->m_shadow_map.size(); ++i) { @@ -289,10 +290,10 @@ struct BtreeConcurrentTest : public BtreeTestHelper< TestType > { BtreeConcurrentTest() { this->m_is_multi_threaded = true; } void SetUp() override { - LOGINFO("Starting iomgr with {} threads", SISL_OPTIONS["n_threads"].as< uint32_t >()); - ioenvironment.with_iomgr(iomgr::iomgr_params{.num_threads = SISL_OPTIONS["n_threads"].as< uint32_t >(), + LOGINFO("Starting iomgr with {} threads", SISL_OPTIONS["num_threads"].as< uint32_t >()); + ioenvironment.with_iomgr(iomgr::iomgr_params{.num_threads = SISL_OPTIONS["num_threads"].as< uint32_t >(), .is_spdk = false, - .num_fibers = 1 + SISL_OPTIONS["n_fibers"].as< uint32_t >(), + .num_fibers = 1 + SISL_OPTIONS["num_fibers"].as< uint32_t >(), .app_mem_size_mb = 0, .hugepage_size_mb = 0}); From 02b16cca7ed27e519932c4a5c21119d880308bc7 Mon Sep 17 00:00:00 2001 From: Mehdi Hosseini <116847813+shosseinimotlagh@users.noreply.github.com> Date: Mon, 13 Nov 2023 12:56:31 -0800 Subject: [PATCH 2/3] Long running for index service. (#219) * Main changes add concurrent UT for index tree change btree_test.py (add device list and run time) introduce run_time --- conanfile.py | 2 +- src/tests/btree_helpers/btree_test_helper.hpp | 7 +- .../test_common/homestore_test_common.hpp | 11 ++- src/tests/test_index_btree.cpp | 97 +++++++++++++++++++ src/tests/test_mem_btree.cpp | 4 +- src/tests/test_scripts/btree_test.py | 35 ++++--- 6 files changed, 136 insertions(+), 20 deletions(-) diff --git a/conanfile.py b/conanfile.py index b4477e155..3ce5f3196 100644 --- a/conanfile.py +++ b/conanfile.py @@ -5,7 +5,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "4.8.2" + version = "4.8.3" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" diff --git a/src/tests/btree_helpers/btree_test_helper.hpp b/src/tests/btree_helpers/btree_test_helper.hpp index 1072ef981..606aceba0 100644 --- a/src/tests/btree_helpers/btree_test_helper.hpp +++ b/src/tests/btree_helpers/btree_test_helper.hpp @@ -51,6 +51,7 @@ struct BtreeTestHelper : public testing::Test { if (m_is_multi_threaded) { std::mutex mtx; + m_run_time = SISL_OPTIONS["run_time"].as< uint32_t >(); iomanager.run_on_wait(iomgr::reactor_regex::all_io, [this, &mtx]() { auto fv = iomanager.sync_io_capable_fibers(); std::unique_lock lg(mtx); @@ -73,6 +74,7 @@ struct BtreeTestHelper : public testing::Test { BtreeConfig m_cfg{g_node_size}; uint32_t m_max_range_input{1000}; bool m_is_multi_threaded{false}; + uint32_t m_run_time{0}; std::map< std::string, op_func_t > m_operations; std::vector< iomgr::io_fiber_t > m_fibers; @@ -378,8 +380,9 @@ struct BtreeTestHelper : public testing::Test { // Construct a weighted distribution based on the input frequencies std::discrete_distribution< uint32_t > s_rand_op_generator(weights.begin(), weights.end()); - - for (uint32_t i = 0; i < num_iters_per_thread; i++) { + auto m_start_time = Clock::now(); + auto time_to_stop = [this, m_start_time]() {return (get_elapsed_time_sec(m_start_time) > m_run_time);}; + for (uint32_t i = 0; i < num_iters_per_thread && !time_to_stop(); i++) { uint32_t op_idx = s_rand_op_generator(re); (this->m_operations[op_list[op_idx].first])(); } diff --git a/src/tests/test_common/homestore_test_common.hpp b/src/tests/test_common/homestore_test_common.hpp index bdac774eb..84062b8a6 100644 --- a/src/tests/test_common/homestore_test_common.hpp +++ b/src/tests/test_common/homestore_test_common.hpp @@ -39,6 +39,8 @@ const std::string USER_WANT_DIRECT_IO{"USER_WANT_DIRECT_IO"}; // u SISL_OPTION_GROUP(test_common_setup, (num_threads, "", "num_threads", "number of threads", ::cxxopts::value< uint32_t >()->default_value("2"), "number"), + (num_fibers, "", "num_fibers", "number of fibers per thread", + ::cxxopts::value< uint32_t >()->default_value("2"), "number"), (num_devs, "", "num_devs", "number of devices to create", ::cxxopts::value< uint32_t >()->default_value("2"), "number"), (dev_size_mb, "", "dev_size_mb", "size of each device in MB", @@ -111,7 +113,8 @@ class HSTestHelper { hs_before_services_starting_cb_t cb = nullptr, bool restart = false) { auto const ndevices = SISL_OPTIONS["num_devs"].as< uint32_t >(); auto const dev_size = SISL_OPTIONS["dev_size_mb"].as< uint64_t >() * 1024 * 1024; - auto nthreads = SISL_OPTIONS["num_threads"].as< uint32_t >(); + auto num_threads = SISL_OPTIONS["num_threads"].as< uint32_t >(); + auto num_fibers = SISL_OPTIONS["num_fibers"].as< uint32_t >(); auto is_spdk = SISL_OPTIONS["spdk"].as< bool >(); if (restart) { @@ -145,11 +148,11 @@ class HSTestHelper { if (is_spdk) { LOGINFO("Spdk with more than 2 threads will cause overburden test systems, changing nthreads to 2"); - nthreads = 2; + num_threads = 2; } - LOGINFO("Starting iomgr with {} threads, spdk: {}", nthreads, is_spdk); - ioenvironment.with_iomgr(iomgr::iomgr_params{.num_threads = nthreads, .is_spdk = is_spdk}); + LOGINFO("Starting iomgr with {} threads, spdk: {}", num_threads, is_spdk); + ioenvironment.with_iomgr(iomgr::iomgr_params{.num_threads = num_threads, .is_spdk = is_spdk, .num_fibers = num_fibers}); auto const http_port = SISL_OPTIONS["http_port"].as< int >(); if (http_port != 0) { diff --git a/src/tests/test_index_btree.cpp b/src/tests/test_index_btree.cpp index 4bab73ad0..e110a931d 100644 --- a/src/tests/test_index_btree.cpp +++ b/src/tests/test_index_btree.cpp @@ -51,6 +51,12 @@ SISL_OPTION_GROUP(test_index_btree, ::cxxopts::value< uint32_t >()->default_value("500"), "number"), (num_entries, "", "num_entries", "number of entries to test with", ::cxxopts::value< uint32_t >()->default_value("5000"), "number"), + (run_time, "", "run_time", "run time for io", ::cxxopts::value< uint32_t >()->default_value("360000"), "seconds"), + (disable_merge, "", "disable_merge", "disable_merge", ::cxxopts::value< bool >()->default_value("0"), ""), + (operation_list, "", "operation_list", "operation list instead of default created following by percentage", + ::cxxopts::value< std::vector< std::string > >(), "operations [...]"), + (preload_size, "", "preload_size", "number of entries to preload tree with", + ::cxxopts::value< uint32_t >()->default_value("1000"), "number"), (seed, "", "seed", "random engine seed, use random if not defined", ::cxxopts::value< uint64_t >()->default_value("0"), "number")) @@ -449,6 +455,97 @@ TYPED_TEST(BtreeTest, ThreadedCpFlush) { LOGINFO("ThreadedCpFlush test end"); } +template < typename TestType > +struct BtreeConcurrentTest : public BtreeTestHelper< TestType > { + + using T = TestType; + using K = typename TestType::KeyType; + using V = typename TestType::ValueType; + class TestIndexServiceCallbacks : public IndexServiceCallbacks { + public: + TestIndexServiceCallbacks(BtreeConcurrentTest* test) : m_test(test) {} + std::shared_ptr< IndexTableBase > on_index_table_found(const superblk< index_table_sb >& sb) override { + LOGINFO("Index table recovered"); + LOGINFO("Root bnode_id {} version {}", sb->root_node, sb->link_version); + m_test->m_bt = std::make_shared< typename T::BtreeType >(sb, m_test->m_cfg); + return m_test->m_bt; + } + + private: + BtreeConcurrentTest* m_test; + }; + + BtreeConcurrentTest() { this->m_is_multi_threaded = true; } + + void SetUp() override { + test_common::HSTestHelper::start_homestore( + "test_index_btree", + {{HS_SERVICE::META, {.size_pct = 10.0}}, + {HS_SERVICE::INDEX, {.size_pct = 70.0, .index_svc_cbs = new TestIndexServiceCallbacks(this)}}}); + + LOGINFO("Node size {} ", hs()->index_service().node_size()); + this->m_cfg = BtreeConfig(hs()->index_service().node_size()); + + auto uuid = boost::uuids::random_generator()(); + auto parent_uuid = boost::uuids::random_generator()(); + + // Test cp flush of write back. + HS_SETTINGS_FACTORY().modifiable_settings([](auto& s) { + s.generic.cache_max_throttle_cnt = 10000; + HS_SETTINGS_FACTORY().save(); + }); + homestore::hs()->resource_mgr().reset_dirty_buf_qd(); + + // Create index table and attach to index service. + BtreeTestHelper< TestType >::SetUp(); + this->m_bt = std::make_shared< typename T::BtreeType >(uuid, parent_uuid, 0, this->m_cfg); + hs()->index_service().add_index_table(this->m_bt); + LOGINFO("Added index table to index service"); + } + + void TearDown() override { + BtreeTestHelper< TestType >::TearDown(); + test_common::HSTestHelper::shutdown_homestore(); + } +}; + +TYPED_TEST_SUITE(BtreeConcurrentTest, BtreeTypes); +TYPED_TEST(BtreeConcurrentTest, ConcurrentAllOps) { + // range put is not supported for non-extent keys + std::vector< std::string > input_ops = {"put:20", "remove:20", "range_put:20", "range_remove:20", "query:20"}; + std::vector< std::pair< std::string, int > > ops; + if (SISL_OPTIONS.count("operation_list")) { + input_ops = SISL_OPTIONS["operation_list"].as< std::vector< std::string > >(); + } + int total = std::accumulate(input_ops.begin(), input_ops.end(), 0, [](int sum, const auto& str) { + std::vector< std::string > tokens; + boost::split(tokens, str, boost::is_any_of(":")); + if (tokens.size() == 2) { + try { + return sum + std::stoi(tokens[1]); + } catch (const std::exception&) { + // Invalid frequency, ignore this element + } + } + return sum; // Ignore malformed strings + }); + + std::transform(input_ops.begin(), input_ops.end(), std::back_inserter(ops), [total](const auto& str) { + std::vector< std::string > tokens; + boost::split(tokens, str, boost::is_any_of(":")); + if (tokens.size() == 2) { + try { + return std::make_pair(tokens[0], (int)(100.0 * std::stoi(tokens[1]) / total)); + } catch (const std::exception&) { + // Invalid frequency, ignore this element + } + } + return std::make_pair(std::string(), 0); + }); + + this->multi_op_execute(ops); +} + int main(int argc, char* argv[]) { int parsed_argc{argc}; ::testing::InitGoogleTest(&parsed_argc, argv); diff --git a/src/tests/test_mem_btree.cpp b/src/tests/test_mem_btree.cpp index bb794fca3..94e78d53c 100644 --- a/src/tests/test_mem_btree.cpp +++ b/src/tests/test_mem_btree.cpp @@ -41,8 +41,8 @@ SISL_OPTION_GROUP( (num_entries, "", "num_entries", "number of entries to test with", ::cxxopts::value< uint32_t >()->default_value("10000"), "number"), (disable_merge, "", "disable_merge", "disable_merge", ::cxxopts::value< bool >()->default_value("0"), ""), - (n_threads, "", "num_threads", "number of threads", ::cxxopts::value< uint32_t >()->default_value("2"), "number"), - (n_fibers, "", "num_fibers", "number of fibers", ::cxxopts::value< uint32_t >()->default_value("10"), "number"), + (num_threads, "", "num_threads", "number of threads", ::cxxopts::value< uint32_t >()->default_value("2"), "number"), + (num_fibers, "", "num_fibers", "number of fibers", ::cxxopts::value< uint32_t >()->default_value("10"), "number"), (operation_list, "", "operation_list", "operation list instead of default created following by percentage", ::cxxopts::value< std::vector< std::string > >(), "operations [...]"), (preload_size, "", "preload_size", "number of entries to preload tree with", diff --git a/src/tests/test_scripts/btree_test.py b/src/tests/test_scripts/btree_test.py index e87059e12..ed0389f4b 100644 --- a/src/tests/test_scripts/btree_test.py +++ b/src/tests/test_scripts/btree_test.py @@ -12,16 +12,18 @@ opts, args = getopt.getopt(sys.argv[1:], 'tdlme:', ['test_suits=', 'dirpath=', 'op_list=', 'log_mods=', 'threads=', 'fibers=', 'preload_size=', - 'op_list=', 'num_entries=', 'num_iters=']) + 'op_list=', 'num_entries=', 'num_iters=', 'dev_list=', 'run_time=']) test_suits = "" dirpath = "./" op_list = "" log_mods = "" -threads = " --n_threads=10" -fibers = " --n_fibers=10" -preload_size = " --preload_size=2000" -num_entries = " --num_entries=10000" -num_iters = " --num_iters=1000000" +threads = " --num_threads=10" +fibers = " --num_fibers=10" +preload_size = " --preload_size=16384" +num_entries = " --num_entries=65536" +num_iters = " --num_iters=10000000" +run_time = " --run_time=36000" +dev_list = "" for opt, arg in opts: if opt in ('-t', '--test_suits'): @@ -38,13 +40,13 @@ log_mods = arg print("log_mods (%s)" % arg) if opt in ('-f', '--fibers'): - fibers = " --n_fibers=" + arg + fibers = " --num_fibers=" + arg print("number of fibers per thread (%s)" % arg) if opt in ('-p', '--preload_size'): preload_size = " --preload_size=" + arg print("preload_size = (%s)" % arg) if opt in ('-t', '--threads'): - threads = " --n_threads=" + arg + threads = " --num_threads=" + arg print("number of threads (%s)" % arg) if opt in ('-n', '--num_entries'): num_entries = " --num_entries=" + arg @@ -52,19 +54,30 @@ if opt in ('-i', '--num_iters'): num_iters = " --num_iters=" + arg print("number of iterations (%s)" % arg) + if opt in ('-r', '--run_time'): + run_time = " --run_time=" + arg + print("total run time (%s)" % arg) + if opt in ('-v', '--dev_list'): + dev_list = arg + print(("device list (%s)") % (arg)) operations = "" if bool(op_list and op_list.strip()): operations = ''.join([f' --operation_list={op}' for op in op_list.split()]) -btree_options = num_entries + num_iters + preload_size + fibers + threads + operations +addln_opts = ' ' +if bool(dev_list and dev_list.strip()): + addln_opts += ' --device_list ' + addln_opts += dev_list + +btree_options = num_entries + num_iters + preload_size + fibers + threads + operations + run_time + addln_opts def normal(): print("normal test started with (%s)" % btree_options) # " --operation_list=query:20 --operation_list=put:20 --operation_list=remove:20" - cmd_opts = " --gtest_filter=BtreeConcurrentTest/*.AllTree" + btree_options + " "+log_mods - subprocess.check_call(dirpath + "test_mem_btree " + cmd_opts, stderr=subprocess.STDOUT, shell=True) + cmd_opts = " --gtest_filter=BtreeConcurrentTest/*.ConcurrentAllOps" + btree_options + " "+log_mods + subprocess.check_call(dirpath + "test_index_btree " + cmd_opts, stderr=subprocess.STDOUT, shell=True) print("normal test completed") From 6897d3eca84506c0cbb36ec77fd38da03dc6aa80 Mon Sep 17 00:00:00 2001 From: Jie Yao Date: Wed, 15 Nov 2023 02:07:15 +0800 Subject: [PATCH 3/3] fix NDEBUG (#228) --- src/include/homestore/btree/detail/prefix_node.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/include/homestore/btree/detail/prefix_node.hpp b/src/include/homestore/btree/detail/prefix_node.hpp index 62003da7a..a72b13adb 100644 --- a/src/include/homestore/btree/detail/prefix_node.hpp +++ b/src/include/homestore/btree/detail/prefix_node.hpp @@ -771,7 +771,7 @@ class FixedPrefixNode : public VariantNode< K, V > { phdr->tail_slot = phdr->used_slots; } -#ifdef _DEBUG +#ifndef NDEBUG void validate_sanity() { uint32_t i{0}; // validate if keys are in ascending order