Skip to content

Commit

Permalink
Add debugging log for index cp.
Browse files Browse the repository at this point in the history
Signed-off-by: Xiaoxi Chen <[email protected]>
  • Loading branch information
xiaoxichen committed Nov 26, 2024
1 parent 1a87f71 commit ac79b6c
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 40 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.5.16"
version = "6.5.18"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
21 changes: 10 additions & 11 deletions src/include/homestore/index/index_table.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class IndexTable : public IndexTableBase, public Btree< K, V > {

void destroy() override {
auto cpg = cp_mgr().cp_guard();
Btree<K, V>::destroy_btree(cpg.context(cp_consumer_t::INDEX_SVC));
Btree< K, V >::destroy_btree(cpg.context(cp_consumer_t::INDEX_SVC));
m_sb.destroy();
}

Expand All @@ -97,7 +97,9 @@ class IndexTable : public IndexTableBase, public Btree< K, V > {
auto cpg = cp_mgr().cp_guard();
put_req.m_op_context = (void*)cpg.context(cp_consumer_t::INDEX_SVC);
ret = Btree< K, V >::put(put_req);
if (ret == btree_status_t::cp_mismatch) { LOGTRACEMOD(wbcache, "CP Mismatch, retrying put"); }
if (ret == btree_status_t::cp_mismatch) {
LOGTRACEMOD(wbcache, "CP Mismatch, cur cp {}, retrying put", ((CPContext*)put_req.m_op_context)->id());
}
} while (ret == btree_status_t::cp_mismatch);
return ret;
}
Expand Down Expand Up @@ -134,13 +136,13 @@ class IndexTable : public IndexTableBase, public Btree< K, V > {
// Only for interior nodes we need to repair its links
if (!bn->is_leaf()) {
LOGTRACEMOD(wbcache, "repair_node cp={} buf={}", cpg->id(), idx_buf->to_string());
repair_links(bn, (void *) cpg.context(cp_consumer_t::INDEX_SVC));
repair_links(bn, (void*)cpg.context(cp_consumer_t::INDEX_SVC));
}

if (idx_buf->m_up_buffer && idx_buf->m_up_buffer->is_meta_buf()) {
// Our up buffer is a meta buffer, which means that we are the new root node, we need to update the
// meta_buf with new root as well
on_root_changed(bn, (void *) cpg.context(cp_consumer_t::INDEX_SVC));
on_root_changed(bn, (void*)cpg.context(cp_consumer_t::INDEX_SVC));
}
}

Expand Down Expand Up @@ -227,8 +229,7 @@ class IndexTable : public IndexTableBase, public Btree< K, V > {
wb_cache().free_buf(n->m_idx_buf, r_cast< CPContext* >(context));
}

btree_status_t
on_root_changed(BtreeNodePtr const &new_root, void *context) override {
btree_status_t on_root_changed(BtreeNodePtr const& new_root, void* context) override {
// todo: if(m_sb->root_node == new_root->node_id() && m_sb->root_link_version == new_root->link_version()){
// return btree_status_t::success;}
m_sb->root_node = new_root->node_id();
Expand All @@ -240,7 +241,7 @@ class IndexTable : public IndexTableBase, public Btree< K, V > {
}

auto& root_buf = static_cast< IndexBtreeNode* >(new_root.get())->m_idx_buf;
wb_cache().transact_bufs(ordinal(), m_sb_buffer, root_buf, {}, {}, r_cast<CPContext *>(context));
wb_cache().transact_bufs(ordinal(), m_sb_buffer, root_buf, {}, {}, r_cast< CPContext* >(context));
return btree_status_t::success;
}

Expand All @@ -257,7 +258,7 @@ class IndexTable : public IndexTableBase, public Btree< K, V > {
}

// Get all original child ids as a support to check if we are beyond the last child node
std::set<bnodeid_t> orig_child_ids;
std::set< bnodeid_t > orig_child_ids;
for (uint32_t i = 0; i < parent_node->total_entries(); ++i) {
BtreeLinkInfo link_info;
parent_node->get_nth_value(i, &link_info, true);
Expand Down Expand Up @@ -391,9 +392,7 @@ class IndexTable : public IndexTableBase, public Btree< K, V > {
}
} while (true);

if (child_node) {
this->unlock_node(child_node, locktype_t::READ);
}
if (child_node) { this->unlock_node(child_node, locktype_t::READ); }

if (parent_node->total_entries() == 0 && !parent_node->has_valid_edge()) {
// We shouldn't have an empty interior node in the tree, let's delete it.
Expand Down
2 changes: 1 addition & 1 deletion src/lib/index/index_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ void IndexService::start() {
}
// Force taking cp after recovery done. This makes sure that the index table is in consistent state and dirty buffer
// after recovery can be added to dirty list for flushing in the new cp
hs()->cp_mgr().trigger_cp_flush(true /* force */);
hs()->cp_mgr().trigger_cp_flush(true /* force */).wait();
}

void IndexService::stop() { m_wb_cache.reset(); }
Expand Down
52 changes: 25 additions & 27 deletions src/lib/index/wb_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ bool IndexWBCache::get_writable_buf(const BtreeNodePtr& node, CPContext* context
if (idx_buf->m_dirtied_cp_id == icp_ctx->id()) {
return true; // For same cp, we don't need a copy, we can rewrite on the same buffer
} else if (idx_buf->m_dirtied_cp_id > icp_ctx->id()) {
LOGINFOMOD(wbcache, "cp_id = {} < buffer dirtied_cp_id {}, rejecting, node hdr {}", icp_ctx->id(),
idx_buf->m_dirtied_cp_id, ((persistent_hdr_t*)idx_buf->m_bytes)->to_string());
return false; // We are asked to provide the buffer of an older CP, which is not possible
}

Expand Down Expand Up @@ -420,11 +422,11 @@ void IndexWBCache::free_buf(const IndexBufferPtr& buf, CPContext* cp_ctx) {
}
buf->m_node_freed = true;
resource_mgr().inc_free_blk(m_node_size);
m_vdev->free_blk(buf->m_blkid, s_cast<VDevCPContext *>(cp_ctx));
m_vdev->free_blk(buf->m_blkid, s_cast< VDevCPContext* >(cp_ctx));
}

//////////////////// Recovery Related section /////////////////////////////////
void IndexWBCache::load_buf(IndexBufferPtr const &buf) {
void IndexWBCache::load_buf(IndexBufferPtr const& buf) {
if (buf->m_bytes == nullptr) {
buf->m_bytes = hs_utils::iobuf_alloc(m_node_size, sisl::buftag::btree_node, m_vdev->align_size());
m_vdev->sync_read(r_cast< char* >(buf->m_bytes), m_node_size, buf->blkid());
Expand Down Expand Up @@ -452,17 +454,17 @@ void IndexWBCache::recover(sisl::byte_view sb) {

#ifdef _PRERELEASE
auto detailed_log = [this](std::map< BlkId, IndexBufferPtr > const& bufs,
std::vector<IndexBufferPtr> const &pending_bufs) {
std::vector< IndexBufferPtr > const& pending_bufs) {
std::string log = fmt::format("\trecovered bufs (#of bufs = {})\n", bufs.size());
for (auto const &[_, buf]: bufs) {
for (auto const& [_, buf] : bufs) {
load_buf(buf);
fmt::format_to(std::back_inserter(log), "{}\n", buf->to_string());
}

// list of new_bufs
if (!pending_bufs.empty()) {
fmt::format_to(std::back_inserter(log), "\n\tpending_bufs (#of bufs = {})\n", pending_bufs.size());
for (auto const &buf: pending_bufs) {
for (auto const& buf : pending_bufs) {
fmt::format_to(std::back_inserter(log), "{}\n", buf->to_string());
}
}
Expand All @@ -484,15 +486,15 @@ void IndexWBCache::recover(sisl::byte_view sb) {
// the same blkid which could clash with the blkid next in the buf list.
//
// On the second pass, we only take part of the parents/siblings and then repair them, if needed.
std::vector<IndexBufferPtr> pending_bufs;
std::vector<IndexBufferPtr> deleted_bufs;
for (auto const &[_, buf]: bufs) {
std::vector< IndexBufferPtr > pending_bufs;
std::vector< IndexBufferPtr > deleted_bufs;
for (auto const& [_, buf] : bufs) {
if (buf->m_node_freed) {
// Freed node
load_buf(buf);
if (was_node_committed(buf)) {
// Mark this buffer as deleted, so that we can avoid using it anymore when repairing its parent's link
r_cast<persistent_hdr_t *>(buf->m_bytes)->node_deleted = true;
r_cast< persistent_hdr_t* >(buf->m_bytes)->node_deleted = true;
write_buf(nullptr, buf, icp_ctx);
deleted_bufs.push_back(buf);
pending_bufs.push_back(buf->m_up_buffer);
Expand Down Expand Up @@ -526,23 +528,23 @@ void IndexWBCache::recover(sisl::byte_view sb) {
LOGTRACEMOD(wbcache, "All unclean bufs list\n{}", detailed_log(bufs, pending_bufs));
#endif

for (auto const &buf: pending_bufs) {
for (auto const& buf : pending_bufs) {
recover_buf(buf);
if (buf->m_bytes != nullptr && r_cast<persistent_hdr_t *>(buf->m_bytes)->node_deleted) {
if (buf->m_bytes != nullptr && r_cast< persistent_hdr_t* >(buf->m_bytes)->node_deleted) {
// This buffer was marked as deleted during repair, so we also need to free it
deleted_bufs.push_back(buf);
}
}

for (auto const &buf: deleted_bufs) {
m_vdev->free_blk(buf->m_blkid, s_cast<VDevCPContext *>(icp_ctx));
for (auto const& buf : deleted_bufs) {
m_vdev->free_blk(buf->m_blkid, s_cast< VDevCPContext* >(icp_ctx));
}

m_in_recovery = false;
m_vdev->recovery_completed();
}

void IndexWBCache::recover_buf(IndexBufferPtr const &buf) {
void IndexWBCache::recover_buf(IndexBufferPtr const& buf) {
if (!buf->m_wait_for_down_buffers.decrement_testz()) {
// TODO: remove the buf_>m_up_buffer from down_buffers list of buf->m_up_buffer
return;
Expand Down Expand Up @@ -656,26 +658,22 @@ void IndexWBCache::do_flush_one_buf(IndexCPContext* cp_ctx, IndexBufferPtr const
if (buf->is_meta_buf()) {
LOGTRACEMOD(wbcache, "Flushing cp {} meta buf {} possibly because of root split", cp_ctx->id(),
buf->to_string());
auto const &sb = r_cast<MetaIndexBuffer *>(buf.get())->m_sb;
if (!sb.is_empty()) {
meta_service().update_sub_sb(buf->m_bytes, sb.size(), sb.meta_blk());
}
auto const& sb = r_cast< MetaIndexBuffer* >(buf.get())->m_sb;
if (!sb.is_empty()) { meta_service().update_sub_sb(buf->m_bytes, sb.size(), sb.meta_blk()); }
process_write_completion(cp_ctx, buf);
} else if (buf->m_node_freed) {
LOGTRACEMOD(wbcache, "Not flushing buf {} as it was freed, its here for merely dependency", cp_ctx->id(),
buf->to_string());
process_write_completion(cp_ctx, buf);
} else {
LOGTRACEMOD(wbcache, "Flushing cp {} buf {}", cp_ctx->id(), buf->to_string());
m_vdev->async_write(r_cast<const char *>(buf->raw_buffer()), m_node_size, buf->m_blkid, part_of_batch)
.thenValue([buf, cp_ctx](auto) {
try {
auto &pthis = s_cast<IndexWBCache &>(wb_cache());
pthis.process_write_completion(cp_ctx, buf);
} catch (const std::runtime_error &e) {
LOGERROR("Failed to access write-back cache: {}", e.what());
}
});
m_vdev->async_write(r_cast< const char* >(buf->raw_buffer()), m_node_size, buf->m_blkid, part_of_batch)
.thenValue([buf, cp_ctx](auto) {
try {
auto& pthis = s_cast< IndexWBCache& >(wb_cache());
pthis.process_write_completion(cp_ctx, buf);
} catch (const std::runtime_error& e) { LOGERROR("Failed to access write-back cache: {}", e.what()); }
});

if (!part_of_batch) { m_vdev->submit_batch(); }
}
Expand Down

0 comments on commit ac79b6c

Please sign in to comment.