diff --git a/src/include/homestore/btree/detail/btree_node_mgr.ipp b/src/include/homestore/btree/detail/btree_node_mgr.ipp index f1476574c..87ae2a6fc 100644 --- a/src/include/homestore/btree/detail/btree_node_mgr.ipp +++ b/src/include/homestore/btree/detail/btree_node_mgr.ipp @@ -307,7 +307,7 @@ BtreeNode* Btree< K, V >::init_node(uint8_t* node_buf, uint32_t node_ctx_size, b /* Note:- This function assumes that access of this node is thread safe. */ template < typename K, typename V > void Btree< K, V >::free_node(const BtreeNodePtr& node, locktype_t cur_lock, void* context) { - BT_NODE_LOG(DEBUG, node, "Freeing node"); + BT_NODE_LOG(TRACE, node, "Freeing node"); COUNTER_DECREMENT_IF_ELSE(m_metrics, node->is_leaf(), btree_leaf_node_count, btree_int_node_count, 1); if (cur_lock != locktype_t::NONE) { diff --git a/src/include/homestore/btree/detail/prefix_node.hpp b/src/include/homestore/btree/detail/prefix_node.hpp index 62003da7a..fd8b1956c 100644 --- a/src/include/homestore/btree/detail/prefix_node.hpp +++ b/src/include/homestore/btree/detail/prefix_node.hpp @@ -687,7 +687,7 @@ class FixedPrefixNode : public VariantNode< K, V > { --phdr->used_slots; prefix_bitset_.reset_bit(slot_num); if ((slot_num != 0) && (slot_num == phdr->tail_slot)) { - uint16_t prev_slot = prefix_bitset_.get_prev_set_bit(slot_num); + uint16_t prev_slot = prefix_bitset_.get_next_set_bit(slot_num); if (prev_slot != std::numeric_limits< uint16_t >::max()) { phdr->tail_slot = prev_slot; } } } diff --git a/src/include/homestore/index/index_internal.hpp b/src/include/homestore/index/index_internal.hpp index 2c8a09849..cd914d315 100644 --- a/src/include/homestore/index/index_internal.hpp +++ b/src/include/homestore/index/index_internal.hpp @@ -64,29 +64,70 @@ enum class index_buf_state_t : uint8_t { }; ///////////////////////// Btree Node and Buffer Portion ////////////////////////// + + +// Multiple IndexBuffer could point to the same NodeBuffer if its clean. +struct NodeBuffer; +typedef std::shared_ptr< NodeBuffer > NodeBufferPtr; +struct NodeBuffer { + uint8_t* m_bytes{nullptr}; // Actual data buffer + std::atomic< index_buf_state_t > m_state{index_buf_state_t::CLEAN}; // Is buffer yet to persist? + NodeBuffer(uint32_t buf_size, uint32_t align_size); + ~NodeBuffer(); +}; + +// IndexBuffer is for each CP. The dependent index buffers are chained using +// m_next_buffer and each buffer is flushed only its wait_for_leaders reaches 0 +// which means all its dependent buffers are flushed. struct IndexBuffer; typedef std::shared_ptr< IndexBuffer > IndexBufferPtr; - struct IndexBuffer { - uint8_t* m_node_buf{nullptr}; // Actual buffer - index_buf_state_t m_buf_state{index_buf_state_t::CLEAN}; // Is buffer yet to persist? - BlkId m_blkid; // BlkId where this needs to be persisted - std::weak_ptr< IndexBuffer > m_next_buffer; // Next buffer in the chain + NodeBufferPtr m_node_buf; + BlkId m_blkid; // BlkId where this needs to be persisted + std::weak_ptr< IndexBuffer > m_next_buffer; // Next buffer in the chain // Number of leader buffers we are waiting for before we write this buffer sisl::atomic_counter< int > m_wait_for_leaders{0}; + std::mutex m_mutex; IndexBuffer(BlkId blkid, uint32_t buf_size, uint32_t align_size); + IndexBuffer(NodeBufferPtr node_buf, BlkId blkid); ~IndexBuffer(); BlkId blkid() const { return m_blkid; } - uint8_t* raw_buffer() { return m_node_buf; } + uint8_t* raw_buffer() { + RELEASE_ASSERT(m_node_buf, "Node buffer null"); + return m_node_buf->m_bytes; + } + + bool is_clean() const { + RELEASE_ASSERT(m_node_buf, "Node buffer null"); + return (m_node_buf->m_state.load() == index_buf_state_t::CLEAN); + } + + index_buf_state_t state() { + RELEASE_ASSERT(m_node_buf, "Node buffer null"); + return m_node_buf->m_state; + } + + void set_state(index_buf_state_t state) { + RELEASE_ASSERT(m_node_buf, "Node buffer null"); + m_node_buf->m_state = state; + } - bool is_clean() const { return (m_buf_state == index_buf_state_t::CLEAN); } std::string to_string() const { - return fmt::format("IndexBuffer {} blkid={} state={} node_buf={} next_buffer={} wait_for={}", - reinterpret_cast< void* >(const_cast< IndexBuffer* >(this)), m_blkid.to_integer(), - static_cast< int >(m_buf_state), static_cast< void* >(m_node_buf), - voidptr_cast(m_next_buffer.lock().get()), m_wait_for_leaders.get()); + std::lock_guard lock{m_mutex}; + auto str = fmt::format("IndexBuffer {} blkid={}", reinterpret_cast< void* >(const_cast< IndexBuffer* >(this)), + m_blkid.to_integer()); + if (m_node_buf == nullptr) { + fmt::format_to(std::back_inserter(str), " node_buf=nullptr"); + } else { + fmt::format_to(std::back_inserter(str), " state={} node_buf={}", + static_cast< int >(m_node_buf->m_state.load()), static_cast< void* >(m_node_buf->m_bytes)); + } + fmt::format_to(std::back_inserter(str), " next_buffer={} wait_for={}", + m_next_buffer.lock() ? reinterpret_cast< void* >(m_next_buffer.lock().get()) : 0, + m_wait_for_leaders.get()); + return str; } }; @@ -97,7 +138,6 @@ struct IndexBtreeNode { public: IndexBufferPtr m_idx_buf; // Buffer backing this node cp_id_t m_last_mod_cp_id{-1}; // This node is previously modified by the cp id; - public: IndexBtreeNode(const IndexBufferPtr& buf) : m_idx_buf{buf} {} ~IndexBtreeNode() { m_idx_buf.reset(); } diff --git a/src/include/homestore/index/index_table.hpp b/src/include/homestore/index/index_table.hpp index 246557a62..0e59e5b70 100644 --- a/src/include/homestore/index/index_table.hpp +++ b/src/include/homestore/index/index_table.hpp @@ -114,7 +114,7 @@ class IndexTable : public IndexTableBase, public Btree< K, V > { // Need to put it in wb cache wb_cache().write_buf(node, idx_node->m_idx_buf, cp_ctx); idx_node->m_last_mod_cp_id = cp_ctx->id(); - LOGTRACEMOD(wbcache, "{}", idx_node->m_idx_buf->to_string()); + LOGTRACEMOD(wbcache, "add to dirty list cp {} {}", cp_ctx->id(), idx_node->m_idx_buf->to_string()); } node->set_checksum(this->m_bt_cfg); return btree_status_t::success; @@ -129,17 +129,28 @@ class IndexTable : public IndexTableBase, public Btree< K, V > { auto& left_child_buf = left_child_idx_node->m_idx_buf; auto& parent_buf = parent_idx_node->m_idx_buf; - LOGTRACEMOD(wbcache, "left {} parent {} ", left_child_buf->to_string(), parent_buf->to_string()); - // Write new nodes in the list as standalone outside transacted pairs. // Write the new right child nodes, left node and parent in order. + // Create the relationship of right child to the left node via prepend_to_chain below. + // Parent and left node are linked in the prepare_node_txn for (const auto& right_child_node : new_nodes) { auto right_child = IndexBtreeNode::convert(right_child_node.get()); write_node_impl(right_child_node, context); - wb_cache().prepend_to_chain(right_child->m_idx_buf, left_child_buf); - LOGTRACEMOD(wbcache, "right {} left {} ", right_child->m_idx_buf->to_string(), left_child_buf->to_string()); + wb_cache().prepend_to_chain(right_child->m_idx_buf, left_child_buf, cp_ctx); } + auto trace_index_bufs = [&]() { + std::string str; + str = fmt::format("cp {} left {} parent {}", cp_ctx->id(), left_child_buf->to_string(), + parent_buf->to_string()); + for (const auto& right_child_node : new_nodes) { + auto right_child = IndexBtreeNode::convert(right_child_node.get()); + fmt::format_to(std::back_inserter(str), " right {}", right_child->m_idx_buf->to_string()); + } + return str; + }; + + LOGTRACEMOD(wbcache, "{}", trace_index_bufs()); write_node_impl(left_child_node, context); write_node_impl(parent_node, context); @@ -181,18 +192,17 @@ class IndexTable : public IndexTableBase, public Btree< K, V > { // realloc_node(node); // } - // If the backing buffer is already in a clean state, we don't need to make a copy of it - if (idx_node->m_idx_buf->is_clean()) { return btree_status_t::success; } - - // Make a new btree buffer and copy the contents and swap it to make it the current node's buffer. The + // We create IndexBuffer for each CP. But if the backing buffer is already in a clean state + // we dont copy the node buffer. Copy buffer will handle it. If the node buffer is dirty, + // make a new btree buffer and copy the contents and swap it to make it the current node's buffer. The // buffer prior to this copy, would have been written and already added into the dirty buffer list. - idx_node->m_idx_buf = wb_cache().copy_buffer(idx_node->m_idx_buf); + idx_node->m_idx_buf = wb_cache().copy_buffer(idx_node->m_idx_buf, cp_ctx); idx_node->m_last_mod_cp_id = -1; node->m_phys_node_buf = idx_node->m_idx_buf->raw_buffer(); node->set_checksum(this->m_bt_cfg); - LOGTRACEMOD(wbcache, "buf {} ", idx_node->m_idx_buf->to_string()); + LOGTRACEMOD(wbcache, "cp {} {} ", cp_ctx->id(), idx_node->m_idx_buf->to_string()); #ifndef NO_CHECKSUM if (!node->verify_node(this->m_bt_cfg)) { @@ -221,6 +231,8 @@ class IndexTable : public IndexTableBase, public Btree< K, V > { auto& child_buf = child_idx_node->m_idx_buf; auto& parent_buf = parent_idx_node->m_idx_buf; + LOGTRACEMOD(wbcache, "cp {} left {} parent {} ", cp_ctx->id(), child_buf->to_string(), parent_buf->to_string()); + auto [child_copied, parent_copied] = wb_cache().create_chain(child_buf, parent_buf, cp_ctx); if (child_copied) { child_node->m_phys_node_buf = child_buf->raw_buffer(); @@ -231,7 +243,6 @@ class IndexTable : public IndexTableBase, public Btree< K, V > { parent_idx_node->m_last_mod_cp_id = -1; } - LOGTRACEMOD(wbcache, "child {} parent {} ", child_buf->to_string(), parent_buf->to_string()); return btree_status_t::success; } diff --git a/src/include/homestore/index/wb_cache_base.hpp b/src/include/homestore/index/wb_cache_base.hpp index f59d4b1ce..d53f3124f 100644 --- a/src/include/homestore/index/wb_cache_base.hpp +++ b/src/include/homestore/index/wb_cache_base.hpp @@ -63,7 +63,7 @@ class IndexWBCacheBase { /// @brief Prepend to the chain that was already created with second /// @param first /// @param second - virtual void prepend_to_chain(const IndexBufferPtr& first, const IndexBufferPtr& second) = 0; + virtual void prepend_to_chain(const IndexBufferPtr& first, const IndexBufferPtr& second, CPContext* cp_ctx) = 0; /// @brief Free the buffer allocated and remove it from wb cache /// @param buf @@ -73,7 +73,7 @@ class IndexWBCacheBase { /// @brief Copy buffer /// @param cur_buf /// @return - virtual IndexBufferPtr copy_buffer(const IndexBufferPtr& cur_buf) const = 0; + virtual IndexBufferPtr copy_buffer(const IndexBufferPtr& cur_buf, CPContext* context) const = 0; }; } // namespace homestore diff --git a/src/include/homestore/index_service.hpp b/src/include/homestore/index_service.hpp index 9952e3852..09543a5fb 100644 --- a/src/include/homestore/index_service.hpp +++ b/src/include/homestore/index_service.hpp @@ -22,7 +22,7 @@ #include #include #include - +#include namespace homestore { class IndexWBCacheBase; diff --git a/src/lib/checkpoint/cp.hpp b/src/lib/checkpoint/cp.hpp index 5644adac7..05bda5be7 100644 --- a/src/lib/checkpoint/cp.hpp +++ b/src/lib/checkpoint/cp.hpp @@ -21,7 +21,7 @@ #include #include - +#include #include "common/homestore_assert.hpp" /* @@ -73,7 +73,7 @@ struct CP { bool m_cp_waiting_to_trigger{false}; // it is waiting for previous cp to complete cp_id_t m_cp_id; std::array< std::unique_ptr< CPContext >, (size_t)cp_consumer_t::SENTINEL > m_contexts; - folly::Promise< bool > m_comp_promise; + folly::SharedPromise< bool > m_comp_promise; public: CP(CPManager* mgr) : m_cp_mgr{mgr} {} diff --git a/src/lib/checkpoint/cp_mgr.cpp b/src/lib/checkpoint/cp_mgr.cpp index 944c90222..c5838b966 100644 --- a/src/lib/checkpoint/cp_mgr.cpp +++ b/src/lib/checkpoint/cp_mgr.cpp @@ -142,8 +142,11 @@ folly::Future< bool > CPManager::trigger_cp_flush(bool force) { std::unique_lock< std::mutex > lk(trigger_cp_mtx); auto cur_cp = cp_guard(); HS_DBG_ASSERT_NE(cur_cp->m_cp_status, cp_status_t::cp_flush_prepare); - cur_cp->m_comp_promise = std::move(folly::Promise< bool >{}); - cur_cp->m_cp_waiting_to_trigger = true; + // If multiple threads call trigger, they all get the future from the same promise. + if (!cur_cp->m_cp_waiting_to_trigger) { + cur_cp->m_comp_promise = std::move(folly::SharedPromise< bool >{}); + cur_cp->m_cp_waiting_to_trigger = true; + } return cur_cp->m_comp_promise.getFuture(); } else { return folly::makeFuture< bool >(false); @@ -177,7 +180,7 @@ folly::Future< bool > CPManager::trigger_cp_flush(bool force) { // originally by the caller will be untouched and completed upto CP completion/ ret_fut = folly::makeFuture< bool >(true); } else { - cur_cp->m_comp_promise = std::move(folly::Promise< bool >{}); + cur_cp->m_comp_promise = std::move(folly::SharedPromise< bool >{}); ret_fut = cur_cp->m_comp_promise.getFuture(); } cur_cp->m_cp_status = cp_status_t::cp_flush_prepare; diff --git a/src/lib/index/index_cp.hpp b/src/lib/index/index_cp.hpp index 077c9824b..3e24966d2 100644 --- a/src/lib/index/index_cp.hpp +++ b/src/lib/index/index_cp.hpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include "checkpoint/cp.hpp" @@ -37,7 +38,6 @@ struct IndexCPContext : public CPContext { sisl::ThreadVector< IndexBufferPtr >* m_dirty_buf_list{nullptr}; sisl::ThreadVector< BlkId >* m_free_node_blkid_list{nullptr}; sisl::atomic_counter< int64_t > m_dirty_buf_count{0}; - IndexBufferPtr m_last_in_chain; std::mutex m_flush_buffer_mtx; flush_buffer_iterator m_buf_it; @@ -47,11 +47,6 @@ struct IndexCPContext : public CPContext { CPContext(cp_id), m_dirty_buf_list{dirty_list}, m_free_node_blkid_list{free_blkid_list} {} virtual ~IndexCPContext() { - auto it = m_dirty_buf_list->begin(true /* latest */); - IndexBufferPtr *tmp = nullptr; - while((tmp = m_dirty_buf_list->next(it)) != nullptr) { - tmp->reset(); - } m_dirty_buf_list->clear(); m_free_node_blkid_list->clear(); } @@ -62,11 +57,9 @@ struct IndexCPContext : public CPContext { } void add_to_dirty_list(const IndexBufferPtr& buf) { - buf->m_buf_state = index_buf_state_t::DIRTY; + buf->m_node_buf->m_state = index_buf_state_t::DIRTY; m_dirty_buf_list->push_back(buf); m_dirty_buf_count.increment(1); - m_last_in_chain = buf; - LOGTRACEMOD(wbcache, "{}", buf->to_string()); } void add_to_free_node_list(BlkId blkid) { m_free_node_blkid_list->push_back(blkid); } @@ -77,13 +70,91 @@ struct IndexCPContext : public CPContext { BlkId* next_blkid() { return m_free_node_blkid_list->next(m_buf_it.free_node_list_it); } std::string to_string() const { std::string str{ - fmt::format("IndexCPContext cpid={} dirty_buf_count={} dirty_buf_list_size={} blkid_list_size={}", id(), + fmt::format("IndexCPContext cpid={} dirty_buf_count={} dirty_buf_list_size={} blkid_list_size={}\n", id(), m_dirty_buf_count.get(), m_dirty_buf_list->size(), m_free_node_blkid_list->size())}; - // TODO dump all index buffers. + // Mapping from a node to all its parents in the graph. + // Display all buffers and its dependencies and state. + std::unordered_map< IndexBuffer*, std::vector< IndexBuffer* > > parents; + IndexBufferPtr* buf; + auto it = m_dirty_buf_list->begin(true /* latest */); + while ((buf = m_dirty_buf_list->next(it)) != nullptr) { + // Add this buf to his children. + parents[(*buf)->m_next_buffer.lock().get()].emplace_back(buf->get()); + } + + it = m_dirty_buf_list->begin(true /* latest */); + while ((buf = m_dirty_buf_list->next(it)) != nullptr) { + fmt::format_to(std::back_inserter(str), "{}", (*buf)->to_string()); + auto first = true; + for (const auto& p : parents[buf->get()]) { + if (first) { + fmt::format_to(std::back_inserter(str), "\nDepends:"); + first = false; + } + fmt::format_to(std::back_inserter(str), " {}({})", r_cast< void* >(p), s_cast< int >(p->state())); + } + fmt::format_to(std::back_inserter(str), "\n"); + } + return str; } -}; + + void check_cycle() { + // Use dfs to find if the graph is cycle + IndexBufferPtr* buf; + auto it = m_dirty_buf_list->begin(true /* latest */); + while ((buf = m_dirty_buf_list->next(it)) != nullptr) { + std::set< IndexBuffer* > visited; + check_cycle_recurse(*buf, visited); + } + } + + void check_cycle_recurse(IndexBufferPtr buf, std::set< IndexBuffer* >& visited) const { + if (visited.count(buf.get()) != 0) { + LOGERROR("Cycle found for {}", buf->to_string()); + for (auto& x : visited) { + LOGERROR("Path : {}", x->to_string()); + } + return; + } + + visited.insert(buf.get()); + if (buf->m_next_buffer.lock()) { check_cycle_recurse(buf->m_next_buffer.lock(), visited); } + } + + void check_wait_for_leaders() { + // Use the next buffer as indegree to find if wait_for_leaders is invalid. + std::unordered_map< IndexBuffer*, int > wait_for_leaders; + IndexBufferPtr* buf; + + // Store the wait for leader count for each buffer. + auto it = m_dirty_buf_list->begin(true /* latest */); + while ((buf = m_dirty_buf_list->next(it)) != nullptr) { + wait_for_leaders[(*buf).get()] = (*buf)->m_wait_for_leaders.get(); + } + + // Decrement the count using the next buffer. + it = m_dirty_buf_list->begin(true /* latest */); + while ((buf = m_dirty_buf_list->next(it)) != nullptr) { + auto next_buf = (*buf)->m_next_buffer.lock(); + if (next_buf.get() == nullptr) continue; + wait_for_leaders[next_buf.get()]--; + } + + bool issue = false; + for (const auto& [buf, waits] : wait_for_leaders) { + // Any value other than zero means the dependency graph is invalid. + if (waits != 0) { + issue = true; + LOGERROR("Leaders wait not zero cp {} buf {} waits {}", id(), buf->to_string(), waits); + } + } + + assert(issue == false); + } + +}; // namespace homestore class IndexWBCache; class IndexCPCallbacks : public CPCallbacks { diff --git a/src/lib/index/index_service.cpp b/src/lib/index/index_service.cpp index d3d0984b5..bfa96f8bc 100644 --- a/src/lib/index/index_service.cpp +++ b/src/lib/index/index_service.cpp @@ -82,7 +82,9 @@ void IndexService::stop() { HS_REL_ASSERT_EQ(success, true, "CP Flush failed"); LOGINFO("CP Flush completed"); - for (auto [id, tbl] : m_index_map) { tbl->destroy(); } + for (auto [id, tbl] : m_index_map) { + tbl->destroy(); + } } void IndexService::add_index_table(const std::shared_ptr< IndexTableBase >& tbl) { std::unique_lock lg(m_index_map_mtx); @@ -107,9 +109,16 @@ uint64_t IndexService::used_size() const { return size; } +NodeBuffer::NodeBuffer(uint32_t buf_size, uint32_t align_size) : + m_bytes{hs_utils::iobuf_alloc(buf_size, sisl::buftag::btree_node, align_size)} {} + +NodeBuffer::~NodeBuffer() { hs_utils::iobuf_free(m_bytes, sisl::buftag::btree_node); } + IndexBuffer::IndexBuffer(BlkId blkid, uint32_t buf_size, uint32_t align_size) : - m_node_buf{hs_utils::iobuf_alloc(buf_size, sisl::buftag::btree_node, align_size)}, m_blkid{blkid} {} + m_node_buf{std::make_shared< NodeBuffer >(buf_size, align_size)}, m_blkid{blkid} {} + +IndexBuffer::IndexBuffer(NodeBufferPtr node_buf, BlkId blkid) : m_node_buf(node_buf), m_blkid(blkid) {} -IndexBuffer::~IndexBuffer() { hs_utils::iobuf_free(m_node_buf, sisl::buftag::btree_node); } +IndexBuffer::~IndexBuffer() { m_node_buf.reset(); } } // namespace homestore diff --git a/src/lib/index/wb_cache.cpp b/src/lib/index/wb_cache.cpp index 21ad44345..4b7b4bb5a 100644 --- a/src/lib/index/wb_cache.cpp +++ b/src/lib/index/wb_cache.cpp @@ -88,7 +88,6 @@ BtreeNodePtr IndexWBCache::alloc_buf(node_initializer_t&& node_initializer) { // Alloc buffer and initialize the node auto idx_buf = std::make_shared< IndexBuffer >(blkid, m_node_size, m_vdev->align_size()); auto node = node_initializer(idx_buf); - LOGTRACEMOD(wbcache, "idx_buf {} blkid {}", static_cast< void* >(idx_buf.get()), blkid.to_integer()); // Add the node to the cache bool done = m_cache.insert(node); @@ -102,16 +101,31 @@ void IndexWBCache::realloc_buf(const IndexBufferPtr& buf) { } void IndexWBCache::write_buf(const BtreeNodePtr& node, const IndexBufferPtr& buf, CPContext* cp_ctx) { + // TODO upsert always returns false even if it succeeds. m_cache.upsert(node); r_cast< IndexCPContext* >(cp_ctx)->add_to_dirty_list(buf); resource_mgr().inc_dirty_buf_size(m_node_size); } -IndexBufferPtr IndexWBCache::copy_buffer(const IndexBufferPtr& cur_buf) const { - auto new_buf = std::make_shared< IndexBuffer >(cur_buf->m_blkid, m_node_size, m_vdev->align_size()); - std::memcpy(new_buf->raw_buffer(), cur_buf->raw_buffer(), m_node_size); - LOGTRACEMOD(wbcache, "new_buf {} cur_buf {} cur_buf_blkid {}", static_cast< void* >(new_buf.get()), - static_cast< void* >(cur_buf.get()), cur_buf->m_blkid.to_integer()); +IndexBufferPtr IndexWBCache::copy_buffer(const IndexBufferPtr& cur_buf, CPContext* cp_ctx) const { + IndexBufferPtr new_buf = nullptr; + bool copied = false; + + // When we copy the buffer we check if the node buffer is clean or not. If its clean + // we could reuse it otherwise create a copy. + if (cur_buf->is_clean()) { + // Refer to the same node buffer. + new_buf = std::make_shared< IndexBuffer >(cur_buf->m_node_buf, cur_buf->m_blkid); + } else { + // If its not clean, we do deep copy. + new_buf = std::make_shared< IndexBuffer >(cur_buf->m_blkid, m_node_size, m_vdev->align_size()); + std::memcpy(new_buf->raw_buffer(), cur_buf->raw_buffer(), m_node_size); + copied = true; + } + + LOGTRACEMOD(wbcache, "cp {} new_buf {} cur_buf {} cur_buf_blkid {} copied {}", cp_ctx->id(), + static_cast< void* >(new_buf.get()), static_cast< void* >(cur_buf.get()), cur_buf->m_blkid.to_integer(), + copied); return new_buf; } @@ -141,41 +155,43 @@ void IndexWBCache::read_buf(bnodeid_t id, BtreeNodePtr& node, node_initializer_t std::tuple< bool, bool > IndexWBCache::create_chain(IndexBufferPtr& second, IndexBufferPtr& third, CPContext* cp_ctx) { bool second_copied{false}, third_copied{false}; - + auto chain = second; + auto old_third = third; if (!second->is_clean()) { - auto new_second = copy_buffer(second); - LOGTRACEMOD(wbcache, "second copied blkid {} {} new_second {}", second->m_blkid.to_integer(), - static_cast< void* >(second.get()), static_cast< void* >(new_second.get())); + auto new_second = copy_buffer(second, cp_ctx); second = new_second; second_copied = true; } + if (!third->is_clean()) { - auto new_third = copy_buffer(third); - LOGTRACEMOD(wbcache, "third copied blkid {} {} new_third {}", third->m_blkid.to_integer(), - static_cast< void* >(third.get()), static_cast< void* >(new_third.get())); + auto new_third = copy_buffer(third, cp_ctx); third = new_third; third_copied = true; } // Append parent(third) to the left child(second). - prepend_to_chain(second, third); + second->m_next_buffer = third; + third->m_wait_for_leaders.increment(1); + if (chain != second) { + // We want buffers to be append to the end of the chain which are related. + // If we split a node multiple times in same or different CP's, each dirty buffer will be + // added to the end of that chain. + while (chain->m_next_buffer.lock() != nullptr) { + chain = chain->m_next_buffer.lock(); + } - // TODO the index buffer are added to end of the chain, instead add to the dependency. - auto& last_in_chain = r_cast< IndexCPContext* >(cp_ctx)->m_last_in_chain; - if (last_in_chain) { - // Add this to the end of the chain. - last_in_chain->m_next_buffer = second; + chain->m_next_buffer = second; second->m_wait_for_leaders.increment(1); } return {second_copied, third_copied}; } -void IndexWBCache::prepend_to_chain(const IndexBufferPtr& first, const IndexBufferPtr& second) { +void IndexWBCache::prepend_to_chain(const IndexBufferPtr& first, const IndexBufferPtr& second, CPContext* cp_ctx) { + assert(first->m_next_buffer.lock() != second); assert(first->m_next_buffer.lock() == nullptr); first->m_next_buffer = second; second->m_wait_for_leaders.increment(1); - LOGTRACEMOD(wbcache, "first {} second {}", first->to_string(), second->to_string()); } void IndexWBCache::free_buf(const IndexBufferPtr& buf, CPContext* cp_ctx) { @@ -190,12 +206,19 @@ void IndexWBCache::free_buf(const IndexBufferPtr& buf, CPContext* cp_ctx) { //////////////////// CP Related API section ///////////////////////////////// folly::Future< bool > IndexWBCache::async_cp_flush(CPContext* context) { IndexCPContext* cp_ctx = s_cast< IndexCPContext* >(context); - LOGTRACEMOD(wbcache, "cp_ctx {}", cp_ctx->to_string()); + LOGTRACEMOD(wbcache, "wb_cache cp_ctx {}", cp_ctx->to_string()); if (!cp_ctx->any_dirty_buffers()) { CP_PERIODIC_LOG(DEBUG, cp_ctx->id(), "Btree does not have any dirty buffers to flush"); return folly::makeFuture< bool >(true); // nothing to flush } +#ifndef NDEBUG + // Check no cycles or invalid wait_for_leader count in the dirty buffer + // dependency graph. + cp_ctx->check_wait_for_leaders(); + cp_ctx->check_cycle(); +#endif + cp_ctx->prepare_flush_iteration(); for (auto& fiber : m_cp_flush_fibers) { @@ -219,31 +242,33 @@ std::unique_ptr< CPContext > IndexWBCache::create_cp_context(cp_id_t cp_id) { m_free_blkid_list[cp_id_slot].get()); } -void IndexWBCache::do_flush_one_buf(IndexCPContext* cp_ctx, const IndexBufferPtr& buf, bool part_of_batch) { - LOGTRACEMOD(wbcache, "buf {}", buf->to_string()); - buf->m_buf_state = index_buf_state_t::FLUSHING; +void IndexWBCache::do_flush_one_buf(IndexCPContext* cp_ctx, IndexBufferPtr buf, bool part_of_batch) { + LOGTRACEMOD(wbcache, "cp {} buf {}", cp_ctx->id(), buf->to_string()); + { buf->set_state(index_buf_state_t::FLUSHING); } m_vdev->async_write(r_cast< const char* >(buf->raw_buffer()), m_node_size, buf->m_blkid, part_of_batch) - .thenValue([pbuf = buf.get(), cp_ctx](auto) { + .thenValue([buf, cp_ctx](auto) { auto& pthis = s_cast< IndexWBCache& >(wb_cache()); // Avoiding more than 16 bytes capture - pthis.process_write_completion(cp_ctx, pbuf); + pthis.process_write_completion(cp_ctx, buf); }); if (!part_of_batch) { m_vdev->submit_batch(); } } -void IndexWBCache::process_write_completion(IndexCPContext* cp_ctx, IndexBuffer* pbuf) { - LOGTRACEMOD(wbcache, "buf {}", pbuf->to_string()); +void IndexWBCache::process_write_completion(IndexCPContext* cp_ctx, IndexBufferPtr buf) { + LOGTRACEMOD(wbcache, "cp {} buf {}", cp_ctx->id(), buf->to_string()); resource_mgr().dec_dirty_buf_size(m_node_size); - auto [next_buf, has_more] = on_buf_flush_done(cp_ctx, pbuf); + auto [next_buf, has_more] = on_buf_flush_done(cp_ctx, buf); if (next_buf) { do_flush_one_buf(cp_ctx, next_buf, false); } else if (!has_more) { // We are done flushing the buffers, lets free the btree blocks and then flush the bitmap free_btree_blks_and_flush(cp_ctx); + } else { + } } -std::pair< IndexBufferPtr, bool > IndexWBCache::on_buf_flush_done(IndexCPContext* cp_ctx, IndexBuffer* buf) { +std::pair< IndexBufferPtr, bool > IndexWBCache::on_buf_flush_done(IndexCPContext* cp_ctx, IndexBufferPtr buf) { if (m_cp_flush_fibers.size() > 1) { std::unique_lock lg(m_flush_mtx); return on_buf_flush_done_internal(cp_ctx, buf); @@ -252,9 +277,9 @@ std::pair< IndexBufferPtr, bool > IndexWBCache::on_buf_flush_done(IndexCPContext } } -std::pair< IndexBufferPtr, bool > IndexWBCache::on_buf_flush_done_internal(IndexCPContext* cp_ctx, IndexBuffer* buf) { +std::pair< IndexBufferPtr, bool > IndexWBCache::on_buf_flush_done_internal(IndexCPContext* cp_ctx, IndexBufferPtr buf) { static thread_local std::vector< IndexBufferPtr > t_buf_list; - buf->m_buf_state = index_buf_state_t::CLEAN; + { buf->set_state(index_buf_state_t::CLEAN); } t_buf_list.clear(); @@ -275,10 +300,9 @@ void IndexWBCache::get_next_bufs(IndexCPContext* cp_ctx, uint32_t max_count, std } } -void IndexWBCache::get_next_bufs_internal(IndexCPContext* cp_ctx, uint32_t max_count, IndexBuffer* prev_flushed_buf, +void IndexWBCache::get_next_bufs_internal(IndexCPContext* cp_ctx, uint32_t max_count, IndexBufferPtr prev_flushed_buf, std::vector< IndexBufferPtr >& bufs) { uint32_t count{0}; - // First attempt to execute any follower buffer flush if (prev_flushed_buf) { auto next_buffer = prev_flushed_buf->m_next_buffer.lock(); @@ -310,7 +334,7 @@ void IndexWBCache::free_btree_blks_and_flush(IndexCPContext* cp_ctx) { // Pick a CP Manager blocking IO fiber to execute the cp flush of vdev iomanager.run_on_forget(hs()->cp_mgr().pick_blocking_io_fiber(), [this, cp_ctx]() { - LOGTRACEMOD(wbcache, "Initiating CP flush"); + LOGTRACEMOD(wbcache, "Initiating vdev cp flush"); m_vdev->cp_flush(nullptr); // This is a blocking io call cp_ctx->complete(true); }); diff --git a/src/lib/index/wb_cache.hpp b/src/lib/index/wb_cache.hpp index ad5255be9..a2db3f99a 100644 --- a/src/lib/index/wb_cache.hpp +++ b/src/lib/index/wb_cache.hpp @@ -53,23 +53,23 @@ class IndexWBCache : public IndexWBCacheBase { void write_buf(const BtreeNodePtr& node, const IndexBufferPtr& buf, CPContext* cp_ctx) override; void read_buf(bnodeid_t id, BtreeNodePtr& node, node_initializer_t&& node_initializer) override; std::tuple< bool, bool > create_chain(IndexBufferPtr& second, IndexBufferPtr& third, CPContext* cp_ctx) override; - void prepend_to_chain(const IndexBufferPtr& first, const IndexBufferPtr& second) override; + void prepend_to_chain(const IndexBufferPtr& first, const IndexBufferPtr& second, CPContext* cp_ctx) override; void free_buf(const IndexBufferPtr& buf, CPContext* cp_ctx) override; //////////////////// CP Related API section ///////////////////////////////// folly::Future< bool > async_cp_flush(CPContext* context); std::unique_ptr< CPContext > create_cp_context(cp_id_t cp_id); - IndexBufferPtr copy_buffer(const IndexBufferPtr& cur_buf) const; + IndexBufferPtr copy_buffer(const IndexBufferPtr& cur_buf, CPContext *cp_ctx) const; private: void start_flush_threads(); - void process_write_completion(IndexCPContext* cp_ctx, IndexBuffer* pbuf); - void do_flush_one_buf(IndexCPContext* cp_ctx, const IndexBufferPtr& buf, bool part_of_batch); - std::pair< IndexBufferPtr, bool > on_buf_flush_done(IndexCPContext* cp_ctx, IndexBuffer* buf); - std::pair< IndexBufferPtr, bool > on_buf_flush_done_internal(IndexCPContext* cp_ctx, IndexBuffer* buf); + void process_write_completion(IndexCPContext* cp_ctx, IndexBufferPtr pbuf); + void do_flush_one_buf(IndexCPContext* cp_ctx, const IndexBufferPtr buf, bool part_of_batch); + std::pair< IndexBufferPtr, bool > on_buf_flush_done(IndexCPContext* cp_ctx, IndexBufferPtr buf); + std::pair< IndexBufferPtr, bool > on_buf_flush_done_internal(IndexCPContext* cp_ctx, IndexBufferPtr buf); void get_next_bufs(IndexCPContext* cp_ctx, uint32_t max_count, std::vector< IndexBufferPtr >& bufs); - void get_next_bufs_internal(IndexCPContext* cp_ctx, uint32_t max_count, IndexBuffer* prev_flushed_buf, + void get_next_bufs_internal(IndexCPContext* cp_ctx, uint32_t max_count, IndexBufferPtr prev_flushed_buf, std::vector< IndexBufferPtr >& bufs); void free_btree_blks_and_flush(IndexCPContext* cp_ctx); diff --git a/src/tests/btree_helpers/btree_test_helper.hpp b/src/tests/btree_helpers/btree_test_helper.hpp index 3bc943fc0..62cf24fa2 100644 --- a/src/tests/btree_helpers/btree_test_helper.hpp +++ b/src/tests/btree_helpers/btree_test_helper.hpp @@ -70,6 +70,7 @@ struct BtreeTestHelper : public testing::Test { protected: std::shared_ptr< typename T::BtreeType > m_bt; ShadowMap< K, V > m_shadow_map; + std::mutex m_shadow_map_mutex; BtreeConfig m_cfg{g_node_size}; RangeScheduler m_range_scheduler; uint32_t m_max_range_input{1000}; @@ -130,6 +131,7 @@ struct BtreeTestHelper : public testing::Test { preq.enable_route_tracing(); ASSERT_EQ(m_bt->put(preq), btree_status_t::success) << "range_put failed for " << start_k << "-" << end_k; + std::lock_guard lock{m_shadow_map_mutex}; if (update) { m_shadow_map.range_update(start_key, nkeys, value); m_range_scheduler.remove_keys_from_working(start_k, end_k); @@ -160,6 +162,7 @@ struct BtreeTestHelper : public testing::Test { auto rreq = BtreeSingleRemoveRequest{pk.get(), existing_v.get()}; bool removed = (m_bt->remove(rreq) == btree_status_t::success); + std::lock_guard lock{m_shadow_map_mutex}; ASSERT_EQ(removed, m_shadow_map.exists(*pk)) << "Removal of key " << pk->key() << " status doesn't match with shadow"; @@ -179,6 +182,7 @@ struct BtreeTestHelper : public testing::Test { } void range_remove_existing(uint32_t start_k, uint32_t count) { + std::lock_guard lock{m_shadow_map_mutex}; auto [start_key, end_key] = m_shadow_map.pick_existing_range(K{start_k}, count); do_range_remove(start_k, end_key.key(), true /* removing_all_existing */); } @@ -187,10 +191,12 @@ struct BtreeTestHelper : public testing::Test { static std::uniform_int_distribution< uint32_t > s_rand_range_generator{2, 5}; auto const [start_k, end_k] = m_range_scheduler.pick_random_existing_keys(s_rand_range_generator(m_re)); + std::lock_guard lock{m_shadow_map_mutex}; do_range_remove(start_k, end_k, true /* only_existing */); } void range_remove_any(uint32_t start_k, uint32_t end_k) { + std::lock_guard lock{m_shadow_map_mutex}; do_range_remove(start_k, end_k, false /* removing_all_existing */); } @@ -202,6 +208,7 @@ struct BtreeTestHelper : public testing::Test { } void do_query(uint32_t start_k, uint32_t end_k, uint32_t batch_size) { + std::lock_guard lock{m_shadow_map_mutex}; std::vector< std::pair< K, V > > out_vector; uint32_t remaining = m_shadow_map.num_elems_in_range(start_k, end_k); auto it = m_shadow_map.map_const().lower_bound(K{start_k}); @@ -248,6 +255,7 @@ struct BtreeTestHelper : public testing::Test { ////////////////////// All get operation variants /////////////////////////////// void get_all() const { + std::lock_guard lock{m_shadow_map_mutex}; for (const auto& [key, value] : m_shadow_map.map_const()) { auto copy_key = std::make_unique< K >(); *copy_key = key; @@ -267,6 +275,7 @@ struct BtreeTestHelper : public testing::Test { auto req = BtreeSingleGetRequest{pk.get(), out_v.get()}; const auto status = m_bt->get(req); + std::lock_guard lock{m_shadow_map_mutex}; if (status == btree_status_t::success) { m_shadow_map.validate_data(req.key(), (const V&)req.value()); } else { @@ -280,6 +289,8 @@ struct BtreeTestHelper : public testing::Test { auto req = BtreeGetAnyRequest< K >{BtreeKeyRange< K >{K{start_k}, true, K{end_k}, true}, out_k.get(), out_v.get()}; const auto status = m_bt->get(req); + + std::lock_guard lock{m_shadow_map_mutex}; if (status == btree_status_t::success) { ASSERT_EQ(m_shadow_map.exists_in_range(*(K*)req.m_outkey, start_k, end_k), true) << "Get Any returned key=" << *(K*)req.m_outkey << " which is not in range " << start_k << "-" << end_k @@ -303,14 +314,32 @@ struct BtreeTestHelper : public testing::Test { void print_keys() const { m_bt->print_tree_keys(); } void compare_files(const std::string& before, const std::string& after) { - std::ifstream b(before); - std::ifstream a(after); - std::ostringstream ss_before, ss_after; - ss_before << b.rdbuf(); - ss_after << a.rdbuf(); - std::string s1 = ss_before.str(); - std::string s2 = ss_after.str(); - ASSERT_EQ(s1, s2) << "Mismatch in btree structure"; + std::ifstream b(before, std::ifstream::ate); + std::ifstream a(after, std::ifstream::ate); + if (a.fail() || b.fail()) { + LOGINFO("Failed to open file"); + assert(false); + } + if (a.tellg() != b.tellg()) { + LOGINFO("Mismatch in btree files"); + assert(false); + } + + int64_t pending = a.tellg(); + const int64_t batch_size = 4096; + a.seekg(0, ifstream::beg); + b.seekg(0, ifstream::beg); + char a_buffer[batch_size], b_buffer[batch_size]; + while (pending > 0) { + auto count = std::min(pending, batch_size); + a.read(a_buffer, count); + b.read(b_buffer, count); + if (std::memcmp(a_buffer, b_buffer, count) != 0) { + LOGINFO("Mismatch in btree files"); + assert(false); + } + pending -= count; + } } private: @@ -320,6 +349,7 @@ struct BtreeTestHelper : public testing::Test { auto sreq = BtreeSinglePutRequest{&key, &value, put_type, existing_v.get()}; bool done = (m_bt->put(sreq) == btree_status_t::success); + std::lock_guard lock{m_shadow_map_mutex}; if (put_type == btree_put_type::INSERT) { ASSERT_EQ(done, !m_shadow_map.exists(key)); } else { @@ -336,6 +366,8 @@ struct BtreeTestHelper : public testing::Test { auto rreq = BtreeRangeRemoveRequest< K >{BtreeKeyRange< K >{start_key, true, end_key, true}}; auto const ret = m_bt->remove(rreq); + + std::lock_guard lock{m_shadow_map_mutex}; m_shadow_map.range_erase(start_key, end_key); if (all_existing) { @@ -380,4 +412,4 @@ struct BtreeTestHelper : public testing::Test { } LOGINFO("ALL parallel jobs joined"); } -}; \ No newline at end of file +}; diff --git a/src/tests/test_index_btree.cpp b/src/tests/test_index_btree.cpp index 14062cd1f..4a73e629f 100644 --- a/src/tests/test_index_btree.cpp +++ b/src/tests/test_index_btree.cpp @@ -42,13 +42,15 @@ SISL_OPTIONS_ENABLE(logging, test_index_btree, iomgr, test_common_setup) SISL_LOGGING_DECL(test_index_btree) std::vector< std::string > test_common::HSTestHelper::s_dev_names; -// TODO increase num_entries to 65k as io mgr page size is 512 and its slow. + +// TODO Add tests to do write,remove after recovery. +// TODO Test with var len key with io mgr page size is 512. SISL_OPTION_GROUP(test_index_btree, (num_iters, "", "num_iters", "number of iterations for rand ops", ::cxxopts::value< uint32_t >()->default_value("1500"), "number"), (num_entries, "", "num_entries", "number of entries to test with", - ::cxxopts::value< uint32_t >()->default_value("15000"), "number"), + ::cxxopts::value< uint32_t >()->default_value("65000"), "number"), (seed, "", "seed", "random engine seed, use random if not defined", ::cxxopts::value< uint64_t >()->default_value("0"), "number")) @@ -159,12 +161,9 @@ struct BtreeTest : public BtreeTestHelper< TestType > { } }; -// TODO sanal fix the varkey issue. -// using BtreeTypes = testing::Types< FixedLenBtreeTest, VarKeySizeBtreeTest, VarValueSizeBtreeTest, -// VarObjSizeBtreeTest -// >; - -using BtreeTypes = testing::Types< FixedLenBtreeTest >; +using BtreeTypes = testing::Types< FixedLenBtreeTest, VarKeySizeBtreeTest, VarValueSizeBtreeTest, +VarObjSizeBtreeTest +>; TYPED_TEST_SUITE(BtreeTest, BtreeTypes); @@ -225,7 +224,6 @@ TYPED_TEST(BtreeTest, RandomInsert) { this->get_all(); } -#if 0 TYPED_TEST(BtreeTest, SequentialRemove) { LOGINFO("SequentialRemove test start"); // Forward sequential insert @@ -280,7 +278,6 @@ TYPED_TEST(BtreeTest, RandomRemove) { } this->get_all(); } -#endif TYPED_TEST(BtreeTest, RangeUpdate) { LOGINFO("RangeUpdate test start"); @@ -309,23 +306,29 @@ TYPED_TEST(BtreeTest, CpFlush) { for (uint32_t i = 0; i < num_entries; ++i) { this->put(i, btree_put_type::INSERT); } + + // Remove some of the entries. + for (uint32_t i = 0; i < num_entries; i += 10) { + this->remove_one(i); + } + LOGINFO("Query {} entries and validate with pagination of 75 entries", num_entries / 2); this->do_query(0, num_entries / 2 - 1, 75); - this->print(std::string("before.txt")); - LOGINFO("Trigger checkpoint flush."); test_common::HSTestHelper::trigger_cp(true /* wait */); LOGINFO("Query {} entries and validate with pagination of 75 entries", num_entries); this->do_query(0, num_entries - 1, 75); + this->print(std::string("before.txt")); + this->destroy_btree(); // Restart homestore. m_bt is updated by the TestIndexServiceCallback. this->restart_homestore(); - std::this_thread::sleep_for(std::chrono::seconds{3}); + std::this_thread::sleep_for(std::chrono::seconds{1}); LOGINFO("Restarted homestore with index recovered"); this->print(std::string("after.txt")); @@ -373,7 +376,7 @@ TYPED_TEST(BtreeTest, MultipleCpFlush) { // Restart homestore. m_bt is updated by the TestIndexServiceCallback. this->restart_homestore(); - std::this_thread::sleep_for(std::chrono::seconds{3}); + std::this_thread::sleep_for(std::chrono::seconds{1}); LOGINFO(" Restarted homestore with index recovered"); this->print(std::string("after.txt")); @@ -388,24 +391,41 @@ TYPED_TEST(BtreeTest, ThreadedCpFlush) { LOGINFO("ThreadedCpFlush test start"); const auto num_entries = SISL_OPTIONS["num_entries"].as< uint32_t >(); - bool stop_cp_flush = false; - auto io_thread = std::thread([this, num_entries] { + bool stop = false; + std::atomic< uint32_t > last_index{0}; + auto insert_io_thread = std::thread([this, num_entries, &last_index] { LOGINFO("Do Forward sequential insert for {} entries", num_entries); + uint32_t j = 0; for (uint32_t i = 0; i < num_entries; ++i) { this->put(i, btree_put_type::INSERT); + last_index = i; } }); - auto cp_flush_thread = std::thread([this, &stop_cp_flush] { - while (!stop_cp_flush) { - LOGINFO("Trigger checkpoint flush wait=false."); - test_common::HSTestHelper::trigger_cp(false /* wait */); + auto remove_io_thread = std::thread([this, &stop, num_entries, &last_index] { + LOGINFO("Do random removes for {} entries", num_entries); + while (!stop) { + std::this_thread::sleep_for(std::chrono::milliseconds{10}); + // Remove a random entry. + std::uniform_int_distribution< uint32_t > rand{0, last_index.load()}; + auto rm_idx = rand(g_re); + LOGINFO("Removing entry {}", rm_idx); + this->remove_one(rm_idx); + } + }); + + auto cp_flush_thread = std::thread([this, &stop] { + while (!stop) { std::this_thread::sleep_for(std::chrono::seconds{1}); + LOGINFO("Trigger checkpoint flush wait=true."); + test_common::HSTestHelper::trigger_cp(true /* wait */); + LOGINFO("Trigger checkpoint flush wait=true done."); } }); - io_thread.join(); - stop_cp_flush = true; + insert_io_thread.join(); + stop = true; + remove_io_thread.join(); cp_flush_thread.join(); LOGINFO("Trigger checkpoint flush wait=true."); @@ -420,7 +440,7 @@ TYPED_TEST(BtreeTest, ThreadedCpFlush) { // Restart homestore. m_bt is updated by the TestIndexServiceCallback. this->restart_homestore(); - std::this_thread::sleep_for(std::chrono::seconds{3}); + std::this_thread::sleep_for(std::chrono::seconds{1}); LOGINFO(" Restarted homestore with index recovered"); this->print(std::string("after.txt"));