Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Index recovery path for split(put) #609

Merged
merged 3 commits into from
Dec 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.6.1"
version = "6.6.2"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
9 changes: 5 additions & 4 deletions src/include/homestore/index/index_internal.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class IndexTableBase {
virtual uint64_t used_size() const = 0;
virtual void destroy() = 0;
virtual void repair_node(IndexBufferPtr const& buf) = 0;
virtual void repair_root_node(IndexBufferPtr const& buf) = 0;
};

enum class index_buf_state_t : uint8_t {
Expand All @@ -97,7 +98,7 @@ struct IndexBuffer : public sisl::ObjLifeCounter< IndexBuffer > {
sisl::atomic_counter< int > m_wait_for_down_buffers{0}; // Number of children need to wait for before persisting
#ifndef NDEBUG
// Down buffers are not mandatory members, but only to keep track of any bugs and asserts
std::vector<std::weak_ptr<IndexBuffer> > m_down_buffers;
std::vector< std::weak_ptr< IndexBuffer > > m_down_buffers;
std::mutex m_down_buffers_mtx;
std::shared_ptr< IndexBuffer > m_prev_up_buffer; // Keep a copy for debugging
#endif
Expand Down Expand Up @@ -125,11 +126,11 @@ struct IndexBuffer : public sisl::ObjLifeCounter< IndexBuffer > {
std::string to_string() const;
std::string to_string_dot() const;

void add_down_buffer(const IndexBufferPtr &buf);
void add_down_buffer(const IndexBufferPtr& buf);

void remove_down_buffer(const IndexBufferPtr &buf);
void remove_down_buffer(const IndexBufferPtr& buf);
#ifndef NDEBUG
bool is_in_down_buffers(const IndexBufferPtr &buf);
bool is_in_down_buffers(const IndexBufferPtr& buf);
#endif
};

Expand Down
49 changes: 39 additions & 10 deletions src/include/homestore/index/index_table.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class IndexTable : public IndexTableBase, public Btree< K, V > {

void destroy() override {
auto cpg = cp_mgr().cp_guard();
Btree<K, V>::destroy_btree(cpg.context(cp_consumer_t::INDEX_SVC));
Btree< K, V >::destroy_btree(cpg.context(cp_consumer_t::INDEX_SVC));
m_sb.destroy();
}

Expand Down Expand Up @@ -114,11 +114,40 @@ class IndexTable : public IndexTableBase, public Btree< K, V > {
return ret;
}

void repair_root_node(IndexBufferPtr const& idx_buf) override {
LOGTRACEMOD(wbcache, "check if this was the previous root node {} for buf {} ", m_sb->root_node,
idx_buf->to_string());
if (m_sb->root_node == idx_buf->blkid().to_integer()) {
// This is the root node, we need to update the root node in superblk
LOGTRACEMOD(wbcache, "{} is old root so we need to update the meta node ", idx_buf->to_string());
BtreeNode* n = this->init_node(idx_buf->raw_buffer(), idx_buf->blkid().to_integer(), false /* init_buf */,
BtreeNode::identify_leaf_node(idx_buf->raw_buffer()));
static_cast< IndexBtreeNode* >(n)->attach_buf(idx_buf);
auto edge_id = n->next_bnode();

BT_DBG_ASSERT(!n->has_valid_edge(),
"root {} already has a valid edge {}, so we should have found the new root node",
n->to_string(), n->get_edge_value().bnode_id());
n->set_next_bnode(empty_bnodeid);
n->set_edge_value(BtreeLinkInfo{edge_id, 0});
LOGTRACEMOD(wbcache, "change root node {}: edge updated to {} and invalidate the next node! ", n->node_id(),
edge_id);
auto cpg = cp_mgr().cp_guard();
write_node_impl(n, (void*)cpg.context(cp_consumer_t::INDEX_SVC));

} else {
LOGTRACEMOD(wbcache, "This is not the root node, so we can ignore this repair call for buf {}",
idx_buf->to_string());
}
}

void repair_node(IndexBufferPtr const& idx_buf) override {
if (idx_buf->is_meta_buf()) {
// We cannot repair the meta buf on its own, we need to repair the root node which modifies the
// meta_buf. It is ok to ignore this call, because repair will be done from root before meta_buf is
// attempted to repair, which would have updated the meta_buf already.
LOGTRACEMOD(wbcache, "Ignoring repair on meta buf {} root id {} ", idx_buf->to_string(),
this->root_node_id());
return;
}
BtreeNode* n = this->init_node(idx_buf->raw_buffer(), idx_buf->blkid().to_integer(), false /* init_buf */,
Expand All @@ -134,13 +163,14 @@ class IndexTable : public IndexTableBase, public Btree< K, V > {
// Only for interior nodes we need to repair its links
if (!bn->is_leaf()) {
LOGTRACEMOD(wbcache, "repair_node cp={} buf={}", cpg->id(), idx_buf->to_string());
repair_links(bn, (void *) cpg.context(cp_consumer_t::INDEX_SVC));
repair_links(bn, (void*)cpg.context(cp_consumer_t::INDEX_SVC));
}

if (idx_buf->m_up_buffer && idx_buf->m_up_buffer->is_meta_buf()) {
// Our up buffer is a meta buffer, which means that we are the new root node, we need to update the
// meta_buf with new root as well
on_root_changed(bn, (void *) cpg.context(cp_consumer_t::INDEX_SVC));
LOGTRACEMOD(wbcache, "root change for after repairing {}\n\n", idx_buf->to_string());
on_root_changed(bn, (void*)cpg.context(cp_consumer_t::INDEX_SVC));
}
}

Expand Down Expand Up @@ -227,10 +257,11 @@ class IndexTable : public IndexTableBase, public Btree< K, V > {
wb_cache().free_buf(n->m_idx_buf, r_cast< CPContext* >(context));
}

btree_status_t
on_root_changed(BtreeNodePtr const &new_root, void *context) override {
btree_status_t on_root_changed(BtreeNodePtr const& new_root, void* context) override {
// todo: if(m_sb->root_node == new_root->node_id() && m_sb->root_link_version == new_root->link_version()){
// return btree_status_t::success;}
LOGTRACEMOD(wbcache, "root changed for index old_root={} new_root={}", m_sb->root_node,
new_root->node_id());
m_sb->root_node = new_root->node_id();
m_sb->root_link_version = new_root->link_version();

Expand All @@ -240,7 +271,7 @@ class IndexTable : public IndexTableBase, public Btree< K, V > {
}

auto& root_buf = static_cast< IndexBtreeNode* >(new_root.get())->m_idx_buf;
wb_cache().transact_bufs(ordinal(), m_sb_buffer, root_buf, {}, {}, r_cast<CPContext *>(context));
wb_cache().transact_bufs(ordinal(), m_sb_buffer, root_buf, {}, {}, r_cast< CPContext* >(context));
return btree_status_t::success;
}

Expand All @@ -257,7 +288,7 @@ class IndexTable : public IndexTableBase, public Btree< K, V > {
}

// Get all original child ids as a support to check if we are beyond the last child node
std::set<bnodeid_t> orig_child_ids;
std::set< bnodeid_t > orig_child_ids;
for (uint32_t i = 0; i < parent_node->total_entries(); ++i) {
BtreeLinkInfo link_info;
parent_node->get_nth_value(i, &link_info, true);
Expand Down Expand Up @@ -391,9 +422,7 @@ class IndexTable : public IndexTableBase, public Btree< K, V > {
}
} while (true);

if (child_node) {
this->unlock_node(child_node, locktype_t::READ);
}
if (child_node) { this->unlock_node(child_node, locktype_t::READ); }

if (parent_node->total_entries() == 0 && !parent_node->has_valid_edge()) {
// We shouldn't have an empty interior node in the tree, let's delete it.
Expand Down
1 change: 1 addition & 0 deletions src/include/homestore/index_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ class IndexService {
uint64_t used_size() const;
uint32_t node_size() const;
void repair_index_node(uint32_t ordinal, IndexBufferPtr const& node_buf);
void update_root(uint32_t ordinal, IndexBufferPtr const& node_buf);

IndexWBCacheBase& wb_cache() {
if (!m_wb_cache) { throw std::runtime_error("Attempted to access a null pointer wb_cache"); }
Expand Down
23 changes: 11 additions & 12 deletions src/lib/index/index_cp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ void IndexCPContext::to_string_dot(const std::string& filename) {
LOGINFO("cp dag is stored in file {}", filename);
}

uint16_t IndexCPContext::num_dags() {
uint16_t IndexCPContext::num_dags() {
// count number of buffers whose up_buffers are nullptr
uint16_t count = 0;
std::unique_lock lg{m_flush_buffer_mtx};
Expand Down Expand Up @@ -190,15 +190,18 @@ std::string IndexCPContext::to_string_with_dags() {
// Now walk through the list of graphs and prepare formatted string
std::string str{fmt::format("IndexCPContext cpid={} dirty_buf_count={} dirty_buf_list_size={} #_of_dags={}\n",
m_cp->id(), m_dirty_buf_count.get(), m_dirty_buf_list.size(), group_roots.size())};
int cnt = 1;
for (const auto& root : group_roots) {
std::vector< std::pair< std::shared_ptr< DagNode >, int > > stack;
stack.emplace_back(root, 0);
std::vector< std::tuple< std::shared_ptr< DagNode >, int, int > > stack;
stack.emplace_back(root, 0, cnt++);
while (!stack.empty()) {
auto [node, level] = stack.back();
auto [node, level, index] = stack.back();
stack.pop_back();
fmt::format_to(std::back_inserter(str), "{}{} \n", std::string(level * 4, ' '), node->buf->to_string());
fmt::format_to(std::back_inserter(str), "{}{}-{} \n", std::string(level * 4, ' '), index,
node->buf->to_string());
int c = node->down_nodes.size();
for (const auto& d : node->down_nodes) {
stack.emplace_back(d, level + 1);
stack.emplace_back(d, level + 1, c--);
}
}
}
Expand Down Expand Up @@ -266,15 +269,11 @@ void IndexCPContext::process_txn_record(txn_record const* rec, std::map< BlkId,
#ifndef NDEBUG
// if (!is_sibling_link || (buf->m_up_buffer == real_up_buf)) { return buf;}
// Already linked with same buf or its not a sibling link to override
if (real_up_buf->is_in_down_buffers(buf)) {
return buf;
}
if (real_up_buf->is_in_down_buffers(buf)) { return buf; }
#endif

if (buf->m_up_buffer != real_up_buf) {
if (buf->m_up_buffer) {
buf->m_up_buffer->remove_down_buffer(buf);
}
if (buf->m_up_buffer) { buf->m_up_buffer->remove_down_buffer(buf); }
real_up_buf->add_down_buffer(buf);
buf->m_up_buffer = real_up_buf;
}
Expand Down
68 changes: 42 additions & 26 deletions src/lib/index/index_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,15 @@ void IndexService::repair_index_node(uint32_t ordinal, IndexBufferPtr const& nod
}
}

void IndexService::update_root(uint32_t ordinal, IndexBufferPtr const& node_buf) {
auto tbl = get_index_table(ordinal);
if (tbl) {
tbl->repair_root_node(node_buf);
} else {
HS_DBG_ASSERT(false, "Index corresponding to ordinal={} has not been loaded yet, unexpected", ordinal);
}
}

uint32_t IndexService::node_size() const { return m_vdev->atomic_page_size(); }

uint64_t IndexService::used_size() const {
Expand All @@ -154,31 +163,39 @@ IndexBuffer::~IndexBuffer() {
}

std::string IndexBuffer::to_string() const {
if (m_is_meta_buf) {
return fmt::format("Buf={} [Meta] index={} state={} create/dirty_cp={}/{} down_wait#={} freed={}",
voidptr_cast(const_cast< IndexBuffer* >(this)), m_index_ordinal, int_cast(state()),
m_created_cp_id, m_dirtied_cp_id, m_wait_for_down_buffers.get(), m_node_freed);
} else {
// store m_down_buffers in a string
std::string down_bufs = "";
static std::vector< std::string > state_str = {"CLEAN", "DIRTY", "FLUSHING"};
// store m_down_buffers in a string
std::string down_bufs = "";
#ifndef NDEBUG
{
std::lock_guard lg(m_down_buffers_mtx);
for (auto const &down_buf: m_down_buffers) {
{
std::lock_guard lg(m_down_buffers_mtx);
if (m_down_buffers.empty()) {
fmt::format_to(std::back_inserter(down_bufs), "EMPTY");
} else {
for (auto const& down_buf : m_down_buffers) {
if (auto ptr = down_buf.lock()) {
fmt::format_to(std::back_inserter(down_bufs), "[{}]", voidptr_cast(ptr.get()));
}
}
fmt::format_to(std::back_inserter(down_bufs), " #down bufs={}", m_down_buffers.size());
}
}
#endif

return fmt::format("Buf={} index={} state={} create/dirty_cp={}/{} down_wait#={}{} up={} node=[{}] down=[{}]",
voidptr_cast(const_cast< IndexBuffer* >(this)), m_index_ordinal, int_cast(state()),
m_created_cp_id, m_dirtied_cp_id, m_wait_for_down_buffers.get(),
m_node_freed ? " Freed" : "", voidptr_cast(const_cast< IndexBuffer* >(m_up_buffer.get())),
(m_bytes == nullptr) ? "not attached yet"
: r_cast< persistent_hdr_t const* >(m_bytes)->to_compact_string(),
down_bufs);
if (m_is_meta_buf) {
return fmt::format("[Meta] Buf={} index={} state={} create/dirty_cp={}/{} down_wait#={}{} down={{{}}}",
voidptr_cast(const_cast< IndexBuffer* >(this)), m_index_ordinal,
state_str[int_cast(state())], m_created_cp_id, m_dirtied_cp_id,
m_wait_for_down_buffers.get(), m_node_freed ? " Freed" : "", down_bufs);
} else {

return fmt::format(
"Buf={} index={} state={} create/dirty_cp={}/{} down_wait#={}{} up={} node=[{}] down={{{}}}",
voidptr_cast(const_cast< IndexBuffer* >(this)), m_index_ordinal, state_str[int_cast(state())],
m_created_cp_id, m_dirtied_cp_id, m_wait_for_down_buffers.get(), m_node_freed ? " Freed" : "",
voidptr_cast(const_cast< IndexBuffer* >(m_up_buffer.get())),
(m_bytes == nullptr) ? "not attached yet" : r_cast< persistent_hdr_t const* >(m_bytes)->to_compact_string(),
down_bufs);
}
}

Expand All @@ -194,7 +211,7 @@ std::string IndexBuffer::to_string_dot() const {
return str;
}

void IndexBuffer::add_down_buffer(const IndexBufferPtr &buf) {
void IndexBuffer::add_down_buffer(const IndexBufferPtr& buf) {
m_wait_for_down_buffers.increment();
#ifndef NDEBUG
{
Expand All @@ -204,10 +221,11 @@ void IndexBuffer::add_down_buffer(const IndexBufferPtr &buf) {
#endif
}

void IndexBuffer::remove_down_buffer(const IndexBufferPtr &buf) {
void IndexBuffer::remove_down_buffer(const IndexBufferPtr& buf) {
m_wait_for_down_buffers.decrement();
#ifndef NDEBUG
bool found{false}; {
bool found{false};
{
std::lock_guard lg(m_down_buffers_mtx);
for (auto it = buf->m_up_buffer->m_down_buffers.begin(); it != buf->m_up_buffer->m_down_buffers.end(); ++it) {
if (it->lock() == buf) {
Expand All @@ -222,12 +240,10 @@ void IndexBuffer::remove_down_buffer(const IndexBufferPtr &buf) {
}

#ifndef NDEBUG
bool IndexBuffer::is_in_down_buffers(const IndexBufferPtr &buf) {
std::lock_guard<std::mutex> lg(m_down_buffers_mtx);
for (auto const &dbuf: m_down_buffers) {
if (dbuf.lock() == buf) {
return true;
}
bool IndexBuffer::is_in_down_buffers(const IndexBufferPtr& buf) {
std::lock_guard< std::mutex > lg(m_down_buffers_mtx);
for (auto const& dbuf : m_down_buffers) {
if (dbuf.lock() == buf) { return true; }
}
return false;
}
Expand Down
Loading
Loading