Skip to content

Commit

Permalink
Reconstitute shards.
Browse files Browse the repository at this point in the history
  • Loading branch information
szmyd committed Sep 29, 2023
1 parent ad12570 commit da8686f
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 12 deletions.
29 changes: 19 additions & 10 deletions src/lib/file_backend/file_blob_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ using std::filesystem::path;
auto& shard = *index_it->second;

#define WITH_ROUTE(blob) \
auto const route = BlobRoute{_shard.id, (blob)}; \
auto route = BlobRoute{_shard.id, (blob)}; \
LOGTRACEMOD(homeobject, "[route={}]", route);

#define IF_BLOB_ALIVE \
Expand Down Expand Up @@ -55,24 +55,33 @@ BlobManager::Result< blob_id_t > FileHomeObject::_put_blob(ShardInfo const& _sha
return route.blob;
}

nlohmann::json FileHomeObject::_read_blob_header(int shard_fd, blob_id_t& blob_id) {
size_t h_size = 0ull;
auto err = pread(shard_fd, &h_size, sizeof(h_size), blob_id);
RELEASE_ASSERT(0 < err, "failed to read from shard");
blob_id += sizeof(h_size);

auto j_str = std::string(h_size, '\0');
err = pread(shard_fd, const_cast< char* >(j_str.c_str()), h_size, blob_id);
blob_id += h_size;
try {
if (0 <= err) return nlohmann::json::parse(j_str);
LOGE("failed to read: {}", strerror(errno));
} catch (nlohmann::exception const&) { LOGT("no blob @ [blob_id={}]", blob_id); }
return nlohmann::json{};
}

// Lookup and duplicate underyling Blob for user; only *safe* because we defer GC.
BlobManager::Result< Blob > FileHomeObject::_get_blob(ShardInfo const& _shard, blob_id_t _blob) const {
WITH_SHARD
WITH_ROUTE(_blob)
IF_BLOB_ALIVE {
WITH_SHARD_FILE(O_RDONLY)
size_t h_size = 0ull;
auto err = pread(shard_fd, &h_size, sizeof(h_size), route.blob);
RELEASE_ASSERT(0 < err, "Failed to read from: {}", shard_file.string());

auto j_str = std::string(h_size, '\0');
err = pread(shard_fd, const_cast< char* >(j_str.c_str()), h_size, sizeof(h_size) + route.blob);
RELEASE_ASSERT(0 < err, "Failed to read from: {}", shard_file.string());
auto blob_json = nlohmann::json::parse(j_str);
auto blob_json = _read_blob_header(shard_fd, route.blob);

auto const body_size = blob_json["body_size"].get< uint64_t >();
auto b = Blob{sisl::io_blob_safe(body_size), "", 0};
err = pread(shard_fd, b.body.bytes, body_size, sizeof(h_size) + h_size + route.blob);
auto err = pread(shard_fd, b.body.bytes, body_size, route.blob);
RELEASE_ASSERT(0 < err, "Failed to read from: {}", shard_file.string());

b.user_key = blob_json["user_key"].get< std::string >();
Expand Down
23 changes: 21 additions & 2 deletions src/lib/file_backend/file_homeobject.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ void FileHomeObject::_recover() {
RELEASE_ASSERT(0 < err, "Failed to read from: {}", shard_file);
auto shard_json = nlohmann::json::parse(j_str);

// Reconsitute the ShardInfo
auto info = ShardInfo();
info.id = shard_json["shard_id"].get< shard_id_t >();
info.placement_group = shard_json["pg_id"].get< pg_id_t >();
Expand All @@ -56,10 +57,27 @@ void FileHomeObject::_recover() {
info.total_capacity_bytes = shard_json["total_capacity"].get< uint64_t >();
info.deleted_capacity_bytes = shard_json["deleted_capacity"].get< uint64_t >();

auto [it, happened] = index_.try_emplace(info.id, std::make_unique< ShardIndex >());
RELEASE_ASSERT(happened, "duplicate Shard recovery!");

// Scan Shard for shard_offset_ (next blob_id)
auto blob_id = sizeof(h_size) + h_size;
while (max_shard_size() > blob_id) {
auto route = BlobRoute{info.id, blob_id};
auto blob_hdr = _read_blob_header(shard_fd, blob_id);
if (blob_hdr.is_null()) break;
auto blob_size = blob_hdr["body_size"].get< uint64_t >();
LOGT("found [blob={}], [user_key={}], [size={}]", route, blob_hdr["user_key"].get< std::string >(),
blob_size);
blob_id += blob_size;
}
LOGI("[shard={}] reconstituted to: [offset={}]", info.id, blob_id);
it->second->shard_offset_.store(blob_id);
close(shard_fd);

auto iter = s_list.emplace(s_list.end(), Shard(info));
auto [_, s_happened] = _shard_map.emplace(info.id, iter);
RELEASE_ASSERT(s_happened, "Duplicate Shard insertion!");
close(shard_fd);
RELEASE_ASSERT(s_happened, "duplicate Shard recovery!");
}
}
}
Expand All @@ -71,6 +89,7 @@ FileHomeObject::FileHomeObject(std::weak_ptr< HomeObjectApplication >&& applicat
if (std::filesystem::exists(file_store_)) {
auto id_fd = open(id_file.string().c_str(), O_RDONLY);
auto err = pread(id_fd, &_our_id, sizeof(_our_id), 0ull);
close(id_fd);
RELEASE_ASSERT(0 < err, "Failed to write to: {}", id_file.string());
LOGI("recovering: {}", to_string(_our_id));
_recover();
Expand Down
1 change: 1 addition & 0 deletions src/lib/file_backend/file_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class FileHomeObject : public HomeObjectImpl {
PGMember const& new_member) override;
///

static nlohmann::json _read_blob_header(int shard_fd, blob_id_t& blob_id);
void _recover();

public:
Expand Down

0 comments on commit da8686f

Please sign in to comment.