Skip to content

Commit

Permalink
Index write back cache fixes.
Browse files Browse the repository at this point in the history
Add atomic for state for indexbuffer to avoid concurrency issues with
cp flush and insert threads. Create per CP indexbuffer.
Create NodeBuffer which points to actual data buffer.
Several indexbuffer an can point to same node buffer.
Add locks in test for index btree shadow map as there are
concurrent requests. Use shared ptr's in wb cache.
  • Loading branch information
sanebay committed Nov 10, 2023
1 parent c979057 commit 4036ffb
Show file tree
Hide file tree
Showing 15 changed files with 405 additions and 136 deletions.
2 changes: 1 addition & 1 deletion src/include/homestore/btree/detail/btree_node_mgr.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ BtreeNode* Btree< K, V >::init_node(uint8_t* node_buf, uint32_t node_ctx_size, b
/* Note:- This function assumes that access of this node is thread safe. */
template < typename K, typename V >
void Btree< K, V >::free_node(const BtreeNodePtr& node, locktype_t cur_lock, void* context) {
BT_NODE_LOG(DEBUG, node, "Freeing node");
BT_NODE_LOG(TRACE, node, "Freeing node");

COUNTER_DECREMENT_IF_ELSE(m_metrics, node->is_leaf(), btree_leaf_node_count, btree_int_node_count, 1);
if (cur_lock != locktype_t::NONE) {
Expand Down
61 changes: 50 additions & 11 deletions src/include/homestore/index/index_internal.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,29 +64,68 @@ enum class index_buf_state_t : uint8_t {
};

///////////////////////// Btree Node and Buffer Portion //////////////////////////


// Multiple IndexBuffer could point to the same NodeBuffer if its clean.
struct NodeBuffer;
typedef std::shared_ptr< NodeBuffer > NodeBufferPtr;
struct NodeBuffer {
uint8_t* m_bytes{nullptr}; // Actual data buffer
std::atomic< index_buf_state_t > m_state{index_buf_state_t::CLEAN}; // Is buffer yet to persist?
NodeBuffer(uint32_t buf_size, uint32_t align_size);
~NodeBuffer();
};

// IndexBuffer is for each CP. The dependent index buffers are chained using
// m_next_buffer and each buffer is flushed only its wait_for_leaders reaches 0
// which means all its dependent buffers are flushed.
struct IndexBuffer;
typedef std::shared_ptr< IndexBuffer > IndexBufferPtr;

struct IndexBuffer {
uint8_t* m_node_buf{nullptr}; // Actual buffer
index_buf_state_t m_buf_state{index_buf_state_t::CLEAN}; // Is buffer yet to persist?
BlkId m_blkid; // BlkId where this needs to be persisted
std::weak_ptr< IndexBuffer > m_next_buffer; // Next buffer in the chain
NodeBufferPtr m_node_buf;
BlkId m_blkid; // BlkId where this needs to be persisted
std::weak_ptr< IndexBuffer > m_next_buffer; // Next buffer in the chain
// Number of leader buffers we are waiting for before we write this buffer
sisl::atomic_counter< int > m_wait_for_leaders{0};

IndexBuffer(BlkId blkid, uint32_t buf_size, uint32_t align_size);
IndexBuffer(NodeBufferPtr node_buf, BlkId blkid);
~IndexBuffer();

BlkId blkid() const { return m_blkid; }
uint8_t* raw_buffer() { return m_node_buf; }
uint8_t* raw_buffer() {
RELEASE_ASSERT(m_node_buf, "Node buffer null blkid {}", m_blkid.to_integer());
return m_node_buf->m_bytes;
}

bool is_clean() const {
RELEASE_ASSERT(m_node_buf, "Node buffer null blkid {}", m_blkid.to_integer());
return (m_node_buf->m_state.load() == index_buf_state_t::CLEAN);
}

index_buf_state_t state() const {
RELEASE_ASSERT(m_node_buf, "Node buffer null blkid {}", m_blkid.to_integer());
return m_node_buf->m_state;
}

void set_state(index_buf_state_t state) {
RELEASE_ASSERT(m_node_buf, "Node buffer null blkid {}", m_blkid.to_integer());
m_node_buf->m_state = state;
}

bool is_clean() const { return (m_buf_state == index_buf_state_t::CLEAN); }
std::string to_string() const {
return fmt::format("IndexBuffer {} blkid={} state={} node_buf={} next_buffer={} wait_for={}",
reinterpret_cast< void* >(const_cast< IndexBuffer* >(this)), m_blkid.to_integer(),
static_cast< int >(m_buf_state), static_cast< void* >(m_node_buf),
voidptr_cast(m_next_buffer.lock().get()), m_wait_for_leaders.get());
auto str = fmt::format("IndexBuffer {} blkid={}", reinterpret_cast< void* >(const_cast< IndexBuffer* >(this)),
m_blkid.to_integer());
if (m_node_buf == nullptr) {
fmt::format_to(std::back_inserter(str), " node_buf=nullptr");
} else {
fmt::format_to(std::back_inserter(str), " state={} node_buf={}",
static_cast< int >(m_node_buf->m_state.load()), static_cast< void* >(m_node_buf->m_bytes));
}
fmt::format_to(std::back_inserter(str), " next_buffer={} wait_for={}",
m_next_buffer.lock() ? reinterpret_cast< void* >(m_next_buffer.lock().get()) : 0,
m_wait_for_leaders.get());
return str;
}
};

Expand Down
33 changes: 22 additions & 11 deletions src/include/homestore/index/index_table.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ class IndexTable : public IndexTableBase, public Btree< K, V > {
// Need to put it in wb cache
wb_cache().write_buf(node, idx_node->m_idx_buf, cp_ctx);
idx_node->m_last_mod_cp_id = cp_ctx->id();
LOGTRACEMOD(wbcache, "{}", idx_node->m_idx_buf->to_string());
LOGTRACEMOD(wbcache, "add to dirty list cp {} {}", cp_ctx->id(), idx_node->m_idx_buf->to_string());
}
node->set_checksum(this->m_bt_cfg);
return btree_status_t::success;
Expand All @@ -129,17 +129,28 @@ class IndexTable : public IndexTableBase, public Btree< K, V > {
auto& left_child_buf = left_child_idx_node->m_idx_buf;
auto& parent_buf = parent_idx_node->m_idx_buf;

LOGTRACEMOD(wbcache, "left {} parent {} ", left_child_buf->to_string(), parent_buf->to_string());

// Write new nodes in the list as standalone outside transacted pairs.
// Write the new right child nodes, left node and parent in order.
// Create the relationship of right child to the left node via prepend_to_chain below.
// Parent and left node are linked in the prepare_node_txn
for (const auto& right_child_node : new_nodes) {
auto right_child = IndexBtreeNode::convert(right_child_node.get());
write_node_impl(right_child_node, context);
wb_cache().prepend_to_chain(right_child->m_idx_buf, left_child_buf);
LOGTRACEMOD(wbcache, "right {} left {} ", right_child->m_idx_buf->to_string(), left_child_buf->to_string());
}

auto trace_index_bufs = [&]() {
std::string str;
str = fmt::format("cp {} left {} parent {}", cp_ctx->id(), left_child_buf->to_string(),
parent_buf->to_string());
for (const auto& right_child_node : new_nodes) {
auto right_child = IndexBtreeNode::convert(right_child_node.get());
fmt::format_to(std::back_inserter(str), " right {}", right_child->m_idx_buf->to_string());
}
return str;
};

LOGTRACEMOD(wbcache, "{}", trace_index_bufs());
write_node_impl(left_child_node, context);
write_node_impl(parent_node, context);

Expand Down Expand Up @@ -181,18 +192,17 @@ class IndexTable : public IndexTableBase, public Btree< K, V > {
// realloc_node(node);
// }

// If the backing buffer is already in a clean state, we don't need to make a copy of it
if (idx_node->m_idx_buf->is_clean()) { return btree_status_t::success; }

// Make a new btree buffer and copy the contents and swap it to make it the current node's buffer. The
// We create IndexBuffer for each CP. But if the backing buffer is already in a clean state
// we dont copy the node buffer. Copy buffer will handle it. If the node buffer is dirty,
// make a new btree buffer and copy the contents and swap it to make it the current node's buffer. The
// buffer prior to this copy, would have been written and already added into the dirty buffer list.
idx_node->m_idx_buf = wb_cache().copy_buffer(idx_node->m_idx_buf);
idx_node->m_idx_buf = wb_cache().copy_buffer(idx_node->m_idx_buf, cp_ctx);
idx_node->m_last_mod_cp_id = -1;

node->m_phys_node_buf = idx_node->m_idx_buf->raw_buffer();
node->set_checksum(this->m_bt_cfg);

LOGTRACEMOD(wbcache, "buf {} ", idx_node->m_idx_buf->to_string());
LOGTRACEMOD(wbcache, "cp {} {} ", cp_ctx->id(), idx_node->m_idx_buf->to_string());

#ifndef NO_CHECKSUM
if (!node->verify_node(this->m_bt_cfg)) {
Expand Down Expand Up @@ -221,6 +231,8 @@ class IndexTable : public IndexTableBase, public Btree< K, V > {
auto& child_buf = child_idx_node->m_idx_buf;
auto& parent_buf = parent_idx_node->m_idx_buf;

LOGTRACEMOD(wbcache, "cp {} left {} parent {} ", cp_ctx->id(), child_buf->to_string(), parent_buf->to_string());

auto [child_copied, parent_copied] = wb_cache().create_chain(child_buf, parent_buf, cp_ctx);
if (child_copied) {
child_node->m_phys_node_buf = child_buf->raw_buffer();
Expand All @@ -231,7 +243,6 @@ class IndexTable : public IndexTableBase, public Btree< K, V > {
parent_idx_node->m_last_mod_cp_id = -1;
}

LOGTRACEMOD(wbcache, "child {} parent {} ", child_buf->to_string(), parent_buf->to_string());
return btree_status_t::success;
}

Expand Down
2 changes: 1 addition & 1 deletion src/include/homestore/index/wb_cache_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class IndexWBCacheBase {
/// @brief Copy buffer
/// @param cur_buf
/// @return
virtual IndexBufferPtr copy_buffer(const IndexBufferPtr& cur_buf) const = 0;
virtual IndexBufferPtr copy_buffer(const IndexBufferPtr& cur_buf, const CPContext* context) const = 0;
};

} // namespace homestore
2 changes: 1 addition & 1 deletion src/include/homestore/index_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
#include <homestore/homestore_decl.hpp>
#include <homestore/index/index_internal.hpp>
#include <homestore/superblk_handler.hpp>

#include <homestore/index/wb_cache_base.hpp>
namespace homestore {

class IndexWBCacheBase;
Expand Down
4 changes: 2 additions & 2 deletions src/lib/checkpoint/cp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

#include <sisl/logging/logging.h>
#include <iomgr/iomgr.hpp>

#include <folly/futures/SharedPromise.h>
#include "common/homestore_assert.hpp"

/*
Expand Down Expand Up @@ -73,7 +73,7 @@ struct CP {
bool m_cp_waiting_to_trigger{false}; // it is waiting for previous cp to complete
cp_id_t m_cp_id;
std::array< std::unique_ptr< CPContext >, (size_t)cp_consumer_t::SENTINEL > m_contexts;
folly::Promise< bool > m_comp_promise;
folly::SharedPromise< bool > m_comp_promise;

public:
CP(CPManager* mgr) : m_cp_mgr{mgr} {}
Expand Down
9 changes: 6 additions & 3 deletions src/lib/checkpoint/cp_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,11 @@ folly::Future< bool > CPManager::trigger_cp_flush(bool force) {
std::unique_lock< std::mutex > lk(trigger_cp_mtx);
auto cur_cp = cp_guard();
HS_DBG_ASSERT_NE(cur_cp->m_cp_status, cp_status_t::cp_flush_prepare);
cur_cp->m_comp_promise = std::move(folly::Promise< bool >{});
cur_cp->m_cp_waiting_to_trigger = true;
// If multiple threads call trigger, they all get the future from the same promise.
if (!cur_cp->m_cp_waiting_to_trigger) {
cur_cp->m_comp_promise = std::move(folly::SharedPromise< bool >{});
cur_cp->m_cp_waiting_to_trigger = true;
}
return cur_cp->m_comp_promise.getFuture();
} else {
return folly::makeFuture< bool >(false);
Expand Down Expand Up @@ -177,7 +180,7 @@ folly::Future< bool > CPManager::trigger_cp_flush(bool force) {
// originally by the caller will be untouched and completed upto CP completion/
ret_fut = folly::makeFuture< bool >(true);
} else {
cur_cp->m_comp_promise = std::move(folly::Promise< bool >{});
cur_cp->m_comp_promise = std::move(folly::SharedPromise< bool >{});
ret_fut = cur_cp->m_comp_promise.getFuture();
}
cur_cp->m_cp_status = cp_status_t::cp_flush_prepare;
Expand Down
98 changes: 92 additions & 6 deletions src/lib/index/index_cp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <sisl/fds/concurrent_insert_vector.hpp>
#include <homestore/blk.h>
#include <homestore/index/index_internal.hpp>
#include <homestore/index_service.hpp>
#include <homestore/checkpoint/cp_mgr.hpp>

#include "checkpoint/cp.hpp"
Expand All @@ -32,20 +33,18 @@ struct IndexCPContext : public VDevCPContext {
std::atomic< uint64_t > m_num_nodes_removed{0};
sisl::ConcurrentInsertVector< IndexBufferPtr > m_dirty_buf_list;
sisl::atomic_counter< int64_t > m_dirty_buf_count{0};
IndexBufferPtr m_last_in_chain;
std::mutex m_flush_buffer_mtx;
sisl::ConcurrentInsertVector< IndexBufferPtr >::iterator m_dirty_buf_it;

public:

IndexCPContext(CP* cp) : VDevCPContext(cp) {}
virtual ~IndexCPContext() = default;

void add_to_dirty_list(const IndexBufferPtr& buf) {
buf->m_buf_state = index_buf_state_t::DIRTY;
m_dirty_buf_list.push_back(buf);
buf->set_state(index_buf_state_t::DIRTY);
m_dirty_buf_count.increment(1);
m_last_in_chain = buf;
LOGTRACEMOD(wbcache, "{}", buf->to_string());
}

bool any_dirty_buffers() const { return !m_dirty_buf_count.testz(); }
Expand All @@ -59,15 +58,102 @@ struct IndexCPContext : public VDevCPContext {
return ret;
}

std::string to_string() const {
std::string to_string() {
std::string str{fmt::format("IndexCPContext cpid={} dirty_buf_count={} dirty_buf_list_size={}", m_cp->id(),
m_dirty_buf_count.get(), m_dirty_buf_list.size())};

// TODO dump all index buffers.
// Mapping from a node to all its parents in the graph.
// Display all buffers and its dependencies and state.
std::unordered_map< IndexBuffer*, std::vector< IndexBuffer* > > parents;

auto it = m_dirty_buf_list.begin();
while (it != m_dirty_buf_list.end()) {
// Add this buf to his children.
IndexBufferPtr buf = *it;
parents[buf->m_next_buffer.lock().get()].emplace_back(buf.get());
++it;
}

it = m_dirty_buf_list.begin();
while (it != m_dirty_buf_list.end()) {
IndexBufferPtr buf = *it;
fmt::format_to(std::back_inserter(str), "{}", buf->to_string());
auto first = true;
for (const auto& p : parents[buf.get()]) {
if (first) {
fmt::format_to(std::back_inserter(str), "\nDepends:");
first = false;
}
fmt::format_to(std::back_inserter(str), " {}({})", r_cast< void* >(p), s_cast< int >(p->state()));
}
fmt::format_to(std::back_inserter(str), "\n");
++it;
}

return str;
}

void check_cycle() {
// Use dfs to find if the graph is cycle
auto it = m_dirty_buf_list.begin();
while (it != m_dirty_buf_list.end()) {
IndexBufferPtr buf = *it;;
std::set< IndexBuffer* > visited;
check_cycle_recurse(buf, visited);
++it;
}
}

void check_cycle_recurse(IndexBufferPtr buf, std::set< IndexBuffer* >& visited) const {
if (visited.count(buf.get()) != 0) {
LOGERROR("Cycle found for {}", buf->to_string());
for (auto& x : visited) {
LOGERROR("Path : {}", x->to_string());
}
return;
}

visited.insert(buf.get());
if (buf->m_next_buffer.lock()) { check_cycle_recurse(buf->m_next_buffer.lock(), visited); }
}

void check_wait_for_leaders() {
// Use the next buffer as indegree to find if wait_for_leaders is invalid.
std::unordered_map< IndexBuffer*, int > wait_for_leaders;
IndexBufferPtr buf;

// Store the wait for leader count for each buffer.
auto it = m_dirty_buf_list.begin();
while (it != m_dirty_buf_list.end()) {
buf = *it;
wait_for_leaders[buf.get()] = buf->m_wait_for_leaders.get();
++it;
}

// Decrement the count using the next buffer.
it = m_dirty_buf_list.begin();
while (it != m_dirty_buf_list.end()) {
buf = *it;
auto next_buf = buf->m_next_buffer.lock();
if (next_buf.get() == nullptr) continue;
wait_for_leaders[next_buf.get()]--;
++it;
}

bool issue = false;
for (const auto& [buf, waits] : wait_for_leaders) {
// Any value other than zero means the dependency graph is invalid.
if (waits != 0) {
issue = true;
LOGERROR("Leaders wait not zero cp {} buf {} waits {}", id(), buf->to_string(), waits);
}
}

RELEASE_ASSERT_EQ(issue, false, "Found issue with wait_for_leaders");
}
};


class IndexWBCache;
class IndexCPCallbacks : public CPCallbacks {
public:
Expand Down
15 changes: 12 additions & 3 deletions src/lib/index/index_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ void IndexService::stop() {
HS_REL_ASSERT_EQ(success, true, "CP Flush failed");
LOGINFO("CP Flush completed");

for (auto [id, tbl] : m_index_map) { tbl->destroy(); }
for (auto [id, tbl] : m_index_map) {
tbl->destroy();
}
}
void IndexService::add_index_table(const std::shared_ptr< IndexTableBase >& tbl) {
std::unique_lock lg(m_index_map_mtx);
Expand All @@ -107,9 +109,16 @@ uint64_t IndexService::used_size() const {
return size;
}

NodeBuffer::NodeBuffer(uint32_t buf_size, uint32_t align_size) :
m_bytes{hs_utils::iobuf_alloc(buf_size, sisl::buftag::btree_node, align_size)} {}

NodeBuffer::~NodeBuffer() { hs_utils::iobuf_free(m_bytes, sisl::buftag::btree_node); }

IndexBuffer::IndexBuffer(BlkId blkid, uint32_t buf_size, uint32_t align_size) :
m_node_buf{hs_utils::iobuf_alloc(buf_size, sisl::buftag::btree_node, align_size)}, m_blkid{blkid} {}
m_node_buf{std::make_shared< NodeBuffer >(buf_size, align_size)}, m_blkid{blkid} {}

IndexBuffer::IndexBuffer(NodeBufferPtr node_buf, BlkId blkid) : m_node_buf(node_buf), m_blkid(blkid) {}

IndexBuffer::~IndexBuffer() { hs_utils::iobuf_free(m_node_buf, sisl::buftag::btree_node); }
IndexBuffer::~IndexBuffer() { m_node_buf.reset(); }

} // namespace homestore
Loading

0 comments on commit 4036ffb

Please sign in to comment.