diff --git a/conanfile.py b/conanfile.py index 19e28962b..783fbbd23 100644 --- a/conanfile.py +++ b/conanfile.py @@ -5,7 +5,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "4.8.1" + version = "4.8.2" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" diff --git a/src/include/homestore/btree/detail/btree_node_mgr.ipp b/src/include/homestore/btree/detail/btree_node_mgr.ipp index e81da107f..826c29789 100644 --- a/src/include/homestore/btree/detail/btree_node_mgr.ipp +++ b/src/include/homestore/btree/detail/btree_node_mgr.ipp @@ -305,7 +305,7 @@ BtreeNode* Btree< K, V >::init_node(uint8_t* node_buf, uint32_t node_ctx_size, b /* Note:- This function assumes that access of this node is thread safe. */ template < typename K, typename V > void Btree< K, V >::free_node(const BtreeNodePtr& node, locktype_t cur_lock, void* context) { - BT_NODE_LOG(DEBUG, node, "Freeing node"); + BT_NODE_LOG(TRACE, node, "Freeing node"); COUNTER_DECREMENT_IF_ELSE(m_metrics, node->is_leaf(), btree_leaf_node_count, btree_int_node_count, 1); if (cur_lock != locktype_t::NONE) { diff --git a/src/include/homestore/index/index_internal.hpp b/src/include/homestore/index/index_internal.hpp index 2c8a09849..756f47bbd 100644 --- a/src/include/homestore/index/index_internal.hpp +++ b/src/include/homestore/index/index_internal.hpp @@ -64,29 +64,68 @@ enum class index_buf_state_t : uint8_t { }; ///////////////////////// Btree Node and Buffer Portion ////////////////////////// + + +// Multiple IndexBuffer could point to the same NodeBuffer if its clean. +struct NodeBuffer; +typedef std::shared_ptr< NodeBuffer > NodeBufferPtr; +struct NodeBuffer { + uint8_t* m_bytes{nullptr}; // Actual data buffer + std::atomic< index_buf_state_t > m_state{index_buf_state_t::CLEAN}; // Is buffer yet to persist? + NodeBuffer(uint32_t buf_size, uint32_t align_size); + ~NodeBuffer(); +}; + +// IndexBuffer is for each CP. The dependent index buffers are chained using +// m_next_buffer and each buffer is flushed only its wait_for_leaders reaches 0 +// which means all its dependent buffers are flushed. struct IndexBuffer; typedef std::shared_ptr< IndexBuffer > IndexBufferPtr; - struct IndexBuffer { - uint8_t* m_node_buf{nullptr}; // Actual buffer - index_buf_state_t m_buf_state{index_buf_state_t::CLEAN}; // Is buffer yet to persist? - BlkId m_blkid; // BlkId where this needs to be persisted - std::weak_ptr< IndexBuffer > m_next_buffer; // Next buffer in the chain + NodeBufferPtr m_node_buf; + BlkId m_blkid; // BlkId where this needs to be persisted + std::weak_ptr< IndexBuffer > m_next_buffer; // Next buffer in the chain // Number of leader buffers we are waiting for before we write this buffer sisl::atomic_counter< int > m_wait_for_leaders{0}; IndexBuffer(BlkId blkid, uint32_t buf_size, uint32_t align_size); + IndexBuffer(NodeBufferPtr node_buf, BlkId blkid); ~IndexBuffer(); BlkId blkid() const { return m_blkid; } - uint8_t* raw_buffer() { return m_node_buf; } + uint8_t* raw_buffer() { + RELEASE_ASSERT(m_node_buf, "Node buffer null blkid {}", m_blkid.to_integer()); + return m_node_buf->m_bytes; + } + + bool is_clean() const { + RELEASE_ASSERT(m_node_buf, "Node buffer null blkid {}", m_blkid.to_integer()); + return (m_node_buf->m_state.load() == index_buf_state_t::CLEAN); + } + + index_buf_state_t state() const { + RELEASE_ASSERT(m_node_buf, "Node buffer null blkid {}", m_blkid.to_integer()); + return m_node_buf->m_state; + } + + void set_state(index_buf_state_t state) { + RELEASE_ASSERT(m_node_buf, "Node buffer null blkid {}", m_blkid.to_integer()); + m_node_buf->m_state = state; + } - bool is_clean() const { return (m_buf_state == index_buf_state_t::CLEAN); } std::string to_string() const { - return fmt::format("IndexBuffer {} blkid={} state={} node_buf={} next_buffer={} wait_for={}", - reinterpret_cast< void* >(const_cast< IndexBuffer* >(this)), m_blkid.to_integer(), - static_cast< int >(m_buf_state), static_cast< void* >(m_node_buf), - voidptr_cast(m_next_buffer.lock().get()), m_wait_for_leaders.get()); + auto str = fmt::format("IndexBuffer {} blkid={}", reinterpret_cast< void* >(const_cast< IndexBuffer* >(this)), + m_blkid.to_integer()); + if (m_node_buf == nullptr) { + fmt::format_to(std::back_inserter(str), " node_buf=nullptr"); + } else { + fmt::format_to(std::back_inserter(str), " state={} node_buf={}", + static_cast< int >(m_node_buf->m_state.load()), static_cast< void* >(m_node_buf->m_bytes)); + } + fmt::format_to(std::back_inserter(str), " next_buffer={} wait_for={}", + m_next_buffer.lock() ? reinterpret_cast< void* >(m_next_buffer.lock().get()) : 0, + m_wait_for_leaders.get()); + return str; } }; diff --git a/src/include/homestore/index/index_table.hpp b/src/include/homestore/index/index_table.hpp index 246557a62..bce7e8f36 100644 --- a/src/include/homestore/index/index_table.hpp +++ b/src/include/homestore/index/index_table.hpp @@ -114,7 +114,7 @@ class IndexTable : public IndexTableBase, public Btree< K, V > { // Need to put it in wb cache wb_cache().write_buf(node, idx_node->m_idx_buf, cp_ctx); idx_node->m_last_mod_cp_id = cp_ctx->id(); - LOGTRACEMOD(wbcache, "{}", idx_node->m_idx_buf->to_string()); + LOGTRACEMOD(wbcache, "add to dirty list cp {} {}", cp_ctx->id(), idx_node->m_idx_buf->to_string()); } node->set_checksum(this->m_bt_cfg); return btree_status_t::success; @@ -129,17 +129,28 @@ class IndexTable : public IndexTableBase, public Btree< K, V > { auto& left_child_buf = left_child_idx_node->m_idx_buf; auto& parent_buf = parent_idx_node->m_idx_buf; - LOGTRACEMOD(wbcache, "left {} parent {} ", left_child_buf->to_string(), parent_buf->to_string()); - // Write new nodes in the list as standalone outside transacted pairs. // Write the new right child nodes, left node and parent in order. + // Create the relationship of right child to the left node via prepend_to_chain below. + // Parent and left node are linked in the prepare_node_txn for (const auto& right_child_node : new_nodes) { auto right_child = IndexBtreeNode::convert(right_child_node.get()); write_node_impl(right_child_node, context); wb_cache().prepend_to_chain(right_child->m_idx_buf, left_child_buf); - LOGTRACEMOD(wbcache, "right {} left {} ", right_child->m_idx_buf->to_string(), left_child_buf->to_string()); } + auto trace_index_bufs = [&]() { + std::string str; + str = fmt::format("cp {} left {} parent {}", cp_ctx->id(), left_child_buf->to_string(), + parent_buf->to_string()); + for (const auto& right_child_node : new_nodes) { + auto right_child = IndexBtreeNode::convert(right_child_node.get()); + fmt::format_to(std::back_inserter(str), " right {}", right_child->m_idx_buf->to_string()); + } + return str; + }; + + LOGTRACEMOD(wbcache, "{}", trace_index_bufs()); write_node_impl(left_child_node, context); write_node_impl(parent_node, context); @@ -181,18 +192,17 @@ class IndexTable : public IndexTableBase, public Btree< K, V > { // realloc_node(node); // } - // If the backing buffer is already in a clean state, we don't need to make a copy of it - if (idx_node->m_idx_buf->is_clean()) { return btree_status_t::success; } - - // Make a new btree buffer and copy the contents and swap it to make it the current node's buffer. The + // We create IndexBuffer for each CP. But if the backing buffer is already in a clean state + // we dont copy the node buffer. Copy buffer will handle it. If the node buffer is dirty, + // make a new btree buffer and copy the contents and swap it to make it the current node's buffer. The // buffer prior to this copy, would have been written and already added into the dirty buffer list. - idx_node->m_idx_buf = wb_cache().copy_buffer(idx_node->m_idx_buf); + idx_node->m_idx_buf = wb_cache().copy_buffer(idx_node->m_idx_buf, cp_ctx); idx_node->m_last_mod_cp_id = -1; node->m_phys_node_buf = idx_node->m_idx_buf->raw_buffer(); node->set_checksum(this->m_bt_cfg); - LOGTRACEMOD(wbcache, "buf {} ", idx_node->m_idx_buf->to_string()); + LOGTRACEMOD(wbcache, "cp {} {} ", cp_ctx->id(), idx_node->m_idx_buf->to_string()); #ifndef NO_CHECKSUM if (!node->verify_node(this->m_bt_cfg)) { @@ -221,6 +231,8 @@ class IndexTable : public IndexTableBase, public Btree< K, V > { auto& child_buf = child_idx_node->m_idx_buf; auto& parent_buf = parent_idx_node->m_idx_buf; + LOGTRACEMOD(wbcache, "cp {} left {} parent {} ", cp_ctx->id(), child_buf->to_string(), parent_buf->to_string()); + auto [child_copied, parent_copied] = wb_cache().create_chain(child_buf, parent_buf, cp_ctx); if (child_copied) { child_node->m_phys_node_buf = child_buf->raw_buffer(); @@ -231,7 +243,6 @@ class IndexTable : public IndexTableBase, public Btree< K, V > { parent_idx_node->m_last_mod_cp_id = -1; } - LOGTRACEMOD(wbcache, "child {} parent {} ", child_buf->to_string(), parent_buf->to_string()); return btree_status_t::success; } diff --git a/src/include/homestore/index/wb_cache_base.hpp b/src/include/homestore/index/wb_cache_base.hpp index f59d4b1ce..caee6c557 100644 --- a/src/include/homestore/index/wb_cache_base.hpp +++ b/src/include/homestore/index/wb_cache_base.hpp @@ -52,13 +52,13 @@ class IndexWBCacheBase { /// @brief Start a chain of related btree buffers. Typically a chain is creating from second and third pairs and /// then first is prepended to the chain. In case the second buffer is already with the WB cache, it will create a - /// new buffer for both second and third. + /// new buffer for both second and third. We append the buffers to a list in dependency chain. /// @param second Second btree buffer in the chain. It will be updated to copy of second buffer if buffer already /// has dependencies. /// @param third Thrid btree buffer in the chain. It will be updated to copy of third buffer if buffer already /// has dependencies. /// @return Returns if the buffer had to be copied - virtual std::tuple< bool, bool > create_chain(IndexBufferPtr& second, IndexBufferPtr& third, CPContext* cp_ctx) = 0; + virtual std::pair< bool, bool > create_chain(IndexBufferPtr& second, IndexBufferPtr& third, CPContext* cp_ctx) = 0; /// @brief Prepend to the chain that was already created with second /// @param first @@ -73,7 +73,7 @@ class IndexWBCacheBase { /// @brief Copy buffer /// @param cur_buf /// @return - virtual IndexBufferPtr copy_buffer(const IndexBufferPtr& cur_buf) const = 0; + virtual IndexBufferPtr copy_buffer(const IndexBufferPtr& cur_buf, const CPContext* context) const = 0; }; } // namespace homestore diff --git a/src/include/homestore/index_service.hpp b/src/include/homestore/index_service.hpp index 9952e3852..09543a5fb 100644 --- a/src/include/homestore/index_service.hpp +++ b/src/include/homestore/index_service.hpp @@ -22,7 +22,7 @@ #include #include #include - +#include namespace homestore { class IndexWBCacheBase; diff --git a/src/lib/checkpoint/cp.hpp b/src/lib/checkpoint/cp.hpp index 5644adac7..05bda5be7 100644 --- a/src/lib/checkpoint/cp.hpp +++ b/src/lib/checkpoint/cp.hpp @@ -21,7 +21,7 @@ #include #include - +#include #include "common/homestore_assert.hpp" /* @@ -73,7 +73,7 @@ struct CP { bool m_cp_waiting_to_trigger{false}; // it is waiting for previous cp to complete cp_id_t m_cp_id; std::array< std::unique_ptr< CPContext >, (size_t)cp_consumer_t::SENTINEL > m_contexts; - folly::Promise< bool > m_comp_promise; + folly::SharedPromise< bool > m_comp_promise; public: CP(CPManager* mgr) : m_cp_mgr{mgr} {} diff --git a/src/lib/checkpoint/cp_mgr.cpp b/src/lib/checkpoint/cp_mgr.cpp index 89921620a..d26424248 100644 --- a/src/lib/checkpoint/cp_mgr.cpp +++ b/src/lib/checkpoint/cp_mgr.cpp @@ -142,8 +142,11 @@ folly::Future< bool > CPManager::trigger_cp_flush(bool force) { std::unique_lock< std::mutex > lk(trigger_cp_mtx); auto cur_cp = cp_guard(); HS_DBG_ASSERT_NE(cur_cp->m_cp_status, cp_status_t::cp_flush_prepare); - cur_cp->m_comp_promise = std::move(folly::Promise< bool >{}); - cur_cp->m_cp_waiting_to_trigger = true; + // If multiple threads call trigger, they all get the future from the same promise. + if (!cur_cp->m_cp_waiting_to_trigger) { + cur_cp->m_comp_promise = std::move(folly::SharedPromise< bool >{}); + cur_cp->m_cp_waiting_to_trigger = true; + } return cur_cp->m_comp_promise.getFuture(); } else { return folly::makeFuture< bool >(false); @@ -177,7 +180,7 @@ folly::Future< bool > CPManager::trigger_cp_flush(bool force) { // originally by the caller will be untouched and completed upto CP completion/ ret_fut = folly::makeFuture< bool >(true); } else { - cur_cp->m_comp_promise = std::move(folly::Promise< bool >{}); + cur_cp->m_comp_promise = std::move(folly::SharedPromise< bool >{}); ret_fut = cur_cp->m_comp_promise.getFuture(); } cur_cp->m_cp_status = cp_status_t::cp_flush_prepare; diff --git a/src/lib/index/index_cp.hpp b/src/lib/index/index_cp.hpp index f06f53091..d7aad88dc 100644 --- a/src/lib/index/index_cp.hpp +++ b/src/lib/index/index_cp.hpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include "checkpoint/cp.hpp" @@ -32,20 +33,18 @@ struct IndexCPContext : public VDevCPContext { std::atomic< uint64_t > m_num_nodes_removed{0}; sisl::ConcurrentInsertVector< IndexBufferPtr > m_dirty_buf_list; sisl::atomic_counter< int64_t > m_dirty_buf_count{0}; - IndexBufferPtr m_last_in_chain; std::mutex m_flush_buffer_mtx; sisl::ConcurrentInsertVector< IndexBufferPtr >::iterator m_dirty_buf_it; public: + IndexCPContext(CP* cp) : VDevCPContext(cp) {} virtual ~IndexCPContext() = default; void add_to_dirty_list(const IndexBufferPtr& buf) { - buf->m_buf_state = index_buf_state_t::DIRTY; m_dirty_buf_list.push_back(buf); + buf->set_state(index_buf_state_t::DIRTY); m_dirty_buf_count.increment(1); - m_last_in_chain = buf; - LOGTRACEMOD(wbcache, "{}", buf->to_string()); } bool any_dirty_buffers() const { return !m_dirty_buf_count.testz(); } @@ -59,15 +58,102 @@ struct IndexCPContext : public VDevCPContext { return ret; } - std::string to_string() const { + std::string to_string() { std::string str{fmt::format("IndexCPContext cpid={} dirty_buf_count={} dirty_buf_list_size={}", m_cp->id(), m_dirty_buf_count.get(), m_dirty_buf_list.size())}; - // TODO dump all index buffers. + // Mapping from a node to all its parents in the graph. + // Display all buffers and its dependencies and state. + std::unordered_map< IndexBuffer*, std::vector< IndexBuffer* > > parents; + + auto it = m_dirty_buf_list.begin(); + while (it != m_dirty_buf_list.end()) { + // Add this buf to his children. + IndexBufferPtr buf = *it; + parents[buf->m_next_buffer.lock().get()].emplace_back(buf.get()); + ++it; + } + + it = m_dirty_buf_list.begin(); + while (it != m_dirty_buf_list.end()) { + IndexBufferPtr buf = *it; + fmt::format_to(std::back_inserter(str), "{}", buf->to_string()); + auto first = true; + for (const auto& p : parents[buf.get()]) { + if (first) { + fmt::format_to(std::back_inserter(str), "\nDepends:"); + first = false; + } + fmt::format_to(std::back_inserter(str), " {}({})", r_cast< void* >(p), s_cast< int >(p->state())); + } + fmt::format_to(std::back_inserter(str), "\n"); + ++it; + } + return str; } + + void check_cycle() { + // Use dfs to find if the graph is cycle + auto it = m_dirty_buf_list.begin(); + while (it != m_dirty_buf_list.end()) { + IndexBufferPtr buf = *it;; + std::set< IndexBuffer* > visited; + check_cycle_recurse(buf, visited); + ++it; + } + } + + void check_cycle_recurse(IndexBufferPtr buf, std::set< IndexBuffer* >& visited) const { + if (visited.count(buf.get()) != 0) { + LOGERROR("Cycle found for {}", buf->to_string()); + for (auto& x : visited) { + LOGERROR("Path : {}", x->to_string()); + } + return; + } + + visited.insert(buf.get()); + if (buf->m_next_buffer.lock()) { check_cycle_recurse(buf->m_next_buffer.lock(), visited); } + } + + void check_wait_for_leaders() { + // Use the next buffer as indegree to find if wait_for_leaders is invalid. + std::unordered_map< IndexBuffer*, int > wait_for_leaders; + IndexBufferPtr buf; + + // Store the wait for leader count for each buffer. + auto it = m_dirty_buf_list.begin(); + while (it != m_dirty_buf_list.end()) { + buf = *it; + wait_for_leaders[buf.get()] = buf->m_wait_for_leaders.get(); + ++it; + } + + // Decrement the count using the next buffer. + it = m_dirty_buf_list.begin(); + while (it != m_dirty_buf_list.end()) { + buf = *it; + auto next_buf = buf->m_next_buffer.lock(); + if (next_buf.get() == nullptr) continue; + wait_for_leaders[next_buf.get()]--; + ++it; + } + + bool issue = false; + for (const auto& [buf, waits] : wait_for_leaders) { + // Any value other than zero means the dependency graph is invalid. + if (waits != 0) { + issue = true; + LOGERROR("Leaders wait not zero cp {} buf {} waits {}", id(), buf->to_string(), waits); + } + } + + RELEASE_ASSERT_EQ(issue, false, "Found issue with wait_for_leaders"); + } }; + class IndexWBCache; class IndexCPCallbacks : public CPCallbacks { public: diff --git a/src/lib/index/index_service.cpp b/src/lib/index/index_service.cpp index d3d0984b5..bfa96f8bc 100644 --- a/src/lib/index/index_service.cpp +++ b/src/lib/index/index_service.cpp @@ -82,7 +82,9 @@ void IndexService::stop() { HS_REL_ASSERT_EQ(success, true, "CP Flush failed"); LOGINFO("CP Flush completed"); - for (auto [id, tbl] : m_index_map) { tbl->destroy(); } + for (auto [id, tbl] : m_index_map) { + tbl->destroy(); + } } void IndexService::add_index_table(const std::shared_ptr< IndexTableBase >& tbl) { std::unique_lock lg(m_index_map_mtx); @@ -107,9 +109,16 @@ uint64_t IndexService::used_size() const { return size; } +NodeBuffer::NodeBuffer(uint32_t buf_size, uint32_t align_size) : + m_bytes{hs_utils::iobuf_alloc(buf_size, sisl::buftag::btree_node, align_size)} {} + +NodeBuffer::~NodeBuffer() { hs_utils::iobuf_free(m_bytes, sisl::buftag::btree_node); } + IndexBuffer::IndexBuffer(BlkId blkid, uint32_t buf_size, uint32_t align_size) : - m_node_buf{hs_utils::iobuf_alloc(buf_size, sisl::buftag::btree_node, align_size)}, m_blkid{blkid} {} + m_node_buf{std::make_shared< NodeBuffer >(buf_size, align_size)}, m_blkid{blkid} {} + +IndexBuffer::IndexBuffer(NodeBufferPtr node_buf, BlkId blkid) : m_node_buf(node_buf), m_blkid(blkid) {} -IndexBuffer::~IndexBuffer() { hs_utils::iobuf_free(m_node_buf, sisl::buftag::btree_node); } +IndexBuffer::~IndexBuffer() { m_node_buf.reset(); } } // namespace homestore diff --git a/src/lib/index/wb_cache.cpp b/src/lib/index/wb_cache.cpp index a00a46dfa..12cbbfc31 100644 --- a/src/lib/index/wb_cache.cpp +++ b/src/lib/index/wb_cache.cpp @@ -84,7 +84,6 @@ BtreeNodePtr IndexWBCache::alloc_buf(node_initializer_t&& node_initializer) { // Alloc buffer and initialize the node auto idx_buf = std::make_shared< IndexBuffer >(blkid, m_node_size, m_vdev->align_size()); auto node = node_initializer(idx_buf); - LOGTRACEMOD(wbcache, "idx_buf {} blkid {}", static_cast< void* >(idx_buf.get()), blkid.to_integer()); // Add the node to the cache bool done = m_cache.insert(node); @@ -101,16 +100,31 @@ void IndexWBCache::realloc_buf(const IndexBufferPtr& buf) { } void IndexWBCache::write_buf(const BtreeNodePtr& node, const IndexBufferPtr& buf, CPContext* cp_ctx) { + // TODO upsert always returns false even if it succeeds. m_cache.upsert(node); r_cast< IndexCPContext* >(cp_ctx)->add_to_dirty_list(buf); resource_mgr().inc_dirty_buf_size(m_node_size); } -IndexBufferPtr IndexWBCache::copy_buffer(const IndexBufferPtr& cur_buf) const { - auto new_buf = std::make_shared< IndexBuffer >(cur_buf->m_blkid, m_node_size, m_vdev->align_size()); - std::memcpy(new_buf->raw_buffer(), cur_buf->raw_buffer(), m_node_size); - LOGTRACEMOD(wbcache, "new_buf {} cur_buf {} cur_buf_blkid {}", static_cast< void* >(new_buf.get()), - static_cast< void* >(cur_buf.get()), cur_buf->m_blkid.to_integer()); +IndexBufferPtr IndexWBCache::copy_buffer(const IndexBufferPtr& cur_buf, const CPContext* cp_ctx) const { + IndexBufferPtr new_buf = nullptr; + bool copied = false; + + // When we copy the buffer we check if the node buffer is clean or not. If its clean + // we could reuse it otherwise create a copy. + if (cur_buf->is_clean()) { + // Refer to the same node buffer. + new_buf = std::make_shared< IndexBuffer >(cur_buf->m_node_buf, cur_buf->m_blkid); + } else { + // If its not clean, we do deep copy. + new_buf = std::make_shared< IndexBuffer >(cur_buf->m_blkid, m_node_size, m_vdev->align_size()); + std::memcpy(new_buf->raw_buffer(), cur_buf->raw_buffer(), m_node_size); + copied = true; + } + + LOGTRACEMOD(wbcache, "cp {} new_buf {} cur_buf {} cur_buf_blkid {} copied {}", cp_ctx->id(), + static_cast< void* >(new_buf.get()), static_cast< void* >(cur_buf.get()), cur_buf->m_blkid.to_integer(), + copied); return new_buf; } @@ -138,32 +152,34 @@ void IndexWBCache::read_buf(bnodeid_t id, BtreeNodePtr& node, node_initializer_t } } -std::tuple< bool, bool > IndexWBCache::create_chain(IndexBufferPtr& second, IndexBufferPtr& third, CPContext* cp_ctx) { +std::pair< bool, bool > IndexWBCache::create_chain(IndexBufferPtr& second, IndexBufferPtr& third, CPContext* cp_ctx) { bool second_copied{false}, third_copied{false}; - + auto chain = second; + auto old_third = third; if (!second->is_clean()) { - auto new_second = copy_buffer(second); - LOGTRACEMOD(wbcache, "second copied blkid {} {} new_second {}", second->m_blkid.to_integer(), - static_cast< void* >(second.get()), static_cast< void* >(new_second.get())); + auto new_second = copy_buffer(second, cp_ctx); second = new_second; second_copied = true; } + if (!third->is_clean()) { - auto new_third = copy_buffer(third); - LOGTRACEMOD(wbcache, "third copied blkid {} {} new_third {}", third->m_blkid.to_integer(), - static_cast< void* >(third.get()), static_cast< void* >(new_third.get())); + auto new_third = copy_buffer(third, cp_ctx); third = new_third; third_copied = true; } // Append parent(third) to the left child(second). - prepend_to_chain(second, third); + second->m_next_buffer = third; + third->m_wait_for_leaders.increment(1); + if (chain != second) { + // We want buffers to be append to the end of the chain which are related. + // If we split a node multiple times in same or different CP's, each dirty buffer will be + // added to the end of that chain. + while (chain->m_next_buffer.lock() != nullptr) { + chain = chain->m_next_buffer.lock(); + } - // TODO the index buffer are added to end of the chain, instead add to the dependency. - auto& last_in_chain = r_cast< IndexCPContext* >(cp_ctx)->m_last_in_chain; - if (last_in_chain) { - // Add this to the end of the chain. - last_in_chain->m_next_buffer = second; + chain->m_next_buffer = second; second->m_wait_for_leaders.increment(1); } @@ -171,10 +187,10 @@ std::tuple< bool, bool > IndexWBCache::create_chain(IndexBufferPtr& second, Inde } void IndexWBCache::prepend_to_chain(const IndexBufferPtr& first, const IndexBufferPtr& second) { + assert(first->m_next_buffer.lock() != second); assert(first->m_next_buffer.lock() == nullptr); first->m_next_buffer = second; second->m_wait_for_leaders.increment(1); - LOGTRACEMOD(wbcache, "first {} second {}", first->to_string(), second->to_string()); } void IndexWBCache::free_buf(const IndexBufferPtr& buf, CPContext* cp_ctx) { @@ -187,6 +203,7 @@ void IndexWBCache::free_buf(const IndexBufferPtr& buf, CPContext* cp_ctx) { } //////////////////// CP Related API section ///////////////////////////////// + folly::Future< bool > IndexWBCache::async_cp_flush(IndexCPContext* cp_ctx) { LOGTRACEMOD(wbcache, "cp_ctx {}", cp_ctx->to_string()); if (!cp_ctx->any_dirty_buffers()) { @@ -194,6 +211,13 @@ folly::Future< bool > IndexWBCache::async_cp_flush(IndexCPContext* cp_ctx) { return folly::makeFuture< bool >(true); // nothing to flush } +#ifndef NDEBUG + // Check no cycles or invalid wait_for_leader count in the dirty buffer + // dependency graph. + // cp_ctx->check_wait_for_leaders(); + // cp_ctx->check_cycle(); +#endif + cp_ctx->prepare_flush_iteration(); for (auto& fiber : m_cp_flush_fibers) { @@ -211,22 +235,22 @@ folly::Future< bool > IndexWBCache::async_cp_flush(IndexCPContext* cp_ctx) { return std::move(cp_ctx->get_future()); } -void IndexWBCache::do_flush_one_buf(IndexCPContext* cp_ctx, const IndexBufferPtr& buf, bool part_of_batch) { - LOGTRACEMOD(wbcache, "buf {}", buf->to_string()); - buf->m_buf_state = index_buf_state_t::FLUSHING; +void IndexWBCache::do_flush_one_buf(IndexCPContext* cp_ctx, IndexBufferPtr buf, bool part_of_batch) { + LOGTRACEMOD(wbcache, "cp {} buf {}", cp_ctx->id(), buf->to_string()); + buf->set_state(index_buf_state_t::FLUSHING); m_vdev->async_write(r_cast< const char* >(buf->raw_buffer()), m_node_size, buf->m_blkid, part_of_batch) - .thenValue([pbuf = buf.get(), cp_ctx](auto) { + .thenValue([buf, cp_ctx](auto) { auto& pthis = s_cast< IndexWBCache& >(wb_cache()); // Avoiding more than 16 bytes capture - pthis.process_write_completion(cp_ctx, pbuf); + pthis.process_write_completion(cp_ctx, buf); }); if (!part_of_batch) { m_vdev->submit_batch(); } } -void IndexWBCache::process_write_completion(IndexCPContext* cp_ctx, IndexBuffer* pbuf) { - LOGTRACEMOD(wbcache, "buf {}", pbuf->to_string()); +void IndexWBCache::process_write_completion(IndexCPContext* cp_ctx, IndexBufferPtr buf) { + LOGTRACEMOD(wbcache, "cp {} buf {}", cp_ctx->id(), buf->to_string()); resource_mgr().dec_dirty_buf_size(m_node_size); - auto [next_buf, has_more] = on_buf_flush_done(cp_ctx, pbuf); + auto [next_buf, has_more] = on_buf_flush_done(cp_ctx, buf); if (next_buf) { do_flush_one_buf(cp_ctx, next_buf, false); } else if (!has_more) { @@ -240,7 +264,7 @@ void IndexWBCache::process_write_completion(IndexCPContext* cp_ctx, IndexBuffer* } } -std::pair< IndexBufferPtr, bool > IndexWBCache::on_buf_flush_done(IndexCPContext* cp_ctx, IndexBuffer* buf) { +std::pair< IndexBufferPtr, bool > IndexWBCache::on_buf_flush_done(IndexCPContext* cp_ctx, IndexBufferPtr& buf) { if (m_cp_flush_fibers.size() > 1) { std::unique_lock lg(m_flush_mtx); return on_buf_flush_done_internal(cp_ctx, buf); @@ -249,9 +273,10 @@ std::pair< IndexBufferPtr, bool > IndexWBCache::on_buf_flush_done(IndexCPContext } } -std::pair< IndexBufferPtr, bool > IndexWBCache::on_buf_flush_done_internal(IndexCPContext* cp_ctx, IndexBuffer* buf) { +std::pair< IndexBufferPtr, bool > IndexWBCache::on_buf_flush_done_internal(IndexCPContext* cp_ctx, + IndexBufferPtr& buf) { static thread_local std::vector< IndexBufferPtr > t_buf_list; - buf->m_buf_state = index_buf_state_t::CLEAN; + buf->set_state(index_buf_state_t::CLEAN); t_buf_list.clear(); @@ -272,7 +297,7 @@ void IndexWBCache::get_next_bufs(IndexCPContext* cp_ctx, uint32_t max_count, std } } -void IndexWBCache::get_next_bufs_internal(IndexCPContext* cp_ctx, uint32_t max_count, IndexBuffer* prev_flushed_buf, +void IndexWBCache::get_next_bufs_internal(IndexCPContext* cp_ctx, uint32_t max_count, IndexBufferPtr prev_flushed_buf, std::vector< IndexBufferPtr >& bufs) { uint32_t count{0}; diff --git a/src/lib/index/wb_cache.hpp b/src/lib/index/wb_cache.hpp index 7639d0714..9a652ce2a 100644 --- a/src/lib/index/wb_cache.hpp +++ b/src/lib/index/wb_cache.hpp @@ -49,23 +49,23 @@ class IndexWBCache : public IndexWBCacheBase { void realloc_buf(const IndexBufferPtr& buf) override; void write_buf(const BtreeNodePtr& node, const IndexBufferPtr& buf, CPContext* cp_ctx) override; void read_buf(bnodeid_t id, BtreeNodePtr& node, node_initializer_t&& node_initializer) override; - std::tuple< bool, bool > create_chain(IndexBufferPtr& second, IndexBufferPtr& third, CPContext* cp_ctx) override; + std::pair< bool, bool > create_chain(IndexBufferPtr& second, IndexBufferPtr& third, CPContext* cp_ctx) override; void prepend_to_chain(const IndexBufferPtr& first, const IndexBufferPtr& second) override; void free_buf(const IndexBufferPtr& buf, CPContext* cp_ctx) override; //////////////////// CP Related API section ///////////////////////////////// folly::Future< bool > async_cp_flush(IndexCPContext* context); - IndexBufferPtr copy_buffer(const IndexBufferPtr& cur_buf) const; + IndexBufferPtr copy_buffer(const IndexBufferPtr& cur_buf, const CPContext *cp_ctx) const; private: void start_flush_threads(); - void process_write_completion(IndexCPContext* cp_ctx, IndexBuffer* pbuf); - void do_flush_one_buf(IndexCPContext* cp_ctx, const IndexBufferPtr& buf, bool part_of_batch); - std::pair< IndexBufferPtr, bool > on_buf_flush_done(IndexCPContext* cp_ctx, IndexBuffer* buf); - std::pair< IndexBufferPtr, bool > on_buf_flush_done_internal(IndexCPContext* cp_ctx, IndexBuffer* buf); + void process_write_completion(IndexCPContext* cp_ctx, IndexBufferPtr pbuf); + void do_flush_one_buf(IndexCPContext* cp_ctx, const IndexBufferPtr buf, bool part_of_batch); + std::pair< IndexBufferPtr, bool > on_buf_flush_done(IndexCPContext* cp_ctx, IndexBufferPtr& buf); + std::pair< IndexBufferPtr, bool > on_buf_flush_done_internal(IndexCPContext* cp_ctx, IndexBufferPtr& buf); void get_next_bufs(IndexCPContext* cp_ctx, uint32_t max_count, std::vector< IndexBufferPtr >& bufs); - void get_next_bufs_internal(IndexCPContext* cp_ctx, uint32_t max_count, IndexBuffer* prev_flushed_buf, + void get_next_bufs_internal(IndexCPContext* cp_ctx, uint32_t max_count, IndexBufferPtr prev_flushed_buf, std::vector< IndexBufferPtr >& bufs); }; } // namespace homestore diff --git a/src/tests/btree_helpers/btree_test_helper.hpp b/src/tests/btree_helpers/btree_test_helper.hpp index 3bc943fc0..1072ef981 100644 --- a/src/tests/btree_helpers/btree_test_helper.hpp +++ b/src/tests/btree_helpers/btree_test_helper.hpp @@ -41,7 +41,7 @@ struct BtreeTestHelper : public testing::Test { using mutex = iomgr::FiberManagerLib::shared_mutex; using op_func_t = std::function< void(void) >; - BtreeTestHelper() : testing::Test(), m_range_scheduler{SISL_OPTIONS["num_entries"].as< uint32_t >()} {} + BtreeTestHelper() : testing::Test(), m_shadow_map{SISL_OPTIONS["num_entries"].as< uint32_t >()} {} void SetUp() override { m_cfg.m_leaf_node_type = T::leaf_node_type; @@ -71,7 +71,6 @@ struct BtreeTestHelper : public testing::Test { std::shared_ptr< typename T::BtreeType > m_bt; ShadowMap< K, V > m_shadow_map; BtreeConfig m_cfg{g_node_size}; - RangeScheduler m_range_scheduler; uint32_t m_max_range_input{1000}; bool m_is_multi_threaded{false}; @@ -94,7 +93,6 @@ struct BtreeTestHelper : public testing::Test { iomanager.run_on_forget(m_fibers[i], [this, start_range, end_range, &test_count]() { for (uint32_t i = start_range; i < end_range; i++) { put(i, btree_put_type::INSERT); - m_range_scheduler.put_key(i); } { std::unique_lock lg(m_test_done_mtx); @@ -114,7 +112,7 @@ struct BtreeTestHelper : public testing::Test { void put(uint64_t k, btree_put_type put_type) { do_put(k, put_type, V::generate_rand()); } void put_random() { - auto [start_k, end_k] = m_range_scheduler.pick_random_non_existing_keys(1); + auto [start_k, end_k] = m_shadow_map.pick_random_non_existing_keys(1); RELEASE_ASSERT_EQ(start_k, end_k, "Range scheduler pick_random_non_existing_keys issue"); do_put(start_k, btree_put_type::INSERT, V::generate_rand()); @@ -132,10 +130,8 @@ struct BtreeTestHelper : public testing::Test { if (update) { m_shadow_map.range_update(start_key, nkeys, value); - m_range_scheduler.remove_keys_from_working(start_k, end_k); } else { m_shadow_map.range_upsert(start_k, nkeys, value); - m_range_scheduler.put_keys(start_k, end_k); } } @@ -146,8 +142,8 @@ struct BtreeTestHelper : public testing::Test { static thread_local std::uniform_int_distribution< uint32_t > s_rand_range_generator{1, 50}; auto const [start_k, end_k] = is_update - ? m_range_scheduler.pick_random_existing_keys(s_rand_range_generator(m_re)) - : m_range_scheduler.pick_random_non_working_keys(s_rand_range_generator(m_re)); + ? m_shadow_map.pick_random_existing_keys(s_rand_range_generator(m_re)) + : m_shadow_map.pick_random_non_working_keys(s_rand_range_generator(m_re)); range_put(start_k, end_k, V::generate_rand(), is_update); } @@ -167,15 +163,13 @@ struct BtreeTestHelper : public testing::Test { m_shadow_map.validate_data(rreq.key(), (const V&)rreq.value()); m_shadow_map.erase(rreq.key()); } - m_range_scheduler.remove_key(k); } void remove_random() { - auto const [start_k, end_k] = m_range_scheduler.pick_random_existing_keys(1); + auto const [start_k, end_k] = m_shadow_map.pick_random_existing_keys(1); RELEASE_ASSERT_EQ(start_k, end_k, "Range scheduler pick_random_existing_keys issue"); remove_one(start_k); - m_range_scheduler.remove_key(start_k); } void range_remove_existing(uint32_t start_k, uint32_t count) { @@ -186,7 +180,7 @@ struct BtreeTestHelper : public testing::Test { void range_remove_existing_random() { static std::uniform_int_distribution< uint32_t > s_rand_range_generator{2, 5}; - auto const [start_k, end_k] = m_range_scheduler.pick_random_existing_keys(s_rand_range_generator(m_re)); + auto const [start_k, end_k] = m_shadow_map.pick_random_existing_keys(s_rand_range_generator(m_re)); do_range_remove(start_k, end_k, true /* only_existing */); } @@ -203,6 +197,7 @@ struct BtreeTestHelper : public testing::Test { void do_query(uint32_t start_k, uint32_t end_k, uint32_t batch_size) { std::vector< std::pair< K, V > > out_vector; + m_shadow_map.guard().lock(); uint32_t remaining = m_shadow_map.num_elems_in_range(start_k, end_k); auto it = m_shadow_map.map_const().lower_bound(K{start_k}); @@ -234,21 +229,23 @@ struct BtreeTestHelper : public testing::Test { ASSERT_EQ(ret, btree_status_t::success) << "Expected success on query"; ASSERT_EQ(out_vector.size(), 0) << "Received incorrect value on empty query pagination"; + m_shadow_map.guard().unlock(); + if (start_k < m_max_range_input) { - m_range_scheduler.remove_keys_from_working(start_k, std::min(end_k, m_max_range_input - 1)); + m_shadow_map.remove_keys_from_working(start_k, std::min(end_k, m_max_range_input - 1)); } } void query_random() { static thread_local std::uniform_int_distribution< uint32_t > s_rand_range_generator{1, 100}; - auto const [start_k, end_k] = m_range_scheduler.pick_random_non_working_keys(s_rand_range_generator(m_re)); + auto const [start_k, end_k] = m_shadow_map.pick_random_non_working_keys(s_rand_range_generator(m_re)); do_query(start_k, end_k, 79); } ////////////////////// All get operation variants /////////////////////////////// void get_all() const { - for (const auto& [key, value] : m_shadow_map.map_const()) { + m_shadow_map.foreach ([this](K key, V value) { auto copy_key = std::make_unique< K >(); *copy_key = key; auto out_v = std::make_unique< V >(); @@ -258,7 +255,7 @@ struct BtreeTestHelper : public testing::Test { ASSERT_EQ(ret, btree_status_t::success) << "Missing key " << key << " in btree but present in shadow map"; ASSERT_EQ((const V&)req.value(), value) << "Found value in btree doesn't return correct data for key=" << key; - } + }); } void get_specific(uint32_t k) const { @@ -280,6 +277,7 @@ struct BtreeTestHelper : public testing::Test { auto req = BtreeGetAnyRequest< K >{BtreeKeyRange< K >{K{start_k}, true, K{end_k}, true}, out_k.get(), out_v.get()}; const auto status = m_bt->get(req); + if (status == btree_status_t::success) { ASSERT_EQ(m_shadow_map.exists_in_range(*(K*)req.m_outkey, start_k, end_k), true) << "Get Any returned key=" << *(K*)req.m_outkey << " which is not in range " << start_k << "-" << end_k @@ -303,14 +301,32 @@ struct BtreeTestHelper : public testing::Test { void print_keys() const { m_bt->print_tree_keys(); } void compare_files(const std::string& before, const std::string& after) { - std::ifstream b(before); - std::ifstream a(after); - std::ostringstream ss_before, ss_after; - ss_before << b.rdbuf(); - ss_after << a.rdbuf(); - std::string s1 = ss_before.str(); - std::string s2 = ss_after.str(); - ASSERT_EQ(s1, s2) << "Mismatch in btree structure"; + std::ifstream b(before, std::ifstream::ate); + std::ifstream a(after, std::ifstream::ate); + if (a.fail() || b.fail()) { + LOGINFO("Failed to open file"); + assert(false); + } + if (a.tellg() != b.tellg()) { + LOGINFO("Mismatch in btree files"); + assert(false); + } + + int64_t pending = a.tellg(); + const int64_t batch_size = 4096; + a.seekg(0, ifstream::beg); + b.seekg(0, ifstream::beg); + char a_buffer[batch_size], b_buffer[batch_size]; + while (pending > 0) { + auto count = std::min(pending, batch_size); + a.read(a_buffer, count); + b.read(b_buffer, count); + if (std::memcmp(a_buffer, b_buffer, count) != 0) { + LOGINFO("Mismatch in btree files"); + assert(false); + } + pending -= count; + } } private: @@ -327,7 +343,6 @@ struct BtreeTestHelper : public testing::Test { } m_shadow_map.put_and_check(key, value, *existing_v, done); - m_range_scheduler.put_key(k); } void do_range_remove(uint64_t start_k, uint64_t end_k, bool all_existing) { @@ -336,6 +351,7 @@ struct BtreeTestHelper : public testing::Test { auto rreq = BtreeRangeRemoveRequest< K >{BtreeKeyRange< K >{start_key, true, end_key, true}}; auto const ret = m_bt->remove(rreq); + m_shadow_map.range_erase(start_key, end_key); if (all_existing) { @@ -344,7 +360,7 @@ struct BtreeTestHelper : public testing::Test { } if (start_k < m_max_range_input) { - m_range_scheduler.remove_keys(start_k, std::min(end_k, uint64_cast(m_max_range_input - 1))); + m_shadow_map.remove_keys(start_k, std::min(end_k, uint64_cast(m_max_range_input - 1))); } } @@ -380,4 +396,4 @@ struct BtreeTestHelper : public testing::Test { } LOGINFO("ALL parallel jobs joined"); } -}; \ No newline at end of file +}; diff --git a/src/tests/btree_helpers/shadow_map.hpp b/src/tests/btree_helpers/shadow_map.hpp index 1e7418122..f8c40e140 100644 --- a/src/tests/btree_helpers/shadow_map.hpp +++ b/src/tests/btree_helpers/shadow_map.hpp @@ -7,26 +7,36 @@ template < typename K, typename V > class ShadowMap { private: std::map< K, V > m_map; + RangeScheduler m_range_scheduler; + using mutex = iomgr::FiberManagerLib::shared_mutex; + mutex m_mutex; public: + ShadowMap(uint32_t num_keys) : m_range_scheduler(num_keys) {} + void put_and_check(const K& key, const V& val, const V& old_val, bool expected_success) { + std::lock_guard lock{m_mutex}; auto const [it, happened] = m_map.insert(std::make_pair(key, val)); ASSERT_EQ(happened, expected_success) << "Testcase issue, expected inserted slots to be in shadow map"; if (!happened) { ASSERT_EQ(old_val, it->second) << "Put: Existing value doesn't return correct data for key: " << it->first; } + m_range_scheduler.put_key(key.key()); } void range_upsert(uint64_t start_k, uint32_t count, const V& val) { + std::lock_guard lock{m_mutex}; for (uint32_t i{0}; i < count; ++i) { K key{start_k + i}; V range_value{val}; if constexpr (std::is_same_v< V, TestIntervalValue >) { range_value.shift(i); } m_map.insert_or_assign(key, range_value); } + m_range_scheduler.put_keys(start_k, start_k + count - 1); } void range_update(const K& start_key, uint32_t count, const V& new_val) { + std::lock_guard lock{m_mutex}; auto const start_it = m_map.lower_bound(start_key); auto it = start_it; uint32_t c = 0; @@ -34,9 +44,11 @@ class ShadowMap { it->second = new_val; ++it; } + m_range_scheduler.remove_keys_from_working(start_key.key(), start_key.key() + count - 1); } std::pair< K, K > pick_existing_range(const K& start_key, uint32_t max_count) const { + std::shared_lock lock{m_mutex}; auto const start_it = m_map.lower_bound(start_key); auto it = start_it; uint32_t count = 0; @@ -46,9 +58,13 @@ class ShadowMap { return std::pair(start_it->first, it->first); } - bool exists(const K& key) const { return m_map.find(key) != m_map.end(); } + bool exists(const K& key) const { + std::shared_lock lock{m_mutex}; + return m_map.find(key) != m_map.end(); + } bool exists_in_range(const K& key, uint64_t start_k, uint64_t end_k) const { + std::shared_lock lock{m_mutex}; const auto itlower = m_map.lower_bound(K{start_k}); const auto itupper = m_map.upper_bound(K{end_k}); auto it = itlower; @@ -59,7 +75,10 @@ class ShadowMap { return false; } - uint64_t size() const { return m_map.size(); } + uint64_t size() const { + std::shared_lock lock{m_mutex}; + return m_map.size(); + } uint32_t num_elems_in_range(uint64_t start_k, uint64_t end_k) const { const auto itlower = m_map.lower_bound(K{start_k}); @@ -68,29 +87,71 @@ class ShadowMap { } void validate_data(const K& key, const V& btree_val) const { + std::shared_lock lock{m_mutex}; const auto r = m_map.find(key); ASSERT_NE(r, m_map.end()) << "Key " << key.to_string() << " is not present in shadow map"; ASSERT_EQ(btree_val, r->second) << "Found value in btree doesn't return correct data for key=" << r->first; } - void erase(const K& key) { m_map.erase(key); } + void erase(const K& key) { + std::lock_guard lock{m_mutex}; + m_map.erase(key); + m_range_scheduler.remove_key(key.key()); + } void range_erase(const K& start_key, uint32_t count) { + std::lock_guard lock{m_mutex}; auto const it = m_map.lower_bound(start_key); uint32_t i{0}; while ((it != m_map.cend()) && (i++ < count)) { it = m_map.erase(it); } + m_range_scheduler.remove_keys(start_key.key(), start_key.key() + count); } void range_erase(const K& start_key, const K& end_key) { + std::lock_guard lock{m_mutex}; auto it = m_map.lower_bound(start_key); auto const end_it = m_map.upper_bound(end_key); while ((it != m_map.cend()) && (it != end_it)) { it = m_map.erase(it); } + m_range_scheduler.remove_keys(start_key.key(), end_key.key()); } + mutex& guard() { return m_mutex; } std::map< K, V >& map() { return m_map; } const std::map< K, V >& map_const() const { return m_map; } + + void foreach (std::function< void(K, V) > func) const { + std::shared_lock lock{m_mutex}; + for (const auto& [key, value] : m_map) { + func(key, value); + } + } + + std::pair< uint32_t, uint32_t > pick_random_non_existing_keys(uint32_t max_keys) { + std::shared_lock lock{m_mutex}; + return m_range_scheduler.pick_random_non_existing_keys(max_keys); + } + + std::pair< uint32_t, uint32_t > pick_random_existing_keys(uint32_t max_keys) { + std::shared_lock lock{m_mutex}; + return m_range_scheduler.pick_random_existing_keys(max_keys); + } + + std::pair< uint32_t, uint32_t > pick_random_non_working_keys(uint32_t max_keys) { + std::shared_lock lock{m_mutex}; + return m_range_scheduler.pick_random_non_working_keys(max_keys); + } + + void remove_keys_from_working(uint32_t s, uint32_t e) { + std::lock_guard lock{m_mutex}; + m_range_scheduler.remove_keys_from_working(s, e); + } + + void remove_keys(uint32_t start_key, uint32_t end_key) { + std::lock_guard lock{m_mutex}; + m_range_scheduler.remove_keys(start_key, end_key); + } }; diff --git a/src/tests/test_index_btree.cpp b/src/tests/test_index_btree.cpp index 14062cd1f..4bab73ad0 100644 --- a/src/tests/test_index_btree.cpp +++ b/src/tests/test_index_btree.cpp @@ -42,13 +42,15 @@ SISL_OPTIONS_ENABLE(logging, test_index_btree, iomgr, test_common_setup) SISL_LOGGING_DECL(test_index_btree) std::vector< std::string > test_common::HSTestHelper::s_dev_names; -// TODO increase num_entries to 65k as io mgr page size is 512 and its slow. + +// TODO Add tests to do write,remove after recovery. +// TODO Test with var len key with io mgr page size is 512. SISL_OPTION_GROUP(test_index_btree, (num_iters, "", "num_iters", "number of iterations for rand ops", - ::cxxopts::value< uint32_t >()->default_value("1500"), "number"), + ::cxxopts::value< uint32_t >()->default_value("500"), "number"), (num_entries, "", "num_entries", "number of entries to test with", - ::cxxopts::value< uint32_t >()->default_value("15000"), "number"), + ::cxxopts::value< uint32_t >()->default_value("5000"), "number"), (seed, "", "seed", "random engine seed, use random if not defined", ::cxxopts::value< uint64_t >()->default_value("0"), "number")) @@ -159,12 +161,7 @@ struct BtreeTest : public BtreeTestHelper< TestType > { } }; -// TODO sanal fix the varkey issue. -// using BtreeTypes = testing::Types< FixedLenBtreeTest, VarKeySizeBtreeTest, VarValueSizeBtreeTest, -// VarObjSizeBtreeTest -// >; - -using BtreeTypes = testing::Types< FixedLenBtreeTest >; +using BtreeTypes = testing::Types< FixedLenBtreeTest, VarKeySizeBtreeTest, VarValueSizeBtreeTest, VarObjSizeBtreeTest >; TYPED_TEST_SUITE(BtreeTest, BtreeTypes); @@ -225,7 +222,6 @@ TYPED_TEST(BtreeTest, RandomInsert) { this->get_all(); } -#if 0 TYPED_TEST(BtreeTest, SequentialRemove) { LOGINFO("SequentialRemove test start"); // Forward sequential insert @@ -280,7 +276,6 @@ TYPED_TEST(BtreeTest, RandomRemove) { } this->get_all(); } -#endif TYPED_TEST(BtreeTest, RangeUpdate) { LOGINFO("RangeUpdate test start"); @@ -309,23 +304,29 @@ TYPED_TEST(BtreeTest, CpFlush) { for (uint32_t i = 0; i < num_entries; ++i) { this->put(i, btree_put_type::INSERT); } + + // Remove some of the entries. + for (uint32_t i = 0; i < num_entries; i += 10) { + this->remove_one(i); + } + LOGINFO("Query {} entries and validate with pagination of 75 entries", num_entries / 2); this->do_query(0, num_entries / 2 - 1, 75); - this->print(std::string("before.txt")); - LOGINFO("Trigger checkpoint flush."); test_common::HSTestHelper::trigger_cp(true /* wait */); LOGINFO("Query {} entries and validate with pagination of 75 entries", num_entries); this->do_query(0, num_entries - 1, 75); + this->print(std::string("before.txt")); + this->destroy_btree(); // Restart homestore. m_bt is updated by the TestIndexServiceCallback. this->restart_homestore(); - std::this_thread::sleep_for(std::chrono::seconds{3}); + std::this_thread::sleep_for(std::chrono::seconds{1}); LOGINFO("Restarted homestore with index recovered"); this->print(std::string("after.txt")); @@ -373,7 +374,7 @@ TYPED_TEST(BtreeTest, MultipleCpFlush) { // Restart homestore. m_bt is updated by the TestIndexServiceCallback. this->restart_homestore(); - std::this_thread::sleep_for(std::chrono::seconds{3}); + std::this_thread::sleep_for(std::chrono::seconds{1}); LOGINFO(" Restarted homestore with index recovered"); this->print(std::string("after.txt")); @@ -388,24 +389,41 @@ TYPED_TEST(BtreeTest, ThreadedCpFlush) { LOGINFO("ThreadedCpFlush test start"); const auto num_entries = SISL_OPTIONS["num_entries"].as< uint32_t >(); - bool stop_cp_flush = false; - auto io_thread = std::thread([this, num_entries] { + bool stop = false; + std::atomic< uint32_t > last_index{0}; + auto insert_io_thread = std::thread([this, num_entries, &last_index] { LOGINFO("Do Forward sequential insert for {} entries", num_entries); + uint32_t j = 0; for (uint32_t i = 0; i < num_entries; ++i) { this->put(i, btree_put_type::INSERT); + last_index = i; } }); - auto cp_flush_thread = std::thread([this, &stop_cp_flush] { - while (!stop_cp_flush) { - LOGINFO("Trigger checkpoint flush wait=false."); - test_common::HSTestHelper::trigger_cp(false /* wait */); + auto remove_io_thread = std::thread([this, &stop, num_entries, &last_index] { + LOGINFO("Do random removes for {} entries", num_entries); + while (!stop) { + std::this_thread::sleep_for(std::chrono::milliseconds{10}); + // Remove a random entry. + std::uniform_int_distribution< uint32_t > rand{0, last_index.load()}; + auto rm_idx = rand(g_re); + LOGINFO("Removing entry {}", rm_idx); + this->remove_one(rm_idx); + } + }); + + auto cp_flush_thread = std::thread([this, &stop] { + while (!stop) { std::this_thread::sleep_for(std::chrono::seconds{1}); + LOGINFO("Trigger checkpoint flush wait=true."); + test_common::HSTestHelper::trigger_cp(false /* wait */); + LOGINFO("Trigger checkpoint flush wait=true done."); } }); - io_thread.join(); - stop_cp_flush = true; + insert_io_thread.join(); + stop = true; + remove_io_thread.join(); cp_flush_thread.join(); LOGINFO("Trigger checkpoint flush wait=true."); @@ -420,7 +438,7 @@ TYPED_TEST(BtreeTest, ThreadedCpFlush) { // Restart homestore. m_bt is updated by the TestIndexServiceCallback. this->restart_homestore(); - std::this_thread::sleep_for(std::chrono::seconds{3}); + std::this_thread::sleep_for(std::chrono::seconds{1}); LOGINFO(" Restarted homestore with index recovered"); this->print(std::string("after.txt")); diff --git a/src/tests/test_mem_btree.cpp b/src/tests/test_mem_btree.cpp index 68bf4003d..bb794fca3 100644 --- a/src/tests/test_mem_btree.cpp +++ b/src/tests/test_mem_btree.cpp @@ -41,14 +41,15 @@ SISL_OPTION_GROUP( (num_entries, "", "num_entries", "number of entries to test with", ::cxxopts::value< uint32_t >()->default_value("10000"), "number"), (disable_merge, "", "disable_merge", "disable_merge", ::cxxopts::value< bool >()->default_value("0"), ""), - (n_threads, "", "n_threads", "number of threads", ::cxxopts::value< uint32_t >()->default_value("2"), "number"), - (n_fibers, "", "n_fibers", "number of fibers", ::cxxopts::value< uint32_t >()->default_value("10"), "number"), + (n_threads, "", "num_threads", "number of threads", ::cxxopts::value< uint32_t >()->default_value("2"), "number"), + (n_fibers, "", "num_fibers", "number of fibers", ::cxxopts::value< uint32_t >()->default_value("10"), "number"), (operation_list, "", "operation_list", "operation list instead of default created following by percentage", ::cxxopts::value< std::vector< std::string > >(), "operations [...]"), (preload_size, "", "preload_size", "number of entries to preload tree with", ::cxxopts::value< uint32_t >()->default_value("1000"), "number"), (seed, "", "seed", "random engine seed, use random if not defined", - ::cxxopts::value< uint64_t >()->default_value("0"), "number")) + ::cxxopts::value< uint64_t >()->default_value("0"), "number"), + (run_time, "", "run_time", "run time for io", ::cxxopts::value< uint32_t >()->default_value("360000"), "seconds")) struct FixedLenBtreeTest { using BtreeType = MemBtree< TestFixedKey, TestFixedValue >; @@ -265,7 +266,7 @@ TYPED_TEST(BtreeTest, RandomRemoveRange) { this->put(i, btree_put_type::INSERT); } // generate keys including out of bound - static thread_local std::uniform_int_distribution< uint32_t > s_rand_key_generator{0, 2 * num_entries}; + static thread_local std::uniform_int_distribution< uint32_t > s_rand_key_generator{0, num_entries}; // this->print_keys(); LOGINFO("Step 2: Do range remove for maximum of {} iterations", num_iters); for (uint32_t i{0}; (i < num_iters) && this->m_shadow_map.size(); ++i) { @@ -289,10 +290,10 @@ struct BtreeConcurrentTest : public BtreeTestHelper< TestType > { BtreeConcurrentTest() { this->m_is_multi_threaded = true; } void SetUp() override { - LOGINFO("Starting iomgr with {} threads", SISL_OPTIONS["n_threads"].as< uint32_t >()); - ioenvironment.with_iomgr(iomgr::iomgr_params{.num_threads = SISL_OPTIONS["n_threads"].as< uint32_t >(), + LOGINFO("Starting iomgr with {} threads", SISL_OPTIONS["num_threads"].as< uint32_t >()); + ioenvironment.with_iomgr(iomgr::iomgr_params{.num_threads = SISL_OPTIONS["num_threads"].as< uint32_t >(), .is_spdk = false, - .num_fibers = 1 + SISL_OPTIONS["n_fibers"].as< uint32_t >(), + .num_fibers = 1 + SISL_OPTIONS["num_fibers"].as< uint32_t >(), .app_mem_size_mb = 0, .hugepage_size_mb = 0});