Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add debugging log for index cp. #592

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.5.17"
version = "6.5.18"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
21 changes: 10 additions & 11 deletions src/include/homestore/index/index_table.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class IndexTable : public IndexTableBase, public Btree< K, V > {

void destroy() override {
auto cpg = cp_mgr().cp_guard();
Btree<K, V>::destroy_btree(cpg.context(cp_consumer_t::INDEX_SVC));
Btree< K, V >::destroy_btree(cpg.context(cp_consumer_t::INDEX_SVC));
m_sb.destroy();
}

Expand All @@ -97,7 +97,9 @@ class IndexTable : public IndexTableBase, public Btree< K, V > {
auto cpg = cp_mgr().cp_guard();
put_req.m_op_context = (void*)cpg.context(cp_consumer_t::INDEX_SVC);
ret = Btree< K, V >::put(put_req);
if (ret == btree_status_t::cp_mismatch) { LOGTRACEMOD(wbcache, "CP Mismatch, retrying put"); }
if (ret == btree_status_t::cp_mismatch) {
LOGTRACEMOD(wbcache, "CP Mismatch, cur cp {}, retrying put", ((CPContext*)put_req.m_op_context)->id());
}
} while (ret == btree_status_t::cp_mismatch);
return ret;
}
Expand Down Expand Up @@ -134,13 +136,13 @@ class IndexTable : public IndexTableBase, public Btree< K, V > {
// Only for interior nodes we need to repair its links
if (!bn->is_leaf()) {
LOGTRACEMOD(wbcache, "repair_node cp={} buf={}", cpg->id(), idx_buf->to_string());
repair_links(bn, (void *) cpg.context(cp_consumer_t::INDEX_SVC));
repair_links(bn, (void*)cpg.context(cp_consumer_t::INDEX_SVC));
}

if (idx_buf->m_up_buffer && idx_buf->m_up_buffer->is_meta_buf()) {
// Our up buffer is a meta buffer, which means that we are the new root node, we need to update the
// meta_buf with new root as well
on_root_changed(bn, (void *) cpg.context(cp_consumer_t::INDEX_SVC));
on_root_changed(bn, (void*)cpg.context(cp_consumer_t::INDEX_SVC));
}
}

Expand Down Expand Up @@ -227,8 +229,7 @@ class IndexTable : public IndexTableBase, public Btree< K, V > {
wb_cache().free_buf(n->m_idx_buf, r_cast< CPContext* >(context));
}

btree_status_t
on_root_changed(BtreeNodePtr const &new_root, void *context) override {
btree_status_t on_root_changed(BtreeNodePtr const& new_root, void* context) override {
// todo: if(m_sb->root_node == new_root->node_id() && m_sb->root_link_version == new_root->link_version()){
// return btree_status_t::success;}
m_sb->root_node = new_root->node_id();
Expand All @@ -240,7 +241,7 @@ class IndexTable : public IndexTableBase, public Btree< K, V > {
}

auto& root_buf = static_cast< IndexBtreeNode* >(new_root.get())->m_idx_buf;
wb_cache().transact_bufs(ordinal(), m_sb_buffer, root_buf, {}, {}, r_cast<CPContext *>(context));
wb_cache().transact_bufs(ordinal(), m_sb_buffer, root_buf, {}, {}, r_cast< CPContext* >(context));
return btree_status_t::success;
}

Expand All @@ -257,7 +258,7 @@ class IndexTable : public IndexTableBase, public Btree< K, V > {
}

// Get all original child ids as a support to check if we are beyond the last child node
std::set<bnodeid_t> orig_child_ids;
std::set< bnodeid_t > orig_child_ids;
for (uint32_t i = 0; i < parent_node->total_entries(); ++i) {
BtreeLinkInfo link_info;
parent_node->get_nth_value(i, &link_info, true);
Expand Down Expand Up @@ -391,9 +392,7 @@ class IndexTable : public IndexTableBase, public Btree< K, V > {
}
} while (true);

if (child_node) {
this->unlock_node(child_node, locktype_t::READ);
}
if (child_node) { this->unlock_node(child_node, locktype_t::READ); }

if (parent_node->total_entries() == 0 && !parent_node->has_valid_edge()) {
// We shouldn't have an empty interior node in the tree, let's delete it.
Expand Down
2 changes: 1 addition & 1 deletion src/lib/index/index_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ void IndexService::start() {
}
// Force taking cp after recovery done. This makes sure that the index table is in consistent state and dirty buffer
// after recovery can be added to dirty list for flushing in the new cp
hs()->cp_mgr().trigger_cp_flush(true /* force */);
hs()->cp_mgr().trigger_cp_flush(true /* force */).wait();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trigger_cp_flush(true) waits until cp flush is done. Did you test it without .wait()

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now all the tests are done without the wait()... I dont know if we do need a wait() but it seems make things easier...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait() will guarantee the cp superblk is written to disk, which further means the last_cp_id is persisted. if this does not do any harm to performance, I think we can wait here, or even all the trigger_cp_flush

}

void IndexService::stop() { m_wb_cache.reset(); }
Expand Down
52 changes: 25 additions & 27 deletions src/lib/index/wb_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ bool IndexWBCache::get_writable_buf(const BtreeNodePtr& node, CPContext* context
if (idx_buf->m_dirtied_cp_id == icp_ctx->id()) {
return true; // For same cp, we don't need a copy, we can rewrite on the same buffer
} else if (idx_buf->m_dirtied_cp_id > icp_ctx->id()) {
LOGINFOMOD(wbcache, "cp_id = {} < buffer dirtied_cp_id {}, rejecting, node hdr {}", icp_ctx->id(),
idx_buf->m_dirtied_cp_id, ((persistent_hdr_t*)idx_buf->m_bytes)->to_string());
return false; // We are asked to provide the buffer of an older CP, which is not possible
}

Expand Down Expand Up @@ -420,11 +422,11 @@ void IndexWBCache::free_buf(const IndexBufferPtr& buf, CPContext* cp_ctx) {
}
buf->m_node_freed = true;
resource_mgr().inc_free_blk(m_node_size);
m_vdev->free_blk(buf->m_blkid, s_cast<VDevCPContext *>(cp_ctx));
m_vdev->free_blk(buf->m_blkid, s_cast< VDevCPContext* >(cp_ctx));
}

//////////////////// Recovery Related section /////////////////////////////////
void IndexWBCache::load_buf(IndexBufferPtr const &buf) {
void IndexWBCache::load_buf(IndexBufferPtr const& buf) {
if (buf->m_bytes == nullptr) {
buf->m_bytes = hs_utils::iobuf_alloc(m_node_size, sisl::buftag::btree_node, m_vdev->align_size());
m_vdev->sync_read(r_cast< char* >(buf->m_bytes), m_node_size, buf->blkid());
Expand Down Expand Up @@ -452,17 +454,17 @@ void IndexWBCache::recover(sisl::byte_view sb) {

#ifdef _PRERELEASE
auto detailed_log = [this](std::map< BlkId, IndexBufferPtr > const& bufs,
std::vector<IndexBufferPtr> const &pending_bufs) {
std::vector< IndexBufferPtr > const& pending_bufs) {
std::string log = fmt::format("\trecovered bufs (#of bufs = {})\n", bufs.size());
for (auto const &[_, buf]: bufs) {
for (auto const& [_, buf] : bufs) {
load_buf(buf);
fmt::format_to(std::back_inserter(log), "{}\n", buf->to_string());
}

// list of new_bufs
if (!pending_bufs.empty()) {
fmt::format_to(std::back_inserter(log), "\n\tpending_bufs (#of bufs = {})\n", pending_bufs.size());
for (auto const &buf: pending_bufs) {
for (auto const& buf : pending_bufs) {
fmt::format_to(std::back_inserter(log), "{}\n", buf->to_string());
}
}
Expand All @@ -484,15 +486,15 @@ void IndexWBCache::recover(sisl::byte_view sb) {
// the same blkid which could clash with the blkid next in the buf list.
//
// On the second pass, we only take part of the parents/siblings and then repair them, if needed.
std::vector<IndexBufferPtr> pending_bufs;
std::vector<IndexBufferPtr> deleted_bufs;
for (auto const &[_, buf]: bufs) {
std::vector< IndexBufferPtr > pending_bufs;
std::vector< IndexBufferPtr > deleted_bufs;
for (auto const& [_, buf] : bufs) {
if (buf->m_node_freed) {
// Freed node
load_buf(buf);
if (was_node_committed(buf)) {
// Mark this buffer as deleted, so that we can avoid using it anymore when repairing its parent's link
r_cast<persistent_hdr_t *>(buf->m_bytes)->node_deleted = true;
r_cast< persistent_hdr_t* >(buf->m_bytes)->node_deleted = true;
write_buf(nullptr, buf, icp_ctx);
deleted_bufs.push_back(buf);
pending_bufs.push_back(buf->m_up_buffer);
Expand Down Expand Up @@ -526,23 +528,23 @@ void IndexWBCache::recover(sisl::byte_view sb) {
LOGTRACEMOD(wbcache, "All unclean bufs list\n{}", detailed_log(bufs, pending_bufs));
#endif

for (auto const &buf: pending_bufs) {
for (auto const& buf : pending_bufs) {
recover_buf(buf);
if (buf->m_bytes != nullptr && r_cast<persistent_hdr_t *>(buf->m_bytes)->node_deleted) {
if (buf->m_bytes != nullptr && r_cast< persistent_hdr_t* >(buf->m_bytes)->node_deleted) {
// This buffer was marked as deleted during repair, so we also need to free it
deleted_bufs.push_back(buf);
}
}

for (auto const &buf: deleted_bufs) {
m_vdev->free_blk(buf->m_blkid, s_cast<VDevCPContext *>(icp_ctx));
for (auto const& buf : deleted_bufs) {
m_vdev->free_blk(buf->m_blkid, s_cast< VDevCPContext* >(icp_ctx));
}

m_in_recovery = false;
m_vdev->recovery_completed();
}

void IndexWBCache::recover_buf(IndexBufferPtr const &buf) {
void IndexWBCache::recover_buf(IndexBufferPtr const& buf) {
if (!buf->m_wait_for_down_buffers.decrement_testz()) {
// TODO: remove the buf_>m_up_buffer from down_buffers list of buf->m_up_buffer
return;
Expand Down Expand Up @@ -656,26 +658,22 @@ void IndexWBCache::do_flush_one_buf(IndexCPContext* cp_ctx, IndexBufferPtr const
if (buf->is_meta_buf()) {
LOGTRACEMOD(wbcache, "Flushing cp {} meta buf {} possibly because of root split", cp_ctx->id(),
buf->to_string());
auto const &sb = r_cast<MetaIndexBuffer *>(buf.get())->m_sb;
if (!sb.is_empty()) {
meta_service().update_sub_sb(buf->m_bytes, sb.size(), sb.meta_blk());
}
auto const& sb = r_cast< MetaIndexBuffer* >(buf.get())->m_sb;
if (!sb.is_empty()) { meta_service().update_sub_sb(buf->m_bytes, sb.size(), sb.meta_blk()); }
process_write_completion(cp_ctx, buf);
} else if (buf->m_node_freed) {
LOGTRACEMOD(wbcache, "Not flushing buf {} as it was freed, its here for merely dependency", cp_ctx->id(),
buf->to_string());
process_write_completion(cp_ctx, buf);
} else {
LOGTRACEMOD(wbcache, "Flushing cp {} buf {}", cp_ctx->id(), buf->to_string());
m_vdev->async_write(r_cast<const char *>(buf->raw_buffer()), m_node_size, buf->m_blkid, part_of_batch)
.thenValue([buf, cp_ctx](auto) {
try {
auto &pthis = s_cast<IndexWBCache &>(wb_cache());
pthis.process_write_completion(cp_ctx, buf);
} catch (const std::runtime_error &e) {
LOGERROR("Failed to access write-back cache: {}", e.what());
}
});
m_vdev->async_write(r_cast< const char* >(buf->raw_buffer()), m_node_size, buf->m_blkid, part_of_batch)
.thenValue([buf, cp_ctx](auto) {
try {
auto& pthis = s_cast< IndexWBCache& >(wb_cache());
pthis.process_write_completion(cp_ctx, buf);
} catch (const std::runtime_error& e) { LOGERROR("Failed to access write-back cache: {}", e.what()); }
});

if (!part_of_batch) { m_vdev->submit_batch(); }
}
Expand Down
Loading