Skip to content

Commit

Permalink
Review comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
sanebay committed Nov 10, 2023
1 parent 1ed246b commit e2a29a6
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 48 deletions.
12 changes: 5 additions & 7 deletions src/include/homestore/index/index_internal.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,35 +87,33 @@ struct IndexBuffer {
std::weak_ptr< IndexBuffer > m_next_buffer; // Next buffer in the chain
// Number of leader buffers we are waiting for before we write this buffer
sisl::atomic_counter< int > m_wait_for_leaders{0};
std::mutex m_mutex;

IndexBuffer(BlkId blkid, uint32_t buf_size, uint32_t align_size);
IndexBuffer(NodeBufferPtr node_buf, BlkId blkid);
~IndexBuffer();

BlkId blkid() const { return m_blkid; }
uint8_t* raw_buffer() {
RELEASE_ASSERT(m_node_buf, "Node buffer null");
RELEASE_ASSERT(m_node_buf, "Node buffer null blkid {}", m_blkid.to_integer());
return m_node_buf->m_bytes;
}

bool is_clean() const {
RELEASE_ASSERT(m_node_buf, "Node buffer null");
RELEASE_ASSERT(m_node_buf, "Node buffer null blkid {}", m_blkid.to_integer());
return (m_node_buf->m_state.load() == index_buf_state_t::CLEAN);
}

index_buf_state_t state() {
RELEASE_ASSERT(m_node_buf, "Node buffer null");
index_buf_state_t state() const {
RELEASE_ASSERT(m_node_buf, "Node buffer null blkid {}", m_blkid.to_integer());
return m_node_buf->m_state;
}

void set_state(index_buf_state_t state) {
RELEASE_ASSERT(m_node_buf, "Node buffer null");
RELEASE_ASSERT(m_node_buf, "Node buffer null blkid {}", m_blkid.to_integer());
m_node_buf->m_state = state;
}

std::string to_string() const {
std::lock_guard lock{m_mutex};
auto str = fmt::format("IndexBuffer {} blkid={}", reinterpret_cast< void* >(const_cast< IndexBuffer* >(this)),
m_blkid.to_integer());
if (m_node_buf == nullptr) {
Expand Down
2 changes: 1 addition & 1 deletion src/include/homestore/index/index_table.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ class IndexTable : public IndexTableBase, public Btree< K, V > {
for (const auto& right_child_node : new_nodes) {
auto right_child = IndexBtreeNode::convert(right_child_node.get());
write_node_impl(right_child_node, context);
wb_cache().prepend_to_chain(right_child->m_idx_buf, left_child_buf, cp_ctx);
wb_cache().prepend_to_chain(right_child->m_idx_buf, left_child_buf);
}

auto trace_index_bufs = [&]() {
Expand Down
4 changes: 2 additions & 2 deletions src/include/homestore/index/wb_cache_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class IndexWBCacheBase {
/// @brief Prepend to the chain that was already created with second
/// @param first
/// @param second
virtual void prepend_to_chain(const IndexBufferPtr& first, const IndexBufferPtr& second, CPContext* cp_ctx) = 0;
virtual void prepend_to_chain(const IndexBufferPtr& first, const IndexBufferPtr& second) = 0;

/// @brief Free the buffer allocated and remove it from wb cache
/// @param buf
Expand All @@ -73,7 +73,7 @@ class IndexWBCacheBase {
/// @brief Copy buffer
/// @param cur_buf
/// @return
virtual IndexBufferPtr copy_buffer(const IndexBufferPtr& cur_buf, CPContext* context) const = 0;
virtual IndexBufferPtr copy_buffer(const IndexBufferPtr& cur_buf, const CPContext* context) const = 0;
};

} // namespace homestore
53 changes: 31 additions & 22 deletions src/lib/index/index_cp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ struct IndexCPContext : public VDevCPContext {

void add_to_dirty_list(const IndexBufferPtr& buf) {
m_dirty_buf_list.push_back(buf);
buf->m_node_buf->m_state = index_buf_state_t::DIRTY;
buf->set_state(index_buf_state_t::DIRTY);
m_dirty_buf_count.increment(1);
}

Expand All @@ -58,44 +58,49 @@ struct IndexCPContext : public VDevCPContext {
return ret;
}

std::string to_string() const {
std::string to_string() {
std::string str{fmt::format("IndexCPContext cpid={} dirty_buf_count={} dirty_buf_list_size={}", m_cp->id(),
m_dirty_buf_count.get(), m_dirty_buf_list.size())};

// Mapping from a node to all its parents in the graph.
// Display all buffers and its dependencies and state.
std::unordered_map< IndexBuffer*, std::vector< IndexBuffer* > > parents;
IndexBufferPtr* buf;
auto it = m_dirty_buf_list->begin(true /* latest */);
while ((buf = m_dirty_buf_list->next(it)) != nullptr) {

auto it = m_dirty_buf_list.begin();
while (it != m_dirty_buf_list.end()) {
// Add this buf to his children.
parents[(*buf)->m_next_buffer.lock().get()].emplace_back(buf->get());
IndexBufferPtr buf = *it;
parents[buf->m_next_buffer.lock().get()].emplace_back(buf.get());
++it;
}

it = m_dirty_buf_list->begin(true /* latest */);
while ((buf = m_dirty_buf_list->next(it)) != nullptr) {
fmt::format_to(std::back_inserter(str), "{}", (*buf)->to_string());
it = m_dirty_buf_list.begin();
while (it != m_dirty_buf_list.end()) {
IndexBufferPtr buf = *it;
fmt::format_to(std::back_inserter(str), "{}", buf->to_string());
auto first = true;
for (const auto& p : parents[buf->get()]) {
for (const auto& p : parents[buf.get()]) {
if (first) {
fmt::format_to(std::back_inserter(str), "\nDepends:");
first = false;
}
fmt::format_to(std::back_inserter(str), " {}({})", r_cast< void* >(p), s_cast< int >(p->state()));
}
fmt::format_to(std::back_inserter(str), "\n");
++it;
}

return str;
}

void check_cycle() {
// Use dfs to find if the graph is cycle
IndexBufferPtr* buf;
auto it = m_dirty_buf_list->begin(true /* latest */);
while ((buf = m_dirty_buf_list->next(it)) != nullptr) {
auto it = m_dirty_buf_list.begin();
while (it != m_dirty_buf_list.end()) {
IndexBufferPtr buf = *it;;
std::set< IndexBuffer* > visited;
check_cycle_recurse(*buf, visited);
check_cycle_recurse(buf, visited);
++it;
}
}

Expand All @@ -115,20 +120,24 @@ struct IndexCPContext : public VDevCPContext {
void check_wait_for_leaders() {
// Use the next buffer as indegree to find if wait_for_leaders is invalid.
std::unordered_map< IndexBuffer*, int > wait_for_leaders;
IndexBufferPtr* buf;
IndexBufferPtr buf;

// Store the wait for leader count for each buffer.
auto it = m_dirty_buf_list->begin(true /* latest */);
while ((buf = m_dirty_buf_list->next(it)) != nullptr) {
wait_for_leaders[(*buf).get()] = (*buf)->m_wait_for_leaders.get();
auto it = m_dirty_buf_list.begin();
while (it != m_dirty_buf_list.end()) {
buf = *it;
wait_for_leaders[buf.get()] = buf->m_wait_for_leaders.get();
++it;
}

// Decrement the count using the next buffer.
it = m_dirty_buf_list->begin(true /* latest */);
while ((buf = m_dirty_buf_list->next(it)) != nullptr) {
auto next_buf = (*buf)->m_next_buffer.lock();
it = m_dirty_buf_list.begin();
while (it != m_dirty_buf_list.end()) {
buf = *it;
auto next_buf = buf->m_next_buffer.lock();
if (next_buf.get() == nullptr) continue;
wait_for_leaders[next_buf.get()]--;
++it;
}

bool issue = false;
Expand All @@ -140,7 +149,7 @@ struct IndexCPContext : public VDevCPContext {
}
}

assert(issue == false);
RELEASE_ASSERT_EQ(issue, false, "Found issue with wait_for_leaders");
}
};

Expand Down
17 changes: 9 additions & 8 deletions src/lib/index/wb_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ void IndexWBCache::write_buf(const BtreeNodePtr& node, const IndexBufferPtr& buf
resource_mgr().inc_dirty_buf_size(m_node_size);
}

IndexBufferPtr IndexWBCache::copy_buffer(const IndexBufferPtr& cur_buf, CPContext* cp_ctx) const {
IndexBufferPtr IndexWBCache::copy_buffer(const IndexBufferPtr& cur_buf, const CPContext* cp_ctx) const {
IndexBufferPtr new_buf = nullptr;
bool copied = false;

Expand Down Expand Up @@ -186,7 +186,7 @@ std::tuple< bool, bool > IndexWBCache::create_chain(IndexBufferPtr& second, Inde
return {second_copied, third_copied};
}

void IndexWBCache::prepend_to_chain(const IndexBufferPtr& first, const IndexBufferPtr& second, CPContext* cp_ctx) {
void IndexWBCache::prepend_to_chain(const IndexBufferPtr& first, const IndexBufferPtr& second) {
assert(first->m_next_buffer.lock() != second);
assert(first->m_next_buffer.lock() == nullptr);
first->m_next_buffer = second;
Expand Down Expand Up @@ -214,8 +214,8 @@ folly::Future< bool > IndexWBCache::async_cp_flush(IndexCPContext* cp_ctx) {
#ifndef NDEBUG
// Check no cycles or invalid wait_for_leader count in the dirty buffer
// dependency graph.
cp_ctx->check_wait_for_leaders();
cp_ctx->check_cycle();
// cp_ctx->check_wait_for_leaders();
// cp_ctx->check_cycle();
#endif

cp_ctx->prepare_flush_iteration();
Expand All @@ -237,7 +237,7 @@ folly::Future< bool > IndexWBCache::async_cp_flush(IndexCPContext* cp_ctx) {

void IndexWBCache::do_flush_one_buf(IndexCPContext* cp_ctx, IndexBufferPtr buf, bool part_of_batch) {
LOGTRACEMOD(wbcache, "cp {} buf {}", cp_ctx->id(), buf->to_string());
{ buf->set_state(index_buf_state_t::FLUSHING); }
buf->set_state(index_buf_state_t::FLUSHING);
m_vdev->async_write(r_cast< const char* >(buf->raw_buffer()), m_node_size, buf->m_blkid, part_of_batch)
.thenValue([buf, cp_ctx](auto) {
auto& pthis = s_cast< IndexWBCache& >(wb_cache()); // Avoiding more than 16 bytes capture
Expand All @@ -264,7 +264,7 @@ void IndexWBCache::process_write_completion(IndexCPContext* cp_ctx, IndexBufferP
}
}

std::pair< IndexBufferPtr, bool > IndexWBCache::on_buf_flush_done(IndexCPContext* cp_ctx, IndexBufferPtr buf) {
std::pair< IndexBufferPtr, bool > IndexWBCache::on_buf_flush_done(IndexCPContext* cp_ctx, IndexBufferPtr& buf) {
if (m_cp_flush_fibers.size() > 1) {
std::unique_lock lg(m_flush_mtx);
return on_buf_flush_done_internal(cp_ctx, buf);
Expand All @@ -273,9 +273,10 @@ std::pair< IndexBufferPtr, bool > IndexWBCache::on_buf_flush_done(IndexCPContext
}
}

std::pair< IndexBufferPtr, bool > IndexWBCache::on_buf_flush_done_internal(IndexCPContext* cp_ctx, IndexBufferPtr buf) {
std::pair< IndexBufferPtr, bool > IndexWBCache::on_buf_flush_done_internal(IndexCPContext* cp_ctx,
IndexBufferPtr& buf) {
static thread_local std::vector< IndexBufferPtr > t_buf_list;
{ buf->set_state(index_buf_state_t::CLEAN); }
buf->set_state(index_buf_state_t::CLEAN);

t_buf_list.clear();

Expand Down
8 changes: 4 additions & 4 deletions src/lib/index/wb_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,19 @@ class IndexWBCache : public IndexWBCacheBase {
void write_buf(const BtreeNodePtr& node, const IndexBufferPtr& buf, CPContext* cp_ctx) override;
void read_buf(bnodeid_t id, BtreeNodePtr& node, node_initializer_t&& node_initializer) override;
std::tuple< bool, bool > create_chain(IndexBufferPtr& second, IndexBufferPtr& third, CPContext* cp_ctx) override;
void prepend_to_chain(const IndexBufferPtr& first, const IndexBufferPtr& second, CPContext* cp_ctx) override;
void prepend_to_chain(const IndexBufferPtr& first, const IndexBufferPtr& second) override;
void free_buf(const IndexBufferPtr& buf, CPContext* cp_ctx) override;

//////////////////// CP Related API section /////////////////////////////////
folly::Future< bool > async_cp_flush(IndexCPContext* context);
IndexBufferPtr copy_buffer(const IndexBufferPtr& cur_buf, CPContext *cp_ctx) const;
IndexBufferPtr copy_buffer(const IndexBufferPtr& cur_buf, const CPContext *cp_ctx) const;

private:
void start_flush_threads();
void process_write_completion(IndexCPContext* cp_ctx, IndexBufferPtr pbuf);
void do_flush_one_buf(IndexCPContext* cp_ctx, const IndexBufferPtr buf, bool part_of_batch);
std::pair< IndexBufferPtr, bool > on_buf_flush_done(IndexCPContext* cp_ctx, IndexBufferPtr buf);
std::pair< IndexBufferPtr, bool > on_buf_flush_done_internal(IndexCPContext* cp_ctx, IndexBufferPtr buf);
std::pair< IndexBufferPtr, bool > on_buf_flush_done(IndexCPContext* cp_ctx, IndexBufferPtr& buf);
std::pair< IndexBufferPtr, bool > on_buf_flush_done_internal(IndexCPContext* cp_ctx, IndexBufferPtr& buf);

void get_next_bufs(IndexCPContext* cp_ctx, uint32_t max_count, std::vector< IndexBufferPtr >& bufs);
void get_next_bufs_internal(IndexCPContext* cp_ctx, uint32_t max_count, IndexBufferPtr prev_flushed_buf,
Expand Down
6 changes: 2 additions & 4 deletions src/tests/test_index_btree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ SISL_OPTION_GROUP(test_index_btree,
(num_iters, "", "num_iters", "number of iterations for rand ops",
::cxxopts::value< uint32_t >()->default_value("1500"), "number"),
(num_entries, "", "num_entries", "number of entries to test with",
::cxxopts::value< uint32_t >()->default_value("65000"), "number"),
::cxxopts::value< uint32_t >()->default_value("15000"), "number"),
(seed, "", "seed", "random engine seed, use random if not defined",
::cxxopts::value< uint64_t >()->default_value("0"), "number"))

Expand Down Expand Up @@ -161,9 +161,7 @@ struct BtreeTest : public BtreeTestHelper< TestType > {
}
};

using BtreeTypes = testing::Types< FixedLenBtreeTest, VarKeySizeBtreeTest, VarValueSizeBtreeTest,
VarObjSizeBtreeTest
>;
using BtreeTypes = testing::Types< FixedLenBtreeTest, VarKeySizeBtreeTest, VarValueSizeBtreeTest, VarObjSizeBtreeTest >;

TYPED_TEST_SUITE(BtreeTest, BtreeTypes);

Expand Down

0 comments on commit e2a29a6

Please sign in to comment.