diff --git a/src/include/homeobject/blob_manager.hpp b/src/include/homeobject/blob_manager.hpp index 4297e955..6d6b9544 100644 --- a/src/include/homeobject/blob_manager.hpp +++ b/src/include/homeobject/blob_manager.hpp @@ -9,8 +9,8 @@ namespace homeobject { -ENUM(BlobError, uint16_t, UNKNOWN = 1, TIMEOUT, INVALID_ARG, NOT_LEADER, UNKNOWN_SHARD, UNKNOWN_BLOB, - CHECKSUM_MISMATCH); +ENUM(BlobError, uint16_t, UNKNOWN = 1, TIMEOUT, INVALID_ARG, NOT_LEADER, UNKNOWN_SHARD, UNKNOWN_BLOB, CHECKSUM_MISMATCH, + UNKNOWN_PG, PG_NOT_READY, READ_FAILED, INDEX_ERROR); struct Blob { Blob(sisl::io_blob_safe b, std::string const& u, uint64_t o) : body(std::move(b)), user_key(u), object_off(o) {} @@ -18,8 +18,8 @@ struct Blob { Blob clone() const; sisl::io_blob_safe body; - std::string user_key; - uint64_t object_off; + std::string user_key{}; + uint64_t object_off{}; std::optional< peer_id_t > current_leader{std::nullopt}; }; diff --git a/src/include/homeobject/common.hpp b/src/include/homeobject/common.hpp index 8802e989..c63c4707 100644 --- a/src/include/homeobject/common.hpp +++ b/src/include/homeobject/common.hpp @@ -11,7 +11,7 @@ SISL_LOGGING_DECL(homeobject); -#define HOMEOBJECT_LOG_MODS grpc_server, HOMESTORE_LOG_MODS, homeobject +#define HOMEOBJECT_LOG_MODS grpc_server, HOMESTORE_LOG_MODS, homeobject, blobmgr #ifndef Ki constexpr uint64_t Ki = 1024ul; diff --git a/src/lib/blob_manager.cpp b/src/lib/blob_manager.cpp index 3a19fd95..33af471f 100644 --- a/src/lib/blob_manager.cpp +++ b/src/lib/blob_manager.cpp @@ -4,17 +4,17 @@ namespace homeobject { std::shared_ptr< BlobManager > HomeObjectImpl::blob_manager() { return shared_from_this(); } -BlobManager::AsyncResult< Blob > HomeObjectImpl::get(shard_id_t shard, blob_id_t const& blob, uint64_t off, +BlobManager::AsyncResult< Blob > HomeObjectImpl::get(shard_id_t shard, blob_id_t const& blob_id, uint64_t off, uint64_t len) const { - return _get_shard(shard).thenValue([this, blob](auto const e) -> BlobManager::Result< Blob > { + return _get_shard(shard).thenValue([this, blob_id, off, len](auto const e) -> BlobManager::AsyncResult< Blob > { if (!e) return folly::makeUnexpected(BlobError::UNKNOWN_SHARD); - return _get_blob(e.value(), blob); + return _get_blob(e.value(), blob_id, off, len); }); } BlobManager::AsyncResult< blob_id_t > HomeObjectImpl::put(shard_id_t shard, Blob&& blob) { return _get_shard(shard).thenValue( - [this, blob = std::move(blob)](auto const e) mutable -> BlobManager::Result< blob_id_t > { + [this, blob = std::move(blob)](auto const e) mutable -> BlobManager::AsyncResult< blob_id_t > { if (!e) return folly::makeUnexpected(BlobError::UNKNOWN_SHARD); if (ShardInfo::State::SEALED == e.value().state) return folly::makeUnexpected(BlobError::INVALID_ARG); return _put_blob(e.value(), std::move(blob)); @@ -22,7 +22,7 @@ BlobManager::AsyncResult< blob_id_t > HomeObjectImpl::put(shard_id_t shard, Blob } BlobManager::NullAsyncResult HomeObjectImpl::del(shard_id_t shard, blob_id_t const& blob) { - return _get_shard(shard).thenValue([this, blob](auto const e) mutable -> BlobManager::NullResult { + return _get_shard(shard).thenValue([this, blob](auto const e) mutable -> BlobManager::NullAsyncResult { if (!e) return folly::makeUnexpected(BlobError::UNKNOWN_SHARD); return _del_blob(e.value(), blob); }); diff --git a/src/lib/blob_route.hpp b/src/lib/blob_route.hpp index e8bd47db..431dc2b9 100644 --- a/src/lib/blob_route.hpp +++ b/src/lib/blob_route.hpp @@ -14,6 +14,9 @@ struct BlobRoute { shard_id_t shard; blob_id_t blob; auto operator<=>(BlobRoute const&) const = default; + sisl::blob to_blob() const { + return sisl::blob{uintptr_cast(const_cast< BlobRoute* >(this)), sizeof(*this)}; + } }; } // namespace homeobject diff --git a/src/lib/homeobject_impl.hpp b/src/lib/homeobject_impl.hpp index f54aca7d..8ff9a1f5 100644 --- a/src/lib/homeobject_impl.hpp +++ b/src/lib/homeobject_impl.hpp @@ -76,9 +76,10 @@ class HomeObjectImpl : public HomeObject, virtual ShardManager::AsyncResult< ShardInfo > _create_shard(pg_id_t, uint64_t size_bytes) = 0; virtual ShardManager::Result< ShardInfo > _seal_shard(shard_id_t) = 0; - virtual BlobManager::Result< blob_id_t > _put_blob(ShardInfo const&, Blob&&) = 0; - virtual BlobManager::Result< Blob > _get_blob(ShardInfo const&, blob_id_t) const = 0; - virtual BlobManager::NullResult _del_blob(ShardInfo const&, blob_id_t) = 0; + virtual BlobManager::AsyncResult< blob_id_t > _put_blob(ShardInfo const&, Blob&&) = 0; + virtual BlobManager::AsyncResult< Blob > _get_blob(ShardInfo const&, blob_id_t, uint64_t off = 0, + uint64_t len = 0) const = 0; + virtual BlobManager::NullAsyncResult _del_blob(ShardInfo const&, blob_id_t) = 0; /// folly::Future< ShardManager::Result< ShardInfo > > _get_shard(shard_id_t id) const; auto _defer() const { return folly::makeSemiFuture().via(folly::getGlobalCPUExecutor()); } diff --git a/src/lib/homestore_backend/CMakeLists.txt b/src/lib/homestore_backend/CMakeLists.txt index 1efe7db3..f0d24ce7 100644 --- a/src/lib/homestore_backend/CMakeLists.txt +++ b/src/lib/homestore_backend/CMakeLists.txt @@ -2,18 +2,19 @@ cmake_minimum_required (VERSION 3.11) add_library ("${PROJECT_NAME}_homestore") target_sources("${PROJECT_NAME}_homestore" PRIVATE - hs_homeobject.cpp - hs_blob_manager.cpp - hs_shard_manager.cpp - hs_pg_manager.cpp - heap_chunk_selector.cpp - replication_state_machine.cpp + hs_homeobject.cpp + hs_blob_manager.cpp + hs_shard_manager.cpp + hs_pg_manager.cpp + index_kv.cpp + heap_chunk_selector.cpp + replication_state_machine.cpp $ - ) + ) target_link_libraries("${PROJECT_NAME}_homestore" ${COMMON_DEPS} ) if(BUILD_TESTING) - add_subdirectory(tests) + add_subdirectory(tests) endif() diff --git a/src/lib/homestore_backend/hs_blob_manager.cpp b/src/lib/homestore_backend/hs_blob_manager.cpp index de8e0084..c64007f0 100644 --- a/src/lib/homestore_backend/hs_blob_manager.cpp +++ b/src/lib/homestore_backend/hs_blob_manager.cpp @@ -1,17 +1,271 @@ #include "hs_homeobject.hpp" +#include "replication_message.hpp" +#include "replication_state_machine.hpp" +#include "lib/homeobject_impl.hpp" + +SISL_LOGGING_DECL(blobmgr) namespace homeobject { -BlobManager::Result< blob_id_t > HSHomeObject::_put_blob(ShardInfo const&, Blob&&) { - return folly::makeUnexpected(BlobError::UNKNOWN); +BlobManager::AsyncResult< blob_id_t > HSHomeObject::_put_blob(ShardInfo const& shard, Blob&& blob) { + auto& pg_id = shard.placement_group; + shared< homestore::ReplDev > repl_dev; + { + std::shared_lock lock_guard(_pg_lock); + auto iter = _pg_map.find(pg_id); + if (iter == _pg_map.end()) { + LOGERROR("failed to put blob with non-exist pg [{}]", pg_id); + return folly::makeUnexpected(BlobError::UNKNOWN_PG); + } + repl_dev = static_cast< HS_PG* >(iter->second.get())->repl_dev_; + } + + RELEASE_ASSERT(repl_dev != nullptr, "Repl dev instance null"); + + const uint32_t needed_size = sizeof(ReplicationMessageHeader); + auto req = repl_result_ctx< BlobManager::Result< BlobInfo > >::make(needed_size, 512); + + uint8_t* raw_ptr = req->hdr_buf_.bytes; + ReplicationMessageHeader* header = new (raw_ptr) ReplicationMessageHeader(); + header->msg_type = ReplicationMessageType::PUT_BLOB_MSG; + header->payload_size = 0; + header->payload_crc = 0; + header->shard_id = shard.id; + header->pg_id = pg_id; + header->header_crc = header->calculate_crc(); + + BlobHeader blob_header{}; + blob_header.shard_id = shard.id; + blob_header.total_size = blob.body.size; + blob_header.hash_algorithm = BlobHeader::HashAlgorithm::CRC32; + blob_header.compute_blob_hash(blob.body, false /* verify */); + + sisl::sg_list sgs; + sgs.size = 0; + auto block_size = repl_dev->get_blk_size(); + auto size = sisl::round_up(sizeof(blob_header), block_size); + auto buf_header = iomanager.iobuf_alloc(block_size, size); + std::memcpy(buf_header, &blob_header, sizeof(blob_header)); + + // Create blob header. + sgs.iovs.emplace_back(iovec{.iov_base = buf_header, .iov_len = size}); + sgs.size += size; + + // Append blob bytes. + sgs.iovs.emplace_back(iovec{.iov_base = blob.body.bytes, .iov_len = blob.body.size}); + sgs.size += blob.body.size; + + // Append metadata and update the offsets and total size. + if (!blob.user_key.empty()) { + size_t user_key_size = blob.user_key.size(); + sgs.iovs.emplace_back(iovec{.iov_base = blob.user_key.data(), .iov_len = user_key_size}); + sgs.size += user_key_size; + blob_header.total_size += user_key_size; + blob_header.meta_data_offset = blob.body.size; + } + + repl_dev->async_alloc_write(req->hdr_buf_, sisl::blob{}, sgs, req); + return req->result().deferValue([this, header, buf_header, blob = std::move(blob)]( + const auto& result) -> BlobManager::AsyncResult< blob_id_t > { + header->~ReplicationMessageHeader(); + iomanager.iobuf_free(buf_header); + + if (result.hasError()) { return folly::makeUnexpected(result.error()); } + auto blob_info = result.value(); + LOGTRACEMOD(blobmgr, "Put blob success shard {} blob {} pbas {}", blob_info.shard_id, blob_info.blob_id, + blob_info.pbas.to_string()); + + return blob_info.blob_id; + }); } -BlobManager::Result< Blob > HSHomeObject::_get_blob(ShardInfo const&, blob_id_t) const { - return folly::makeUnexpected(BlobError::UNKNOWN); +void HSHomeObject::on_blob_put_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, + const homestore::MultiBlkId& pbas, + cintrusive< homestore::repl_req_ctx >& hs_ctx) { + repl_result_ctx< BlobManager::Result< BlobInfo > >* ctx{nullptr}; + if (hs_ctx != nullptr) { + ctx = boost::static_pointer_cast< repl_result_ctx< BlobManager::Result< BlobInfo > > >(hs_ctx).get(); + } + + auto msg_header = r_cast< ReplicationMessageHeader* >(header.bytes); + if (msg_header->corrupted()) { + LOGERROR("replication message header is corrupted with crc error, lsn:{}", lsn); + if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(BlobError::CHECKSUM_MISMATCH)); } + return; + } + + shared< BlobIndexTable > index_table; + { + std::shared_lock lock_guard(_pg_lock); + auto iter = _pg_map.find(msg_header->pg_id); + if (iter == _pg_map.end()) { + LOGERROR("Couldnt find pg {} for blob {}", msg_header->pg_id, lsn); + ctx->promise_.setValue(folly::makeUnexpected(BlobError::UNKNOWN_PG)); + return; + } + + index_table = static_cast< HS_PG* >(iter->second.get())->index_table_; + RELEASE_ASSERT(index_table != nullptr, "Index table not intialized"); + } + + BlobInfo blob_info; + blob_info.shard_id = msg_header->shard_id; + blob_info.blob_id = lsn; + blob_info.pbas = pbas; + + // Write to index table with key {shard id, blob id } and value {pba}. + auto r = add_to_index_table(index_table, blob_info); + if (r.hasError()) { + LOGERROR("Failed to insert into index table for blob {} err {}", lsn, r.error()); + ctx->promise_.setValue(folly::makeUnexpected(r.error())); + return; + } + + if (ctx) { ctx->promise_.setValue(BlobManager::Result< BlobInfo >(blob_info)); } } -BlobManager::NullResult HSHomeObject::_del_blob(ShardInfo const&, blob_id_t) { +BlobManager::AsyncResult< Blob > HSHomeObject::_get_blob(ShardInfo const& shard, blob_id_t blob_id, uint64_t req_offset, + uint64_t req_len) const { + auto& pg_id = shard.placement_group; + shared< BlobIndexTable > index_table; + shared< homestore::ReplDev > repl_dev; + { + std::shared_lock lock_guard(_pg_lock); + auto iter = _pg_map.find(pg_id); + if (iter == _pg_map.end()) { + LOGERROR("failed to do get blob with non-exist pg [{}]", pg_id); + return folly::makeUnexpected(BlobError::UNKNOWN_PG); + } + + repl_dev = static_cast< HS_PG* >(iter->second.get())->repl_dev_; + index_table = static_cast< HS_PG* >(iter->second.get())->index_table_; + } + + RELEASE_ASSERT(repl_dev != nullptr, "Repl dev instance null"); + RELEASE_ASSERT(index_table != nullptr, "Index table instance null"); + + auto r = get_from_index_table(index_table, shard.id, blob_id); + if (!r) { + LOGWARN("Blob not found in index id {} shard {}", blob_id, shard.id); + return folly::makeUnexpected(r.error()); + } + + auto multi_blkids = r.value(); + auto block_size = repl_dev->get_blk_size(); + auto sgs_ptr = std::make_shared< sisl::sg_list >(); + auto total_size = multi_blkids.blk_count() * block_size; + auto iov_base = iomanager.iobuf_alloc(block_size, total_size); + sgs_ptr->size = total_size; + sgs_ptr->iovs.emplace_back(iovec{.iov_base = iov_base, .iov_len = total_size}); + + return repl_dev->async_read(multi_blkids, *sgs_ptr, total_size) + .thenValue([block_size, blob_id, req_len, req_offset, shard, multi_blkids, sgs_ptr, + iov_base](auto&& result) mutable -> BlobManager::AsyncResult< Blob > { + if (result) { + LOGERROR("Failed to read blob {} shard {} err {}", blob_id, shard.id, result.value()); + iomanager.iobuf_free(iov_base); + return folly::makeUnexpected(BlobError::READ_FAILED); + } + + BlobHeader* header = (BlobHeader*)iov_base; + if (!header->valid() || header->shard_id != shard.id) { + LOGERROR("Invalid header found for blob {} shard {}", blob_id, shard.id); + LOGERROR("Blob header {}", header->to_string()); + iomanager.iobuf_free(iov_base); + return folly::makeUnexpected(BlobError::READ_FAILED); + } + + auto header_size = sisl::round_up(sizeof(BlobHeader), block_size); + size_t blob_size = header->meta_data_offset == 0 ? header->total_size : header->meta_data_offset; + uint8_t* blob_bytes = (uint8_t*)iov_base + header_size; + if (!header->compute_blob_hash(sisl::blob{blob_bytes, (uint32_t)blob_size}, true /* verify */)) { + LOGERROR("Hash mismatch for blob {} shard {}", blob_id, shard.id); + iomanager.iobuf_free(iov_base); + return folly::makeUnexpected(BlobError::CHECKSUM_MISMATCH); + } + + if (req_offset + req_len > blob_size) { + LOGERROR("Invalid offset length request in get blob {} offset {} len {} size {}", blob_id, req_offset, + req_len, blob_size); + iomanager.iobuf_free(iov_base); + return folly::makeUnexpected(BlobError::INVALID_ARG); + } + + // Copy the blob bytes from the offset. If request len is 0, take the + // whole blob size else copy only the request length. + blob_bytes += req_offset; + auto res_len = req_len == 0 ? blob_size : req_len; + auto body = sisl::io_blob_safe(res_len); + std::memcpy(body.bytes, blob_bytes, res_len); + + // Copy the metadata if its present. + std::string user_key{}; + if (header->meta_data_offset != 0) { + blob_bytes += header->meta_data_offset; + auto meta_size = header->total_size - header->meta_data_offset; + user_key.reserve(meta_size); + std::memcpy(user_key.data(), blob_bytes, meta_size); + } + + LOGTRACEMOD(blobmgr, "Blob get success for blob {} shard {} blkid {}", blob_id, shard.id, + multi_blkids.to_string()); + iomanager.iobuf_free(iov_base); + return Blob(std::move(body), std::move(user_key), 0); + }); +} + +homestore::blk_alloc_hints HSHomeObject::blob_put_get_blk_alloc_hints(sisl::blob const& header, + cintrusive< homestore::repl_req_ctx >& hs_ctx) { + repl_result_ctx< BlobManager::Result< BlobInfo > >* ctx{nullptr}; + if (hs_ctx != nullptr) { + ctx = boost::static_pointer_cast< repl_result_ctx< BlobManager::Result< BlobInfo > > >(hs_ctx).get(); + } + + auto msg_header = r_cast< ReplicationMessageHeader* >(header.bytes); + if (msg_header->corrupted()) { + LOGERROR("replication message header is corrupted with crc error shard:{}", msg_header->shard_id); + if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(BlobError::CHECKSUM_MISMATCH)); } + return {}; + } + + std::scoped_lock lock_guard(_shard_lock); + auto shard_iter = _shard_map.find(msg_header->shard_id); + RELEASE_ASSERT(shard_iter != _shard_map.end(), "Couldnt find shard id"); + auto hs_shard = d_cast< HS_Shard* >((*shard_iter->second).get()); + auto chunk_id = hs_shard->sb_->chunk_id; + LOGINFO("Got shard id {} chunk id {}", msg_header->shard_id, chunk_id); + homestore::blk_alloc_hints hints; + hints.chunk_id_hint = chunk_id; + return hints; +} + +BlobManager::NullAsyncResult HSHomeObject::_del_blob(ShardInfo const&, blob_id_t) { return folly::makeUnexpected(BlobError::UNKNOWN); } +bool HSHomeObject::BlobHeader::compute_blob_hash(const sisl::blob& blob, bool verify) { + // If verify is false store only the computed hash in the header else do the verification also. + bool match = false; + if (hash_algorithm == HSHomeObject::BlobHeader::HashAlgorithm::NONE) { + std::memset(&hash, 0, sizeof(hash)); + match = true; + } else if (hash_algorithm == HSHomeObject::BlobHeader::HashAlgorithm::CRC32) { + auto blob_hash = crc32_ieee(init_crc32, blob.bytes, blob.size); + if (!verify) { + std::memcpy(&hash, &blob_hash, sizeof(int32_t)); + } else { + if (std::memcmp(&hash, &blob_hash, sizeof(int32_t)) == 0) { + match = true; + } else { + LOGERROR("CRC32 hash mismatch computed: {} header_hash: {}", + hex_bytes((uint8_t*)&blob_hash, sizeof(int32_t)), hex_bytes((uint8_t*)&hash, sizeof(int32_t))); + match = false; + } + } + } else { + RELEASE_ASSERT(false, "Hash not implemented"); + } + return match; +} + } // namespace homeobject diff --git a/src/lib/homestore_backend/hs_homeobject.cpp b/src/lib/homestore_backend/hs_homeobject.cpp index 2bf87216..b5c96207 100644 --- a/src/lib/homestore_backend/hs_homeobject.cpp +++ b/src/lib/homestore_backend/hs_homeobject.cpp @@ -1,12 +1,13 @@ #include -#include #include #include +#include #include #include #include "hs_homeobject.hpp" #include "heap_chunk_selector.h" +#include "index_kv.hpp" namespace homeobject { @@ -41,7 +42,7 @@ void HSHomeObject::init_homestore() { chunk_selector_ = std::make_shared< HeapChunkSelector >(); using namespace homestore; bool need_format = HomeStore::instance() - ->with_index_service(nullptr) + ->with_index_service(std::make_unique< BlobIndexServiceCallbacks >(this)) .with_repl_data_service(repl_impl_type::solo, chunk_selector_) .start(hs_input_params{.devices = device_info, .app_mem_size = app_mem_size}, [this]() { register_homestore_metablk_callback(); }); @@ -121,4 +122,13 @@ void HSHomeObject::on_shard_meta_blk_recover_completed(bool success) { chunk_selector_->build_per_dev_chunk_heap(excluding_chunks); } +std::string hex_bytes(uint8_t* bytes, size_t len) { + std::stringstream ss; + ss << std::hex; + for (size_t i = 0; i < len; i++) { + ss << std::setw(2) << std::setfill('0') << (int)bytes[i]; + } + return ss.str(); +} + } // namespace homeobject diff --git a/src/lib/homestore_backend/hs_homeobject.hpp b/src/lib/homestore_backend/hs_homeobject.hpp index 9aa16a92..20cac56b 100644 --- a/src/lib/homestore_backend/hs_homeobject.hpp +++ b/src/lib/homestore_backend/hs_homeobject.hpp @@ -10,26 +10,41 @@ #include "heap_chunk_selector.h" #include "lib/homeobject_impl.hpp" #include "replication_message.hpp" +#include "index_kv.hpp" namespace homestore { struct meta_blk; -} +class IndexTableBase; +} // namespace homestore namespace homeobject { +std::string hex_bytes(uint8_t* bytes, size_t len); + +using BlobIndexTable = homestore::IndexTable< BlobRouteKey, BlobRouteValue >; + class HSHomeObject : public HomeObjectImpl { /// Overridable Helpers ShardManager::AsyncResult< ShardInfo > _create_shard(pg_id_t, uint64_t size_bytes) override; ShardManager::Result< ShardInfo > _seal_shard(shard_id_t) override; - BlobManager::Result< blob_id_t > _put_blob(ShardInfo const&, Blob&&) override; - BlobManager::Result< Blob > _get_blob(ShardInfo const&, blob_id_t) const override; - BlobManager::NullResult _del_blob(ShardInfo const&, blob_id_t) override; + BlobManager::AsyncResult< blob_id_t > _put_blob(ShardInfo const&, Blob&&) override; + BlobManager::AsyncResult< Blob > _get_blob(ShardInfo const&, blob_id_t, uint64_t off = 0, + uint64_t len = 0) const override; + BlobManager::NullAsyncResult _del_blob(ShardInfo const&, blob_id_t) override; PGManager::NullAsyncResult _create_pg(PGInfo&& pg_info, std::set< std::string, std::less<> > peers) override; PGManager::NullAsyncResult _replace_member(pg_id_t id, peer_id_t const& old_member, PGMember const& new_member) override; + // Mapping from index table uuid to pg id. + std::shared_mutex index_lock_; + struct PgIndexTable { + pg_id_t pg_id; + std::shared_ptr< BlobIndexTable > index_table; + }; + std::unordered_map< std::string, PgIndexTable > index_table_pg_map_; + public: #pragma pack(1) struct pg_members { @@ -43,6 +58,7 @@ class HSHomeObject : public HomeObjectImpl { pg_id_t id; uint32_t num_members; peer_id_t replica_set_uuid; + homestore::uuid_t index_table_uuid; pg_members members[1]; // ISO C++ forbids zero-size array }; @@ -62,8 +78,11 @@ class HSHomeObject : public HomeObjectImpl { struct HS_PG : public PG { homestore::superblk< pg_info_superblk > pg_sb_; shared< homestore::ReplDev > repl_dev_; + std::optional< homestore::chunk_num_t > any_allocated_chunk_id_{}; - HS_PG(PGInfo info, shared< homestore::ReplDev > rdev); + std::shared_ptr< BlobIndexTable > index_table_; + + HS_PG(PGInfo info, shared< homestore::ReplDev > rdev, shared< BlobIndexTable > index_table); HS_PG(homestore::superblk< pg_info_superblk > const& sb, shared< homestore::ReplDev > rdev); virtual ~HS_PG() = default; @@ -81,6 +100,43 @@ class HSHomeObject : public HomeObjectImpl { static ShardInfo shard_info_from_sb(homestore::superblk< shard_info_superblk > const& sb); }; +#pragma pack(1) + // Every blob stored in disk as blob header | blob data | blob metadata(optional). + struct BlobHeader { + static constexpr uint64_t blob_max_hash_len = 32; + static constexpr uint8_t blob_header_version = 0x01; + static constexpr uint64_t blob_header_magic = 0x21fdffdba8d68fc6; // echo "BlobHeader" | md5sum + + enum class HashAlgorithm : uint8_t { + NONE = 0, + CRC32 = 1, + MD5 = 2, + }; + + uint64_t magic{blob_header_magic}; + uint8_t version{blob_header_version}; + HashAlgorithm hash_algorithm; + uint8_t hash[blob_max_hash_len]{}; + shard_id_t shard_id; + int64_t meta_data_offset{}; // Metadata stored after the blob data. + int64_t total_size{}; // Size of data and metadata. + + bool valid() const { return magic == blob_header_magic || version <= blob_header_version; } + std::string to_string() { + return fmt::format("magic={} version={} algo={} hash={} shard={} meta_offset={} total_size={}", magic, + version, (uint8_t)hash_algorithm, hex_bytes(hash, blob_max_hash_len), shard_id, + meta_data_offset, total_size); + } + bool compute_blob_hash(const sisl::blob& blob, bool verify); + }; +#pragma pack() + + struct BlobInfo { + shard_id_t shard_id; + blob_id_t blob_id; + homestore::MultiBlkId pbas; + }; + private: shared< HeapChunkSelector > chunk_selector_; std::map< pg_id_t, std::list< homestore::superblk< shard_info_superblk > > > pending_recovery_shards_; @@ -118,6 +174,40 @@ class HSHomeObject : public HomeObjectImpl { std::optional< homestore::chunk_num_t > get_any_chunk_id(pg_id_t const pg); shared< HeapChunkSelector > chunk_selector() { return chunk_selector_; } + + bool on_pre_commit_shard_msg(int64_t lsn, sisl::blob const& header, sisl::blob const& key, + cintrusive< homestore::repl_req_ctx >&); + void on_rollback_shard_msg(int64_t lsn, sisl::blob const& header, sisl::blob const& key, + cintrusive< homestore::repl_req_ctx >&); + void on_shard_message_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, + cintrusive< homestore::repl_req_ctx >& hs_ctx); + + // Blob manager related. + void on_blob_put_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, + const homestore::MultiBlkId& pbas, cintrusive< homestore::repl_req_ctx >& hs_ctx); + homestore::blk_alloc_hints blob_put_get_blk_alloc_hints(sisl::blob const& header, + cintrusive< homestore::repl_req_ctx >& ctx); + + std::shared_ptr< BlobIndexTable > create_index_table(); + std::shared_ptr< BlobIndexTable > recover_index_table(const homestore::superblk< homestore::index_table_sb >& sb); + BlobManager::NullResult add_to_index_table(shared< BlobIndexTable > index_table, const BlobInfo& blob_info); + BlobManager::Result< homestore::MultiBlkId > get_from_index_table(shared< BlobIndexTable > index_table, + shard_id_t shard_id, blob_id_t blob_id) const; + + void print_btree_index(pg_id_t pg_id); +}; + +class BlobIndexServiceCallbacks : public homestore::IndexServiceCallbacks { +public: + BlobIndexServiceCallbacks(HSHomeObject* home_object) : home_object_(home_object) {} + std::shared_ptr< homestore::IndexTableBase > + on_index_table_found(const homestore::superblk< homestore::index_table_sb >& sb) override { + LOGINFO("Recovered index table to index service"); + return home_object_->recover_index_table(sb); + } + +private: + HSHomeObject* home_object_; }; } // namespace homeobject diff --git a/src/lib/homestore_backend/hs_pg_manager.cpp b/src/lib/homestore_backend/hs_pg_manager.cpp index 43be4373..68428991 100644 --- a/src/lib/homestore_backend/hs_pg_manager.cpp +++ b/src/lib/homestore_backend/hs_pg_manager.cpp @@ -50,9 +50,24 @@ PGManager::NullAsyncResult HSHomeObject::_create_pg(PGInfo&& pg_info, std::set< pg_info.replica_set_uuid = boost::uuids::random_generator()(); return hs_repl_service() .create_repl_dev(pg_info.replica_set_uuid, std::move(peers), std::make_unique< ReplicationStateMachine >(this)) - .thenValue([this, pg_info = std::move(pg_info)](auto&& v) -> PGManager::NullResult { + .thenValue([this, pg_info = std::move(pg_info)](auto&& v) mutable -> PGManager::NullResult { if (v.hasError()) { return folly::makeUnexpected(toPgError(v.error())); } - add_pg_to_map(std::make_unique< HS_PG >(std::move(pg_info), std::move(v.value()))); + + // TODO create index table during create shard. + auto index_table = create_index_table(); + auto uuid_str = boost::uuids::to_string(index_table->uuid()); + + auto pg_id = pg_info.id; + auto hs_pg = std::make_unique< HS_PG >(std::move(pg_info), std::move(v.value()), index_table); + std::scoped_lock lock_guard(index_lock_); + RELEASE_ASSERT(index_table_pg_map_.count(uuid_str) == 0, "duplicate index table found"); + index_table_pg_map_[uuid_str] = PgIndexTable{pg_id, index_table}; + + LOGINFO("Index table created for pg {} uuid {}", pg_id, uuid_str); + hs_pg->index_table_ = index_table; + // Add to index service, so that it gets cleaned up when index service is shutdown. + homestore::hs()->index_service().add_index_table(index_table); + add_pg_to_map(std::move(hs_pg)); return folly::Unit(); }); } @@ -121,7 +136,8 @@ void HSHomeObject::on_pg_meta_blk_found(sisl::byte_view const& buf, void* meta_c return; } - add_pg_to_map(std::make_unique< HS_PG >(pg_sb, std::move(v.value()))); + auto hs_pg = std::make_unique< HS_PG >(pg_sb, std::move(v.value())); + // check if any shard recovery is pending by this pg; auto iter = pending_recovery_shards_.find(pg_sb->id); if (iter != pending_recovery_shards_.end()) { @@ -130,6 +146,20 @@ void HSHomeObject::on_pg_meta_blk_found(sisl::byte_view const& buf, void* meta_c } pending_recovery_shards_.erase(iter); } + + // During PG recovery check if index is already recoverd else + // add entry in map, so that index recovery can update the PG. + std::scoped_lock lg(index_lock_); + auto uuid_str = boost::uuids::to_string(pg_sb->index_table_uuid); + auto it = index_table_pg_map_.find(uuid_str); + if (it != index_table_pg_map_.end()) { + hs_pg->index_table_ = it->second.index_table; + it->second.pg_id = pg_sb->id; + } else { + index_table_pg_map_.emplace(uuid_str, PgIndexTable{pg_sb->id, nullptr}); + } + + add_pg_to_map(std::move(hs_pg)); }); } @@ -142,12 +172,13 @@ PGInfo HSHomeObject::HS_PG::pg_info_from_sb(homestore::superblk< pg_info_superbl return pginfo; } -HSHomeObject::HS_PG::HS_PG(PGInfo info, shared< homestore::ReplDev > rdev) : - PG{std::move(info)}, pg_sb_{"PGManager"}, repl_dev_{std::move(rdev)} { +HSHomeObject::HS_PG::HS_PG(PGInfo info, shared< homestore::ReplDev > rdev, shared< BlobIndexTable > index_table) : + PG{std::move(info)}, pg_sb_{"PGManager"}, repl_dev_{std::move(rdev)}, index_table_(index_table) { pg_sb_.create(sizeof(pg_info_superblk) + ((pg_info_.members.size() - 1) * sizeof(pg_members))); pg_sb_->id = pg_info_.id; pg_sb_->num_members = pg_info_.members.size(); pg_sb_->replica_set_uuid = repl_dev_->group_id(); + pg_sb_->index_table_uuid = index_table_->uuid(); uint32_t i{0}; for (auto const& m : pg_info_.members) { diff --git a/src/lib/homestore_backend/index_kv.cpp b/src/lib/homestore_backend/index_kv.cpp new file mode 100644 index 00000000..eda01437 --- /dev/null +++ b/src/lib/homestore_backend/index_kv.cpp @@ -0,0 +1,90 @@ +#include +#include +#include "hs_homeobject.hpp" +#include "index_kv.hpp" + +SISL_LOGGING_DECL(blobmgr) + +namespace homeobject { + +std::shared_ptr< BlobIndexTable > HSHomeObject::create_index_table() { + homestore::uuid_t uuid = boost::uuids::random_generator()(); + homestore::uuid_t parent_uuid = boost::uuids::random_generator()(); + std::string uuid_str = boost::uuids::to_string(uuid); + homestore::BtreeConfig bt_cfg(homestore::hs()->index_service().node_size()); + bt_cfg.m_leaf_node_type = homestore::btree_node_type::FIXED; + bt_cfg.m_int_node_type = homestore::btree_node_type::FIXED; + + auto index_table = + std::make_shared< homestore::IndexTable< BlobRouteKey, BlobRouteValue > >(uuid, parent_uuid, 0, bt_cfg); + + return index_table; +} + +std::shared_ptr< BlobIndexTable > +HSHomeObject::recover_index_table(const homestore::superblk< homestore::index_table_sb >& sb) { + homestore::BtreeConfig bt_cfg(homestore::hs()->index_service().node_size()); + bt_cfg.m_leaf_node_type = homestore::btree_node_type::FIXED; + bt_cfg.m_int_node_type = homestore::btree_node_type::FIXED; + + auto uuid_str = boost::uuids::to_string(sb->uuid); + auto index_table = std::make_shared< homestore::IndexTable< BlobRouteKey, BlobRouteValue > >(sb, bt_cfg); + + // Check if PG is already recovered. + std::scoped_lock lock_guard(index_lock_); + auto it = index_table_pg_map_.find(uuid_str); + if (it != index_table_pg_map_.end()) { + std::shared_lock lg(_pg_lock); + auto iter = _pg_map.find(it->second.pg_id); + RELEASE_ASSERT(iter != _pg_map.end(), "Unknown PG id"); + // Found a PG, update its index table. + static_cast< HS_PG* >(iter->second.get())->index_table_ = index_table; + } else { + index_table_pg_map_.emplace(uuid_str, PgIndexTable{0, index_table}); + } + + LOGTRACEMOD(blobmgr, "Recovered index table uuid {}", uuid_str); + return index_table; +} + +BlobManager::NullResult HSHomeObject::add_to_index_table(shared< BlobIndexTable > index_table, + const BlobInfo& blob_info) { + BlobRouteKey index_key{BlobRoute{blob_info.shard_id, blob_info.blob_id}}; + BlobRouteValue index_value{blob_info.pbas}; + homestore::BtreeSinglePutRequest put_req{&index_key, &index_value, + homestore::btree_put_type::INSERT_ONLY_IF_NOT_EXISTS}; + auto status = index_table->put(put_req); + if (status != homestore::btree_status_t::success) { return folly::makeUnexpected(BlobError::INDEX_ERROR); } + + return folly::Unit(); +} + +BlobManager::Result< homestore::MultiBlkId > +HSHomeObject::get_from_index_table(shared< BlobIndexTable > index_table, shard_id_t shard_id, blob_id_t blob_id) const { + BlobRouteKey index_key{BlobRoute{shard_id, blob_id}}; + BlobRouteValue index_value; + homestore::BtreeSingleGetRequest get_req{&index_key, &index_value}; + auto status = index_table->get(get_req); + if (status != homestore::btree_status_t::success) { + LOGERROR("Failed to get from index table {}", index_key.to_string()); + return folly::makeUnexpected(BlobError::INDEX_ERROR); + } + + return index_value.pbas(); +} + +void HSHomeObject::print_btree_index(pg_id_t pg_id) { + shared< BlobIndexTable > index_table; + { + std::shared_lock lock_guard(_pg_lock); + auto iter = _pg_map.find(pg_id); + RELEASE_ASSERT (iter != _pg_map.end(), "Unknown PG"); + index_table = static_cast< HS_PG* >(iter->second.get())->index_table_; + RELEASE_ASSERT(index_table != nullptr, "Index table not intialized"); + } + + LOGINFO("Index UUID {}", boost::uuids::to_string(index_table->uuid())); + index_table->print_tree(); +} + +} // namespace homeobject diff --git a/src/lib/homestore_backend/index_kv.hpp b/src/lib/homestore_backend/index_kv.hpp new file mode 100644 index 00000000..047f5c8a --- /dev/null +++ b/src/lib/homestore_backend/index_kv.hpp @@ -0,0 +1,96 @@ +#pragma once + +#include +#include +#include +#include +#include +#include "lib/blob_route.hpp" + +namespace homeobject { + +class BlobRouteKey : public homestore::BtreeKey { +private: + BlobRoute key_; + +public: + + BlobRouteKey() = default; + BlobRouteKey(const BlobRoute key) : key_(key) {} + BlobRouteKey(const BlobRouteKey& other) : BlobRouteKey(other.serialize(), true) {} + BlobRouteKey(const homestore::BtreeKey& other) : BlobRouteKey(other.serialize(), true) {} + BlobRouteKey(const sisl::blob& b, bool copy) : + homestore::BtreeKey(), key_{*(r_cast< const BlobRoute* >(b.bytes))} {} + BlobRouteKey& operator=(const BlobRouteKey& other) { + clone(other); + return *this; + }; + virtual void clone(const homestore::BtreeKey& other) override { key_ = ((BlobRouteKey&)other).key_; } + + virtual ~BlobRouteKey() = default; + + int compare(const homestore::BtreeKey& o) const override { + const BlobRouteKey& other = s_cast< const BlobRouteKey& >(o); + if (key_ < other.key_) { + return -1; + } else if (key_ > other.key_) { + return 1; + } else { + return 0; + } + } + + sisl::blob serialize() const override { return key_.to_blob(); } + uint32_t serialized_size() const override { return sizeof(key_); } + static bool is_fixed_size() { return true; } + static uint32_t get_fixed_size() { return (sizeof(key_)); } + std::string to_string() const { return fmt::format("{}", key_); } + + void deserialize(const sisl::blob& b, bool copy) override { key_ = *(r_cast< const BlobRoute* >(b.bytes)); } + + static uint32_t get_estimate_max_size() { return get_fixed_size(); } + friend std::ostream& operator<<(std::ostream& os, const BlobRouteKey& k) { + os << fmt::format("{}", k.key()); + return os; + } + + BlobRoute key() const { return key_; } +}; + +class BlobRouteValue : public homestore::BtreeValue { +public: + BlobRouteValue() = default; + BlobRouteValue(const homestore::MultiBlkId& pbas) : pbas_(pbas) {} + BlobRouteValue(const BlobRouteValue& other) : homestore::BtreeValue() { pbas_ = other.pbas_; }; + BlobRouteValue(const sisl::blob& b, bool copy) : homestore::BtreeValue() { deserialize(b, copy); } + virtual ~BlobRouteValue() = default; + + BlobRouteValue& operator=(const BlobRouteValue& other) { + pbas_ = other.pbas_; + return *this; + } + + sisl::blob serialize() const override { + auto& pba = const_cast< homestore::MultiBlkId& >(pbas_); + return pba.serialize(); + } + + uint32_t serialized_size() const override { return pbas_.serialized_size(); } + static uint32_t get_fixed_size() { + return homestore::MultiBlkId::expected_serialized_size(1 /* num_pieces */); + } + + void deserialize(const sisl::blob& b, bool copy) override { pbas_.deserialize(b, copy); } + std::string to_string() const override { return fmt::format("{}", pbas_.to_string()); } + friend std::ostream& operator<<(std::ostream& os, const BlobRouteValue& v) { + os << v.pbas().to_string(); + return os; + } + + homestore::MultiBlkId pbas() const { return pbas_; } + +private: + homestore::MultiBlkId pbas_; +}; + +} // namespace homeobject diff --git a/src/lib/homestore_backend/replication_state_machine.cpp b/src/lib/homestore_backend/replication_state_machine.cpp index 9b3231c5..fcb202e7 100644 --- a/src/lib/homestore_backend/replication_state_machine.cpp +++ b/src/lib/homestore_backend/replication_state_machine.cpp @@ -12,14 +12,20 @@ void ReplicationStateMachine::on_commit(int64_t lsn, const sisl::blob& header, c home_object_->on_shard_message_commit(lsn, header, pbas, repl_dev(), ctx); break; } - case ReplicationMessageType::PUT_BLOB_MSG: + + case ReplicationMessageType::PUT_BLOB_MSG: { + home_object_->on_blob_put_commit(lsn, header, key, pbas, ctx); + break; + } case ReplicationMessageType::DEL_BLOB_MSG: + break; default: { break; } } } + bool ReplicationStateMachine::on_pre_commit(int64_t lsn, sisl::blob const&, sisl::blob const&, cintrusive< homestore::repl_req_ctx >&) { LOGI("on_pre_commit with lsn:{}", lsn); @@ -58,6 +64,7 @@ homestore::blk_alloc_hints ReplicationStateMachine::get_blk_alloc_hints(sisl::bl } case ReplicationMessageType::PUT_BLOB_MSG: + return home_object_->blob_put_get_blk_alloc_hints(header, ctx); case ReplicationMessageType::DEL_BLOB_MSG: default: { break; diff --git a/src/lib/homestore_backend/tests/bits_generator.hpp b/src/lib/homestore_backend/tests/bits_generator.hpp new file mode 100644 index 00000000..cdd34b99 --- /dev/null +++ b/src/lib/homestore_backend/tests/bits_generator.hpp @@ -0,0 +1,42 @@ +/********************************************************************************* + * Modifications Copyright 2017-2019 eBay Inc. + * + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + *********************************************************************************/ + +// Copied from homestore. + +#pragma once +#include +#include +#include +#include + +namespace homeobject { + +class BitsGenerator { +public: + static void gen_random_bits(size_t size, uint8_t* buf) { + std::random_device rd; + std::default_random_engine g(rd()); + std::uniform_int_distribution< unsigned long long > dis(std::numeric_limits< std::uint8_t >::min(), + std::numeric_limits< std::uint8_t >::max()); + for (size_t i = 0; i < size; ++i) { + buf[i] = dis(g); + } + } + + static void gen_random_bits(sisl::blob& b) { gen_random_bits(b.size, b.bytes); } +}; + +}; // namespace homeobject diff --git a/src/lib/homestore_backend/tests/test_home_object.cpp b/src/lib/homestore_backend/tests/test_home_object.cpp index ca82c72c..1cd9c136 100644 --- a/src/lib/homestore_backend/tests/test_home_object.cpp +++ b/src/lib/homestore_backend/tests/test_home_object.cpp @@ -11,6 +11,7 @@ #include #include "lib/homestore_backend/hs_homeobject.hpp" +#include "bits_generator.hpp" using namespace std::chrono_literals; @@ -19,22 +20,35 @@ using homeobject::PGError; using homeobject::PGInfo; using homeobject::PGMember; using homeobject::ShardError; +using namespace homeobject; SISL_LOGGING_INIT(logging, HOMEOBJECT_LOG_MODS) -SISL_OPTIONS_ENABLE(logging) +SISL_OPTIONS_ENABLE(logging, test_home_object) + +SISL_OPTION_GROUP( + test_home_object, + (num_pgs, "", "num_pgs", "number of pgs", ::cxxopts::value< uint64_t >()->default_value("10"), "number"), + (num_shards, "", "num_shards", "number of shards", ::cxxopts::value< uint64_t >()->default_value("20"), "number"), + (num_blobs, "", "num_blobs", "number of blobs", ::cxxopts::value< uint64_t >()->default_value("50"), "number")); class FixtureApp : public homeobject::HomeObjectApplication { private: - std::string fpath_{"/tmp/test_home_object.data.{}" + std::to_string(rand())}; + std::string fpath_{fmt::format("/tmp/test_home_object.data.{}", std::to_string(rand()))}; public: bool spdk_mode() const override { return false; } uint32_t threads() const override { return 2; } + void set_restart(bool r = true) { restart_ = r; } std::list< std::filesystem::path > devices() const override { - LOGI("creating {} device file with size={}", fpath_, homestore::in_bytes(2 * Gi)); - if (std::filesystem::exists(fpath_)) { std::filesystem::remove(fpath_); } - std::ofstream ofs{fpath_, std::ios::binary | std::ios::out | std::ios::trunc}; - std::filesystem::resize_file(fpath_, 2 * Gi); + if (!restart_) { + /* create files */ + LOGINFO("creating {} device file with size={}", fpath_, homestore::in_bytes(2 * Gi)); + if (std::filesystem::exists(fpath_)) { std::filesystem::remove(fpath_); } + std::ofstream ofs{fpath_, std::ios::binary | std::ios::out | std::ios::trunc}; + std::filesystem::resize_file(fpath_, 2 * Gi); + } else { + LOGINFO("Skipping create device files"); + } auto device_info = std::list< std::filesystem::path >(); device_info.emplace_back(std::filesystem::canonical(fpath_)); @@ -48,9 +62,13 @@ class FixtureApp : public homeobject::HomeObjectApplication { homeobject::peer_id_t discover_svcid(std::optional< homeobject::peer_id_t > const&) const override { return boost::uuids::random_generator()(); } + /// TODO /// This will have to work if we test replication in the future std::string lookup_peer(homeobject::peer_id_t const&) const override { return "test_fixture.com"; } + +private: + bool restart_{false}; }; TEST(HomeObject, BasicEquivalence) { @@ -69,11 +87,56 @@ class HomeObjectFixture : public ::testing::Test { public: std::shared_ptr< FixtureApp > app; std::shared_ptr< homeobject::HomeObject > _obj_inst; + std::random_device rnd{}; + std::default_random_engine rnd_engine{rnd()}; void SetUp() override { app = std::make_shared< FixtureApp >(); _obj_inst = homeobject::init_homeobject(std::weak_ptr< homeobject::HomeObjectApplication >(app)); } + + void create_pg(pg_id_t pg_id) { + auto info = homeobject::PGInfo(pg_id); + auto peer1 = _obj_inst->our_uuid(); + auto peer2 = boost::uuids::random_generator()(); + auto peer3 = boost::uuids::random_generator()(); + info.members.insert(homeobject::PGMember{peer1, "peer1", 1}); + info.members.insert(homeobject::PGMember{peer2, "peer2", 0}); + info.members.insert(homeobject::PGMember{peer3, "peer3", 0}); + auto p = _obj_inst->pg_manager()->create_pg(std::move(info)).get(); + ASSERT_TRUE(!!p); + } + + static void trigger_cp(bool wait) { + auto fut = homestore::hs()->cp_mgr().trigger_cp_flush(true /* force */); + auto on_complete = [&](auto success) { + EXPECT_EQ(success, true); + LOGINFO("CP Flush completed"); + }; + + if (wait) { + on_complete(std::move(fut).get()); + } else { + std::move(fut).thenValue(on_complete); + } + } + + void restart() { + LOGINFO("Restarting homeobject."); + _obj_inst.reset(); + app->set_restart(); + _obj_inst = homeobject::init_homeobject(std::weak_ptr< homeobject::HomeObjectApplication >(app)); + std::this_thread::sleep_for(std::chrono::seconds{1}); + } + + std::string hex_bytes(uint8_t* bytes, size_t len) { + std::stringstream ss; + ss << std::hex; + for (size_t k = 0; k < len; k++) { + ss << std::setw(2) << std::setfill('0') << (int)bytes[k]; + } + return ss.str(); + } }; TEST_F(HomeObjectFixture, TestValidations) { @@ -109,10 +172,108 @@ TEST_F(HomeObjectFixture, DeleteBlobMissingShard) { EXPECT_EQ(BlobError::UNKNOWN_SHARD, _obj_inst->blob_manager()->del(1, 0u).get().error()); } +TEST_F(HomeObjectFixture, BasicPutGetBlob) { + auto num_pgs = SISL_OPTIONS["num_pgs"].as< uint64_t >(); + auto num_shards_per_pg = SISL_OPTIONS["num_shards"].as< uint64_t >() / num_pgs; + auto num_blobs_per_shard = SISL_OPTIONS["num_blobs"].as< uint64_t >() / num_shards_per_pg; + std::vector< std::pair< pg_id_t, shard_id_t > > pg_shard_id_vec; + std::map< std::tuple< pg_id_t, shard_id_t, blob_id_t >, homeobject::Blob > blob_map; + + uint32_t max_blob_size = 1024; + uint32_t alignment = 1024; + for (uint64_t i = 1; i <= num_pgs; i++) { + create_pg(i /* pg_id */); + for (uint64_t j = 0; j < num_shards_per_pg; j++) { + auto shard = _obj_inst->shard_manager()->create_shard(i /* pg_id */, 10 * Mi).get(); + ASSERT_TRUE(!!shard); + pg_shard_id_vec.emplace_back(i, shard->id); + LOGINFO("pg {} shard {}", i, shard->id); + } + } + + // Put blob for all shards in all pg's. + for (const auto& id : pg_shard_id_vec) { + int64_t pg_id = id.first, shard_id = id.second; + for (uint64_t k = 0; k < num_blobs_per_shard; k++) { + std::string user_key; + user_key.reserve(32); + BitsGenerator::gen_random_bits(32, (uint8_t*)user_key.data()); + homeobject::Blob put_blob{sisl::io_blob_safe(max_blob_size, alignment), user_key, 0ul}; + BitsGenerator::gen_random_bits(put_blob.body); + // Keep a copy of random payload to verify later. + homeobject::Blob clone{sisl::io_blob_safe(max_blob_size, alignment), user_key, 0ul}; + std::memcpy(clone.body.bytes, put_blob.body.bytes, put_blob.body.size); + auto b = _obj_inst->blob_manager()->put(shard_id, std::move(put_blob)).get(); + ASSERT_TRUE(!!b); + auto blob_id = b.value(); + + LOGINFO("Put blob pg {} shard {} blob {} data {}", pg_id, shard_id, blob_id, + hex_bytes(clone.body.bytes, 10)); + blob_map.insert({{pg_id, shard_id, blob_id}, std::move(clone)}); + } + } + + // Verify all get blobs + for (const auto& [id, blob] : blob_map) { + int64_t pg_id = std::get< 0 >(id), shard_id = std::get< 1 >(id), blob_id = std::get< 2 >(id); + auto g = _obj_inst->blob_manager()->get(shard_id, blob_id).get(); + ASSERT_TRUE(!!g); + auto result = std::move(g.value()); + LOGINFO("Get blob pg {} shard id {} blob id {} data {}", pg_id, shard_id, blob_id, + hex_bytes(result.body.bytes, 10)); + EXPECT_EQ(blob.body.size, result.body.size); + EXPECT_EQ(std::memcmp(result.body.bytes, blob.body.bytes, result.body.size), 0); + EXPECT_EQ(blob.user_key, result.user_key); + } + + for (uint64_t i = 1; i <= num_pgs; i++) { + r_cast< HSHomeObject* >(_obj_inst.get())->print_btree_index(i); + } + + LOGINFO("Flushing CP."); + trigger_cp(true /* wait */); + + // Restart homeobject + restart(); + + // Verify all get blobs after restart + for (const auto& [id, blob] : blob_map) { + int64_t pg_id = std::get< 0 >(id), shard_id = std::get< 1 >(id), blob_id = std::get< 2 >(id); + auto g = _obj_inst->blob_manager()->get(shard_id, blob_id).get(); + ASSERT_TRUE(!!g); + auto result = std::move(g.value()); + LOGINFO("After restart get blob pg {} shard {} blob {} data {}", pg_id, shard_id, blob_id, + hex_bytes(result.body.bytes, 10u)); + EXPECT_EQ(result.body.size, blob.body.size); + EXPECT_EQ(std::memcmp(result.body.bytes, blob.body.bytes, result.body.size), 0); + EXPECT_EQ(blob.user_key, result.user_key); + } + + // Verify all get blobs with random offset and length. + std::uniform_int_distribution< uint32_t > rand_off_gen{0u, max_blob_size - 1u}; + std::uniform_int_distribution< uint32_t > rand_len_gen{1u, max_blob_size}; + + for (const auto& [id, blob] : blob_map) { + int64_t pg_id = std::get< 0 >(id), shard_id = std::get< 1 >(id), blob_id = std::get< 2 >(id); + auto off = rand_off_gen(rnd_engine); + auto len = rand_len_gen(rnd_engine); + if ((off + len) >= blob.body.size) { len = blob.body.size - off; } + + auto g = _obj_inst->blob_manager()->get(shard_id, blob_id, off, len).get(); + ASSERT_TRUE(!!g); + auto result = std::move(g.value()); + LOGINFO("After restart get blob pg {} shard {} blob {} off {} len {} data {}", pg_id, shard_id, blob_id, off, + len, hex_bytes(result.body.bytes, std::min(len, 10u))); + EXPECT_EQ(result.body.size, len); + EXPECT_EQ(std::memcmp(result.body.bytes, blob.body.bytes + off, result.body.size), 0); + EXPECT_EQ(blob.user_key, result.user_key); + } +} + int main(int argc, char* argv[]) { int parsed_argc = argc; ::testing::InitGoogleTest(&parsed_argc, argv); - SISL_OPTIONS_LOAD(parsed_argc, argv, logging); + SISL_OPTIONS_LOAD(parsed_argc, argv, logging, test_home_object); sisl::logging::SetLogger(std::string(argv[0])); spdlog::set_pattern("[%D %T.%e] [%n] [%^%l%$] [%t] %v"); parsed_argc = 1; diff --git a/src/lib/memory_backend/mem_blob_manager.cpp b/src/lib/memory_backend/mem_blob_manager.cpp index c88e7500..b76b66e0 100644 --- a/src/lib/memory_backend/mem_blob_manager.cpp +++ b/src/lib/memory_backend/mem_blob_manager.cpp @@ -18,7 +18,7 @@ namespace homeobject { } else // Write (move) Blob to new BlobExt on heap and Insert BlobExt to Index -BlobManager::Result< blob_id_t > MemoryHomeObject::_put_blob(ShardInfo const& _shard, Blob&& _blob) { +BlobManager::AsyncResult< blob_id_t > MemoryHomeObject::_put_blob(ShardInfo const& _shard, Blob&& _blob) { WITH_SHARD WITH_ROUTE(shard.shard_seq_num_++) @@ -29,14 +29,15 @@ BlobManager::Result< blob_id_t > MemoryHomeObject::_put_blob(ShardInfo const& _s } // Lookup BlobExt and duplicate underyling Blob for user; only *safe* because we defer GC. -BlobManager::Result< Blob > MemoryHomeObject::_get_blob(ShardInfo const& _shard, blob_id_t _blob) const { +BlobManager::AsyncResult< Blob > MemoryHomeObject::_get_blob(ShardInfo const& _shard, blob_id_t _blob, uint64_t off, + uint64_t len) const { WITH_SHARD WITH_ROUTE(_blob) IF_BLOB_ALIVE { return blob_it->second.blob_->clone(); } } // Tombstone BlobExt entry -BlobManager::NullResult MemoryHomeObject::_del_blob(ShardInfo const& _shard, blob_id_t _blob) { +BlobManager::NullAsyncResult MemoryHomeObject::_del_blob(ShardInfo const& _shard, blob_id_t _blob) { WITH_SHARD WITH_ROUTE(_blob) IF_BLOB_ALIVE { diff --git a/src/lib/memory_backend/mem_homeobject.hpp b/src/lib/memory_backend/mem_homeobject.hpp index c46e935a..f03c68e1 100644 --- a/src/lib/memory_backend/mem_homeobject.hpp +++ b/src/lib/memory_backend/mem_homeobject.hpp @@ -39,9 +39,10 @@ class MemoryHomeObject : public HomeObjectImpl { ShardManager::Result< ShardInfo > _seal_shard(shard_id_t) override; // BlobManager - BlobManager::Result< blob_id_t > _put_blob(ShardInfo const&, Blob&&) override; - BlobManager::Result< Blob > _get_blob(ShardInfo const&, blob_id_t) const override; - BlobManager::NullResult _del_blob(ShardInfo const&, blob_id_t) override; + BlobManager::AsyncResult< blob_id_t > _put_blob(ShardInfo const&, Blob&&) override; + BlobManager::AsyncResult< Blob > _get_blob(ShardInfo const&, blob_id_t, uint64_t off = 0, + uint64_t len = 0) const override; + BlobManager::NullAsyncResult _del_blob(ShardInfo const&, blob_id_t) override; /// // PGManager