From da8686fea57448547fa095931c1f78e45aacf751 Mon Sep 17 00:00:00 2001 From: Brian Szmyd Date: Fri, 29 Sep 2023 09:18:15 -0700 Subject: [PATCH] Reconstitute shards. --- src/lib/file_backend/file_blob_manager.cpp | 29 ++++++++++++++-------- src/lib/file_backend/file_homeobject.cpp | 23 +++++++++++++++-- src/lib/file_backend/file_homeobject.hpp | 1 + 3 files changed, 41 insertions(+), 12 deletions(-) diff --git a/src/lib/file_backend/file_blob_manager.cpp b/src/lib/file_backend/file_blob_manager.cpp index 7c3643fc..2fbd1c5c 100644 --- a/src/lib/file_backend/file_blob_manager.cpp +++ b/src/lib/file_backend/file_blob_manager.cpp @@ -13,7 +13,7 @@ using std::filesystem::path; auto& shard = *index_it->second; #define WITH_ROUTE(blob) \ - auto const route = BlobRoute{_shard.id, (blob)}; \ + auto route = BlobRoute{_shard.id, (blob)}; \ LOGTRACEMOD(homeobject, "[route={}]", route); #define IF_BLOB_ALIVE \ @@ -55,24 +55,33 @@ BlobManager::Result< blob_id_t > FileHomeObject::_put_blob(ShardInfo const& _sha return route.blob; } +nlohmann::json FileHomeObject::_read_blob_header(int shard_fd, blob_id_t& blob_id) { + size_t h_size = 0ull; + auto err = pread(shard_fd, &h_size, sizeof(h_size), blob_id); + RELEASE_ASSERT(0 < err, "failed to read from shard"); + blob_id += sizeof(h_size); + + auto j_str = std::string(h_size, '\0'); + err = pread(shard_fd, const_cast< char* >(j_str.c_str()), h_size, blob_id); + blob_id += h_size; + try { + if (0 <= err) return nlohmann::json::parse(j_str); + LOGE("failed to read: {}", strerror(errno)); + } catch (nlohmann::exception const&) { LOGT("no blob @ [blob_id={}]", blob_id); } + return nlohmann::json{}; +} + // Lookup and duplicate underyling Blob for user; only *safe* because we defer GC. BlobManager::Result< Blob > FileHomeObject::_get_blob(ShardInfo const& _shard, blob_id_t _blob) const { WITH_SHARD WITH_ROUTE(_blob) IF_BLOB_ALIVE { WITH_SHARD_FILE(O_RDONLY) - size_t h_size = 0ull; - auto err = pread(shard_fd, &h_size, sizeof(h_size), route.blob); - RELEASE_ASSERT(0 < err, "Failed to read from: {}", shard_file.string()); - - auto j_str = std::string(h_size, '\0'); - err = pread(shard_fd, const_cast< char* >(j_str.c_str()), h_size, sizeof(h_size) + route.blob); - RELEASE_ASSERT(0 < err, "Failed to read from: {}", shard_file.string()); - auto blob_json = nlohmann::json::parse(j_str); + auto blob_json = _read_blob_header(shard_fd, route.blob); auto const body_size = blob_json["body_size"].get< uint64_t >(); auto b = Blob{sisl::io_blob_safe(body_size), "", 0}; - err = pread(shard_fd, b.body.bytes, body_size, sizeof(h_size) + h_size + route.blob); + auto err = pread(shard_fd, b.body.bytes, body_size, route.blob); RELEASE_ASSERT(0 < err, "Failed to read from: {}", shard_file.string()); b.user_key = blob_json["user_key"].get< std::string >(); diff --git a/src/lib/file_backend/file_homeobject.cpp b/src/lib/file_backend/file_homeobject.cpp index 0401b18d..b57d2466 100644 --- a/src/lib/file_backend/file_homeobject.cpp +++ b/src/lib/file_backend/file_homeobject.cpp @@ -46,6 +46,7 @@ void FileHomeObject::_recover() { RELEASE_ASSERT(0 < err, "Failed to read from: {}", shard_file); auto shard_json = nlohmann::json::parse(j_str); + // Reconsitute the ShardInfo auto info = ShardInfo(); info.id = shard_json["shard_id"].get< shard_id_t >(); info.placement_group = shard_json["pg_id"].get< pg_id_t >(); @@ -56,10 +57,27 @@ void FileHomeObject::_recover() { info.total_capacity_bytes = shard_json["total_capacity"].get< uint64_t >(); info.deleted_capacity_bytes = shard_json["deleted_capacity"].get< uint64_t >(); + auto [it, happened] = index_.try_emplace(info.id, std::make_unique< ShardIndex >()); + RELEASE_ASSERT(happened, "duplicate Shard recovery!"); + + // Scan Shard for shard_offset_ (next blob_id) + auto blob_id = sizeof(h_size) + h_size; + while (max_shard_size() > blob_id) { + auto route = BlobRoute{info.id, blob_id}; + auto blob_hdr = _read_blob_header(shard_fd, blob_id); + if (blob_hdr.is_null()) break; + auto blob_size = blob_hdr["body_size"].get< uint64_t >(); + LOGT("found [blob={}], [user_key={}], [size={}]", route, blob_hdr["user_key"].get< std::string >(), + blob_size); + blob_id += blob_size; + } + LOGI("[shard={}] reconstituted to: [offset={}]", info.id, blob_id); + it->second->shard_offset_.store(blob_id); + close(shard_fd); + auto iter = s_list.emplace(s_list.end(), Shard(info)); auto [_, s_happened] = _shard_map.emplace(info.id, iter); - RELEASE_ASSERT(s_happened, "Duplicate Shard insertion!"); - close(shard_fd); + RELEASE_ASSERT(s_happened, "duplicate Shard recovery!"); } } } @@ -71,6 +89,7 @@ FileHomeObject::FileHomeObject(std::weak_ptr< HomeObjectApplication >&& applicat if (std::filesystem::exists(file_store_)) { auto id_fd = open(id_file.string().c_str(), O_RDONLY); auto err = pread(id_fd, &_our_id, sizeof(_our_id), 0ull); + close(id_fd); RELEASE_ASSERT(0 < err, "Failed to write to: {}", id_file.string()); LOGI("recovering: {}", to_string(_our_id)); _recover(); diff --git a/src/lib/file_backend/file_homeobject.hpp b/src/lib/file_backend/file_homeobject.hpp index a6045e42..4b0f5398 100644 --- a/src/lib/file_backend/file_homeobject.hpp +++ b/src/lib/file_backend/file_homeobject.hpp @@ -41,6 +41,7 @@ class FileHomeObject : public HomeObjectImpl { PGMember const& new_member) override; /// + static nlohmann::json _read_blob_header(int shard_fd, blob_id_t& blob_id); void _recover(); public: