Skip to content

Commit

Permalink
Merge branch 'origin/main'
Browse files Browse the repository at this point in the history
  • Loading branch information
zichanglai committed Sep 15, 2023
2 parents f995734 + 57fe3e6 commit ad3860c
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 77 deletions.
97 changes: 45 additions & 52 deletions src/lib/memory/blob_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,87 +2,80 @@

namespace homeobject {

ShardIndex& MemoryHomeObject::_find_index(shard_id shard) {
auto lg = std::shared_lock(_index_lock);
ShardIndex& MemoryHomeObject::_find_index(shard_id shard) const {
auto index_it = _in_memory_index.find(shard);
RELEASE_ASSERT(_in_memory_index.end() != index_it, "Missing BTree!!");
return index_it->second;
return *index_it->second;
}

BlobManager::Result< blob_id > MemoryHomeObject::_put_blob(ShardInfo const& shard, Blob&& blob) {
// Lookup Shard Index (BTree and SSN)
auto& shard_index = _find_index(shard.id);
auto& our_shard = _find_index(shard.id);

// Lock the BTree
shard_index._btree_lock.lock();
// TODO: Generate BlobID (with RAFT this will happen implicitly) and Route
auto route = BlobRoute{shard.id, shard_index._shard_seq_num++};
auto [new_it, happened] = shard_index._btree.try_emplace(route, std::move(blob));
shard_index._btree_lock.unlock();
// Generate BlobID (with RAFT this will happen implicitly) and Route
auto const route = BlobRoute{shard.id, our_shard._shard_seq_num++};
if ((UINT64_MAX >> 16) < route.blob) { return folly::makeUnexpected(BlobError::UNKNOWN); }
LOGTRACEMOD(homeobject, "Writing Blob {}", route);

// Write (move) Blob to Heap
auto new_blob = BlobExt();
new_blob._state = BlobState::ALIVE;
new_blob._blob = new Blob(std::move(blob));
LOGDEBUGMOD(homeobject, "Wrote BLOB {} to: BlkId:[{}]", route, fmt::ptr(new_blob._blob));

// Insert BlobExt to Index
auto [_, happened] = our_shard._btree.try_emplace(route, std::move(new_blob));
RELEASE_ASSERT(happened, "Generated duplicate BlobRoute!");
LOGDEBUGMOD(homeobject, "Wrote BLOB {} to: BlkId:[{}]", route.blob, fmt::ptr(&new_it->second));
return route.blob;
}

BlobManager::Result< Blob > MemoryHomeObject::_get_blob(ShardInfo const& shard, blob_id blob) const {
// Lookup Shard Index (BTree and SSN)
auto index_it = _in_memory_index.cend();
{
auto lg = std::shared_lock(_index_lock);
if (index_it = _in_memory_index.find(shard.id); _in_memory_index.cend() == index_it)
return folly::makeUnexpected(BlobError::UNKNOWN_SHARD);
}
auto& shard_index = index_it->second;

// Calculate BlobRoute from ShardInfo (use ordinal?)
auto route = BlobRoute(shard.id, blob);
LOGDEBUGMOD(homeobject, "Looking up Blob {}", route.blob);
auto& our_shard = _find_index(shard.id);

// Lock the BTree *SHARED* and find the BLOB location in memory, read the BLOB into our
// return value except body which we'll do outside the lock (may be HUGE and guaranteed to not move)
// This is only *safe* because we defer GC to shutdown currently.
Blob user_blob;
auto unsafe_ptr = decltype(Blob::body)::pointer{nullptr};
shard_index._btree_lock.lock_shared();
if (auto it = shard_index._btree.find(route); shard_index._btree.end() != it) {
user_blob.object_off = it->second.object_off;
user_blob.user_key = it->second.user_key;
unsafe_ptr = it->second.body.get();
}
shard_index._btree_lock.unlock_shared();
// Find BlobExt by BlobRoute
auto const route = BlobRoute(shard.id, blob);

if (!unsafe_ptr) {
LOGWARNMOD(homeobject, "Blob missing {} during get", route.blob);
// Calculate BlobRoute from ShardInfo (use ordinal?)
LOGTRACEMOD(homeobject, "Looking up Blob {}", route);
auto blob_it = our_shard._btree.find(route);
if (our_shard._btree.end() == blob_it || !blob_it->second) {
LOGWARNMOD(homeobject, "Blob [{}] missing during get", route);
return folly::makeUnexpected(BlobError::UNKNOWN_BLOB);
}
/// Make copy outside lock

// Duplicate underyling Blob for user
// This is only *safe* because we defer GC.
auto& ext_blob = blob_it->second;
RELEASE_ASSERT(ext_blob._blob != nullptr, "Blob Deleted!");
RELEASE_ASSERT(!!ext_blob._blob->body, "BlobBody Deleted!");
auto unsafe_ptr = ext_blob._blob->body.get();

Blob user_blob;
user_blob.object_off = ext_blob._blob->object_off;
user_blob.user_key = ext_blob._blob->user_key;
user_blob.body = std::make_unique< sisl::byte_array_impl >(unsafe_ptr->size);
std::memcpy(user_blob.body->bytes, unsafe_ptr->bytes, user_blob.body->size);
return user_blob;
}

BlobManager::NullResult MemoryHomeObject::_del_blob(ShardInfo const& shard, blob_id id) {
// Lookup Shard Index (BTree and SSN)
auto& shard_index = _find_index(shard.id);
auto& our_shard = _find_index(shard.id);

// Calculate BlobRoute from ShardInfo (use ordinal?)
auto route = BlobRoute(shard.id, id);
auto const route = BlobRoute(shard.id, id);

// Lock the BTree *OWNED* and find the BLOB location. Move the index value to *garbage*, to be dealt
// with later and remove route from Index.
auto result = BlobManager::NullResult(folly::makeUnexpected(BlobError::UNKNOWN_BLOB));
shard_index._btree_lock.lock();
auto& our_btree = shard_index._btree;
if (auto r_it = our_btree.find(route); our_btree.end() != r_it) {
result = folly::Unit();
_garbage.push_back(std::move(r_it->second));
our_btree.erase(r_it);
// Tombstone BlobExt entry
LOGTRACEMOD(homeobject, "Tombstoning Blob {}", route);
if (auto blob_it = our_shard._btree.find(route); our_shard._btree.end() != blob_it && blob_it->second) {
auto del_blob = BlobExt();
del_blob._blob = blob_it->second._blob;
our_shard._btree.assign_if_equal(route, blob_it->second, std::move(del_blob));
return folly::Unit();
}
shard_index._btree_lock.unlock();

if (!result) LOGWARNMOD(homeobject, "Blob missing {} during delete", route.blob);
return result;
LOGWARNMOD(homeobject, "Blob [{}] missing during del", route);
return folly::makeUnexpected(BlobError::UNKNOWN_BLOB);
}

} // namespace homeobject
6 changes: 6 additions & 0 deletions src/lib/memory/homeobject.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,10 @@ void HomeObjectImpl::init_repl_svc() {
}
}

ShardIndex::~ShardIndex() {
for (auto it = _btree.begin(); it != _btree.end(); ++it) {
delete it->second._blob;
}
}

} // namespace homeobject
59 changes: 41 additions & 18 deletions src/lib/memory/homeobject.hpp
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
#pragma once

#include <memory>
#include <mutex>
#include <shared_mutex>
#include <set>
#include <atomic>
#include <utility>

#include <folly/synchronization/RWSpinLock.h>
#include <folly/concurrency/ConcurrentHashMap.h>
#include "mocks/repl_service.h"

#include "lib/homeobject_impl.hpp"
Expand All @@ -16,37 +13,63 @@ struct BlobRoute {
shard_id shard;
blob_id blob;
};
} // namespace homeobject

inline std::string toString(BlobRoute const& r) { return fmt::format("{}:{}", r.shard, r.blob); }

inline bool operator<(BlobRoute const& lhs, BlobRoute const& rhs) { return toString(lhs) < toString(rhs); }
namespace fmt {
template <>
struct formatter< homeobject::BlobRoute > {
template < typename ParseContext >
constexpr auto parse(ParseContext& ctx) {
return ctx.begin();
}

inline bool operator==(BlobRoute const& lhs, BlobRoute const& rhs) { return toString(lhs) == toString(rhs); }
} // namespace homeobject
template < typename FormatContext >
auto format(const homeobject::BlobRoute& r, FormatContext& ctx) {
return format_to(ctx.out(), "{}:{:012x}", r.shard, r.blob);
}
};
} // namespace fmt

template <>
struct std::hash< homeobject::BlobRoute > {
std::size_t operator()(homeobject::BlobRoute const& r) const noexcept {
return std::hash< std::string >()(homeobject::toString(r));
return std::hash< std::string >()(fmt::format("{}", r));
}
};

namespace homeobject {

using btree = std::unordered_map< BlobRoute, Blob >;
inline bool operator<(BlobRoute const& lhs, BlobRoute const& rhs) {
return fmt::format("{}", lhs) < fmt::format("{}", rhs);
}
inline bool operator==(BlobRoute const& lhs, BlobRoute const& rhs) {
return fmt::format("{}", lhs) == fmt::format("{}", rhs);
}

///
// Used to TombStone Blob's in the Index to defer for GC.
ENUM(BlobState, uint8_t, ALIVE = 0, DELETED);

struct BlobExt {
BlobState _state{BlobState::DELETED};
Blob* _blob;

explicit operator bool() const { return _state == BlobState::ALIVE; }
};
inline bool operator==(BlobExt const& lhs, BlobExt const& rhs) { return lhs._blob == rhs._blob; }

using btree = folly::ConcurrentHashMap< BlobRoute, BlobExt >;

struct ShardIndex {
mutable folly::RWSpinLock _btree_lock;
btree _btree;
blob_id _shard_seq_num{0ull};
std::atomic< blob_id > _shard_seq_num{0ull};
~ShardIndex();
};

class MemoryHomeObject : public HomeObjectImpl {
/// Simulates the Shard=>Chunk mapping in IndexSvc
mutable std::shared_mutex _index_lock;
using index_svc = std::unordered_map< shard_id, ShardIndex >;
using index_svc = folly::ConcurrentHashMap< shard_id, std::unique_ptr< ShardIndex > >;
index_svc _in_memory_index;
std::list< Blob > _garbage;
///

/// Helpers
Expand All @@ -60,7 +83,7 @@ class MemoryHomeObject : public HomeObjectImpl {
BlobManager::NullResult _del_blob(ShardInfo const&, blob_id) override;
///

ShardIndex& _find_index(shard_id);
ShardIndex& _find_index(shard_id) const;

public:
using HomeObjectImpl::HomeObjectImpl;
Expand Down
3 changes: 1 addition & 2 deletions src/lib/memory/shard_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ ShardManager::Result< ShardInfo > MemoryHomeObject::_create_shard(pg_id pg_owner
auto [_, s_happened] = _shard_map.emplace(info.id, iter);
RELEASE_ASSERT(s_happened, "Duplicate Shard insertion!");
}
auto lg = std::scoped_lock(_index_lock);
auto [_, happened] = _in_memory_index.try_emplace(info.id);
auto [it, happened] = _in_memory_index.try_emplace(info.id, std::make_unique< ShardIndex >());
RELEASE_ASSERT(happened, "Could not create BTree!");
return info;
}
Expand Down
13 changes: 8 additions & 5 deletions src/lib/memory/tests/BlobManagerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,11 @@ TEST_F(BlobManagerFixture, BasicTests) {
our_calls.push_back(
m_memory_homeobj->blob_manager()->get(i, _blob_id).deferValue([](auto const& e) {}));
our_calls.push_back(
m_memory_homeobj->blob_manager()->get(_shard_1.id, i).deferValue([](auto const&) {}));
m_memory_homeobj->blob_manager()->get(_shard_1.id, (i - _shard_2.id)).deferValue([](auto const&) {
}));
our_calls.push_back(
m_memory_homeobj->blob_manager()->get(_shard_2.id, i).deferValue([](auto const&) {}));
m_memory_homeobj->blob_manager()->get(_shard_2.id, (i - _shard_2.id)).deferValue([](auto const&) {
}));
our_calls.push_back(m_memory_homeobj->blob_manager()->put(i, Blob()).deferValue(
[](auto const& e) { EXPECT_EQ(BlobError::UNKNOWN_SHARD, e.error()); }));
our_calls.push_back(
Expand All @@ -120,9 +122,10 @@ TEST_F(BlobManagerFixture, BasicTests) {
.deferValue([](auto const& e) { EXPECT_TRUE(!!e); }));
our_calls.push_back(
m_memory_homeobj->blob_manager()->del(i, _blob_id).deferValue([](auto const& e) {}));
our_calls.push_back(m_memory_homeobj->blob_manager()->del(_shard_1.id, i).deferValue([](auto const& e) {
EXPECT_EQ(BlobError::UNKNOWN_BLOB, e.error());
}));
our_calls.push_back(
m_memory_homeobj->blob_manager()->del(_shard_1.id, (i - _shard_2.id)).deferValue([](auto const& e) {
EXPECT_EQ(BlobError::UNKNOWN_BLOB, e.error());
}));
}

auto lg = std::scoped_lock(call_lock);
Expand Down

0 comments on commit ad3860c

Please sign in to comment.