Skip to content

Commit

Permalink
Persistent HomeObject.
Browse files Browse the repository at this point in the history
  • Loading branch information
szmyd committed Sep 26, 2023
1 parent 3fb4869 commit dbe76b0
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 46 deletions.
62 changes: 52 additions & 10 deletions src/lib/file/blob_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

namespace homeobject {

using std::filesystem::path;

#define WITH_SHARD \
auto index_it = index_.find(_shard.id); \
RELEASE_ASSERT(index_.end() != index_it, "Missing BTree!!"); \
Expand All @@ -17,32 +19,72 @@ namespace homeobject {
return folly::makeUnexpected(BlobError::UNKNOWN_BLOB); \
} else

// Write (move) Blob to new BlobExt on heap and Insert BlobExt to Index
#define WITH_SHARD_FILE(mode) \
auto const shard_file = file_store_ / path(fmt::format("{:04x}", (_shard.id >> homeobject::shard_width))) / \
path(fmt::format("{:012x}", (_shard.id & homeobject::shard_mask))); \
auto shard_fd = open(shard_file.string().c_str(), (mode)); \
RELEASE_ASSERT(shard_fd >= 0, "Failed to open Shard {}", shard_file.string());

// Write (move) Blob to FILE
BlobManager::Result< blob_id > FileHomeObject::_put_blob(ShardInfo const& _shard, Blob&& _blob) {
WITH_SHARD
WITH_ROUTE(shard.shard_seq_num_++)

auto [_, happened] =
shard.btree_.try_emplace(route, BlobExt{.state_ = BlobState::ALIVE, .blob_ = new Blob(std::move(_blob))});
nlohmann::json j;
j["user_key"] = _blob.user_key;
j["object_off"] = _blob.object_off;
j["body_size"] = _blob.body.size;
auto serialize = j.dump();
auto const h_size = serialize.size();
auto const t_size = sizeof(size_t) + h_size + _blob.body.size;

WITH_ROUTE(shard.shard_offset_.fetch_add(t_size, std::memory_order_relaxed))
WITH_SHARD_FILE(O_WRONLY)

auto err = pwrite(shard_fd, &h_size, sizeof(h_size), route.blob);
RELEASE_ASSERT(0 < err, "Failed to write to: {}", shard_file.string());
err = err || pwrite(shard_fd, serialize.c_str(), h_size, sizeof(h_size) + route.blob);
RELEASE_ASSERT(0 < err, "Failed to write to: {}", shard_file.string());
err = err || pwrite(shard_fd, _blob.body.bytes, _blob.body.size, sizeof(h_size) + h_size + route.blob);
RELEASE_ASSERT(0 < err, "Failed to write to: {}", shard_file.string());
auto [_, happened] = shard.btree_.try_emplace(route, true);
RELEASE_ASSERT(happened, "Generated duplicate BlobRoute!");
close(shard_fd);
return route.blob;
}

// Lookup BlobExt and duplicate underyling Blob for user; only *safe* because we defer GC.
// Lookup and duplicate underyling Blob for user; only *safe* because we defer GC.
BlobManager::Result< Blob > FileHomeObject::_get_blob(ShardInfo const& _shard, blob_id _blob) const {
WITH_SHARD
WITH_ROUTE(_blob)
IF_BLOB_ALIVE { return blob_it->second.blob_->clone(); }
IF_BLOB_ALIVE {
WITH_SHARD_FILE(O_RDONLY)
size_t h_size = 0ull;
auto err = pread(shard_fd, &h_size, sizeof(h_size), route.blob);
RELEASE_ASSERT(0 < err, "Failed to read from: {}", shard_file.string());

auto j_str = std::string(h_size, '\0');
err = pread(shard_fd, const_cast< char* >(j_str.c_str()), h_size, sizeof(h_size) + route.blob);
RELEASE_ASSERT(0 < err, "Failed to read from: {}", shard_file.string());
auto shard_json = nlohmann::json::parse(j_str);

auto const body_size = shard_json["body_size"].get< uint64_t >();
auto b = Blob{sisl::io_blob_safe(body_size), "", 0};
err = pread(shard_fd, b.body.bytes, body_size, sizeof(h_size) + h_size + route.blob);
RELEASE_ASSERT(0 < err, "Failed to read from: {}", shard_file.string());

b.user_key = shard_json["user_key"].get< std::string >();
b.object_off = shard_json["object_off"].get< uint64_t >();
close(shard_fd);
return b;
}
}

// Tombstone BlobExt entry
// Tombstone entry
BlobManager::NullResult FileHomeObject::_del_blob(ShardInfo const& _shard, blob_id _blob) {
WITH_SHARD
WITH_ROUTE(_blob)
IF_BLOB_ALIVE {
auto del_blob = BlobExt();
del_blob.blob_ = blob_it->second.blob_;
shard.btree_.assign_if_equal(route, blob_it->second, std::move(del_blob));
shard.btree_.assign_if_equal(route, blob_it->second, false);
return folly::Unit();
}
}
Expand Down
9 changes: 2 additions & 7 deletions src/lib/file/homeobject.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ namespace homeobject {

/// NOTE: We give ourselves the option to provide a different HR instance here than libhomeobject.a
extern std::shared_ptr< HomeObject > init_homeobject(std::weak_ptr< HomeObjectApplication >&& application) {
auto instance = std::make_shared< FileHomeObject >(std::move(application));
auto devices = application.lock()->devices();
auto instance = std::make_shared< FileHomeObject >(std::move(application), *devices.begin());
instance->init_repl_svc();
return instance;
}
Expand All @@ -21,10 +22,4 @@ void HomeObjectImpl::init_repl_svc() {
}
}

ShardIndex::~ShardIndex() {
for (auto it = btree_.begin(); it != btree_.end(); ++it) {
delete it->second.blob_;
}
}

} // namespace homeobject
22 changes: 5 additions & 17 deletions src/lib/file/homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,13 @@

namespace homeobject {

///
// Used to TombStone Blob's in the Index to defer for GC.
ENUM(BlobState, uint8_t, ALIVE = 0, DELETED);

struct BlobExt {
BlobState state_{BlobState::DELETED};
Blob* blob_;

explicit operator bool() const { return state_ == BlobState::ALIVE; }
bool operator==(const BlobExt& rhs) const { return blob_ == rhs.blob_; }
};

struct ShardIndex {
folly::ConcurrentHashMap< BlobRoute, BlobExt > btree_;
std::atomic< blob_id > shard_seq_num_{0ull};
~ShardIndex();
folly::ConcurrentHashMap< BlobRoute, bool > btree_;
std::atomic< blob_id > shard_offset_{0ull};
};

class FileHomeObject : public HomeObjectImpl {
std::filesystem::path const file_store_ = "file_store";
std::filesystem::path const file_store_;

/// Simulates the Shard=>Chunk mapping in IndexSvc
using index_svc = folly::ConcurrentHashMap< shard_id, std::unique_ptr< ShardIndex > >;
Expand All @@ -50,7 +37,8 @@ class FileHomeObject : public HomeObjectImpl {
///

public:
using HomeObjectImpl::HomeObjectImpl;
FileHomeObject(std::weak_ptr< HomeObjectApplication >&& application, std::filesystem::path const& root) :
HomeObjectImpl::HomeObjectImpl(std::move(application)), file_store_(root) {}
~FileHomeObject() override = default;
};

Expand Down
26 changes: 21 additions & 5 deletions src/lib/file/shard_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,33 @@ ShardManager::Result< ShardInfo > FileHomeObject::_create_shard(pg_id pg_owner,
}

auto const shard_path = file_store_ / path(fmt::format("{:04x}", (info.id >> homeobject::shard_width)));
RELEASE_ASSERT(std::filesystem::create_directories(shard_path), "Could not create directory: {}",
shard_path.string());
std::filesystem::create_directories(shard_path);

auto const shard_file = shard_path / path(fmt::format("{:012x}", (info.id & homeobject::shard_mask)));
RELEASE_ASSERT(!std::filesystem::exists(shard_file), "Shard Path Exists! [path={}]", shard_file.string());
std::ofstream ofs{shard_path, std::ios::binary | std::ios::out | std::ios::trunc};
std::filesystem::resize_file(shard_path, max_shard_size());
std::ofstream ofs{shard_file, std::ios::binary | std::ios::out | std::ios::trunc};
std::filesystem::resize_file(shard_file, max_shard_size());
RELEASE_ASSERT(std::filesystem::exists(shard_file), "Shard Path Failed Creation! [path={}]", shard_file.string());

auto [_, happened] = index_.try_emplace(info.id, std::make_unique< ShardIndex >());
auto shard_fd = open(shard_file.string().c_str(), O_WRONLY);
RELEASE_ASSERT(shard_fd >= 0, "Failed to open Shard {}", shard_file.string());

nlohmann::json j;
j["shard_id"] = info.id;
j["pg_id"] = info.placement_group;
j["state"] = info.state;
j["created_time"] = info.created_time;
j["modified_time"] = info.last_modified_time;
j["total_capacity"] = info.total_capacity_bytes;
j["available_capacity"] = info.available_capacity_bytes;
j["deleted_capacity"] = info.deleted_capacity_bytes;
auto serialize = j.dump();
auto err = pwrite(shard_fd, serialize.c_str(), serialize.size(), 0ull);
RELEASE_ASSERT(0 < err, "Failed to write to: {}", shard_file.string());

auto [it, happened] = index_.try_emplace(info.id, std::make_unique< ShardIndex >());
RELEASE_ASSERT(happened, "Could not create BTree!");
it->second->shard_offset_.store(serialize.size());
return info;
}

Expand Down
12 changes: 5 additions & 7 deletions src/lib/tests/fixture_app.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ class FixtureApp : public homeobject::HomeObjectApplication {
FixtureApp() {
clean();
LOGINFO("creating device {} file with size {} ", path_, homestore::in_bytes(2 * Gi));
std::ofstream ofs{path_, std::ios::binary | std::ios::out | std::ios::trunc};
std::filesystem::resize_file(path_, 2 * Gi);
}

~FixtureApp() override { clean(); }
Expand All @@ -40,12 +38,12 @@ class FixtureApp : public homeobject::HomeObjectApplication {
uint32_t threads() const override { return 2; }

void clean() {
if (std::filesystem::exists(path_)) { std::filesystem::remove(path_); }
if (std::filesystem::exists(path_)) { std::filesystem::remove_all(path_); }
}

std::list< std::filesystem::path > devices() const override {
auto device_info = std::list< std::filesystem::path >();
device_info.emplace_back(std::filesystem::canonical(path_));
device_info.emplace_back(std::filesystem::weakly_canonical(path_));
return device_info;
}

Expand Down Expand Up @@ -94,9 +92,9 @@ class TestFixture : public ::testing::Test {
EXPECT_EQ(BlobError::UNKNOWN_BLOB, g_e.error());

LOGDEBUG("Insert Blob to: {}", _shard_1.id);
auto o_e = homeobj_->blob_manager()
->put(_shard_1.id, Blob{sisl::io_blob_safe(4 * Ki, 512u), "test_blob", 4 * Mi})
.get();
auto blob_data = sisl::io_blob_safe(4 * Ki, 512u);
sprintf((char*)blob_data.bytes, "HELLO, WORLD!");
auto o_e = homeobj_->blob_manager()->put(_shard_1.id, Blob{std::move(blob_data), "test_blob", 4 * Mi}).get();
EXPECT_TRUE(!!o_e);
o_e.then([this](auto&& b) mutable { _blob_id = std::move(b); });
}
Expand Down

0 comments on commit dbe76b0

Please sign in to comment.