Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merging master to repl #231

Merged
merged 3 commits into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "4.8.1"
version = "4.8.3"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
2 changes: 1 addition & 1 deletion src/include/homestore/btree/detail/btree_node_mgr.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ BtreeNode* Btree< K, V >::init_node(uint8_t* node_buf, uint32_t node_ctx_size, b
/* Note:- This function assumes that access of this node is thread safe. */
template < typename K, typename V >
void Btree< K, V >::free_node(const BtreeNodePtr& node, locktype_t cur_lock, void* context) {
BT_NODE_LOG(DEBUG, node, "Freeing node");
BT_NODE_LOG(TRACE, node, "Freeing node");

COUNTER_DECREMENT_IF_ELSE(m_metrics, node->is_leaf(), btree_leaf_node_count, btree_int_node_count, 1);
if (cur_lock != locktype_t::NONE) {
Expand Down
2 changes: 1 addition & 1 deletion src/include/homestore/btree/detail/prefix_node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -771,7 +771,7 @@ class FixedPrefixNode : public VariantNode< K, V > {
phdr->tail_slot = phdr->used_slots;
}

#ifdef _DEBUG
#ifndef NDEBUG
void validate_sanity() {
uint32_t i{0};
// validate if keys are in ascending order
Expand Down
61 changes: 50 additions & 11 deletions src/include/homestore/index/index_internal.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,29 +64,68 @@ enum class index_buf_state_t : uint8_t {
};

///////////////////////// Btree Node and Buffer Portion //////////////////////////


// Multiple IndexBuffer could point to the same NodeBuffer if its clean.
struct NodeBuffer;
typedef std::shared_ptr< NodeBuffer > NodeBufferPtr;
struct NodeBuffer {
uint8_t* m_bytes{nullptr}; // Actual data buffer
std::atomic< index_buf_state_t > m_state{index_buf_state_t::CLEAN}; // Is buffer yet to persist?
NodeBuffer(uint32_t buf_size, uint32_t align_size);
~NodeBuffer();
};

// IndexBuffer is for each CP. The dependent index buffers are chained using
// m_next_buffer and each buffer is flushed only its wait_for_leaders reaches 0
// which means all its dependent buffers are flushed.
struct IndexBuffer;
typedef std::shared_ptr< IndexBuffer > IndexBufferPtr;

struct IndexBuffer {
uint8_t* m_node_buf{nullptr}; // Actual buffer
index_buf_state_t m_buf_state{index_buf_state_t::CLEAN}; // Is buffer yet to persist?
BlkId m_blkid; // BlkId where this needs to be persisted
std::weak_ptr< IndexBuffer > m_next_buffer; // Next buffer in the chain
NodeBufferPtr m_node_buf;
BlkId m_blkid; // BlkId where this needs to be persisted
std::weak_ptr< IndexBuffer > m_next_buffer; // Next buffer in the chain
// Number of leader buffers we are waiting for before we write this buffer
sisl::atomic_counter< int > m_wait_for_leaders{0};

IndexBuffer(BlkId blkid, uint32_t buf_size, uint32_t align_size);
IndexBuffer(NodeBufferPtr node_buf, BlkId blkid);
~IndexBuffer();

BlkId blkid() const { return m_blkid; }
uint8_t* raw_buffer() { return m_node_buf; }
uint8_t* raw_buffer() {
RELEASE_ASSERT(m_node_buf, "Node buffer null blkid {}", m_blkid.to_integer());
return m_node_buf->m_bytes;
}

bool is_clean() const {
RELEASE_ASSERT(m_node_buf, "Node buffer null blkid {}", m_blkid.to_integer());
return (m_node_buf->m_state.load() == index_buf_state_t::CLEAN);
}

index_buf_state_t state() const {
RELEASE_ASSERT(m_node_buf, "Node buffer null blkid {}", m_blkid.to_integer());
return m_node_buf->m_state;
}

void set_state(index_buf_state_t state) {
RELEASE_ASSERT(m_node_buf, "Node buffer null blkid {}", m_blkid.to_integer());
m_node_buf->m_state = state;
}

bool is_clean() const { return (m_buf_state == index_buf_state_t::CLEAN); }
std::string to_string() const {
return fmt::format("IndexBuffer {} blkid={} state={} node_buf={} next_buffer={} wait_for={}",
reinterpret_cast< void* >(const_cast< IndexBuffer* >(this)), m_blkid.to_integer(),
static_cast< int >(m_buf_state), static_cast< void* >(m_node_buf),
voidptr_cast(m_next_buffer.lock().get()), m_wait_for_leaders.get());
auto str = fmt::format("IndexBuffer {} blkid={}", reinterpret_cast< void* >(const_cast< IndexBuffer* >(this)),
m_blkid.to_integer());
if (m_node_buf == nullptr) {
fmt::format_to(std::back_inserter(str), " node_buf=nullptr");
} else {
fmt::format_to(std::back_inserter(str), " state={} node_buf={}",
static_cast< int >(m_node_buf->m_state.load()), static_cast< void* >(m_node_buf->m_bytes));
}
fmt::format_to(std::back_inserter(str), " next_buffer={} wait_for={}",
m_next_buffer.lock() ? reinterpret_cast< void* >(m_next_buffer.lock().get()) : 0,
m_wait_for_leaders.get());
return str;
}
};

Expand Down
33 changes: 22 additions & 11 deletions src/include/homestore/index/index_table.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ class IndexTable : public IndexTableBase, public Btree< K, V > {
// Need to put it in wb cache
wb_cache().write_buf(node, idx_node->m_idx_buf, cp_ctx);
idx_node->m_last_mod_cp_id = cp_ctx->id();
LOGTRACEMOD(wbcache, "{}", idx_node->m_idx_buf->to_string());
LOGTRACEMOD(wbcache, "add to dirty list cp {} {}", cp_ctx->id(), idx_node->m_idx_buf->to_string());
}
node->set_checksum(this->m_bt_cfg);
return btree_status_t::success;
Expand All @@ -129,17 +129,28 @@ class IndexTable : public IndexTableBase, public Btree< K, V > {
auto& left_child_buf = left_child_idx_node->m_idx_buf;
auto& parent_buf = parent_idx_node->m_idx_buf;

LOGTRACEMOD(wbcache, "left {} parent {} ", left_child_buf->to_string(), parent_buf->to_string());

// Write new nodes in the list as standalone outside transacted pairs.
// Write the new right child nodes, left node and parent in order.
// Create the relationship of right child to the left node via prepend_to_chain below.
// Parent and left node are linked in the prepare_node_txn
for (const auto& right_child_node : new_nodes) {
auto right_child = IndexBtreeNode::convert(right_child_node.get());
write_node_impl(right_child_node, context);
wb_cache().prepend_to_chain(right_child->m_idx_buf, left_child_buf);
LOGTRACEMOD(wbcache, "right {} left {} ", right_child->m_idx_buf->to_string(), left_child_buf->to_string());
}

auto trace_index_bufs = [&]() {
std::string str;
str = fmt::format("cp {} left {} parent {}", cp_ctx->id(), left_child_buf->to_string(),
parent_buf->to_string());
for (const auto& right_child_node : new_nodes) {
auto right_child = IndexBtreeNode::convert(right_child_node.get());
fmt::format_to(std::back_inserter(str), " right {}", right_child->m_idx_buf->to_string());
}
return str;
};

LOGTRACEMOD(wbcache, "{}", trace_index_bufs());
write_node_impl(left_child_node, context);
write_node_impl(parent_node, context);

Expand Down Expand Up @@ -181,18 +192,17 @@ class IndexTable : public IndexTableBase, public Btree< K, V > {
// realloc_node(node);
// }

// If the backing buffer is already in a clean state, we don't need to make a copy of it
if (idx_node->m_idx_buf->is_clean()) { return btree_status_t::success; }

// Make a new btree buffer and copy the contents and swap it to make it the current node's buffer. The
// We create IndexBuffer for each CP. But if the backing buffer is already in a clean state
// we dont copy the node buffer. Copy buffer will handle it. If the node buffer is dirty,
// make a new btree buffer and copy the contents and swap it to make it the current node's buffer. The
// buffer prior to this copy, would have been written and already added into the dirty buffer list.
idx_node->m_idx_buf = wb_cache().copy_buffer(idx_node->m_idx_buf);
idx_node->m_idx_buf = wb_cache().copy_buffer(idx_node->m_idx_buf, cp_ctx);
idx_node->m_last_mod_cp_id = -1;

node->m_phys_node_buf = idx_node->m_idx_buf->raw_buffer();
node->set_checksum(this->m_bt_cfg);

LOGTRACEMOD(wbcache, "buf {} ", idx_node->m_idx_buf->to_string());
LOGTRACEMOD(wbcache, "cp {} {} ", cp_ctx->id(), idx_node->m_idx_buf->to_string());

#ifndef NO_CHECKSUM
if (!node->verify_node(this->m_bt_cfg)) {
Expand Down Expand Up @@ -221,6 +231,8 @@ class IndexTable : public IndexTableBase, public Btree< K, V > {
auto& child_buf = child_idx_node->m_idx_buf;
auto& parent_buf = parent_idx_node->m_idx_buf;

LOGTRACEMOD(wbcache, "cp {} left {} parent {} ", cp_ctx->id(), child_buf->to_string(), parent_buf->to_string());

auto [child_copied, parent_copied] = wb_cache().create_chain(child_buf, parent_buf, cp_ctx);
if (child_copied) {
child_node->m_phys_node_buf = child_buf->raw_buffer();
Expand All @@ -231,7 +243,6 @@ class IndexTable : public IndexTableBase, public Btree< K, V > {
parent_idx_node->m_last_mod_cp_id = -1;
}

LOGTRACEMOD(wbcache, "child {} parent {} ", child_buf->to_string(), parent_buf->to_string());
return btree_status_t::success;
}

Expand Down
6 changes: 3 additions & 3 deletions src/include/homestore/index/wb_cache_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,13 @@ class IndexWBCacheBase {

/// @brief Start a chain of related btree buffers. Typically a chain is creating from second and third pairs and
/// then first is prepended to the chain. In case the second buffer is already with the WB cache, it will create a
/// new buffer for both second and third.
/// new buffer for both second and third. We append the buffers to a list in dependency chain.
/// @param second Second btree buffer in the chain. It will be updated to copy of second buffer if buffer already
/// has dependencies.
/// @param third Thrid btree buffer in the chain. It will be updated to copy of third buffer if buffer already
/// has dependencies.
/// @return Returns if the buffer had to be copied
virtual std::tuple< bool, bool > create_chain(IndexBufferPtr& second, IndexBufferPtr& third, CPContext* cp_ctx) = 0;
virtual std::pair< bool, bool > create_chain(IndexBufferPtr& second, IndexBufferPtr& third, CPContext* cp_ctx) = 0;

/// @brief Prepend to the chain that was already created with second
/// @param first
Expand All @@ -73,7 +73,7 @@ class IndexWBCacheBase {
/// @brief Copy buffer
/// @param cur_buf
/// @return
virtual IndexBufferPtr copy_buffer(const IndexBufferPtr& cur_buf) const = 0;
virtual IndexBufferPtr copy_buffer(const IndexBufferPtr& cur_buf, const CPContext* context) const = 0;
};

} // namespace homestore
2 changes: 1 addition & 1 deletion src/include/homestore/index_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
#include <homestore/homestore_decl.hpp>
#include <homestore/index/index_internal.hpp>
#include <homestore/superblk_handler.hpp>

#include <homestore/index/wb_cache_base.hpp>
namespace homestore {

class IndexWBCacheBase;
Expand Down
4 changes: 2 additions & 2 deletions src/lib/checkpoint/cp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

#include <sisl/logging/logging.h>
#include <iomgr/iomgr.hpp>

#include <folly/futures/SharedPromise.h>
#include "common/homestore_assert.hpp"

/*
Expand Down Expand Up @@ -73,7 +73,7 @@ struct CP {
bool m_cp_waiting_to_trigger{false}; // it is waiting for previous cp to complete
cp_id_t m_cp_id;
std::array< std::unique_ptr< CPContext >, (size_t)cp_consumer_t::SENTINEL > m_contexts;
folly::Promise< bool > m_comp_promise;
folly::SharedPromise< bool > m_comp_promise;

public:
CP(CPManager* mgr) : m_cp_mgr{mgr} {}
Expand Down
9 changes: 6 additions & 3 deletions src/lib/checkpoint/cp_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,11 @@ folly::Future< bool > CPManager::trigger_cp_flush(bool force) {
std::unique_lock< std::mutex > lk(trigger_cp_mtx);
auto cur_cp = cp_guard();
HS_DBG_ASSERT_NE(cur_cp->m_cp_status, cp_status_t::cp_flush_prepare);
cur_cp->m_comp_promise = std::move(folly::Promise< bool >{});
cur_cp->m_cp_waiting_to_trigger = true;
// If multiple threads call trigger, they all get the future from the same promise.
if (!cur_cp->m_cp_waiting_to_trigger) {
cur_cp->m_comp_promise = std::move(folly::SharedPromise< bool >{});
cur_cp->m_cp_waiting_to_trigger = true;
}
return cur_cp->m_comp_promise.getFuture();
} else {
return folly::makeFuture< bool >(false);
Expand Down Expand Up @@ -177,7 +180,7 @@ folly::Future< bool > CPManager::trigger_cp_flush(bool force) {
// originally by the caller will be untouched and completed upto CP completion/
ret_fut = folly::makeFuture< bool >(true);
} else {
cur_cp->m_comp_promise = std::move(folly::Promise< bool >{});
cur_cp->m_comp_promise = std::move(folly::SharedPromise< bool >{});
ret_fut = cur_cp->m_comp_promise.getFuture();
}
cur_cp->m_cp_status = cp_status_t::cp_flush_prepare;
Expand Down
98 changes: 92 additions & 6 deletions src/lib/index/index_cp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <sisl/fds/concurrent_insert_vector.hpp>
#include <homestore/blk.h>
#include <homestore/index/index_internal.hpp>
#include <homestore/index_service.hpp>
#include <homestore/checkpoint/cp_mgr.hpp>

#include "checkpoint/cp.hpp"
Expand All @@ -32,20 +33,18 @@ struct IndexCPContext : public VDevCPContext {
std::atomic< uint64_t > m_num_nodes_removed{0};
sisl::ConcurrentInsertVector< IndexBufferPtr > m_dirty_buf_list;
sisl::atomic_counter< int64_t > m_dirty_buf_count{0};
IndexBufferPtr m_last_in_chain;
std::mutex m_flush_buffer_mtx;
sisl::ConcurrentInsertVector< IndexBufferPtr >::iterator m_dirty_buf_it;

public:

IndexCPContext(CP* cp) : VDevCPContext(cp) {}
virtual ~IndexCPContext() = default;

void add_to_dirty_list(const IndexBufferPtr& buf) {
buf->m_buf_state = index_buf_state_t::DIRTY;
m_dirty_buf_list.push_back(buf);
buf->set_state(index_buf_state_t::DIRTY);
m_dirty_buf_count.increment(1);
m_last_in_chain = buf;
LOGTRACEMOD(wbcache, "{}", buf->to_string());
}

bool any_dirty_buffers() const { return !m_dirty_buf_count.testz(); }
Expand All @@ -59,15 +58,102 @@ struct IndexCPContext : public VDevCPContext {
return ret;
}

std::string to_string() const {
std::string to_string() {
std::string str{fmt::format("IndexCPContext cpid={} dirty_buf_count={} dirty_buf_list_size={}", m_cp->id(),
m_dirty_buf_count.get(), m_dirty_buf_list.size())};

// TODO dump all index buffers.
// Mapping from a node to all its parents in the graph.
// Display all buffers and its dependencies and state.
std::unordered_map< IndexBuffer*, std::vector< IndexBuffer* > > parents;

auto it = m_dirty_buf_list.begin();
while (it != m_dirty_buf_list.end()) {
// Add this buf to his children.
IndexBufferPtr buf = *it;
parents[buf->m_next_buffer.lock().get()].emplace_back(buf.get());
++it;
}

it = m_dirty_buf_list.begin();
while (it != m_dirty_buf_list.end()) {
IndexBufferPtr buf = *it;
fmt::format_to(std::back_inserter(str), "{}", buf->to_string());
auto first = true;
for (const auto& p : parents[buf.get()]) {
if (first) {
fmt::format_to(std::back_inserter(str), "\nDepends:");
first = false;
}
fmt::format_to(std::back_inserter(str), " {}({})", r_cast< void* >(p), s_cast< int >(p->state()));
}
fmt::format_to(std::back_inserter(str), "\n");
++it;
}

return str;
}

void check_cycle() {
// Use dfs to find if the graph is cycle
auto it = m_dirty_buf_list.begin();
while (it != m_dirty_buf_list.end()) {
IndexBufferPtr buf = *it;;
std::set< IndexBuffer* > visited;
check_cycle_recurse(buf, visited);
++it;
}
}

void check_cycle_recurse(IndexBufferPtr buf, std::set< IndexBuffer* >& visited) const {
if (visited.count(buf.get()) != 0) {
LOGERROR("Cycle found for {}", buf->to_string());
for (auto& x : visited) {
LOGERROR("Path : {}", x->to_string());
}
return;
}

visited.insert(buf.get());
if (buf->m_next_buffer.lock()) { check_cycle_recurse(buf->m_next_buffer.lock(), visited); }
}

void check_wait_for_leaders() {
// Use the next buffer as indegree to find if wait_for_leaders is invalid.
std::unordered_map< IndexBuffer*, int > wait_for_leaders;
IndexBufferPtr buf;

// Store the wait for leader count for each buffer.
auto it = m_dirty_buf_list.begin();
while (it != m_dirty_buf_list.end()) {
buf = *it;
wait_for_leaders[buf.get()] = buf->m_wait_for_leaders.get();
++it;
}

// Decrement the count using the next buffer.
it = m_dirty_buf_list.begin();
while (it != m_dirty_buf_list.end()) {
buf = *it;
auto next_buf = buf->m_next_buffer.lock();
if (next_buf.get() == nullptr) continue;
wait_for_leaders[next_buf.get()]--;
++it;
}

bool issue = false;
for (const auto& [buf, waits] : wait_for_leaders) {
// Any value other than zero means the dependency graph is invalid.
if (waits != 0) {
issue = true;
LOGERROR("Leaders wait not zero cp {} buf {} waits {}", id(), buf->to_string(), waits);
}
}

RELEASE_ASSERT_EQ(issue, false, "Found issue with wait_for_leaders");
}
};


class IndexWBCache;
class IndexCPCallbacks : public CPCallbacks {
public:
Expand Down
Loading
Loading