Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

File based implementation without Replication. #76

Closed
wants to merge 24 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,13 @@ def package_info(self):
self.cpp_info.components["homestore"].requires = ["homestore::homestore"]
self.cpp_info.components["memory"].libs = ["homeobject_memory"]
self.cpp_info.components["memory"].requires = ["homestore::homestore"]
self.cpp_info.components["file"].libs = ["homeobject_file", "home_replication_mock"]
self.cpp_info.components["file"].requires = ["homestore::homestore"]

if self.settings.os == "Linux":
self.cpp_info.components["homestore"].system_libs.append("pthread")
self.cpp_info.components["memory"].system_libs.append("pthread")
self.cpp_info.components["file"].system_libs.append("pthread")
if self.options.sanitize:
self.cpp_info.components["memory"].sharedlinkflags.append("-fsanitize=address")
self.cpp_info.components["memory"].exelinkflags.append("-fsanitize=address")
Expand Down
1 change: 1 addition & 0 deletions src/lib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ endif()

add_subdirectory(homestore_backend)
add_subdirectory(memory_backend)
add_subdirectory(file_backend)
26 changes: 26 additions & 0 deletions src/lib/file_backend/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
cmake_minimum_required (VERSION 3.11)

add_library ("${PROJECT_NAME}_file")
target_sources("${PROJECT_NAME}_file" PRIVATE
file_homeobject.cpp
file_blob_manager.cpp
file_shard_manager.cpp
file_pg_manager.cpp
$<TARGET_OBJECTS:${PROJECT_NAME}_core>
)
target_link_libraries("${PROJECT_NAME}_file"
${COMMON_DEPS}
)

if(BUILD_TESTING)
add_executable (file_test)
target_sources(file_test PRIVATE
$<TARGET_OBJECTS:test_fixture>
)
target_link_libraries(file_test
homeobject_file
${COMMON_TEST_DEPS}
)
add_test(NAME FileTestCPU COMMAND memory_test -csv error --executor cpu --num_iters 10000)
add_test(NAME FileTestIO COMMAND memory_test -csv error --executor io --num_iters 10000)
endif()
99 changes: 99 additions & 0 deletions src/lib/file_backend/file_blob_manager.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
#include <fcntl.h>
#include <unistd.h>

#include "file_homeobject.hpp"

namespace homeobject {

using std::filesystem::path;

#define WITH_SHARD \
auto index_it = index_.find(_shard.id); \
RELEASE_ASSERT(index_.end() != index_it, "Missing BTree!!"); \
auto& shard = *index_it->second;

#define WITH_ROUTE(blob) \
auto route = BlobRoute{_shard.id, (blob)}; \
LOGTRACEMOD(homeobject, "[route={}]", route);

#define IF_BLOB_ALIVE \
if (auto blob_it = shard.btree_.find(route); shard.btree_.end() == blob_it || !blob_it->second) { \
LOGWARNMOD(homeobject, "[route={}] missing", route); \
return folly::makeUnexpected(BlobError::UNKNOWN_BLOB); \
} else

// Write (move) Blob to FILE
BlobManager::AsyncResult< blob_id_t > FileHomeObject::_put_blob(ShardInfo const& _shard, Blob&& _blob) {
WITH_SHARD

nlohmann::json j;
j["user_key"] = _blob.user_key;
j["object_off"] = _blob.object_off;
j["body_size"] = _blob.body.size;
auto serialize = j.dump();
auto const h_size = serialize.size();
auto const t_size = sizeof(size_t) + h_size + _blob.body.size;

WITH_ROUTE(shard.shard_offset_.fetch_add(t_size, std::memory_order_relaxed))
if (route.blob + t_size > max_shard_size()) return folly::makeUnexpected(BlobError::INVALID_ARG);

auto err = pwrite(shard.fd_, &h_size, sizeof(h_size), route.blob);
RELEASE_ASSERT(0 < err, "failed to write to: {route=}", route);
err = pwrite(shard.fd_, serialize.c_str(), h_size, sizeof(h_size) + route.blob);
RELEASE_ASSERT(0 < err, "failed to write to: {route=}", route);
err = pwrite(shard.fd_, _blob.body.bytes, _blob.body.size, sizeof(h_size) + h_size + route.blob);
RELEASE_ASSERT(0 < err, "failed to write to: {route=}", route);
auto [_, happened] = shard.btree_.try_emplace(route, true);
RELEASE_ASSERT(happened, "Generated duplicate BlobRoute!");
return route.blob;
}

nlohmann::json FileHomeObject::_read_blob_header(int shard_fd, blob_id_t& blob_id) {
size_t h_size = 0ull;
auto err = pread(shard_fd, &h_size, sizeof(h_size), blob_id);
RELEASE_ASSERT(0 < err, "failed to read from shard");
blob_id += sizeof(h_size);

auto j_str = std::string(h_size, '\0');
err = pread(shard_fd, const_cast< char* >(j_str.c_str()), h_size, blob_id);
blob_id += h_size;
try {
if (0 <= err) return nlohmann::json::parse(j_str);
LOGE("failed to read: {}", strerror(errno));
} catch (nlohmann::json::exception const&) { LOGT("no blob @ [blob_id={}]", blob_id); }
return nlohmann::json{};
}

// Lookup and duplicate underyling Blob for user; only *safe* because we defer GC.
BlobManager::AsyncResult< Blob > FileHomeObject::_get_blob(ShardInfo const& _shard, blob_id_t _blob, uint64_t off,
uint64_t len) const {
WITH_SHARD
WITH_ROUTE(_blob)
IF_BLOB_ALIVE {
auto blob_json = _read_blob_header(shard.fd_, route.blob);
auto body_size = blob_json["body_size"].get< uint64_t >();
if ((off < body_size) && (0 < len))
len = (off + len) > body_size ? (body_size - off) : len;
else
len = 0;
auto b = Blob{sisl::io_blob_safe(len), blob_json["user_key"].get< std::string >(),
blob_json["object_off"].get< uint64_t >()};
if (0 < len) {
auto err = pread(shard.fd_, b.body.bytes, len, route.blob + off);
RELEASE_ASSERT(0 < err, "Failed to read from: [route={}]", route);
}
return b;
}
}

// Tombstone entry
BlobManager::NullAsyncResult FileHomeObject::_del_blob(ShardInfo const& _shard, blob_id_t _blob) {
WITH_SHARD
WITH_ROUTE(_blob)
IF_BLOB_ALIVE {
shard.btree_.assign_if_equal(route, blob_it->second, false);
return folly::Unit();
}
}

} // namespace homeobject
107 changes: 107 additions & 0 deletions src/lib/file_backend/file_homeobject.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
#include "file_homeobject.hpp"

#include <fcntl.h>
#include <unistd.h>
#include <filesystem>
#include <system_error>

#include <boost/uuid/random_generator.hpp>

SISL_OPTION_GROUP(homeobject_file,
(max_filesize, "", "max_filesize", "Maximum File (Shard) size",
cxxopts::value< uint32_t >()->default_value("1024"), "mb"))

namespace homeobject {

/// NOTE: We give ourselves the option to provide a different HR instance here than libhomeobject.a
extern std::shared_ptr< HomeObject > init_homeobject(std::weak_ptr< HomeObjectApplication >&& application) {
auto devices = application.lock()->devices();
auto instance = std::make_shared< FileHomeObject >(std::move(application), *devices.begin());
return instance;
}
void FileHomeObject::_recover() {
for (auto const& pg_dir_e : std::filesystem::directory_iterator{file_store_}) {
auto pg_dir = pg_dir_e.path();
if (pg_dir.filename().string() == "svc_id") continue;
LOGI("discovered [pg_dir={}]", pg_dir.string());
auto pgid = std::stoul(pg_dir.filename().string());
LOGI("discovered [pg_dir={}] [pg_id={}]", pg_dir.string(), pgid);
auto pg = PGInfo(pgid);
pg.replica_set_uuid = boost::uuids::random_generator()();
auto [it, happened] = _pg_map.try_emplace(pgid, std::make_unique< PG >(std::move(pg)));
auto& s_list = it->second->shards_;
RELEASE_ASSERT(happened, "Unknown map insert error!");
for (auto const& shard_file_e : std::filesystem::directory_iterator{pg_dir_e}) {
auto shard_file = shard_file_e.path().string();
LOGI("discovered [shard_file={}]", shard_file);
auto shard_fd = open(shard_file.c_str(), O_RDONLY);
RELEASE_ASSERT(shard_fd >= 0, "Failed to open Shard {}", shard_file);

size_t h_size = 0ull;
auto err = pread(shard_fd, &h_size, sizeof(h_size), 0ull);
RELEASE_ASSERT(0 < err, "Failed to read from: {}", shard_file);

auto j_str = std::string(h_size, '\0');
err = pread(shard_fd, const_cast< char* >(j_str.c_str()), h_size, sizeof(h_size));
RELEASE_ASSERT(0 < err, "Failed to read from: {}", shard_file);
auto shard_json = nlohmann::json::parse(j_str);

// Reconsitute the ShardInfo
auto info = ShardInfo();
info.id = shard_json["shard_id"].get< shard_id_t >();
info.placement_group = shard_json["pg_id"].get< pg_id_t >();
info.state = shard_json["state"].get< ShardInfo::State >();
info.created_time = shard_json["created_time"].get< uint64_t >();
info.last_modified_time = shard_json["modified_time"].get< uint64_t >();
info.available_capacity_bytes = shard_json["available_capacity"].get< uint64_t >();
info.total_capacity_bytes = shard_json["total_capacity"].get< uint64_t >();
info.deleted_capacity_bytes = shard_json["deleted_capacity"].get< uint64_t >();

auto [it, happened] = index_.try_emplace(info.id, std::make_unique< ShardIndex >());
RELEASE_ASSERT(happened, "duplicate Shard recovery!");

// Scan Shard for shard_offset_ (next blob_id)
auto blob_id = sizeof(h_size) + h_size;
while (max_shard_size() > blob_id) {
auto route = BlobRoute{info.id, blob_id};
auto blob_hdr = _read_blob_header(shard_fd, blob_id);
if (blob_hdr.is_null()) break;
auto blob_size = blob_hdr["body_size"].get< uint64_t >();
LOGT("found [blob={}], [user_key={}], [size={}]", route, blob_hdr["user_key"].get< std::string >(),
blob_size);
blob_id += blob_size;
}
LOGI("[shard={}] reconstituted to: [offset={}]", info.id, blob_id);
it->second->fd_ = shard_fd;
it->second->shard_offset_.store(blob_id);

auto iter = s_list.emplace(s_list.end(), std::make_unique< Shard >(info));
auto [_, s_happened] = _shard_map.emplace(info.id, iter);
RELEASE_ASSERT(s_happened, "duplicate Shard recovery!");
}
}
}

FileHomeObject::FileHomeObject(std::weak_ptr< HomeObjectApplication >&& application,
std::filesystem::path const& root) :
HomeObjectImpl::HomeObjectImpl(std::move(application)), file_store_(root) {
auto const id_file = file_store_ / "svc_id";
if (std::filesystem::exists(file_store_)) {
auto id_fd = open(id_file.string().c_str(), O_RDONLY);
auto err = pread(id_fd, &_our_id, sizeof(_our_id), 0ull);
close(id_fd);
RELEASE_ASSERT(0 < err, "Failed to write to: {}", id_file.string());
LOGI("recovering: {}", to_string(_our_id));
_recover();
}
_our_id = _application.lock()->discover_svcid(_our_id);
std::filesystem::create_directories(file_store_);
std::ofstream ofs{id_file, std::ios::binary | std::ios::out | std::ios::trunc};
std::filesystem::resize_file(id_file, 4 * Ki);
auto id_fd = open(id_file.string().c_str(), O_WRONLY);
RELEASE_ASSERT(0 < id_fd, "Failed to open: {}", id_file.string());
auto err = pwrite(id_fd, &_our_id, sizeof(_our_id), 0ull);
RELEASE_ASSERT(0 < err, "Failed to write to: {}", id_file.string());
}

} // namespace homeobject
55 changes: 55 additions & 0 deletions src/lib/file_backend/file_homeobject.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#pragma once

#include <atomic>
#include <filesystem>
#include <utility>

#include <folly/concurrency/ConcurrentHashMap.h>

#include "lib/homeobject_impl.hpp"
#include "lib/blob_route.hpp"

namespace homeobject {

struct ShardIndex {
folly::ConcurrentHashMap< BlobRoute, bool > btree_;
int fd_{-1};
std::atomic< blob_id_t > shard_offset_{0ull};
~ShardIndex();
};

class FileHomeObject : public HomeObjectImpl {
std::filesystem::path const file_store_;

/// Simulates the Shard=>Chunk mapping in IndexSvc
using index_svc = folly::ConcurrentHashMap< shard_id_t, std::unique_ptr< ShardIndex > >;
index_svc index_;
///

/// Helpers
// ShardManager
ShardManager::AsyncResult< ShardInfo > _create_shard(pg_id_t, uint64_t size_bytes) override;
ShardManager::AsyncResult< ShardInfo > _seal_shard(ShardInfo const&) override;

// BlobManager
BlobManager::AsyncResult< blob_id_t > _put_blob(ShardInfo const&, Blob&&) override;
BlobManager::AsyncResult< Blob > _get_blob(ShardInfo const&, blob_id_t, uint64_t off = 0,
uint64_t len = 0) const override;
BlobManager::NullAsyncResult _del_blob(ShardInfo const&, blob_id_t) override;
///

// PGManager
PGManager::NullAsyncResult _create_pg(PGInfo&& pg_info, std::set< std::string, std::less<> > peers) override;
PGManager::NullAsyncResult _replace_member(pg_id_t id, peer_id_t const& old_member,
PGMember const& new_member) override;
///

static nlohmann::json _read_blob_header(int shard_fd, blob_id_t& blob_id);
void _recover();

public:
FileHomeObject(std::weak_ptr< HomeObjectApplication >&& application, std::filesystem::path const& root);
~FileHomeObject() override = default;
};

} // namespace homeobject
19 changes: 19 additions & 0 deletions src/lib/file_backend/file_pg_manager.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#include "file_homeobject.hpp"

namespace homeobject {
PGManager::NullAsyncResult FileHomeObject::_create_pg(PGInfo&& pg_info, std::set< std::string, std::less<> >) {
auto lg = std::scoped_lock(_pg_lock);

auto const pg_path = file_store_ / std::filesystem::path(fmt::format("{:04x}", pg_info.id));
std::filesystem::create_directories(pg_path);

auto [it1, _] = _pg_map.try_emplace(pg_info.id, std::make_unique< PG >(pg_info));
RELEASE_ASSERT(_pg_map.end() != it1, "Unknown map insert error!");
return folly::makeSemiFuture< PGManager::NullResult >(folly::Unit());
}

PGManager::NullAsyncResult FileHomeObject::_replace_member(pg_id_t id, peer_id_t const& old_member,
PGMember const& new_member) {
return folly::makeSemiFuture< PGManager::NullResult >(folly::makeUnexpected(PGError::UNSUPPORTED_OP));
}
} // namespace homeobject
Loading
Loading