diff --git a/src/lib/memory/blob_manager.cpp b/src/lib/memory/blob_manager.cpp index a8c5b68f..2758e3c4 100644 --- a/src/lib/memory/blob_manager.cpp +++ b/src/lib/memory/blob_manager.cpp @@ -2,61 +2,58 @@ namespace homeobject { -ShardIndex& MemoryHomeObject::_find_index(shard_id shard) { - auto lg = std::shared_lock(_index_lock); +ShardIndex& MemoryHomeObject::_find_index(shard_id shard) const { auto index_it = _in_memory_index.find(shard); RELEASE_ASSERT(_in_memory_index.end() != index_it, "Missing BTree!!"); - return index_it->second; + return *index_it->second; } BlobManager::Result< blob_id > MemoryHomeObject::_put_blob(ShardInfo const& shard, Blob&& blob) { // Lookup Shard Index (BTree and SSN) - auto& shard_index = _find_index(shard.id); + auto& our_shard = _find_index(shard.id); - // Lock the BTree - shard_index._btree_lock.lock(); - // TODO: Generate BlobID (with RAFT this will happen implicitly) and Route - auto route = BlobRoute{shard.id, shard_index._shard_seq_num++}; - auto [new_it, happened] = shard_index._btree.try_emplace(route, std::move(blob)); - shard_index._btree_lock.unlock(); + // Generate BlobID (with RAFT this will happen implicitly) and Route + auto const route = BlobRoute{shard.id, our_shard._shard_seq_num++}; + if ((UINT64_MAX >> 16) < route.blob) { return folly::makeUnexpected(BlobError::UNKNOWN); } + LOGTRACEMOD(homeobject, "Writing Blob {}", route); + // Write (move) Blob to Heap + auto new_blob = BlobExt(); + new_blob._state = BlobState::ALIVE; + new_blob._blob = new Blob(std::move(blob)); + LOGDEBUGMOD(homeobject, "Wrote BLOB {} to: BlkId:[{}]", route, fmt::ptr(new_blob._blob)); + + // Insert BlobExt to Index + auto [_, happened] = our_shard._btree.try_emplace(route, std::move(new_blob)); RELEASE_ASSERT(happened, "Generated duplicate BlobRoute!"); - LOGDEBUGMOD(homeobject, "Wrote BLOB {} to: BlkId:[{}]", route.blob, fmt::ptr(&new_it->second)); return route.blob; } BlobManager::Result< Blob > MemoryHomeObject::_get_blob(ShardInfo const& shard, blob_id blob) const { // Lookup Shard Index (BTree and SSN) - auto index_it = _in_memory_index.cend(); - { - auto lg = std::shared_lock(_index_lock); - if (index_it = _in_memory_index.find(shard.id); _in_memory_index.cend() == index_it) - return folly::makeUnexpected(BlobError::UNKNOWN_SHARD); - } - auto& shard_index = index_it->second; - - // Calculate BlobRoute from ShardInfo (use ordinal?) - auto route = BlobRoute(shard.id, blob); - LOGDEBUGMOD(homeobject, "Looking up Blob {}", route.blob); + auto& our_shard = _find_index(shard.id); - // Lock the BTree *SHARED* and find the BLOB location in memory, read the BLOB into our - // return value except body which we'll do outside the lock (may be HUGE and guaranteed to not move) - // This is only *safe* because we defer GC to shutdown currently. - Blob user_blob; - auto unsafe_ptr = decltype(Blob::body)::pointer{nullptr}; - shard_index._btree_lock.lock_shared(); - if (auto it = shard_index._btree.find(route); shard_index._btree.end() != it) { - user_blob.object_off = it->second.object_off; - user_blob.user_key = it->second.user_key; - unsafe_ptr = it->second.body.get(); - } - shard_index._btree_lock.unlock_shared(); + // Find BlobExt by BlobRoute + auto const route = BlobRoute(shard.id, blob); - if (!unsafe_ptr) { - LOGWARNMOD(homeobject, "Blob missing {} during get", route.blob); + // Calculate BlobRoute from ShardInfo (use ordinal?) + LOGTRACEMOD(homeobject, "Looking up Blob {}", route); + auto blob_it = our_shard._btree.find(route); + if (our_shard._btree.end() == blob_it || !blob_it->second) { + LOGWARNMOD(homeobject, "Blob [{}] missing during get", route); return folly::makeUnexpected(BlobError::UNKNOWN_BLOB); } - /// Make copy outside lock + + // Duplicate underyling Blob for user + // This is only *safe* because we defer GC. + auto& ext_blob = blob_it->second; + RELEASE_ASSERT(ext_blob._blob != nullptr, "Blob Deleted!"); + RELEASE_ASSERT(!!ext_blob._blob->body, "BlobBody Deleted!"); + auto unsafe_ptr = ext_blob._blob->body.get(); + + Blob user_blob; + user_blob.object_off = ext_blob._blob->object_off; + user_blob.user_key = ext_blob._blob->user_key; user_blob.body = std::make_unique< sisl::byte_array_impl >(unsafe_ptr->size); std::memcpy(user_blob.body->bytes, unsafe_ptr->bytes, user_blob.body->size); return user_blob; @@ -64,25 +61,21 @@ BlobManager::Result< Blob > MemoryHomeObject::_get_blob(ShardInfo const& shard, BlobManager::NullResult MemoryHomeObject::_del_blob(ShardInfo const& shard, blob_id id) { // Lookup Shard Index (BTree and SSN) - auto& shard_index = _find_index(shard.id); + auto& our_shard = _find_index(shard.id); // Calculate BlobRoute from ShardInfo (use ordinal?) - auto route = BlobRoute(shard.id, id); + auto const route = BlobRoute(shard.id, id); - // Lock the BTree *OWNED* and find the BLOB location. Move the index value to *garbage*, to be dealt - // with later and remove route from Index. - auto result = BlobManager::NullResult(folly::makeUnexpected(BlobError::UNKNOWN_BLOB)); - shard_index._btree_lock.lock(); - auto& our_btree = shard_index._btree; - if (auto r_it = our_btree.find(route); our_btree.end() != r_it) { - result = folly::Unit(); - _garbage.push_back(std::move(r_it->second)); - our_btree.erase(r_it); + // Tombstone BlobExt entry + LOGTRACEMOD(homeobject, "Tombstoning Blob {}", route); + if (auto blob_it = our_shard._btree.find(route); our_shard._btree.end() != blob_it && blob_it->second) { + auto del_blob = BlobExt(); + del_blob._blob = blob_it->second._blob; + our_shard._btree.assign_if_equal(route, blob_it->second, std::move(del_blob)); + return folly::Unit(); } - shard_index._btree_lock.unlock(); - - if (!result) LOGWARNMOD(homeobject, "Blob missing {} during delete", route.blob); - return result; + LOGWARNMOD(homeobject, "Blob [{}] missing during del", route); + return folly::makeUnexpected(BlobError::UNKNOWN_BLOB); } } // namespace homeobject diff --git a/src/lib/memory/homeobject.cpp b/src/lib/memory/homeobject.cpp index c72f5c57..e71cded2 100644 --- a/src/lib/memory/homeobject.cpp +++ b/src/lib/memory/homeobject.cpp @@ -21,4 +21,10 @@ void HomeObjectImpl::init_repl_svc() { } } +ShardIndex::~ShardIndex() { + for (auto it = _btree.begin(); it != _btree.end(); ++it) { + delete it->second._blob; + } +} + } // namespace homeobject diff --git a/src/lib/memory/homeobject.hpp b/src/lib/memory/homeobject.hpp index 25435973..b2bdfcd3 100644 --- a/src/lib/memory/homeobject.hpp +++ b/src/lib/memory/homeobject.hpp @@ -1,12 +1,9 @@ #pragma once -#include -#include -#include -#include +#include #include -#include +#include #include "mocks/repl_service.h" #include "lib/homeobject_impl.hpp" @@ -16,37 +13,63 @@ struct BlobRoute { shard_id shard; blob_id blob; }; +} // namespace homeobject -inline std::string toString(BlobRoute const& r) { return fmt::format("{}:{}", r.shard, r.blob); } - -inline bool operator<(BlobRoute const& lhs, BlobRoute const& rhs) { return toString(lhs) < toString(rhs); } +namespace fmt { +template <> +struct formatter< homeobject::BlobRoute > { + template < typename ParseContext > + constexpr auto parse(ParseContext& ctx) { + return ctx.begin(); + } -inline bool operator==(BlobRoute const& lhs, BlobRoute const& rhs) { return toString(lhs) == toString(rhs); } -} // namespace homeobject + template < typename FormatContext > + auto format(const homeobject::BlobRoute& r, FormatContext& ctx) { + return format_to(ctx.out(), "{}:{:012x}", r.shard, r.blob); + } +}; +} // namespace fmt template <> struct std::hash< homeobject::BlobRoute > { std::size_t operator()(homeobject::BlobRoute const& r) const noexcept { - return std::hash< std::string >()(homeobject::toString(r)); + return std::hash< std::string >()(fmt::format("{}", r)); } }; namespace homeobject { -using btree = std::unordered_map< BlobRoute, Blob >; +inline bool operator<(BlobRoute const& lhs, BlobRoute const& rhs) { + return fmt::format("{}", lhs) < fmt::format("{}", rhs); +} +inline bool operator==(BlobRoute const& lhs, BlobRoute const& rhs) { + return fmt::format("{}", lhs) == fmt::format("{}", rhs); +} + +/// +// Used to TombStone Blob's in the Index to defer for GC. +ENUM(BlobState, uint8_t, ALIVE = 0, DELETED); + +struct BlobExt { + BlobState _state{BlobState::DELETED}; + Blob* _blob; + + explicit operator bool() const { return _state == BlobState::ALIVE; } +}; +inline bool operator==(BlobExt const& lhs, BlobExt const& rhs) { return lhs._blob == rhs._blob; } + +using btree = folly::ConcurrentHashMap< BlobRoute, BlobExt >; struct ShardIndex { - mutable folly::RWSpinLock _btree_lock; btree _btree; - blob_id _shard_seq_num{0ull}; + std::atomic< blob_id > _shard_seq_num{0ull}; + ~ShardIndex(); }; class MemoryHomeObject : public HomeObjectImpl { /// Simulates the Shard=>Chunk mapping in IndexSvc - mutable std::shared_mutex _index_lock; - using index_svc = std::unordered_map< shard_id, ShardIndex >; + using index_svc = folly::ConcurrentHashMap< shard_id, std::unique_ptr< ShardIndex > >; index_svc _in_memory_index; - std::list< Blob > _garbage; /// /// Helpers @@ -60,7 +83,7 @@ class MemoryHomeObject : public HomeObjectImpl { BlobManager::NullResult _del_blob(ShardInfo const&, blob_id) override; /// - ShardIndex& _find_index(shard_id); + ShardIndex& _find_index(shard_id) const; public: using HomeObjectImpl::HomeObjectImpl; diff --git a/src/lib/memory/shard_manager.cpp b/src/lib/memory/shard_manager.cpp index 2b072ed4..9f389532 100644 --- a/src/lib/memory/shard_manager.cpp +++ b/src/lib/memory/shard_manager.cpp @@ -21,8 +21,7 @@ ShardManager::Result< ShardInfo > MemoryHomeObject::_create_shard(pg_id pg_owner auto [_, s_happened] = _shard_map.emplace(info.id, iter); RELEASE_ASSERT(s_happened, "Duplicate Shard insertion!"); } - auto lg = std::scoped_lock(_index_lock); - auto [_, happened] = _in_memory_index.try_emplace(info.id); + auto [it, happened] = _in_memory_index.try_emplace(info.id, std::make_unique< ShardIndex >()); RELEASE_ASSERT(happened, "Could not create BTree!"); return info; } diff --git a/src/lib/memory/tests/BlobManagerTest.cpp b/src/lib/memory/tests/BlobManagerTest.cpp index c41d7851..0ce8aad0 100644 --- a/src/lib/memory/tests/BlobManagerTest.cpp +++ b/src/lib/memory/tests/BlobManagerTest.cpp @@ -103,9 +103,11 @@ TEST_F(BlobManagerFixture, BasicTests) { our_calls.push_back( m_memory_homeobj->blob_manager()->get(i, _blob_id).deferValue([](auto const& e) {})); our_calls.push_back( - m_memory_homeobj->blob_manager()->get(_shard_1.id, i).deferValue([](auto const&) {})); + m_memory_homeobj->blob_manager()->get(_shard_1.id, (i - _shard_2.id)).deferValue([](auto const&) { + })); our_calls.push_back( - m_memory_homeobj->blob_manager()->get(_shard_2.id, i).deferValue([](auto const&) {})); + m_memory_homeobj->blob_manager()->get(_shard_2.id, (i - _shard_2.id)).deferValue([](auto const&) { + })); our_calls.push_back(m_memory_homeobj->blob_manager()->put(i, Blob()).deferValue( [](auto const& e) { EXPECT_EQ(BlobError::UNKNOWN_SHARD, e.error()); })); our_calls.push_back( @@ -120,9 +122,10 @@ TEST_F(BlobManagerFixture, BasicTests) { .deferValue([](auto const& e) { EXPECT_TRUE(!!e); })); our_calls.push_back( m_memory_homeobj->blob_manager()->del(i, _blob_id).deferValue([](auto const& e) {})); - our_calls.push_back(m_memory_homeobj->blob_manager()->del(_shard_1.id, i).deferValue([](auto const& e) { - EXPECT_EQ(BlobError::UNKNOWN_BLOB, e.error()); - })); + our_calls.push_back( + m_memory_homeobj->blob_manager()->del(_shard_1.id, (i - _shard_2.id)).deferValue([](auto const& e) { + EXPECT_EQ(BlobError::UNKNOWN_BLOB, e.error()); + })); } auto lg = std::scoped_lock(call_lock);