Skip to content

Commit

Permalink
Add protection for concurrent access to m_down_buffers
Browse files Browse the repository at this point in the history
Signed-off-by: Jilong Kou <[email protected]>
  • Loading branch information
koujl committed Nov 4, 2024
1 parent c4efe11 commit 6ee5e4d
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 67 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.5.5"
version = "6.5.6"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
10 changes: 9 additions & 1 deletion src/include/homestore/index/index_internal.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ struct IndexBuffer : public sisl::ObjLifeCounter< IndexBuffer > {
sisl::atomic_counter< int > m_wait_for_down_buffers{0}; // Number of children need to wait for before persisting
#ifndef NDEBUG
// Down buffers are not mandatory members, but only to keep track of any bugs and asserts
std::vector< std::weak_ptr< IndexBuffer > > m_down_buffers;
std::vector<std::weak_ptr<IndexBuffer> > m_down_buffers;
std::mutex m_down_buffers_mtx;
std::shared_ptr< IndexBuffer > m_prev_up_buffer; // Keep a copy for debugging
#endif

Expand All @@ -123,6 +124,13 @@ struct IndexBuffer : public sisl::ObjLifeCounter< IndexBuffer > {

std::string to_string() const;
std::string to_string_dot() const;

void add_down_buffer(const IndexBufferPtr &buf);

void remove_down_buffer(const IndexBufferPtr &buf);
#ifndef NDEBUG
bool is_in_down_buffers(const IndexBufferPtr &buf);
#endif
};

// This is a special buffer which is used to write to the meta block
Expand Down
25 changes: 4 additions & 21 deletions src/lib/index/index_cp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -266,33 +266,16 @@ void IndexCPContext::process_txn_record(txn_record const* rec, std::map< BlkId,
#ifndef NDEBUG
// if (!is_sibling_link || (buf->m_up_buffer == real_up_buf)) { return buf;}
// Already linked with same buf or its not a sibling link to override
bool found{false};
for (auto const& dbuf : real_up_buf->m_down_buffers) {
if (dbuf.lock() == buf) {
found = true;
break;
}
if (real_up_buf->is_in_down_buffers(buf)) {
return buf;
}
if (found) { return buf; }
real_up_buf->m_down_buffers.emplace_back(buf);
#endif

if (buf->m_up_buffer != real_up_buf) {
if (buf->m_up_buffer) {
buf->m_up_buffer->m_wait_for_down_buffers.decrement(1);
#ifndef NDEBUG
bool found{false};
for (auto it = buf->m_up_buffer->m_down_buffers.begin(); it != buf->m_up_buffer->m_down_buffers.end(); ++it) {
if (it->lock() == buf) {
buf->m_up_buffer->m_down_buffers.erase(it);
found = true;
break;
}
}
HS_DBG_ASSERT(found, "Down buffer is linked to Up buf, but up_buf doesn't have down_buf in its list");
#endif
buf->m_up_buffer->remove_down_buffer(buf);
}
real_up_buf->m_wait_for_down_buffers.increment(1);
real_up_buf->add_down_buffer(buf);
buf->m_up_buffer = real_up_buf;
}
}
Expand Down
49 changes: 46 additions & 3 deletions src/lib/index/index_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,12 @@ std::string IndexBuffer::to_string() const {
// store m_down_buffers in a string
std::string down_bufs = "";
#ifndef NDEBUG
for (auto const& down_buf : m_down_buffers) {
if (auto ptr = down_buf.lock()) {
fmt::format_to(std::back_inserter(down_bufs), "[{}]", voidptr_cast(ptr.get()));
{
std::lock_guard lg(m_down_buffers_mtx);
for (auto const &down_buf: m_down_buffers) {
if (auto ptr = down_buf.lock()) {
fmt::format_to(std::back_inserter(down_bufs), "[{}]", voidptr_cast(ptr.get()));
}
}
}
#endif
Expand All @@ -178,6 +181,7 @@ std::string IndexBuffer::to_string() const {
down_bufs);
}
}

std::string IndexBuffer::to_string_dot() const {
auto str = fmt::format("IndexBuffer {} ", reinterpret_cast< void* >(const_cast< IndexBuffer* >(this)));
if (m_bytes == nullptr) {
Expand All @@ -190,6 +194,45 @@ std::string IndexBuffer::to_string_dot() const {
return str;
}

void IndexBuffer::add_down_buffer(const IndexBufferPtr &buf) {
m_wait_for_down_buffers.increment();
#ifndef NDEBUG
{
std::lock_guard lg(m_down_buffers_mtx);
m_down_buffers.push_back(buf);
}
#endif
}

void IndexBuffer::remove_down_buffer(const IndexBufferPtr &buf) {
m_wait_for_down_buffers.decrement();
#ifndef NDEBUG
bool found{false}; {
std::lock_guard lg(m_down_buffers_mtx);
for (auto it = buf->m_up_buffer->m_down_buffers.begin(); it != buf->m_up_buffer->m_down_buffers.end(); ++it) {
if (it->lock() == buf) {
buf->m_up_buffer->m_down_buffers.erase(it);
found = true;
break;
}
}
}
HS_DBG_ASSERT(found, "Down buffer is linked to up_buf, but up_buf doesn't have down_buf in its list");
#endif
}

#ifndef NDEBUG
bool IndexBuffer::is_in_down_buffers(const IndexBufferPtr &buf) {
std::lock_guard<std::mutex> lg(m_down_buffers_mtx);
for (auto const &dbuf: m_down_buffers) {
if (dbuf.lock() == buf) {
return true;
}
}
return false;
}
#endif

MetaIndexBuffer::MetaIndexBuffer(superblk< index_table_sb >& sb) : IndexBuffer{nullptr, BlkId{}}, m_sb{sb} {
m_is_meta_buf = true;
}
Expand Down
51 changes: 10 additions & 41 deletions src/lib/index/wb_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -396,14 +396,8 @@ void IndexWBCache::link_buf(IndexBufferPtr const& up_buf, IndexBufferPtr const&
HS_DBG_ASSERT((real_up_buf->m_dirtied_cp_id == down_buf->m_dirtied_cp_id) || (real_up_buf->is_meta_buf()),
"Up buffer is not modified by current cp, but down buffer is linked to it");
#ifndef NDEBUG
bool found{false};
for (auto const& dbuf : real_up_buf->m_down_buffers) {
if (dbuf.lock() == down_buf) {
found = true;
break;
}
}
HS_DBG_ASSERT(found, "Down buffer is linked to Up buf, but up_buf doesn't have down_buf in its list");
HS_DBG_ASSERT(real_up_buf->is_in_down_buffers(down_buf),
"Down buffer is linked to Up buf, but up_buf doesn't have down_buf in its list");
#endif
return;
}
Expand All @@ -412,25 +406,10 @@ void IndexWBCache::link_buf(IndexBufferPtr const& up_buf, IndexBufferPtr const&
// Now we link the down_buffer to the real up_buffer
if (down_buf->m_up_buffer) {
// release existing up_buffer's wait count
down_buf->m_up_buffer->m_wait_for_down_buffers.decrement();
#ifndef NDEBUG
bool found{false};
for (auto it = down_buf->m_up_buffer->m_down_buffers.begin(); it != down_buf->m_up_buffer->m_down_buffers.end();
++it) {
if (it->lock() == down_buf) {
down_buf->m_up_buffer->m_down_buffers.erase(it);
found = true;
break;
}
}
HS_DBG_ASSERT(found, "Down buffer is linked to Up buf, but up_buf doesn't have down_buf in its list");
#endif
down_buf->m_up_buffer->remove_down_buffer(down_buf);
}
real_up_buf->m_wait_for_down_buffers.increment(1);
down_buf->m_up_buffer = real_up_buf;
#ifndef NDEBUG
real_up_buf->m_down_buffers.emplace_back(down_buf);
#endif
real_up_buf->add_down_buffer(down_buf);
}

void IndexWBCache::free_buf(const IndexBufferPtr& buf, CPContext* cp_ctx) {
Expand Down Expand Up @@ -535,21 +514,8 @@ void IndexWBCache::recover(sisl::byte_view sb) {
pending_bufs.push_back(buf->m_up_buffer);
} else {
// Just ignore it
buf->m_up_buffer->m_wait_for_down_buffers.decrement();
#ifndef NDEBUG
bool found{false};
for (auto it = buf->m_up_buffer->m_down_buffers.begin();
it != buf->m_up_buffer->m_down_buffers.end(); ++it) {
auto sp = it->lock();
if (sp && sp == buf) {
found = true;
buf->m_up_buffer->m_down_buffers.erase(it);
break;
}
}
HS_DBG_ASSERT(found,
"Down buffer is linked to Up buf, but up_buf doesn't have down_buf in its list");
#endif
buf->m_up_buffer->remove_down_buffer(buf);
buf->m_up_buffer = nullptr;
}
}
}
Expand Down Expand Up @@ -754,7 +720,10 @@ std::pair< IndexBufferPtr, bool > IndexWBCache::on_buf_flush_done_internal(Index
IndexBufferPtr const& buf) {
IndexBufferPtrList buf_list;
#ifndef NDEBUG
buf->m_down_buffers.clear();
{
std::lock_guard lg(buf->m_down_buffers_mtx);
buf->m_down_buffers.clear();
}
#endif
buf->set_state(index_buf_state_t::CLEAN);

Expand Down

0 comments on commit 6ee5e4d

Please sign in to comment.