diff --git a/conanfile.py b/conanfile.py index 6f4e8d36c..9a2532415 100644 --- a/conanfile.py +++ b/conanfile.py @@ -9,7 +9,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "6.5.23" + version = "6.5.24" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" diff --git a/src/include/homestore/index/index_internal.hpp b/src/include/homestore/index/index_internal.hpp index fea20dbd6..989e650c4 100644 --- a/src/include/homestore/index/index_internal.hpp +++ b/src/include/homestore/index/index_internal.hpp @@ -73,6 +73,7 @@ class IndexTableBase { virtual uint64_t used_size() const = 0; virtual void destroy() = 0; virtual void repair_node(IndexBufferPtr const& buf) = 0; + virtual void repair_root_node(IndexBufferPtr const& buf) = 0; }; enum class index_buf_state_t : uint8_t { @@ -97,7 +98,7 @@ struct IndexBuffer : public sisl::ObjLifeCounter< IndexBuffer > { sisl::atomic_counter< int > m_wait_for_down_buffers{0}; // Number of children need to wait for before persisting #ifndef NDEBUG // Down buffers are not mandatory members, but only to keep track of any bugs and asserts - std::vector > m_down_buffers; + std::vector< std::weak_ptr< IndexBuffer > > m_down_buffers; std::mutex m_down_buffers_mtx; std::shared_ptr< IndexBuffer > m_prev_up_buffer; // Keep a copy for debugging #endif @@ -125,11 +126,11 @@ struct IndexBuffer : public sisl::ObjLifeCounter< IndexBuffer > { std::string to_string() const; std::string to_string_dot() const; - void add_down_buffer(const IndexBufferPtr &buf); + void add_down_buffer(const IndexBufferPtr& buf); - void remove_down_buffer(const IndexBufferPtr &buf); + void remove_down_buffer(const IndexBufferPtr& buf); #ifndef NDEBUG - bool is_in_down_buffers(const IndexBufferPtr &buf); + bool is_in_down_buffers(const IndexBufferPtr& buf); #endif }; diff --git a/src/include/homestore/index/index_table.hpp b/src/include/homestore/index/index_table.hpp index 94b8685a3..83411b5c0 100644 --- a/src/include/homestore/index/index_table.hpp +++ b/src/include/homestore/index/index_table.hpp @@ -79,7 +79,7 @@ class IndexTable : public IndexTableBase, public Btree< K, V > { void destroy() override { auto cpg = cp_mgr().cp_guard(); - Btree::destroy_btree(cpg.context(cp_consumer_t::INDEX_SVC)); + Btree< K, V >::destroy_btree(cpg.context(cp_consumer_t::INDEX_SVC)); m_sb.destroy(); } @@ -114,11 +114,40 @@ class IndexTable : public IndexTableBase, public Btree< K, V > { return ret; } + void repair_root_node(IndexBufferPtr const& idx_buf) override { + LOGTRACEMOD(wbcache, "check if this was the previous root node {} for buf {} ", m_sb->root_node, + idx_buf->to_string()); + if (m_sb->root_node == idx_buf->blkid().to_integer()) { + // This is the root node, we need to update the root node in superblk + LOGTRACEMOD(wbcache, "{} is old root so we need to update the meta node ", idx_buf->to_string()); + BtreeNode* n = this->init_node(idx_buf->raw_buffer(), idx_buf->blkid().to_integer(), false /* init_buf */, + BtreeNode::identify_leaf_node(idx_buf->raw_buffer())); + static_cast< IndexBtreeNode* >(n)->attach_buf(idx_buf); + auto edge_id = n->next_bnode(); + + BT_DBG_ASSERT(!n->has_valid_edge(), + "root {} already has a valid edge {}, so we should have found the new root node", + n->to_string(), n->get_edge_value().bnode_id()); + n->set_next_bnode(empty_bnodeid); + n->set_edge_value(BtreeLinkInfo{edge_id, 0}); + LOGTRACEMOD(wbcache, "change root node {}: edge updated to {} and invalidate the next node! ", n->node_id(), + edge_id); + auto cpg = cp_mgr().cp_guard(); + write_node_impl(n, (void*)cpg.context(cp_consumer_t::INDEX_SVC)); + + } else { + LOGTRACEMOD(wbcache, "This is not the root node, so we can ignore this repair call for buf {}", + idx_buf->to_string()); + } + } + void repair_node(IndexBufferPtr const& idx_buf) override { if (idx_buf->is_meta_buf()) { // We cannot repair the meta buf on its own, we need to repair the root node which modifies the // meta_buf. It is ok to ignore this call, because repair will be done from root before meta_buf is // attempted to repair, which would have updated the meta_buf already. + LOGTRACEMOD(wbcache, "Ignoring repair on meta buf {} root id {} ", idx_buf->to_string(), + this->root_node_id()); return; } BtreeNode* n = this->init_node(idx_buf->raw_buffer(), idx_buf->blkid().to_integer(), false /* init_buf */, @@ -134,13 +163,14 @@ class IndexTable : public IndexTableBase, public Btree< K, V > { // Only for interior nodes we need to repair its links if (!bn->is_leaf()) { LOGTRACEMOD(wbcache, "repair_node cp={} buf={}", cpg->id(), idx_buf->to_string()); - repair_links(bn, (void *) cpg.context(cp_consumer_t::INDEX_SVC)); + repair_links(bn, (void*)cpg.context(cp_consumer_t::INDEX_SVC)); } if (idx_buf->m_up_buffer && idx_buf->m_up_buffer->is_meta_buf()) { // Our up buffer is a meta buffer, which means that we are the new root node, we need to update the // meta_buf with new root as well - on_root_changed(bn, (void *) cpg.context(cp_consumer_t::INDEX_SVC)); + LOGTRACEMOD(wbcache, "root change for after repairing {}\n\n", idx_buf->to_string()); + on_root_changed(bn, (void*)cpg.context(cp_consumer_t::INDEX_SVC)); } } @@ -227,10 +257,11 @@ class IndexTable : public IndexTableBase, public Btree< K, V > { wb_cache().free_buf(n->m_idx_buf, r_cast< CPContext* >(context)); } - btree_status_t - on_root_changed(BtreeNodePtr const &new_root, void *context) override { + btree_status_t on_root_changed(BtreeNodePtr const& new_root, void* context) override { // todo: if(m_sb->root_node == new_root->node_id() && m_sb->root_link_version == new_root->link_version()){ // return btree_status_t::success;} + LOGTRACEMOD(wbcache, "root changed for index old_root={} new_root={}", m_sb->root_node, + new_root->node_id()); m_sb->root_node = new_root->node_id(); m_sb->root_link_version = new_root->link_version(); @@ -240,7 +271,7 @@ class IndexTable : public IndexTableBase, public Btree< K, V > { } auto& root_buf = static_cast< IndexBtreeNode* >(new_root.get())->m_idx_buf; - wb_cache().transact_bufs(ordinal(), m_sb_buffer, root_buf, {}, {}, r_cast(context)); + wb_cache().transact_bufs(ordinal(), m_sb_buffer, root_buf, {}, {}, r_cast< CPContext* >(context)); return btree_status_t::success; } @@ -257,7 +288,7 @@ class IndexTable : public IndexTableBase, public Btree< K, V > { } // Get all original child ids as a support to check if we are beyond the last child node - std::set orig_child_ids; + std::set< bnodeid_t > orig_child_ids; for (uint32_t i = 0; i < parent_node->total_entries(); ++i) { BtreeLinkInfo link_info; parent_node->get_nth_value(i, &link_info, true); @@ -391,9 +422,7 @@ class IndexTable : public IndexTableBase, public Btree< K, V > { } } while (true); - if (child_node) { - this->unlock_node(child_node, locktype_t::READ); - } + if (child_node) { this->unlock_node(child_node, locktype_t::READ); } if (parent_node->total_entries() == 0 && !parent_node->has_valid_edge()) { // We shouldn't have an empty interior node in the tree, let's delete it. diff --git a/src/include/homestore/index_service.hpp b/src/include/homestore/index_service.hpp index c8801c9d2..87ad63672 100644 --- a/src/include/homestore/index_service.hpp +++ b/src/include/homestore/index_service.hpp @@ -82,6 +82,7 @@ class IndexService { uint64_t used_size() const; uint32_t node_size() const; void repair_index_node(uint32_t ordinal, IndexBufferPtr const& node_buf); + void update_root(uint32_t ordinal, IndexBufferPtr const& node_buf); IndexWBCacheBase& wb_cache() { if (!m_wb_cache) { throw std::runtime_error("Attempted to access a null pointer wb_cache"); } diff --git a/src/lib/index/index_cp.cpp b/src/lib/index/index_cp.cpp index 578fae997..122667726 100644 --- a/src/lib/index/index_cp.cpp +++ b/src/lib/index/index_cp.cpp @@ -145,7 +145,7 @@ void IndexCPContext::to_string_dot(const std::string& filename) { LOGINFO("cp dag is stored in file {}", filename); } -uint16_t IndexCPContext::num_dags() { +uint16_t IndexCPContext::num_dags() { // count number of buffers whose up_buffers are nullptr uint16_t count = 0; std::unique_lock lg{m_flush_buffer_mtx}; @@ -190,15 +190,18 @@ std::string IndexCPContext::to_string_with_dags() { // Now walk through the list of graphs and prepare formatted string std::string str{fmt::format("IndexCPContext cpid={} dirty_buf_count={} dirty_buf_list_size={} #_of_dags={}\n", m_cp->id(), m_dirty_buf_count.get(), m_dirty_buf_list.size(), group_roots.size())}; + int cnt = 1; for (const auto& root : group_roots) { - std::vector< std::pair< std::shared_ptr< DagNode >, int > > stack; - stack.emplace_back(root, 0); + std::vector< std::tuple< std::shared_ptr< DagNode >, int, int > > stack; + stack.emplace_back(root, 0, cnt++); while (!stack.empty()) { - auto [node, level] = stack.back(); + auto [node, level, index] = stack.back(); stack.pop_back(); - fmt::format_to(std::back_inserter(str), "{}{} \n", std::string(level * 4, ' '), node->buf->to_string()); + fmt::format_to(std::back_inserter(str), "{}{}-{} \n", std::string(level * 4, ' '), index, + node->buf->to_string()); + int c = node->down_nodes.size(); for (const auto& d : node->down_nodes) { - stack.emplace_back(d, level + 1); + stack.emplace_back(d, level + 1, c--); } } } @@ -266,15 +269,11 @@ void IndexCPContext::process_txn_record(txn_record const* rec, std::map< BlkId, #ifndef NDEBUG // if (!is_sibling_link || (buf->m_up_buffer == real_up_buf)) { return buf;} // Already linked with same buf or its not a sibling link to override - if (real_up_buf->is_in_down_buffers(buf)) { - return buf; - } + if (real_up_buf->is_in_down_buffers(buf)) { return buf; } #endif if (buf->m_up_buffer != real_up_buf) { - if (buf->m_up_buffer) { - buf->m_up_buffer->remove_down_buffer(buf); - } + if (buf->m_up_buffer) { buf->m_up_buffer->remove_down_buffer(buf); } real_up_buf->add_down_buffer(buf); buf->m_up_buffer = real_up_buf; } diff --git a/src/lib/index/index_service.cpp b/src/lib/index/index_service.cpp index 49755a4ef..73b96b064 100644 --- a/src/lib/index/index_service.cpp +++ b/src/lib/index/index_service.cpp @@ -132,6 +132,15 @@ void IndexService::repair_index_node(uint32_t ordinal, IndexBufferPtr const& nod } } +void IndexService::update_root(uint32_t ordinal, IndexBufferPtr const& node_buf) { + auto tbl = get_index_table(ordinal); + if (tbl) { + tbl->repair_root_node(node_buf); + } else { + HS_DBG_ASSERT(false, "Index corresponding to ordinal={} has not been loaded yet, unexpected", ordinal); + } +} + uint32_t IndexService::node_size() const { return m_vdev->atomic_page_size(); } uint64_t IndexService::used_size() const { @@ -154,31 +163,39 @@ IndexBuffer::~IndexBuffer() { } std::string IndexBuffer::to_string() const { - if (m_is_meta_buf) { - return fmt::format("Buf={} [Meta] index={} state={} create/dirty_cp={}/{} down_wait#={} freed={}", - voidptr_cast(const_cast< IndexBuffer* >(this)), m_index_ordinal, int_cast(state()), - m_created_cp_id, m_dirtied_cp_id, m_wait_for_down_buffers.get(), m_node_freed); - } else { - // store m_down_buffers in a string - std::string down_bufs = ""; + static std::vector< std::string > state_str = {"CLEAN", "DIRTY", "FLUSHING"}; + // store m_down_buffers in a string + std::string down_bufs = ""; #ifndef NDEBUG - { - std::lock_guard lg(m_down_buffers_mtx); - for (auto const &down_buf: m_down_buffers) { + { + std::lock_guard lg(m_down_buffers_mtx); + if (m_down_buffers.empty()) { + fmt::format_to(std::back_inserter(down_bufs), "EMPTY"); + } else { + for (auto const& down_buf : m_down_buffers) { if (auto ptr = down_buf.lock()) { fmt::format_to(std::back_inserter(down_bufs), "[{}]", voidptr_cast(ptr.get())); } } + fmt::format_to(std::back_inserter(down_bufs), " #down bufs={}", m_down_buffers.size()); } + } #endif - return fmt::format("Buf={} index={} state={} create/dirty_cp={}/{} down_wait#={}{} up={} node=[{}] down=[{}]", - voidptr_cast(const_cast< IndexBuffer* >(this)), m_index_ordinal, int_cast(state()), - m_created_cp_id, m_dirtied_cp_id, m_wait_for_down_buffers.get(), - m_node_freed ? " Freed" : "", voidptr_cast(const_cast< IndexBuffer* >(m_up_buffer.get())), - (m_bytes == nullptr) ? "not attached yet" - : r_cast< persistent_hdr_t const* >(m_bytes)->to_compact_string(), - down_bufs); + if (m_is_meta_buf) { + return fmt::format("[Meta] Buf={} index={} state={} create/dirty_cp={}/{} down_wait#={}{} down={{{}}}", + voidptr_cast(const_cast< IndexBuffer* >(this)), m_index_ordinal, + state_str[int_cast(state())], m_created_cp_id, m_dirtied_cp_id, + m_wait_for_down_buffers.get(), m_node_freed ? " Freed" : "", down_bufs); + } else { + + return fmt::format( + "Buf={} index={} state={} create/dirty_cp={}/{} down_wait#={}{} up={} node=[{}] down={{{}}}", + voidptr_cast(const_cast< IndexBuffer* >(this)), m_index_ordinal, state_str[int_cast(state())], + m_created_cp_id, m_dirtied_cp_id, m_wait_for_down_buffers.get(), m_node_freed ? " Freed" : "", + voidptr_cast(const_cast< IndexBuffer* >(m_up_buffer.get())), + (m_bytes == nullptr) ? "not attached yet" : r_cast< persistent_hdr_t const* >(m_bytes)->to_compact_string(), + down_bufs); } } @@ -194,7 +211,7 @@ std::string IndexBuffer::to_string_dot() const { return str; } -void IndexBuffer::add_down_buffer(const IndexBufferPtr &buf) { +void IndexBuffer::add_down_buffer(const IndexBufferPtr& buf) { m_wait_for_down_buffers.increment(); #ifndef NDEBUG { @@ -204,10 +221,11 @@ void IndexBuffer::add_down_buffer(const IndexBufferPtr &buf) { #endif } -void IndexBuffer::remove_down_buffer(const IndexBufferPtr &buf) { +void IndexBuffer::remove_down_buffer(const IndexBufferPtr& buf) { m_wait_for_down_buffers.decrement(); #ifndef NDEBUG - bool found{false}; { + bool found{false}; + { std::lock_guard lg(m_down_buffers_mtx); for (auto it = buf->m_up_buffer->m_down_buffers.begin(); it != buf->m_up_buffer->m_down_buffers.end(); ++it) { if (it->lock() == buf) { @@ -222,12 +240,10 @@ void IndexBuffer::remove_down_buffer(const IndexBufferPtr &buf) { } #ifndef NDEBUG -bool IndexBuffer::is_in_down_buffers(const IndexBufferPtr &buf) { - std::lock_guard lg(m_down_buffers_mtx); - for (auto const &dbuf: m_down_buffers) { - if (dbuf.lock() == buf) { - return true; - } +bool IndexBuffer::is_in_down_buffers(const IndexBufferPtr& buf) { + std::lock_guard< std::mutex > lg(m_down_buffers_mtx); + for (auto const& dbuf : m_down_buffers) { + if (dbuf.lock() == buf) { return true; } } return false; } diff --git a/src/lib/index/wb_cache.cpp b/src/lib/index/wb_cache.cpp index 04383d8ac..cc45d18de 100644 --- a/src/lib/index/wb_cache.cpp +++ b/src/lib/index/wb_cache.cpp @@ -420,11 +420,11 @@ void IndexWBCache::free_buf(const IndexBufferPtr& buf, CPContext* cp_ctx) { } buf->m_node_freed = true; resource_mgr().inc_free_blk(m_node_size); - m_vdev->free_blk(buf->m_blkid, s_cast(cp_ctx)); + m_vdev->free_blk(buf->m_blkid, s_cast< VDevCPContext* >(cp_ctx)); } //////////////////// Recovery Related section ///////////////////////////////// -void IndexWBCache::load_buf(IndexBufferPtr const &buf) { +void IndexWBCache::load_buf(IndexBufferPtr const& buf) { if (buf->m_bytes == nullptr) { buf->m_bytes = hs_utils::iobuf_alloc(m_node_size, sisl::buftag::btree_node, m_vdev->align_size()); m_vdev->sync_read(r_cast< char* >(buf->m_bytes), m_node_size, buf->blkid()); @@ -432,6 +432,78 @@ void IndexWBCache::load_buf(IndexBufferPtr const &buf) { } } +struct DagNode { + IndexBufferPtr buffer; + std::vector< shared< DagNode > > children; +}; + +using DagPtr = std::shared_ptr< DagNode >; +using DagMap = std::map< IndexBufferPtr, DagPtr >; + +static DagMap generate_dag_buffers(std::map< BlkId, IndexBufferPtr >& bufmap) { + std::vector< IndexBufferPtr > bufs; + std::ranges::transform(bufmap, std::back_inserter(bufs), [](const auto& pair) { return pair.second; }); + + auto buildReverseMapping = [](const std::vector< IndexBufferPtr >& buffers) { + std::unordered_map< IndexBufferPtr, std::vector< IndexBufferPtr > > parentToChildren; + for (const auto& buffer : buffers) { + if (buffer->m_up_buffer) { parentToChildren[buffer->m_up_buffer].push_back(buffer); } + } + return parentToChildren; + }; + + std::function< DagPtr(IndexBufferPtr, std::unordered_map< IndexBufferPtr, std::vector< IndexBufferPtr > >&) > + buildDag; + buildDag = + [&buildDag](IndexBufferPtr buffer, + std::unordered_map< IndexBufferPtr, std::vector< IndexBufferPtr > >& parentToChildren) -> DagPtr { + auto dagNode = std::make_shared< DagNode >(); + dagNode->buffer = buffer; + if (parentToChildren.count(buffer)) { + for (const auto& child : parentToChildren[buffer]) { + dagNode->children.push_back(buildDag(child, parentToChildren)); + } + } + return dagNode; + }; + + auto generateDagMap = [&](const std::vector< IndexBufferPtr >& buffers) { + DagMap dagMap; + auto parentToChildren = buildReverseMapping(buffers); + for (const auto& buffer : buffers) { + if (!buffer->m_up_buffer) { // This is a root buffer + auto dagRoot = buildDag(buffer, parentToChildren); + dagMap[buffer] = dagRoot; + } + } + return dagMap; + }; + + return generateDagMap(bufs); +} + +static std::string to_string_dag_bufs(DagMap& dags, cp_id_t cp_id = 0) { + std::string str{fmt::format("#_of_dags={}\n", dags.size())}; + int cnt = 1; + for (const auto& [_, dag] : dags) { + std::vector< std::tuple< std::shared_ptr< DagNode >, int, int > > stack; + stack.emplace_back(dag, 0, cnt++); + while (!stack.empty()) { + auto [node, level, index] = stack.back(); + stack.pop_back(); + auto snew = node->buffer->m_created_cp_id == cp_id ? "NEW" : ""; + auto sfree = node->buffer->m_node_freed ? "FREED" : ""; + fmt::format_to(std::back_inserter(str), "{}{}-{} {} {}\n", std::string(level * 4, ' '), index, + node->buffer->to_string(), snew, sfree); + int c = node->children.size(); + for (const auto& d : node->children) { + stack.emplace_back(d, level + 1, c--); + } + } + } + return str; +} + void IndexWBCache::recover(sisl::byte_view sb) { // If sb is empty, its possible a first time boot. if ((sb.bytes() == nullptr) || (sb.size() == 0)) { @@ -452,9 +524,9 @@ void IndexWBCache::recover(sisl::byte_view sb) { #ifdef _PRERELEASE auto detailed_log = [this](std::map< BlkId, IndexBufferPtr > const& bufs, - std::vector const &pending_bufs) { + std::vector< IndexBufferPtr > const& pending_bufs) { std::string log = fmt::format("\trecovered bufs (#of bufs = {})\n", bufs.size()); - for (auto const &[_, buf]: bufs) { + for (auto const& [_, buf] : bufs) { load_buf(buf); fmt::format_to(std::back_inserter(log), "{}\n", buf->to_string()); } @@ -462,7 +534,7 @@ void IndexWBCache::recover(sisl::byte_view sb) { // list of new_bufs if (!pending_bufs.empty()) { fmt::format_to(std::back_inserter(log), "\n\tpending_bufs (#of bufs = {})\n", pending_bufs.size()); - for (auto const &buf: pending_bufs) { + for (auto const& buf : pending_bufs) { fmt::format_to(std::back_inserter(log), "{}\n", buf->to_string()); } } @@ -471,6 +543,8 @@ void IndexWBCache::recover(sisl::byte_view sb) { std::string log = fmt::format("Recovering bufs (#of bufs = {}) before processing them\n", bufs.size()); LOGTRACEMOD(wbcache, "{}\n{}", log, detailed_log(bufs, {})); + auto dags = generate_dag_buffers(bufs); + LOGTRACEMOD(wbcache, "Before recovery: {}", to_string_dag_bufs(dags, icp_ctx->id())); #endif // At this point, we have the DAG structure (up/down dependency graph), exactly the same as prior to crash, with one @@ -484,15 +558,15 @@ void IndexWBCache::recover(sisl::byte_view sb) { // the same blkid which could clash with the blkid next in the buf list. // // On the second pass, we only take part of the parents/siblings and then repair them, if needed. - std::vector pending_bufs; - std::vector deleted_bufs; - for (auto const &[_, buf]: bufs) { + std::vector< IndexBufferPtr > pending_bufs; + std::vector< IndexBufferPtr > deleted_bufs; + for (auto const& [_, buf] : bufs) { if (buf->m_node_freed) { // Freed node load_buf(buf); if (was_node_committed(buf)) { // Mark this buffer as deleted, so that we can avoid using it anymore when repairing its parent's link - r_cast(buf->m_bytes)->node_deleted = true; + r_cast< persistent_hdr_t* >(buf->m_bytes)->node_deleted = true; write_buf(nullptr, buf, icp_ctx); deleted_bufs.push_back(buf); pending_bufs.push_back(buf->m_up_buffer); @@ -508,14 +582,27 @@ void IndexWBCache::recover(sisl::byte_view sb) { } } else if (buf->m_created_cp_id == icp_ctx->id()) { // New node - if (was_node_committed(buf) && was_node_committed(buf->m_up_buffer)) { - // Both current and up buffer is commited, we can safely commit the current block - m_vdev->commit_blk(buf->m_blkid); - pending_bufs.push_back(buf->m_up_buffer); - } else { - // Just ignore it - buf->m_up_buffer->remove_down_buffer(buf); - buf->m_up_buffer = nullptr; + if (was_node_committed(buf)) { + if (was_node_committed(buf->m_up_buffer)) { + // Both current and up buffer is commited, we can safely commit the current block + m_vdev->commit_blk(buf->m_blkid); + pending_bufs.push_back(buf->m_up_buffer); + } else { + // Up buffer is not committed, we need to repair it first + buf->m_up_buffer->remove_down_buffer(buf); + // buf->m_up_buffer = nullptr; + if (buf->m_up_buffer->m_wait_for_down_buffers.testz()) { + // if up buffer has upbuffer, then we need to decrement its wait_for_down_buffers + auto grand_buf = buf->m_up_buffer->m_up_buffer; + if (grand_buf) { + HS_DBG_ASSERT(!grand_buf->m_wait_for_down_buffers.testz(), + "upbuffer of upbuffer is already zero"); + grand_buf->remove_down_buffer(buf->m_up_buffer); + LOGINFOMOD(wbcache, "Decrementing wait_for_down_buffers for up buffer of up buffer {}", + grand_buf->to_string()); + } + } + } } } } @@ -524,25 +611,48 @@ void IndexWBCache::recover(sisl::byte_view sb) { LOGINFOMOD(wbcache, "Index Recovery detected {} nodes out of {} as new/freed nodes to be recovered in prev cp={}", pending_bufs.size(), bufs.size(), icp_ctx->id()); LOGTRACEMOD(wbcache, "All unclean bufs list\n{}", detailed_log(bufs, pending_bufs)); + LOGTRACEMOD(wbcache, "After recovery: {}", to_string_dag_bufs(dags, icp_ctx->id())); #endif - for (auto const &buf: pending_bufs) { + for (auto const& buf : pending_bufs) { recover_buf(buf); - if (buf->m_bytes != nullptr && r_cast(buf->m_bytes)->node_deleted) { + if (buf->m_bytes != nullptr && r_cast< persistent_hdr_t* >(buf->m_bytes)->node_deleted) { // This buffer was marked as deleted during repair, so we also need to free it deleted_bufs.push_back(buf); } } - for (auto const &buf: deleted_bufs) { - m_vdev->free_blk(buf->m_blkid, s_cast(icp_ctx)); + for (auto const& buf : deleted_bufs) { + m_vdev->free_blk(buf->m_blkid, s_cast< VDevCPContext* >(icp_ctx)); } m_in_recovery = false; m_vdev->recovery_completed(); } -void IndexWBCache::recover_buf(IndexBufferPtr const &buf) { +void IndexWBCache::updateUpBufferCounters(std::vector< IndexBufferPtr >& l0_bufs) { + std::unordered_set< IndexBufferPtr > allBuffers; + + // First, collect all unique buffers and reset their counters + for (auto& leaf : l0_bufs) { + auto currentBuffer = leaf; + while (currentBuffer) { + if (allBuffers.insert(currentBuffer).second) { currentBuffer->m_wait_for_down_buffers.set(0); } + currentBuffer = currentBuffer->m_up_buffer; + } + } + + // Now, iterate over each leaf buffer and update the count for each parent up the chain + for (auto& leaf : l0_bufs) { + auto currentBuffer = leaf; + while (currentBuffer) { + if (currentBuffer->m_up_buffer) { currentBuffer->m_up_buffer->m_wait_for_down_buffers.increment(1); } + currentBuffer = currentBuffer->m_up_buffer; + } + } +} + +void IndexWBCache::recover_buf(IndexBufferPtr const& buf) { if (!buf->m_wait_for_down_buffers.decrement_testz()) { // TODO: remove the buf_>m_up_buffer from down_buffers list of buf->m_up_buffer return; @@ -557,6 +667,12 @@ void IndexWBCache::recover_buf(IndexBufferPtr const &buf) { } else { LOGTRACEMOD(wbcache, "Index Recovery detected up node [{}] as committed no need to repair that", buf->to_string()); + if (buf->m_up_buffer && buf->m_up_buffer->is_meta_buf()) { + // Our up buffer is a meta buffer, which means old root is dirtied and may need no repair but possible of + // new root on upper level so needs to be retore the edge + LOGTRACEMOD(wbcache, "check root change for without repairing {}\n\n", buf->to_string()); + index_service().update_root(buf->m_index_ordinal, buf); + } } if (buf->m_up_buffer) { recover_buf(buf->m_up_buffer); } @@ -656,10 +772,8 @@ void IndexWBCache::do_flush_one_buf(IndexCPContext* cp_ctx, IndexBufferPtr const if (buf->is_meta_buf()) { LOGTRACEMOD(wbcache, "Flushing cp {} meta buf {} possibly because of root split", cp_ctx->id(), buf->to_string()); - auto const &sb = r_cast(buf.get())->m_sb; - if (!sb.is_empty()) { - meta_service().update_sub_sb(buf->m_bytes, sb.size(), sb.meta_blk()); - } + auto const& sb = r_cast< MetaIndexBuffer* >(buf.get())->m_sb; + if (!sb.is_empty()) { meta_service().update_sub_sb(buf->m_bytes, sb.size(), sb.meta_blk()); } process_write_completion(cp_ctx, buf); } else if (buf->m_node_freed) { LOGTRACEMOD(wbcache, "Not flushing buf {} as it was freed, its here for merely dependency", cp_ctx->id(), @@ -667,15 +781,13 @@ void IndexWBCache::do_flush_one_buf(IndexCPContext* cp_ctx, IndexBufferPtr const process_write_completion(cp_ctx, buf); } else { LOGTRACEMOD(wbcache, "Flushing cp {} buf {}", cp_ctx->id(), buf->to_string()); - m_vdev->async_write(r_cast(buf->raw_buffer()), m_node_size, buf->m_blkid, part_of_batch) - .thenValue([buf, cp_ctx](auto) { - try { - auto &pthis = s_cast(wb_cache()); - pthis.process_write_completion(cp_ctx, buf); - } catch (const std::runtime_error &e) { - LOGERROR("Failed to access write-back cache: {}", e.what()); - } - }); + m_vdev->async_write(r_cast< const char* >(buf->raw_buffer()), m_node_size, buf->m_blkid, part_of_batch) + .thenValue([buf, cp_ctx](auto) { + try { + auto& pthis = s_cast< IndexWBCache& >(wb_cache()); + pthis.process_write_completion(cp_ctx, buf); + } catch (const std::runtime_error& e) { LOGERROR("Failed to access write-back cache: {}", e.what()); } + }); if (!part_of_batch) { m_vdev->submit_batch(); } } diff --git a/src/lib/index/wb_cache.hpp b/src/lib/index/wb_cache.hpp index 25a4c8201..f129d11ef 100644 --- a/src/lib/index/wb_cache.hpp +++ b/src/lib/index/wb_cache.hpp @@ -41,6 +41,7 @@ class IndexWBCache : public IndexWBCacheBase { std::mutex m_flush_mtx; void* m_meta_blk; bool m_in_recovery{false}; + public: IndexWBCache(const std::shared_ptr< VirtualDev >& vdev, std::pair< meta_blk*, sisl::byte_view > sb, const std::shared_ptr< sisl::Evictor >& evictor, uint32_t node_size); @@ -78,5 +79,6 @@ class IndexWBCache : public IndexWBCacheBase { void recover_buf(IndexBufferPtr const& buf); bool was_node_committed(IndexBufferPtr const& buf); void load_buf(IndexBufferPtr const& buf); + void updateUpBufferCounters(std::vector< IndexBufferPtr >& pending_bufs); }; } // namespace homestore diff --git a/src/tests/test_index_crash_recovery.cpp b/src/tests/test_index_crash_recovery.cpp index 560bf0f83..650b35955 100644 --- a/src/tests/test_index_crash_recovery.cpp +++ b/src/tests/test_index_crash_recovery.cpp @@ -37,29 +37,29 @@ SISL_LOGGING_DECL(test_index_crash_recovery) SISL_OPTION_GROUP( test_index_crash_recovery, (num_iters, "", "num_iters", "number of iterations for rand ops", - ::cxxopts::value()->default_value("500"), "number"), + ::cxxopts::value< uint32_t >()->default_value("500"), "number"), (num_entries, "", "num_entries", "number of entries to test with", - ::cxxopts::value()->default_value("5000"), "number"), - (run_time, "", "run_time", "run time for io", ::cxxopts::value()->default_value("360000"), "seconds"), + ::cxxopts::value< uint32_t >()->default_value("5000"), "number"), + (run_time, "", "run_time", "run time for io", ::cxxopts::value< uint32_t >()->default_value("360000"), "seconds"), (num_rounds, "", "num_rounds", "number of rounds to test with", - ::cxxopts::value()->default_value("100"), "number"), + ::cxxopts::value< uint32_t >()->default_value("100"), "number"), (num_entries_per_rounds, "", "num_entries_per_rounds", "number of entries per rounds", - ::cxxopts::value()->default_value("40"), "number"), - (max_keys_in_node, "", "max_keys_in_node", "max_keys_in_node", - ::cxxopts::value()->default_value("20"), ""), - (min_keys_in_node, "", "min_keys_in_node", "min_keys_in_node", - ::cxxopts::value()->default_value("6"), ""), + ::cxxopts::value< uint32_t >()->default_value("40"), "number"), + (max_keys_in_node, "", "max_keys_in_node", "max_keys_in_node", ::cxxopts::value< uint32_t >()->default_value("20"), + ""), + (min_keys_in_node, "", "min_keys_in_node", "min_keys_in_node", ::cxxopts::value< uint32_t >()->default_value("6"), + ""), (operation_list, "", "operation_list", "operation list instead of default created following by percentage", - ::cxxopts::value< std::vector< std::string > >(), "operations [...]"), + ::cxxopts::value< std::vector< std::string > >(), "operations [...]"), (preload_size, "", "preload_size", "number of entries to preload tree with", - ::cxxopts::value()->default_value("1000"), "number"), + ::cxxopts::value< uint32_t >()->default_value("1000"), "number"), (init_device, "", "init_device", "init device", ::cxxopts::value< bool >()->default_value("1"), ""), (load_from_file, "", "load_from_file", "load from file", ::cxxopts::value< bool >()->default_value("0"), ""), (save_to_file, "", "save_to_file", "save to file", ::cxxopts::value< bool >()->default_value("0"), ""), (cleanup_after_shutdown, "", "cleanup_after_shutdown", "cleanup after shutdown", - ::cxxopts::value< bool >()->default_value("1"), ""), + ::cxxopts::value< bool >()->default_value("1"), ""), (seed, "", "seed", "random engine seed, use random if not defined", - ::cxxopts::value< uint64_t >()->default_value("0"), "number")) + ::cxxopts::value< uint64_t >()->default_value("0"), "number")) void log_obj_life_counter() { std::string str; @@ -249,7 +249,7 @@ struct IndexCrashTest : public test_common::HSTestHelper, BtreeTestHelper< TestT m_test->m_cfg.m_leaf_node_type = T::leaf_node_type; m_test->m_cfg.m_int_node_type = T::interior_node_type; m_test->m_cfg.m_max_keys_in_node = SISL_OPTIONS["max_keys_in_node"].as< uint32_t >(); - m_test->m_cfg.m_min_keys_in_node = SISL_OPTIONS["min_keys_in_node"].as(); + m_test->m_cfg.m_min_keys_in_node = SISL_OPTIONS["min_keys_in_node"].as< uint32_t >(); m_test->m_bt = std::make_shared< typename T::BtreeType >(std::move(sb), m_test->m_cfg); return m_test->m_bt; } @@ -277,7 +277,7 @@ struct IndexCrashTest : public test_common::HSTestHelper, BtreeTestHelper< TestT this->m_cfg = BtreeConfig(hs()->index_service().node_size()); this->m_cfg.m_max_keys_in_node = SISL_OPTIONS["max_keys_in_node"].as< uint32_t >(); - this->m_cfg.m_min_keys_in_node = SISL_OPTIONS["min_keys_in_node"].as(); + this->m_cfg.m_min_keys_in_node = SISL_OPTIONS["min_keys_in_node"].as< uint32_t >(); LOGINFO("Node size {}, max_keys_in_node {}, min_keys_in_node {}", this->m_cfg.node_size(), this->m_cfg.m_max_keys_in_node, this->m_cfg.m_min_keys_in_node); auto uuid = boost::uuids::random_generator()(); @@ -338,7 +338,7 @@ struct IndexCrashTest : public test_common::HSTestHelper, BtreeTestHelper< TestT void reapply_after_crash() { ShadowMap< K, V > snapshot_map{this->m_shadow_map.max_keys()}; snapshot_map.load(m_shadow_filename); - LOGINFO("\tSnapshot before crash\n{}", snapshot_map.to_string()); + // LOGINFO("\tSnapshot before crash\n{}", snapshot_map.to_string()); auto diff = this->m_shadow_map.diff(snapshot_map); // visualize tree after crash @@ -346,13 +346,14 @@ struct IndexCrashTest : public test_common::HSTestHelper, BtreeTestHelper< TestT // this->visualize_keys(recovered_tree_filename); // LOGINFO(" tree after recovered stored in {}", recovered_tree_filename); - std::string dif_str = "KEY \tADDITION\n"; - for (const auto& [k, addition] : diff) { - dif_str += fmt::format(" {} \t{}\n", k.key(), addition); + std::string dif_str = "Keys["; + for (const auto& [k, _] : diff) { + dif_str += fmt::format("{} ", k.key()); } + dif_str += "]"; LOGINFO("Diff between shadow map and snapshot map\n{}\n", dif_str); - for (const auto &[k, addition]: diff) { + for (const auto& [k, addition] : diff) { // this->print_keys(fmt::format("reapply: before inserting key {}", k.key())); // this->visualize_keys(recovered_tree_filename); if (addition) { @@ -401,15 +402,15 @@ struct IndexCrashTest : public test_common::HSTestHelper, BtreeTestHelper< TestT } void crash_and_recover(uint32_t s_key, uint32_t e_key) { - this->print_keys("Btree prior to CP and susbsequent simulated crash: "); + // this->print_keys("Btree prior to CP and susbsequent simulated crash: "); trigger_cp(false); this->wait_for_crash_recovery(); // this->visualize_keys("tree_after_crash_" + std::to_string(s_key) + "_" + std::to_string(e_key) + ".dot"); - this->print_keys("Post crash and recovery, btree structure: "); + // this->print_keys("Post crash and recovery, btree structure: "); this->reapply_after_crash(); - this->print_keys("Post reapply, btree structure: "); + // this->print_keys("Post reapply, btree structure: "); this->get_all(); LOGINFO("Expect to have [{},{}) in tree and it is actually{} ", s_key, e_key, tree_key_count()); @@ -420,24 +421,28 @@ struct IndexCrashTest : public test_common::HSTestHelper, BtreeTestHelper< TestT std::set< uint64_t > new_keys; std::transform(operations.begin(), operations.end(), std::inserter(new_keys, new_keys.end()), [](const Operation& operation) { return operation.first; }); - uint32_t count = 1; + uint32_t count = 0; this->m_shadow_map.foreach ([this, new_keys, &count](K key, V value) { // discard the new keys to check if (new_keys.find(key.key()) != new_keys.end()) { return; } + count++; auto copy_key = std::make_unique< K >(); *copy_key = key; auto out_v = std::make_unique< V >(); auto req = BtreeSingleGetRequest{copy_key.get(), out_v.get()}; req.enable_route_tracing(); const auto ret = this->m_bt->get(req); + if (ret != btree_status_t::success) { + this->print_keys(fmt::format("Sanity check: key {}", key.key())); + this->dump_to_file("sanity_fail.txt"); + } ASSERT_EQ(ret, btree_status_t::success) << "Missing key " << key << " in btree but present in shadow map"; }); LOGINFO("Sanity check passed for {} keys!", count); - } void crash_and_recover(OperationList& operations, std::string filename = "") { - this->print_keys("Btree prior to CP and susbsequent simulated crash: "); + // this->print_keys("Btree prior to CP and susbsequent simulated crash: "); LOGINFO("Before Crash: {} keys in shadow map and it is actually {} keys in tree - operations size {}", this->m_shadow_map.size(), tree_key_count(), operations.size()); @@ -456,7 +461,7 @@ struct IndexCrashTest : public test_common::HSTestHelper, BtreeTestHelper< TestT LOGINFO("Visualize the tree file after recovery : {}", rec_filename); this->visualize_keys(rec_filename); } - this->print_keys("Post crash and recovery, btree structure: "); + // this->print_keys("Post crash and recovery, btree structure: "); sanity_check(operations); // Added to the index service right after recovery. Not needed here // test_common::HSTestHelper::trigger_cp(true); @@ -468,7 +473,7 @@ struct IndexCrashTest : public test_common::HSTestHelper, BtreeTestHelper< TestT LOGINFO("Visualize the tree after reapply {}", re_filename); this->visualize_keys(re_filename); } - this->print_keys("Post reapply, btree structure: "); + // this->print_keys("Post reapply, btree structure: "); this->get_all(); LOGINFO("After reapply: {} keys in shadow map and actually {} in tress", this->m_shadow_map.size(), @@ -629,7 +634,7 @@ TYPED_TEST(IndexCrashTest, long_running_put_crash) { test_common::HSTestHelper::trigger_cp(true); this->get_all(); this->m_shadow_map.save(this->m_shadow_filename); - this->print_keys("reapply: after preload"); + // this->print_keys("reapply: after preload"); this->visualize_keys("tree_after_preload.dot"); for (uint32_t round = 1; @@ -716,28 +721,27 @@ TYPED_TEST(IndexCrashTest, long_running_put_crash) { elapsed_time * 100.0 / this->m_run_time, this->tree_key_count(), num_entries, this->tree_key_count() * 100.0 / num_entries); } - this->print_keys(fmt::format("reapply: after round {}", round)); + // this->print_keys(fmt::format("reapply: after round {}", round)); if (renew_btree_after_crash) { this->reset_btree(); }; } } // Basic reverse and forward order remove with different flip points TYPED_TEST(IndexCrashTest, MergeRemoveBasic) { - vector flip_points = { - "crash_flush_on_merge_at_parent", - "crash_flush_on_merge_at_left_child", + vector< std::string > flip_points = { + "crash_flush_on_merge_at_parent", "crash_flush_on_merge_at_left_child", // "crash_flush_on_freed_child", }; for (size_t i = 0; i < flip_points.size(); ++i) { this->reset_btree(); - auto &flip_point = flip_points[i]; + auto& flip_point = flip_points[i]; LOGINFO("=== Testing flip point: {} - {} ===", i + 1, flip_point); // Populate some keys [1,num_entries) and trigger cp to persist - LOGINFO("Step {}-1: Populate some keys and flush", i+1); - auto const num_entries = SISL_OPTIONS["num_entries"].as(); + LOGINFO("Step {}-1: Populate some keys and flush", i + 1); + auto const num_entries = SISL_OPTIONS["num_entries"].as< uint32_t >(); for (auto k = 0u; k < num_entries; ++k) { this->put(k, btree_put_type::INSERT, true /* expect_success */); } @@ -748,7 +752,8 @@ TYPED_TEST(IndexCrashTest, MergeRemoveBasic) { // Split keys into batches and remove the last one in reverse order LOGINFO("Step {}-2: Set crash flag, remove some keys in reverse order", i + 1); - int batch_num = 4; { + int batch_num = 4; + { int n = batch_num; auto r = num_entries * n / batch_num - 1; auto l = num_entries * (n - 1) / batch_num; @@ -759,8 +764,7 @@ TYPED_TEST(IndexCrashTest, MergeRemoveBasic) { LOGINFO("Step {}-2-1: Remove keys in batch {}/{} ({} to {})", i + 1, n, batch_num, r, l); this->set_basic_flip(flip_point); - for (auto [k, _]: ops) { - LOGINFO("Removing key {}", k); + for (auto [k, _] : ops) { this->remove_one(k, true); } this->visualize_keys("tree_merge_before_first_crash.dot"); @@ -781,8 +785,7 @@ TYPED_TEST(IndexCrashTest, MergeRemoveBasic) { LOGINFO("Step {}-3-1: Remove keys in batch {}/{} ({} to {})", i + 1, n, batch_num, l, r); this->set_basic_flip(flip_point); - for (auto [k, _]: ops) { - LOGINFO("Removing key {}", k); + for (auto [k, _] : ops) { this->remove_one(k, true); } this->visualize_keys("tree_merge_before_second_crash.dot"); @@ -803,8 +806,7 @@ TYPED_TEST(IndexCrashTest, MergeRemoveBasic) { LOGINFO("Step {}-4-1: Remove keys in batch {}/{} ({} to {})", i + 1, n, batch_num, l, r); this->set_basic_flip(flip_point); - for (auto [k, _]: ops) { - LOGINFO("Removing key {}", k); + for (auto [k, _] : ops) { this->remove_one(k, true); } this->visualize_keys("tree_merge_before_third_crash.dot"); @@ -828,9 +830,8 @@ TYPED_TEST(IndexCrashTest, MergeRemoveBasic) { // vector flips = { // "crash_flush_on_merge_at_parent", "crash_flush_on_merge_at_left_child", // }; -// SequenceGenerator generator(0 /*putFreq*/, 100 /* removeFreq*/, 0 /*start_range*/, num_entries - 1 /*end_range*/); -// OperationList operations; -// for (size_t i = 0; i < flips.size(); ++i) { +// SequenceGenerator generator(0 /*putFreq*/, 100 /* removeFreq*/, 0 /*start_range*/, num_entries - 1 +// /*end_range*/); OperationList operations; for (size_t i = 0; i < flips.size(); ++i) { // this->reset_btree(); // LOGINFO("Step {}-1: Init btree", i + 1); // for (auto k = 0u; k < num_entries; ++k) { diff --git a/src/tests/test_scripts/index_test.py b/src/tests/test_scripts/index_test.py index dd2f8f010..d4734ac82 100755 --- a/src/tests/test_scripts/index_test.py +++ b/src/tests/test_scripts/index_test.py @@ -51,10 +51,10 @@ def parse_arguments(): parser.add_argument('--dev_list', help='Device list', default='') parser.add_argument('--cleanup_after_shutdown', help='Cleanup after shutdown', type=bool, default=False) parser.add_argument('--init_device', help='Initialize device', type=bool, default=True) - parser.add_argument('--max_keys_in_node', help='Maximum num of keys in btree nodes', type=int, default=5) + parser.add_argument('--max_keys_in_node', help='Maximum num of keys in btree nodes', type=int, default=10) parser.add_argument('--min_keys_in_node', help='Minimum num of keys in btree nodes', type=int, default=2) - parser.add_argument('--num_rounds', help='number of rounds for crash test', type=int, default=10000) - parser.add_argument('--num_entries_per_rounds', help='number of rounds for crash test', type=int, default=60) + parser.add_argument('--num_rounds', help='number of rounds for crash test', type=int, default=1000) + parser.add_argument('--num_entries_per_rounds', help='number of rounds for crash test', type=int, default=100) # Parse the known arguments and ignore any unknown arguments args, unknown = parser.parse_known_args() @@ -94,10 +94,10 @@ def long_running_clean_shutdown(options, type=0): def long_running_crash_put(options): print("Long running crash put started") - options['num_entries'] = 131072 # 128K + options['num_entries'] = 1310720 # 1280K options['init_device'] = True options['run_time'] = 14400 # 4 hours - options['preload_size'] = 100 + options['preload_size'] = 1024 print(f"options: {options}") run_crash_test(options) print("Long running crash put completed")