Skip to content

Commit

Permalink
Index write back cache fixes.
Browse files Browse the repository at this point in the history
Add locks for indexbuffer to avoid concurrency issues with
cp flush and insert threads. Create per CP indexbuffer.
Add locks in test for index btree shadow map as there are
concurrent requests. Use shared ptr's in wb cache.
  • Loading branch information
sanebay committed Oct 25, 2023
1 parent bd20078 commit 8d92ff4
Show file tree
Hide file tree
Showing 14 changed files with 334 additions and 124 deletions.
2 changes: 1 addition & 1 deletion src/include/homestore/btree/detail/btree_node_mgr.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ BtreeNode* Btree< K, V >::init_node(uint8_t* node_buf, uint32_t node_ctx_size, b
/* Note:- This function assumes that access of this node is thread safe. */
template < typename K, typename V >
void Btree< K, V >::free_node(const BtreeNodePtr& node, locktype_t cur_lock, void* context) {
BT_NODE_LOG(DEBUG, node, "Freeing node");
BT_NODE_LOG(TRACE, node, "Freeing node");

COUNTER_DECREMENT_IF_ELSE(m_metrics, node->is_leaf(), btree_leaf_node_count, btree_int_node_count, 1);
if (cur_lock != locktype_t::NONE) {
Expand Down
2 changes: 1 addition & 1 deletion src/include/homestore/btree/detail/prefix_node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,7 @@ class FixedPrefixNode : public VariantNode< K, V > {
--phdr->used_slots;
prefix_bitset_.reset_bit(slot_num);
if ((slot_num != 0) && (slot_num == phdr->tail_slot)) {
uint16_t prev_slot = prefix_bitset_.get_prev_set_bit(slot_num);
uint16_t prev_slot = prefix_bitset_.get_next_set_bit(slot_num);
if (prev_slot != std::numeric_limits< uint16_t >::max()) { phdr->tail_slot = prev_slot; }
}
}
Expand Down
64 changes: 52 additions & 12 deletions src/include/homestore/index/index_internal.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,29 +64,70 @@ enum class index_buf_state_t : uint8_t {
};

///////////////////////// Btree Node and Buffer Portion //////////////////////////


// Multiple IndexBuffer could point to the same NodeBuffer if its clean.
struct NodeBuffer;
typedef std::shared_ptr< NodeBuffer > NodeBufferPtr;
struct NodeBuffer {
uint8_t* m_bytes{nullptr}; // Actual data buffer
std::atomic< index_buf_state_t > m_state{index_buf_state_t::CLEAN}; // Is buffer yet to persist?
NodeBuffer(uint32_t buf_size, uint32_t align_size);
~NodeBuffer();
};

// IndexBuffer is for each CP. The dependent index buffers are chained using
// m_next_buffer and each buffer is flushed only its wait_for_leaders reaches 0
// which means all its dependent buffers are flushed.
struct IndexBuffer;
typedef std::shared_ptr< IndexBuffer > IndexBufferPtr;

struct IndexBuffer {
uint8_t* m_node_buf{nullptr}; // Actual buffer
index_buf_state_t m_buf_state{index_buf_state_t::CLEAN}; // Is buffer yet to persist?
BlkId m_blkid; // BlkId where this needs to be persisted
std::weak_ptr< IndexBuffer > m_next_buffer; // Next buffer in the chain
NodeBufferPtr m_node_buf;
BlkId m_blkid; // BlkId where this needs to be persisted
std::weak_ptr< IndexBuffer > m_next_buffer; // Next buffer in the chain
// Number of leader buffers we are waiting for before we write this buffer
sisl::atomic_counter< int > m_wait_for_leaders{0};
std::mutex m_mutex;

IndexBuffer(BlkId blkid, uint32_t buf_size, uint32_t align_size);
IndexBuffer(NodeBufferPtr node_buf, BlkId blkid);
~IndexBuffer();

BlkId blkid() const { return m_blkid; }
uint8_t* raw_buffer() { return m_node_buf; }
uint8_t* raw_buffer() {
RELEASE_ASSERT(m_node_buf, "Node buffer null");
return m_node_buf->m_bytes;
}

bool is_clean() const {
RELEASE_ASSERT(m_node_buf, "Node buffer null");
return (m_node_buf->m_state.load() == index_buf_state_t::CLEAN);
}

index_buf_state_t state() {
RELEASE_ASSERT(m_node_buf, "Node buffer null");
return m_node_buf->m_state;
}

void set_state(index_buf_state_t state) {
RELEASE_ASSERT(m_node_buf, "Node buffer null");
m_node_buf->m_state = state;
}

bool is_clean() const { return (m_buf_state == index_buf_state_t::CLEAN); }
std::string to_string() const {
return fmt::format("IndexBuffer {} blkid={} state={} node_buf={} next_buffer={} wait_for={}",
reinterpret_cast< void* >(const_cast< IndexBuffer* >(this)), m_blkid.to_integer(),
static_cast< int >(m_buf_state), static_cast< void* >(m_node_buf),
voidptr_cast(m_next_buffer.lock().get()), m_wait_for_leaders.get());
std::lock_guard lock{m_mutex};
auto str = fmt::format("IndexBuffer {} blkid={}", reinterpret_cast< void* >(const_cast< IndexBuffer* >(this)),
m_blkid.to_integer());
if (m_node_buf == nullptr) {
fmt::format_to(std::back_inserter(str), " node_buf=nullptr");
} else {
fmt::format_to(std::back_inserter(str), " state={} node_buf={}",
static_cast< int >(m_node_buf->m_state.load()), static_cast< void* >(m_node_buf->m_bytes));
}
fmt::format_to(std::back_inserter(str), " next_buffer={} wait_for={}",
m_next_buffer.lock() ? reinterpret_cast< void* >(m_next_buffer.lock().get()) : 0,
m_wait_for_leaders.get());
return str;
}
};

Expand All @@ -97,7 +138,6 @@ struct IndexBtreeNode {
public:
IndexBufferPtr m_idx_buf; // Buffer backing this node
cp_id_t m_last_mod_cp_id{-1}; // This node is previously modified by the cp id;

public:
IndexBtreeNode(const IndexBufferPtr& buf) : m_idx_buf{buf} {}
~IndexBtreeNode() { m_idx_buf.reset(); }
Expand Down
35 changes: 23 additions & 12 deletions src/include/homestore/index/index_table.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ class IndexTable : public IndexTableBase, public Btree< K, V > {
// Need to put it in wb cache
wb_cache().write_buf(node, idx_node->m_idx_buf, cp_ctx);
idx_node->m_last_mod_cp_id = cp_ctx->id();
LOGTRACEMOD(wbcache, "{}", idx_node->m_idx_buf->to_string());
LOGTRACEMOD(wbcache, "add to dirty list cp {} {}", cp_ctx->id(), idx_node->m_idx_buf->to_string());
}
node->set_checksum(this->m_bt_cfg);
return btree_status_t::success;
Expand All @@ -129,17 +129,28 @@ class IndexTable : public IndexTableBase, public Btree< K, V > {
auto& left_child_buf = left_child_idx_node->m_idx_buf;
auto& parent_buf = parent_idx_node->m_idx_buf;

LOGTRACEMOD(wbcache, "left {} parent {} ", left_child_buf->to_string(), parent_buf->to_string());

// Write new nodes in the list as standalone outside transacted pairs.
// Write the new right child nodes, left node and parent in order.
// Create the relationship of right child to the left node via prepend_to_chain below.
// Parent and left node are linked in the prepare_node_txn
for (const auto& right_child_node : new_nodes) {
auto right_child = IndexBtreeNode::convert(right_child_node.get());
write_node_impl(right_child_node, context);
wb_cache().prepend_to_chain(right_child->m_idx_buf, left_child_buf);
LOGTRACEMOD(wbcache, "right {} left {} ", right_child->m_idx_buf->to_string(), left_child_buf->to_string());
wb_cache().prepend_to_chain(right_child->m_idx_buf, left_child_buf, cp_ctx);
}

auto trace_index_bufs = [&]() {
std::string str;
str = fmt::format("cp {} left {} parent {}", cp_ctx->id(), left_child_buf->to_string(),
parent_buf->to_string());
for (const auto& right_child_node : new_nodes) {
auto right_child = IndexBtreeNode::convert(right_child_node.get());
fmt::format_to(std::back_inserter(str), " right {}", right_child->m_idx_buf->to_string());
}
return str;
};

LOGTRACEMOD(wbcache, "{}", trace_index_bufs());
write_node_impl(left_child_node, context);
write_node_impl(parent_node, context);

Expand Down Expand Up @@ -181,18 +192,17 @@ class IndexTable : public IndexTableBase, public Btree< K, V > {
// realloc_node(node);
// }

// If the backing buffer is already in a clean state, we don't need to make a copy of it
if (idx_node->m_idx_buf->is_clean()) { return btree_status_t::success; }

// Make a new btree buffer and copy the contents and swap it to make it the current node's buffer. The
// We create IndexBuffer for each CP. But if the backing buffer is already in a clean state
// we dont copy the node buffer. Copy buffer will handle it. If the node buffer is dirty,
// make a new btree buffer and copy the contents and swap it to make it the current node's buffer. The
// buffer prior to this copy, would have been written and already added into the dirty buffer list.
idx_node->m_idx_buf = wb_cache().copy_buffer(idx_node->m_idx_buf);
idx_node->m_idx_buf = wb_cache().copy_buffer(idx_node->m_idx_buf, cp_ctx);
idx_node->m_last_mod_cp_id = -1;

node->m_phys_node_buf = idx_node->m_idx_buf->raw_buffer();
node->set_checksum(this->m_bt_cfg);

LOGTRACEMOD(wbcache, "buf {} ", idx_node->m_idx_buf->to_string());
LOGTRACEMOD(wbcache, "cp {} {} ", cp_ctx->id(), idx_node->m_idx_buf->to_string());

#ifndef NO_CHECKSUM
if (!node->verify_node(this->m_bt_cfg)) {
Expand Down Expand Up @@ -221,6 +231,8 @@ class IndexTable : public IndexTableBase, public Btree< K, V > {
auto& child_buf = child_idx_node->m_idx_buf;
auto& parent_buf = parent_idx_node->m_idx_buf;

LOGTRACEMOD(wbcache, "cp {} left {} parent {} ", cp_ctx->id(), child_buf->to_string(), parent_buf->to_string());

auto [child_copied, parent_copied] = wb_cache().create_chain(child_buf, parent_buf, cp_ctx);
if (child_copied) {
child_node->m_phys_node_buf = child_buf->raw_buffer();
Expand All @@ -231,7 +243,6 @@ class IndexTable : public IndexTableBase, public Btree< K, V > {
parent_idx_node->m_last_mod_cp_id = -1;
}

LOGTRACEMOD(wbcache, "child {} parent {} ", child_buf->to_string(), parent_buf->to_string());
return btree_status_t::success;
}

Expand Down
4 changes: 2 additions & 2 deletions src/include/homestore/index/wb_cache_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class IndexWBCacheBase {
/// @brief Prepend to the chain that was already created with second
/// @param first
/// @param second
virtual void prepend_to_chain(const IndexBufferPtr& first, const IndexBufferPtr& second) = 0;
virtual void prepend_to_chain(const IndexBufferPtr& first, const IndexBufferPtr& second, CPContext* cp_ctx) = 0;

/// @brief Free the buffer allocated and remove it from wb cache
/// @param buf
Expand All @@ -73,7 +73,7 @@ class IndexWBCacheBase {
/// @brief Copy buffer
/// @param cur_buf
/// @return
virtual IndexBufferPtr copy_buffer(const IndexBufferPtr& cur_buf) const = 0;
virtual IndexBufferPtr copy_buffer(const IndexBufferPtr& cur_buf, CPContext* context) const = 0;
};

} // namespace homestore
2 changes: 1 addition & 1 deletion src/include/homestore/index_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
#include <homestore/homestore_decl.hpp>
#include <homestore/index/index_internal.hpp>
#include <homestore/superblk_handler.hpp>

#include <homestore/index/wb_cache_base.hpp>
namespace homestore {

class IndexWBCacheBase;
Expand Down
4 changes: 2 additions & 2 deletions src/lib/checkpoint/cp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

#include <sisl/logging/logging.h>
#include <iomgr/iomgr.hpp>

#include <folly/futures/SharedPromise.h>
#include "common/homestore_assert.hpp"

/*
Expand Down Expand Up @@ -73,7 +73,7 @@ struct CP {
bool m_cp_waiting_to_trigger{false}; // it is waiting for previous cp to complete
cp_id_t m_cp_id;
std::array< std::unique_ptr< CPContext >, (size_t)cp_consumer_t::SENTINEL > m_contexts;
folly::Promise< bool > m_comp_promise;
folly::SharedPromise< bool > m_comp_promise;

public:
CP(CPManager* mgr) : m_cp_mgr{mgr} {}
Expand Down
9 changes: 6 additions & 3 deletions src/lib/checkpoint/cp_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,11 @@ folly::Future< bool > CPManager::trigger_cp_flush(bool force) {
std::unique_lock< std::mutex > lk(trigger_cp_mtx);
auto cur_cp = cp_guard();
HS_DBG_ASSERT_NE(cur_cp->m_cp_status, cp_status_t::cp_flush_prepare);
cur_cp->m_comp_promise = std::move(folly::Promise< bool >{});
cur_cp->m_cp_waiting_to_trigger = true;
// If multiple threads call trigger, they all get the future from the same promise.
if (!cur_cp->m_cp_waiting_to_trigger) {
cur_cp->m_comp_promise = std::move(folly::SharedPromise< bool >{});
cur_cp->m_cp_waiting_to_trigger = true;
}
return cur_cp->m_comp_promise.getFuture();
} else {
return folly::makeFuture< bool >(false);
Expand Down Expand Up @@ -177,7 +180,7 @@ folly::Future< bool > CPManager::trigger_cp_flush(bool force) {
// originally by the caller will be untouched and completed upto CP completion/
ret_fut = folly::makeFuture< bool >(true);
} else {
cur_cp->m_comp_promise = std::move(folly::Promise< bool >{});
cur_cp->m_comp_promise = std::move(folly::SharedPromise< bool >{});
ret_fut = cur_cp->m_comp_promise.getFuture();
}
cur_cp->m_cp_status = cp_status_t::cp_flush_prepare;
Expand Down
95 changes: 83 additions & 12 deletions src/lib/index/index_cp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <sisl/fds/thread_vector.hpp>
#include <homestore/blk.h>
#include <homestore/index/index_internal.hpp>
#include <homestore/index_service.hpp>
#include <homestore/checkpoint/cp_mgr.hpp>

#include "checkpoint/cp.hpp"
Expand All @@ -37,7 +38,6 @@ struct IndexCPContext : public CPContext {
sisl::ThreadVector< IndexBufferPtr >* m_dirty_buf_list{nullptr};
sisl::ThreadVector< BlkId >* m_free_node_blkid_list{nullptr};
sisl::atomic_counter< int64_t > m_dirty_buf_count{0};
IndexBufferPtr m_last_in_chain;
std::mutex m_flush_buffer_mtx;
flush_buffer_iterator m_buf_it;

Expand All @@ -47,11 +47,6 @@ struct IndexCPContext : public CPContext {
CPContext(cp_id), m_dirty_buf_list{dirty_list}, m_free_node_blkid_list{free_blkid_list} {}

virtual ~IndexCPContext() {
auto it = m_dirty_buf_list->begin(true /* latest */);
IndexBufferPtr *tmp = nullptr;
while((tmp = m_dirty_buf_list->next(it)) != nullptr) {
tmp->reset();
}
m_dirty_buf_list->clear();
m_free_node_blkid_list->clear();
}
Expand All @@ -62,11 +57,9 @@ struct IndexCPContext : public CPContext {
}

void add_to_dirty_list(const IndexBufferPtr& buf) {
buf->m_buf_state = index_buf_state_t::DIRTY;
buf->m_node_buf->m_state = index_buf_state_t::DIRTY;
m_dirty_buf_list->push_back(buf);
m_dirty_buf_count.increment(1);
m_last_in_chain = buf;
LOGTRACEMOD(wbcache, "{}", buf->to_string());
}

void add_to_free_node_list(BlkId blkid) { m_free_node_blkid_list->push_back(blkid); }
Expand All @@ -77,13 +70,91 @@ struct IndexCPContext : public CPContext {
BlkId* next_blkid() { return m_free_node_blkid_list->next(m_buf_it.free_node_list_it); }
std::string to_string() const {
std::string str{
fmt::format("IndexCPContext cpid={} dirty_buf_count={} dirty_buf_list_size={} blkid_list_size={}", id(),
fmt::format("IndexCPContext cpid={} dirty_buf_count={} dirty_buf_list_size={} blkid_list_size={}\n", id(),
m_dirty_buf_count.get(), m_dirty_buf_list->size(), m_free_node_blkid_list->size())};

// TODO dump all index buffers.
// Mapping from a node to all its parents in the graph.
// Display all buffers and its dependencies and state.
std::unordered_map< IndexBuffer*, std::vector< IndexBuffer* > > parents;
IndexBufferPtr* buf;
auto it = m_dirty_buf_list->begin(true /* latest */);
while ((buf = m_dirty_buf_list->next(it)) != nullptr) {
// Add this buf to his children.
parents[(*buf)->m_next_buffer.lock().get()].emplace_back(buf->get());
}

it = m_dirty_buf_list->begin(true /* latest */);
while ((buf = m_dirty_buf_list->next(it)) != nullptr) {
fmt::format_to(std::back_inserter(str), "{}", (*buf)->to_string());
auto first = true;
for (const auto& p : parents[buf->get()]) {
if (first) {
fmt::format_to(std::back_inserter(str), "\nDepends:");
first = false;
}
fmt::format_to(std::back_inserter(str), " {}({})", r_cast< void* >(p), s_cast< int >(p->state()));
}
fmt::format_to(std::back_inserter(str), "\n");
}

return str;
}
};

void check_cycle() {
// Use dfs to find if the graph is cycle
IndexBufferPtr* buf;
auto it = m_dirty_buf_list->begin(true /* latest */);
while ((buf = m_dirty_buf_list->next(it)) != nullptr) {
std::set< IndexBuffer* > visited;
check_cycle_recurse(*buf, visited);
}
}

void check_cycle_recurse(IndexBufferPtr buf, std::set< IndexBuffer* >& visited) const {
if (visited.count(buf.get()) != 0) {
LOGERROR("Cycle found for {}", buf->to_string());
for (auto& x : visited) {
LOGERROR("Path : {}", x->to_string());
}
return;
}

visited.insert(buf.get());
if (buf->m_next_buffer.lock()) { check_cycle_recurse(buf->m_next_buffer.lock(), visited); }
}

void check_wait_for_leaders() {
// Use the next buffer as indegree to find if wait_for_leaders is invalid.
std::unordered_map< IndexBuffer*, int > wait_for_leaders;
IndexBufferPtr* buf;

// Store the wait for leader count for each buffer.
auto it = m_dirty_buf_list->begin(true /* latest */);
while ((buf = m_dirty_buf_list->next(it)) != nullptr) {
wait_for_leaders[(*buf).get()] = (*buf)->m_wait_for_leaders.get();
}

// Decrement the count using the next buffer.
it = m_dirty_buf_list->begin(true /* latest */);
while ((buf = m_dirty_buf_list->next(it)) != nullptr) {
auto next_buf = (*buf)->m_next_buffer.lock();
if (next_buf.get() == nullptr) continue;
wait_for_leaders[next_buf.get()]--;
}

bool issue = false;
for (const auto& [buf, waits] : wait_for_leaders) {
// Any value other than zero means the dependency graph is invalid.
if (waits != 0) {
issue = true;
LOGERROR("Leaders wait not zero cp {} buf {} waits {}", id(), buf->to_string(), waits);
}
}

assert(issue == false);
}

}; // namespace homestore

class IndexWBCache;
class IndexCPCallbacks : public CPCallbacks {
Expand Down
Loading

0 comments on commit 8d92ff4

Please sign in to comment.