diff --git a/src/include/homestore/index/index_internal.hpp b/src/include/homestore/index/index_internal.hpp index cd914d315..eef5dad61 100644 --- a/src/include/homestore/index/index_internal.hpp +++ b/src/include/homestore/index/index_internal.hpp @@ -87,7 +87,6 @@ struct IndexBuffer { std::weak_ptr< IndexBuffer > m_next_buffer; // Next buffer in the chain // Number of leader buffers we are waiting for before we write this buffer sisl::atomic_counter< int > m_wait_for_leaders{0}; - std::mutex m_mutex; IndexBuffer(BlkId blkid, uint32_t buf_size, uint32_t align_size); IndexBuffer(NodeBufferPtr node_buf, BlkId blkid); @@ -95,27 +94,26 @@ struct IndexBuffer { BlkId blkid() const { return m_blkid; } uint8_t* raw_buffer() { - RELEASE_ASSERT(m_node_buf, "Node buffer null"); + RELEASE_ASSERT(m_node_buf, "Node buffer null blkid {}", m_blkid.to_integer()); return m_node_buf->m_bytes; } bool is_clean() const { - RELEASE_ASSERT(m_node_buf, "Node buffer null"); + RELEASE_ASSERT(m_node_buf, "Node buffer null blkid {}", m_blkid.to_integer()); return (m_node_buf->m_state.load() == index_buf_state_t::CLEAN); } - index_buf_state_t state() { - RELEASE_ASSERT(m_node_buf, "Node buffer null"); + index_buf_state_t state() const { + RELEASE_ASSERT(m_node_buf, "Node buffer null blkid {}", m_blkid.to_integer()); return m_node_buf->m_state; } void set_state(index_buf_state_t state) { - RELEASE_ASSERT(m_node_buf, "Node buffer null"); + RELEASE_ASSERT(m_node_buf, "Node buffer null blkid {}", m_blkid.to_integer()); m_node_buf->m_state = state; } std::string to_string() const { - std::lock_guard lock{m_mutex}; auto str = fmt::format("IndexBuffer {} blkid={}", reinterpret_cast< void* >(const_cast< IndexBuffer* >(this)), m_blkid.to_integer()); if (m_node_buf == nullptr) { diff --git a/src/include/homestore/index/index_table.hpp b/src/include/homestore/index/index_table.hpp index 0e59e5b70..bce7e8f36 100644 --- a/src/include/homestore/index/index_table.hpp +++ b/src/include/homestore/index/index_table.hpp @@ -136,7 +136,7 @@ class IndexTable : public IndexTableBase, public Btree< K, V > { for (const auto& right_child_node : new_nodes) { auto right_child = IndexBtreeNode::convert(right_child_node.get()); write_node_impl(right_child_node, context); - wb_cache().prepend_to_chain(right_child->m_idx_buf, left_child_buf, cp_ctx); + wb_cache().prepend_to_chain(right_child->m_idx_buf, left_child_buf); } auto trace_index_bufs = [&]() { diff --git a/src/include/homestore/index/wb_cache_base.hpp b/src/include/homestore/index/wb_cache_base.hpp index d53f3124f..d55ef26a3 100644 --- a/src/include/homestore/index/wb_cache_base.hpp +++ b/src/include/homestore/index/wb_cache_base.hpp @@ -63,7 +63,7 @@ class IndexWBCacheBase { /// @brief Prepend to the chain that was already created with second /// @param first /// @param second - virtual void prepend_to_chain(const IndexBufferPtr& first, const IndexBufferPtr& second, CPContext* cp_ctx) = 0; + virtual void prepend_to_chain(const IndexBufferPtr& first, const IndexBufferPtr& second) = 0; /// @brief Free the buffer allocated and remove it from wb cache /// @param buf @@ -73,7 +73,7 @@ class IndexWBCacheBase { /// @brief Copy buffer /// @param cur_buf /// @return - virtual IndexBufferPtr copy_buffer(const IndexBufferPtr& cur_buf, CPContext* context) const = 0; + virtual IndexBufferPtr copy_buffer(const IndexBufferPtr& cur_buf, const CPContext* context) const = 0; }; } // namespace homestore diff --git a/src/lib/index/index_cp.hpp b/src/lib/index/index_cp.hpp index 3e24966d2..4eb83bc89 100644 --- a/src/lib/index/index_cp.hpp +++ b/src/lib/index/index_cp.hpp @@ -57,7 +57,7 @@ struct IndexCPContext : public CPContext { } void add_to_dirty_list(const IndexBufferPtr& buf) { - buf->m_node_buf->m_state = index_buf_state_t::DIRTY; + buf->set_state(index_buf_state_t::DIRTY); m_dirty_buf_list->push_back(buf); m_dirty_buf_count.increment(1); } @@ -151,7 +151,7 @@ struct IndexCPContext : public CPContext { } } - assert(issue == false); + RELEASE_ASSERT_EQ(issue, false, "Found issue with wait_for_leaders"); } }; // namespace homestore diff --git a/src/lib/index/wb_cache.cpp b/src/lib/index/wb_cache.cpp index 4b7b4bb5a..38cc28a58 100644 --- a/src/lib/index/wb_cache.cpp +++ b/src/lib/index/wb_cache.cpp @@ -107,7 +107,7 @@ void IndexWBCache::write_buf(const BtreeNodePtr& node, const IndexBufferPtr& buf resource_mgr().inc_dirty_buf_size(m_node_size); } -IndexBufferPtr IndexWBCache::copy_buffer(const IndexBufferPtr& cur_buf, CPContext* cp_ctx) const { +IndexBufferPtr IndexWBCache::copy_buffer(const IndexBufferPtr& cur_buf, const CPContext* cp_ctx) const { IndexBufferPtr new_buf = nullptr; bool copied = false; @@ -187,7 +187,7 @@ std::tuple< bool, bool > IndexWBCache::create_chain(IndexBufferPtr& second, Inde return {second_copied, third_copied}; } -void IndexWBCache::prepend_to_chain(const IndexBufferPtr& first, const IndexBufferPtr& second, CPContext* cp_ctx) { +void IndexWBCache::prepend_to_chain(const IndexBufferPtr& first, const IndexBufferPtr& second) { assert(first->m_next_buffer.lock() != second); assert(first->m_next_buffer.lock() == nullptr); first->m_next_buffer = second; @@ -244,7 +244,7 @@ std::unique_ptr< CPContext > IndexWBCache::create_cp_context(cp_id_t cp_id) { void IndexWBCache::do_flush_one_buf(IndexCPContext* cp_ctx, IndexBufferPtr buf, bool part_of_batch) { LOGTRACEMOD(wbcache, "cp {} buf {}", cp_ctx->id(), buf->to_string()); - { buf->set_state(index_buf_state_t::FLUSHING); } + buf->set_state(index_buf_state_t::FLUSHING); m_vdev->async_write(r_cast< const char* >(buf->raw_buffer()), m_node_size, buf->m_blkid, part_of_batch) .thenValue([buf, cp_ctx](auto) { auto& pthis = s_cast< IndexWBCache& >(wb_cache()); // Avoiding more than 16 bytes capture @@ -264,11 +264,10 @@ void IndexWBCache::process_write_completion(IndexCPContext* cp_ctx, IndexBufferP // We are done flushing the buffers, lets free the btree blocks and then flush the bitmap free_btree_blks_and_flush(cp_ctx); } else { - } } -std::pair< IndexBufferPtr, bool > IndexWBCache::on_buf_flush_done(IndexCPContext* cp_ctx, IndexBufferPtr buf) { +std::pair< IndexBufferPtr, bool > IndexWBCache::on_buf_flush_done(IndexCPContext* cp_ctx, IndexBufferPtr& buf) { if (m_cp_flush_fibers.size() > 1) { std::unique_lock lg(m_flush_mtx); return on_buf_flush_done_internal(cp_ctx, buf); @@ -277,9 +276,10 @@ std::pair< IndexBufferPtr, bool > IndexWBCache::on_buf_flush_done(IndexCPContext } } -std::pair< IndexBufferPtr, bool > IndexWBCache::on_buf_flush_done_internal(IndexCPContext* cp_ctx, IndexBufferPtr buf) { +std::pair< IndexBufferPtr, bool > IndexWBCache::on_buf_flush_done_internal(IndexCPContext* cp_ctx, + IndexBufferPtr& buf) { static thread_local std::vector< IndexBufferPtr > t_buf_list; - { buf->set_state(index_buf_state_t::CLEAN); } + buf->set_state(index_buf_state_t::CLEAN); t_buf_list.clear(); diff --git a/src/lib/index/wb_cache.hpp b/src/lib/index/wb_cache.hpp index a2db3f99a..4e002b39c 100644 --- a/src/lib/index/wb_cache.hpp +++ b/src/lib/index/wb_cache.hpp @@ -53,25 +53,24 @@ class IndexWBCache : public IndexWBCacheBase { void write_buf(const BtreeNodePtr& node, const IndexBufferPtr& buf, CPContext* cp_ctx) override; void read_buf(bnodeid_t id, BtreeNodePtr& node, node_initializer_t&& node_initializer) override; std::tuple< bool, bool > create_chain(IndexBufferPtr& second, IndexBufferPtr& third, CPContext* cp_ctx) override; - void prepend_to_chain(const IndexBufferPtr& first, const IndexBufferPtr& second, CPContext* cp_ctx) override; + void prepend_to_chain(const IndexBufferPtr& first, const IndexBufferPtr& second) override; void free_buf(const IndexBufferPtr& buf, CPContext* cp_ctx) override; //////////////////// CP Related API section ///////////////////////////////// folly::Future< bool > async_cp_flush(CPContext* context); std::unique_ptr< CPContext > create_cp_context(cp_id_t cp_id); - IndexBufferPtr copy_buffer(const IndexBufferPtr& cur_buf, CPContext *cp_ctx) const; + IndexBufferPtr copy_buffer(const IndexBufferPtr& cur_buf, const CPContext* cp_ctx) const; private: void start_flush_threads(); void process_write_completion(IndexCPContext* cp_ctx, IndexBufferPtr pbuf); void do_flush_one_buf(IndexCPContext* cp_ctx, const IndexBufferPtr buf, bool part_of_batch); - std::pair< IndexBufferPtr, bool > on_buf_flush_done(IndexCPContext* cp_ctx, IndexBufferPtr buf); - std::pair< IndexBufferPtr, bool > on_buf_flush_done_internal(IndexCPContext* cp_ctx, IndexBufferPtr buf); + std::pair< IndexBufferPtr, bool > on_buf_flush_done(IndexCPContext* cp_ctx, IndexBufferPtr& buf); + std::pair< IndexBufferPtr, bool > on_buf_flush_done_internal(IndexCPContext* cp_ctx, IndexBufferPtr& buf); void get_next_bufs(IndexCPContext* cp_ctx, uint32_t max_count, std::vector< IndexBufferPtr >& bufs); void get_next_bufs_internal(IndexCPContext* cp_ctx, uint32_t max_count, IndexBufferPtr prev_flushed_buf, std::vector< IndexBufferPtr >& bufs); void free_btree_blks_and_flush(IndexCPContext* cp_ctx); - }; } // namespace homestore