Skip to content

Commit

Permalink
Review comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
sanebay committed Oct 30, 2023
1 parent 9d92f18 commit 5b5e1a0
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 24 deletions.
12 changes: 5 additions & 7 deletions src/include/homestore/index/index_internal.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,35 +87,33 @@ struct IndexBuffer {
std::weak_ptr< IndexBuffer > m_next_buffer; // Next buffer in the chain
// Number of leader buffers we are waiting for before we write this buffer
sisl::atomic_counter< int > m_wait_for_leaders{0};
std::mutex m_mutex;

IndexBuffer(BlkId blkid, uint32_t buf_size, uint32_t align_size);
IndexBuffer(NodeBufferPtr node_buf, BlkId blkid);
~IndexBuffer();

BlkId blkid() const { return m_blkid; }
uint8_t* raw_buffer() {
RELEASE_ASSERT(m_node_buf, "Node buffer null");
RELEASE_ASSERT(m_node_buf, "Node buffer null blkid {}", m_blkid.to_integer());
return m_node_buf->m_bytes;
}

bool is_clean() const {
RELEASE_ASSERT(m_node_buf, "Node buffer null");
RELEASE_ASSERT(m_node_buf, "Node buffer null blkid {}", m_blkid.to_integer());
return (m_node_buf->m_state.load() == index_buf_state_t::CLEAN);
}

index_buf_state_t state() {
RELEASE_ASSERT(m_node_buf, "Node buffer null");
index_buf_state_t state() const {
RELEASE_ASSERT(m_node_buf, "Node buffer null blkid {}", m_blkid.to_integer());
return m_node_buf->m_state;
}

void set_state(index_buf_state_t state) {
RELEASE_ASSERT(m_node_buf, "Node buffer null");
RELEASE_ASSERT(m_node_buf, "Node buffer null blkid {}", m_blkid.to_integer());
m_node_buf->m_state = state;
}

std::string to_string() const {
std::lock_guard lock{m_mutex};
auto str = fmt::format("IndexBuffer {} blkid={}", reinterpret_cast< void* >(const_cast< IndexBuffer* >(this)),
m_blkid.to_integer());
if (m_node_buf == nullptr) {
Expand Down
2 changes: 1 addition & 1 deletion src/include/homestore/index/index_table.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ class IndexTable : public IndexTableBase, public Btree< K, V > {
for (const auto& right_child_node : new_nodes) {
auto right_child = IndexBtreeNode::convert(right_child_node.get());
write_node_impl(right_child_node, context);
wb_cache().prepend_to_chain(right_child->m_idx_buf, left_child_buf, cp_ctx);
wb_cache().prepend_to_chain(right_child->m_idx_buf, left_child_buf);
}

auto trace_index_bufs = [&]() {
Expand Down
4 changes: 2 additions & 2 deletions src/include/homestore/index/wb_cache_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class IndexWBCacheBase {
/// @brief Prepend to the chain that was already created with second
/// @param first
/// @param second
virtual void prepend_to_chain(const IndexBufferPtr& first, const IndexBufferPtr& second, CPContext* cp_ctx) = 0;
virtual void prepend_to_chain(const IndexBufferPtr& first, const IndexBufferPtr& second) = 0;

/// @brief Free the buffer allocated and remove it from wb cache
/// @param buf
Expand All @@ -73,7 +73,7 @@ class IndexWBCacheBase {
/// @brief Copy buffer
/// @param cur_buf
/// @return
virtual IndexBufferPtr copy_buffer(const IndexBufferPtr& cur_buf, CPContext* context) const = 0;
virtual IndexBufferPtr copy_buffer(const IndexBufferPtr& cur_buf, const CPContext* context) const = 0;
};

} // namespace homestore
4 changes: 2 additions & 2 deletions src/lib/index/index_cp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ struct IndexCPContext : public CPContext {
}

void add_to_dirty_list(const IndexBufferPtr& buf) {
buf->m_node_buf->m_state = index_buf_state_t::DIRTY;
buf->set_state(index_buf_state_t::DIRTY);
m_dirty_buf_list->push_back(buf);
m_dirty_buf_count.increment(1);
}
Expand Down Expand Up @@ -151,7 +151,7 @@ struct IndexCPContext : public CPContext {
}
}

assert(issue == false);
RELEASE_ASSERT_EQ(issue, false, "Found issue with wait_for_leaders");
}

}; // namespace homestore
Expand Down
14 changes: 7 additions & 7 deletions src/lib/index/wb_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ void IndexWBCache::write_buf(const BtreeNodePtr& node, const IndexBufferPtr& buf
resource_mgr().inc_dirty_buf_size(m_node_size);
}

IndexBufferPtr IndexWBCache::copy_buffer(const IndexBufferPtr& cur_buf, CPContext* cp_ctx) const {
IndexBufferPtr IndexWBCache::copy_buffer(const IndexBufferPtr& cur_buf, const CPContext* cp_ctx) const {
IndexBufferPtr new_buf = nullptr;
bool copied = false;

Expand Down Expand Up @@ -187,7 +187,7 @@ std::tuple< bool, bool > IndexWBCache::create_chain(IndexBufferPtr& second, Inde
return {second_copied, third_copied};
}

void IndexWBCache::prepend_to_chain(const IndexBufferPtr& first, const IndexBufferPtr& second, CPContext* cp_ctx) {
void IndexWBCache::prepend_to_chain(const IndexBufferPtr& first, const IndexBufferPtr& second) {
assert(first->m_next_buffer.lock() != second);
assert(first->m_next_buffer.lock() == nullptr);
first->m_next_buffer = second;
Expand Down Expand Up @@ -244,7 +244,7 @@ std::unique_ptr< CPContext > IndexWBCache::create_cp_context(cp_id_t cp_id) {

void IndexWBCache::do_flush_one_buf(IndexCPContext* cp_ctx, IndexBufferPtr buf, bool part_of_batch) {
LOGTRACEMOD(wbcache, "cp {} buf {}", cp_ctx->id(), buf->to_string());
{ buf->set_state(index_buf_state_t::FLUSHING); }
buf->set_state(index_buf_state_t::FLUSHING);
m_vdev->async_write(r_cast< const char* >(buf->raw_buffer()), m_node_size, buf->m_blkid, part_of_batch)
.thenValue([buf, cp_ctx](auto) {
auto& pthis = s_cast< IndexWBCache& >(wb_cache()); // Avoiding more than 16 bytes capture
Expand All @@ -264,11 +264,10 @@ void IndexWBCache::process_write_completion(IndexCPContext* cp_ctx, IndexBufferP
// We are done flushing the buffers, lets free the btree blocks and then flush the bitmap
free_btree_blks_and_flush(cp_ctx);
} else {

}
}

std::pair< IndexBufferPtr, bool > IndexWBCache::on_buf_flush_done(IndexCPContext* cp_ctx, IndexBufferPtr buf) {
std::pair< IndexBufferPtr, bool > IndexWBCache::on_buf_flush_done(IndexCPContext* cp_ctx, IndexBufferPtr& buf) {
if (m_cp_flush_fibers.size() > 1) {
std::unique_lock lg(m_flush_mtx);
return on_buf_flush_done_internal(cp_ctx, buf);
Expand All @@ -277,9 +276,10 @@ std::pair< IndexBufferPtr, bool > IndexWBCache::on_buf_flush_done(IndexCPContext
}
}

std::pair< IndexBufferPtr, bool > IndexWBCache::on_buf_flush_done_internal(IndexCPContext* cp_ctx, IndexBufferPtr buf) {
std::pair< IndexBufferPtr, bool > IndexWBCache::on_buf_flush_done_internal(IndexCPContext* cp_ctx,
IndexBufferPtr& buf) {
static thread_local std::vector< IndexBufferPtr > t_buf_list;
{ buf->set_state(index_buf_state_t::CLEAN); }
buf->set_state(index_buf_state_t::CLEAN);

t_buf_list.clear();

Expand Down
9 changes: 4 additions & 5 deletions src/lib/index/wb_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,25 +53,24 @@ class IndexWBCache : public IndexWBCacheBase {
void write_buf(const BtreeNodePtr& node, const IndexBufferPtr& buf, CPContext* cp_ctx) override;
void read_buf(bnodeid_t id, BtreeNodePtr& node, node_initializer_t&& node_initializer) override;
std::tuple< bool, bool > create_chain(IndexBufferPtr& second, IndexBufferPtr& third, CPContext* cp_ctx) override;
void prepend_to_chain(const IndexBufferPtr& first, const IndexBufferPtr& second, CPContext* cp_ctx) override;
void prepend_to_chain(const IndexBufferPtr& first, const IndexBufferPtr& second) override;
void free_buf(const IndexBufferPtr& buf, CPContext* cp_ctx) override;

//////////////////// CP Related API section /////////////////////////////////
folly::Future< bool > async_cp_flush(CPContext* context);
std::unique_ptr< CPContext > create_cp_context(cp_id_t cp_id);
IndexBufferPtr copy_buffer(const IndexBufferPtr& cur_buf, CPContext *cp_ctx) const;
IndexBufferPtr copy_buffer(const IndexBufferPtr& cur_buf, const CPContext* cp_ctx) const;

private:
void start_flush_threads();
void process_write_completion(IndexCPContext* cp_ctx, IndexBufferPtr pbuf);
void do_flush_one_buf(IndexCPContext* cp_ctx, const IndexBufferPtr buf, bool part_of_batch);
std::pair< IndexBufferPtr, bool > on_buf_flush_done(IndexCPContext* cp_ctx, IndexBufferPtr buf);
std::pair< IndexBufferPtr, bool > on_buf_flush_done_internal(IndexCPContext* cp_ctx, IndexBufferPtr buf);
std::pair< IndexBufferPtr, bool > on_buf_flush_done(IndexCPContext* cp_ctx, IndexBufferPtr& buf);
std::pair< IndexBufferPtr, bool > on_buf_flush_done_internal(IndexCPContext* cp_ctx, IndexBufferPtr& buf);

void get_next_bufs(IndexCPContext* cp_ctx, uint32_t max_count, std::vector< IndexBufferPtr >& bufs);
void get_next_bufs_internal(IndexCPContext* cp_ctx, uint32_t max_count, IndexBufferPtr prev_flushed_buf,
std::vector< IndexBufferPtr >& bufs);
void free_btree_blks_and_flush(IndexCPContext* cp_ctx);

};
} // namespace homestore

0 comments on commit 5b5e1a0

Please sign in to comment.