Skip to content

Commit

Permalink
Index write back cache fixes.
Browse files Browse the repository at this point in the history
Add locks for indexbuffer to avoid concurrency issues with
cp flush and insert threads. Add locks in test for index btree
shado map as there are concurrent requests. Use shared ptr's in
wb cache.
  • Loading branch information
sanebay committed Oct 25, 2023
1 parent bd20078 commit 9a9af75
Show file tree
Hide file tree
Showing 14 changed files with 281 additions and 111 deletions.
2 changes: 1 addition & 1 deletion src/include/homestore/btree/detail/btree_node_mgr.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ BtreeNode* Btree< K, V >::init_node(uint8_t* node_buf, uint32_t node_ctx_size, b
/* Note:- This function assumes that access of this node is thread safe. */
template < typename K, typename V >
void Btree< K, V >::free_node(const BtreeNodePtr& node, locktype_t cur_lock, void* context) {
BT_NODE_LOG(DEBUG, node, "Freeing node");
BT_NODE_LOG(TRACE, node, "Freeing node");

COUNTER_DECREMENT_IF_ELSE(m_metrics, node->is_leaf(), btree_leaf_node_count, btree_int_node_count, 1);
if (cur_lock != locktype_t::NONE) {
Expand Down
2 changes: 1 addition & 1 deletion src/include/homestore/btree/detail/prefix_node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,7 @@ class FixedPrefixNode : public VariantNode< K, V > {
--phdr->used_slots;
prefix_bitset_.reset_bit(slot_num);
if ((slot_num != 0) && (slot_num == phdr->tail_slot)) {
uint16_t prev_slot = prefix_bitset_.get_prev_set_bit(slot_num);
uint16_t prev_slot = prefix_bitset_.get_next_set_bit(slot_num);
if (prev_slot != std::numeric_limits< uint16_t >::max()) { phdr->tail_slot = prev_slot; }
}
}
Expand Down
54 changes: 42 additions & 12 deletions src/include/homestore/index/index_internal.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,29 +64,60 @@ enum class index_buf_state_t : uint8_t {
};

///////////////////////// Btree Node and Buffer Portion //////////////////////////

struct NodeBuffer;
typedef std::shared_ptr< NodeBuffer > NodeBufferPtr;
struct NodeBuffer {
uint8_t* m_bytes{nullptr}; // Actual buffer
std::atomic< index_buf_state_t > m_state{index_buf_state_t::CLEAN}; // Is buffer yet to persist?
NodeBuffer(uint32_t buf_size, uint32_t align_size);
~NodeBuffer();
};

struct IndexBuffer;
typedef std::shared_ptr< IndexBuffer > IndexBufferPtr;

struct IndexBuffer {
uint8_t* m_node_buf{nullptr}; // Actual buffer
index_buf_state_t m_buf_state{index_buf_state_t::CLEAN}; // Is buffer yet to persist?
BlkId m_blkid; // BlkId where this needs to be persisted
std::weak_ptr< IndexBuffer > m_next_buffer; // Next buffer in the chain
NodeBufferPtr m_node_buf;
BlkId m_blkid; // BlkId where this needs to be persisted
std::weak_ptr< IndexBuffer > m_next_buffer; // Next buffer in the chain
// Number of leader buffers we are waiting for before we write this buffer
sisl::atomic_counter< int > m_wait_for_leaders{0};
std::mutex m_mutex;

IndexBuffer(BlkId blkid, uint32_t buf_size, uint32_t align_size);
IndexBuffer(NodeBufferPtr node_buf, BlkId blkid);
~IndexBuffer();

BlkId blkid() const { return m_blkid; }
uint8_t* raw_buffer() { return m_node_buf; }
uint8_t* raw_buffer() {
RELEASE_ASSERT(m_node_buf, "Node buffer null");
return m_node_buf->m_bytes;
}

bool is_clean() const {
RELEASE_ASSERT(m_node_buf, "Node buffer null");
return (m_node_buf->m_state.load() == index_buf_state_t::CLEAN);
}

void set_state(index_buf_state_t state) {
RELEASE_ASSERT(m_node_buf, "Node buffer null");
m_node_buf->m_state = state;
}

bool is_clean() const { return (m_buf_state == index_buf_state_t::CLEAN); }
std::string to_string() const {
return fmt::format("IndexBuffer {} blkid={} state={} node_buf={} next_buffer={} wait_for={}",
reinterpret_cast< void* >(const_cast< IndexBuffer* >(this)), m_blkid.to_integer(),
static_cast< int >(m_buf_state), static_cast< void* >(m_node_buf),
voidptr_cast(m_next_buffer.lock().get()), m_wait_for_leaders.get());
std::lock_guard lock{m_mutex};
auto str = fmt::format("IndexBuffer {} blkid={}", reinterpret_cast< void* >(const_cast< IndexBuffer* >(this)),
m_blkid.to_integer());
if (m_node_buf == nullptr) {
fmt::format_to(std::back_inserter(str), " node_buf=nullptr");
} else {
fmt::format_to(std::back_inserter(str), " state={} node_buf={}",
static_cast< int >(m_node_buf->m_state.load()), static_cast< void* >(m_node_buf->m_bytes));
}
fmt::format_to(std::back_inserter(str), " next_buffer={} wait_for={}",
m_next_buffer.lock() ? reinterpret_cast< void* >(m_next_buffer.lock().get()) : 0,
m_wait_for_leaders.get());
return str;
}
};

Expand All @@ -97,7 +128,6 @@ struct IndexBtreeNode {
public:
IndexBufferPtr m_idx_buf; // Buffer backing this node
cp_id_t m_last_mod_cp_id{-1}; // This node is previously modified by the cp id;

public:
IndexBtreeNode(const IndexBufferPtr& buf) : m_idx_buf{buf} {}
~IndexBtreeNode() { m_idx_buf.reset(); }
Expand Down
33 changes: 21 additions & 12 deletions src/include/homestore/index/index_table.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ class IndexTable : public IndexTableBase, public Btree< K, V > {
// Need to put it in wb cache
wb_cache().write_buf(node, idx_node->m_idx_buf, cp_ctx);
idx_node->m_last_mod_cp_id = cp_ctx->id();
LOGTRACEMOD(wbcache, "{}", idx_node->m_idx_buf->to_string());
LOGTRACEMOD(wbcache, "add to dirty list cp {} {}", cp_ctx->id(), idx_node->m_idx_buf->to_string());
}
node->set_checksum(this->m_bt_cfg);
return btree_status_t::success;
Expand All @@ -129,17 +129,26 @@ class IndexTable : public IndexTableBase, public Btree< K, V > {
auto& left_child_buf = left_child_idx_node->m_idx_buf;
auto& parent_buf = parent_idx_node->m_idx_buf;

LOGTRACEMOD(wbcache, "left {} parent {} ", left_child_buf->to_string(), parent_buf->to_string());

// Write new nodes in the list as standalone outside transacted pairs.
// Write the new right child nodes, left node and parent in order.
for (const auto& right_child_node : new_nodes) {
auto right_child = IndexBtreeNode::convert(right_child_node.get());
write_node_impl(right_child_node, context);
wb_cache().prepend_to_chain(right_child->m_idx_buf, left_child_buf);
LOGTRACEMOD(wbcache, "right {} left {} ", right_child->m_idx_buf->to_string(), left_child_buf->to_string());
wb_cache().prepend_to_chain(right_child->m_idx_buf, left_child_buf, cp_ctx);
}

auto trace_index_bufs = [&]() {
std::string str;
str = fmt::format("cp {} left {} parent {}", cp_ctx->id(), left_child_buf->to_string(),
parent_buf->to_string());
for (const auto& right_child_node : new_nodes) {
auto right_child = IndexBtreeNode::convert(right_child_node.get());
fmt::format_to(std::back_inserter(str), " right {}", right_child->m_idx_buf->to_string());
}
return str;
};

LOGTRACEMOD(wbcache, "{}", trace_index_bufs());
write_node_impl(left_child_node, context);
write_node_impl(parent_node, context);

Expand Down Expand Up @@ -181,18 +190,17 @@ class IndexTable : public IndexTableBase, public Btree< K, V > {
// realloc_node(node);
// }

// If the backing buffer is already in a clean state, we don't need to make a copy of it
if (idx_node->m_idx_buf->is_clean()) { return btree_status_t::success; }

// Make a new btree buffer and copy the contents and swap it to make it the current node's buffer. The
// We create IndexBuffer for each CP. But if the backing buffer is already in a clean state
// we dont copy the node buffer. Copy buffer will handle it. If the node buffer is dirty,
// make a new btree buffer and copy the contents and swap it to make it the current node's buffer. The
// buffer prior to this copy, would have been written and already added into the dirty buffer list.
idx_node->m_idx_buf = wb_cache().copy_buffer(idx_node->m_idx_buf);
idx_node->m_idx_buf = wb_cache().copy_buffer(idx_node->m_idx_buf, cp_ctx);
idx_node->m_last_mod_cp_id = -1;

node->m_phys_node_buf = idx_node->m_idx_buf->raw_buffer();
node->set_checksum(this->m_bt_cfg);

LOGTRACEMOD(wbcache, "buf {} ", idx_node->m_idx_buf->to_string());
LOGTRACEMOD(wbcache, "cp {} {} ", cp_ctx->id(), idx_node->m_idx_buf->to_string());

#ifndef NO_CHECKSUM
if (!node->verify_node(this->m_bt_cfg)) {
Expand Down Expand Up @@ -221,6 +229,8 @@ class IndexTable : public IndexTableBase, public Btree< K, V > {
auto& child_buf = child_idx_node->m_idx_buf;
auto& parent_buf = parent_idx_node->m_idx_buf;

LOGTRACEMOD(wbcache, "cp {} left {} parent {} ", cp_ctx->id(), child_buf->to_string(), parent_buf->to_string());

auto [child_copied, parent_copied] = wb_cache().create_chain(child_buf, parent_buf, cp_ctx);
if (child_copied) {
child_node->m_phys_node_buf = child_buf->raw_buffer();
Expand All @@ -231,7 +241,6 @@ class IndexTable : public IndexTableBase, public Btree< K, V > {
parent_idx_node->m_last_mod_cp_id = -1;
}

LOGTRACEMOD(wbcache, "child {} parent {} ", child_buf->to_string(), parent_buf->to_string());
return btree_status_t::success;
}

Expand Down
4 changes: 2 additions & 2 deletions src/include/homestore/index/wb_cache_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class IndexWBCacheBase {
/// @brief Prepend to the chain that was already created with second
/// @param first
/// @param second
virtual void prepend_to_chain(const IndexBufferPtr& first, const IndexBufferPtr& second) = 0;
virtual void prepend_to_chain(const IndexBufferPtr& first, const IndexBufferPtr& second, CPContext* cp_ctx) = 0;

/// @brief Free the buffer allocated and remove it from wb cache
/// @param buf
Expand All @@ -73,7 +73,7 @@ class IndexWBCacheBase {
/// @brief Copy buffer
/// @param cur_buf
/// @return
virtual IndexBufferPtr copy_buffer(const IndexBufferPtr& cur_buf) const = 0;
virtual IndexBufferPtr copy_buffer(const IndexBufferPtr& cur_buf, CPContext* context) const = 0;
};

} // namespace homestore
2 changes: 1 addition & 1 deletion src/include/homestore/index_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
#include <homestore/homestore_decl.hpp>
#include <homestore/index/index_internal.hpp>
#include <homestore/superblk_handler.hpp>

#include <homestore/index/wb_cache_base.hpp>
namespace homestore {

class IndexWBCacheBase;
Expand Down
4 changes: 2 additions & 2 deletions src/lib/checkpoint/cp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

#include <sisl/logging/logging.h>
#include <iomgr/iomgr.hpp>

#include <folly/futures/SharedPromise.h>
#include "common/homestore_assert.hpp"

/*
Expand Down Expand Up @@ -73,7 +73,7 @@ struct CP {
bool m_cp_waiting_to_trigger{false}; // it is waiting for previous cp to complete
cp_id_t m_cp_id;
std::array< std::unique_ptr< CPContext >, (size_t)cp_consumer_t::SENTINEL > m_contexts;
folly::Promise< bool > m_comp_promise;
folly::SharedPromise< bool > m_comp_promise;

public:
CP(CPManager* mgr) : m_cp_mgr{mgr} {}
Expand Down
8 changes: 5 additions & 3 deletions src/lib/checkpoint/cp_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,10 @@ folly::Future< bool > CPManager::trigger_cp_flush(bool force) {
std::unique_lock< std::mutex > lk(trigger_cp_mtx);
auto cur_cp = cp_guard();
HS_DBG_ASSERT_NE(cur_cp->m_cp_status, cp_status_t::cp_flush_prepare);
cur_cp->m_comp_promise = std::move(folly::Promise< bool >{});
cur_cp->m_cp_waiting_to_trigger = true;
if (!cur_cp->m_cp_waiting_to_trigger) {
cur_cp->m_comp_promise = std::move(folly::SharedPromise< bool >{});
cur_cp->m_cp_waiting_to_trigger = true;
}
return cur_cp->m_comp_promise.getFuture();
} else {
return folly::makeFuture< bool >(false);
Expand Down Expand Up @@ -177,7 +179,7 @@ folly::Future< bool > CPManager::trigger_cp_flush(bool force) {
// originally by the caller will be untouched and completed upto CP completion/
ret_fut = folly::makeFuture< bool >(true);
} else {
cur_cp->m_comp_promise = std::move(folly::Promise< bool >{});
cur_cp->m_comp_promise = std::move(folly::SharedPromise< bool >{});
ret_fut = cur_cp->m_comp_promise.getFuture();
}
cur_cp->m_cp_status = cp_status_t::cp_flush_prepare;
Expand Down
74 changes: 63 additions & 11 deletions src/lib/index/index_cp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <sisl/fds/thread_vector.hpp>
#include <homestore/blk.h>
#include <homestore/index/index_internal.hpp>
#include <homestore/index_service.hpp>
#include <homestore/checkpoint/cp_mgr.hpp>

#include "checkpoint/cp.hpp"
Expand All @@ -37,7 +38,6 @@ struct IndexCPContext : public CPContext {
sisl::ThreadVector< IndexBufferPtr >* m_dirty_buf_list{nullptr};
sisl::ThreadVector< BlkId >* m_free_node_blkid_list{nullptr};
sisl::atomic_counter< int64_t > m_dirty_buf_count{0};
IndexBufferPtr m_last_in_chain;
std::mutex m_flush_buffer_mtx;
flush_buffer_iterator m_buf_it;

Expand All @@ -47,11 +47,6 @@ struct IndexCPContext : public CPContext {
CPContext(cp_id), m_dirty_buf_list{dirty_list}, m_free_node_blkid_list{free_blkid_list} {}

virtual ~IndexCPContext() {
auto it = m_dirty_buf_list->begin(true /* latest */);
IndexBufferPtr *tmp = nullptr;
while((tmp = m_dirty_buf_list->next(it)) != nullptr) {
tmp->reset();
}
m_dirty_buf_list->clear();
m_free_node_blkid_list->clear();
}
Expand All @@ -62,11 +57,9 @@ struct IndexCPContext : public CPContext {
}

void add_to_dirty_list(const IndexBufferPtr& buf) {
buf->m_buf_state = index_buf_state_t::DIRTY;
buf->m_node_buf->m_state = index_buf_state_t::DIRTY;
m_dirty_buf_list->push_back(buf);
m_dirty_buf_count.increment(1);
m_last_in_chain = buf;
LOGTRACEMOD(wbcache, "{}", buf->to_string());
}

void add_to_free_node_list(BlkId blkid) { m_free_node_blkid_list->push_back(blkid); }
Expand All @@ -80,10 +73,69 @@ struct IndexCPContext : public CPContext {
fmt::format("IndexCPContext cpid={} dirty_buf_count={} dirty_buf_list_size={} blkid_list_size={}", id(),
m_dirty_buf_count.get(), m_dirty_buf_list->size(), m_free_node_blkid_list->size())};

// TODO dump all index buffers.
auto it = m_dirty_buf_list->begin(true /* latest */);
IndexBufferPtr* buf;
while ((buf = m_dirty_buf_list->next(it)) != nullptr) {
fmt::format_to(std::back_inserter(str), "{}\n", (*buf)->to_string());
}
return str;
}
};

void check_cycle() {
// Use dfs to find any cyclic graph.
IndexBufferPtr* buf;
auto it = m_dirty_buf_list->begin(true /* latest */);
while ((buf = m_dirty_buf_list->next(it)) != nullptr) {
std::set< IndexBuffer* > visited;
check_cycle_recurse(*buf, visited);
}
}

void check_cycle_recurse(IndexBufferPtr buf, std::set< IndexBuffer* >& visited) const {
if (visited.count(buf.get()) != 0) {
LOGERROR("Cycle found for {}", buf->to_string());
for (auto& x : visited) {
LOGERROR("Path : {}", x->to_string());
}
return;
}

visited.insert(buf.get());
if (buf->m_next_buffer.lock()) { check_cycle_recurse(buf->m_next_buffer.lock(), visited); }
}

void check_wait_for_leaders() {
// Use the next buffer to find if wait_for_leaders is invalid.
std::unordered_map< IndexBuffer*, int > wait_for_leaders;
IndexBufferPtr* buf;

// Store the wait for leader count for each buffer.
auto it = m_dirty_buf_list->begin(true /* latest */);
while ((buf = m_dirty_buf_list->next(it)) != nullptr) {
wait_for_leaders[(*buf).get()] = (*buf)->m_wait_for_leaders.get();
}

// Decrement the count using the next buffer.
it = m_dirty_buf_list->begin(true /* latest */);
while ((buf = m_dirty_buf_list->next(it)) != nullptr) {
auto next_buf = (*buf)->m_next_buffer.lock();
if (next_buf.get() == nullptr) continue;
wait_for_leaders[next_buf.get()]--;
}

bool issue = false;
for (const auto& [buf, waits] : wait_for_leaders) {
// Any value other than zero means the dependency graph is invalid.
if (waits != 0) {
issue = true;
LOGERROR("Leaders wait not zero cp {} buf {} waits {}", id(), buf->to_string(), waits);
}
}

assert(issue == false);
}

}; // namespace homestore

class IndexWBCache;
class IndexCPCallbacks : public CPCallbacks {
Expand Down
15 changes: 12 additions & 3 deletions src/lib/index/index_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ void IndexService::stop() {
HS_REL_ASSERT_EQ(success, true, "CP Flush failed");
LOGINFO("CP Flush completed");

for (auto [id, tbl] : m_index_map) { tbl->destroy(); }
for (auto [id, tbl] : m_index_map) {
tbl->destroy();
}
}
void IndexService::add_index_table(const std::shared_ptr< IndexTableBase >& tbl) {
std::unique_lock lg(m_index_map_mtx);
Expand All @@ -107,9 +109,16 @@ uint64_t IndexService::used_size() const {
return size;
}

NodeBuffer::NodeBuffer(uint32_t buf_size, uint32_t align_size) :
m_bytes{hs_utils::iobuf_alloc(buf_size, sisl::buftag::btree_node, align_size)} {}

NodeBuffer::~NodeBuffer() { hs_utils::iobuf_free(m_bytes, sisl::buftag::btree_node); }

IndexBuffer::IndexBuffer(BlkId blkid, uint32_t buf_size, uint32_t align_size) :
m_node_buf{hs_utils::iobuf_alloc(buf_size, sisl::buftag::btree_node, align_size)}, m_blkid{blkid} {}
m_node_buf{std::make_shared< NodeBuffer >(buf_size, align_size)}, m_blkid{blkid} {}

IndexBuffer::IndexBuffer(NodeBufferPtr node_buf, BlkId blkid) : m_node_buf(node_buf), m_blkid(blkid) {}

IndexBuffer::~IndexBuffer() { hs_utils::iobuf_free(m_node_buf, sisl::buftag::btree_node); }
IndexBuffer::~IndexBuffer() { m_node_buf.reset(); }

} // namespace homestore
Loading

0 comments on commit 9a9af75

Please sign in to comment.