From f757100849cf4e4d45bd8f91464f44ed8e5d8327 Mon Sep 17 00:00:00 2001 From: Sanal P Date: Tue, 20 Aug 2024 22:37:08 -0700 Subject: [PATCH] Add baseline resync read part in homeobject. Add read_snapshot_data to go over all shards and blobs of a PG. If obj_id is zero, send all shards. obj_id is concatenation of blob sequence number and batch number. For all other values of obj_id, we send batch of blobs for a shard. Once all blobs are finished in a shard, we move to next shard_id, and batch_num is reset to 0. Add LSN in shard metadata to ignore all reads of shards which are created later that the snapshot LSN. --- src/include/homeobject/shard_manager.hpp | 1 + src/lib/blob_route.hpp | 2 + src/lib/homestore_backend/CMakeLists.txt | 1 + .../homestore_backend/hs_backend_config.fbs | 6 + src/lib/homestore_backend/hs_blob_manager.cpp | 13 +- src/lib/homestore_backend/hs_homeobject.hpp | 35 +++++ .../homestore_backend/hs_shard_manager.cpp | 6 +- src/lib/homestore_backend/index_kv.cpp | 37 +++++ .../homestore_backend/pg_blob_iterator.cpp | 128 ++++++++++++++++ .../replication_state_machine.cpp | 143 ++++++++++++++++-- .../replication_state_machine.hpp | 2 + .../homestore_backend/tests/hs_blob_tests.cpp | 74 +++++++++ .../tests/hs_shard_tests.cpp | 2 + src/lib/memory_backend/mem_shard_manager.cpp | 2 +- src/lib/pg_manager.cpp | 2 +- 15 files changed, 438 insertions(+), 16 deletions(-) create mode 100644 src/lib/homestore_backend/pg_blob_iterator.cpp diff --git a/src/include/homeobject/shard_manager.hpp b/src/include/homeobject/shard_manager.hpp index ff3401fe..812ebe9f 100644 --- a/src/include/homeobject/shard_manager.hpp +++ b/src/include/homeobject/shard_manager.hpp @@ -22,6 +22,7 @@ struct ShardInfo { shard_id_t id; pg_id_t placement_group; State state; + uint64_t lsn; uint64_t created_time; uint64_t last_modified_time; uint64_t available_capacity_bytes; diff --git a/src/lib/blob_route.hpp b/src/lib/blob_route.hpp index adfcde10..e873da89 100644 --- a/src/lib/blob_route.hpp +++ b/src/lib/blob_route.hpp @@ -1,3 +1,5 @@ +#pragma once + #include #include diff --git a/src/lib/homestore_backend/CMakeLists.txt b/src/lib/homestore_backend/CMakeLists.txt index 32ac26c7..aa4b32df 100644 --- a/src/lib/homestore_backend/CMakeLists.txt +++ b/src/lib/homestore_backend/CMakeLists.txt @@ -22,6 +22,7 @@ target_sources("${PROJECT_NAME}_homestore" PRIVATE hs_blob_manager.cpp hs_shard_manager.cpp hs_pg_manager.cpp + pg_blob_iterator.cpp index_kv.cpp heap_chunk_selector.cpp replication_state_machine.cpp diff --git a/src/lib/homestore_backend/hs_backend_config.fbs b/src/lib/homestore_backend/hs_backend_config.fbs index 3daf317e..db55a66f 100644 --- a/src/lib/homestore_backend/hs_backend_config.fbs +++ b/src/lib/homestore_backend/hs_backend_config.fbs @@ -9,6 +9,12 @@ attribute "deprecated"; table HSBackendSettings { // timer thread freq in us backend_timer_us: uint64 = 60000000 (hotswap); + + // Maximum number of blobs in a snapshot batch + max_num_blobs_in_snapshot_batch: uint64 = 1024 (hotswap); + + // Maximum size of a snapshot batch + max_snapshot_batch_size_mb: uint64 = 128 (hotswap); } root_type HSBackendSettings; diff --git a/src/lib/homestore_backend/hs_blob_manager.cpp b/src/lib/homestore_backend/hs_blob_manager.cpp index 6e791504..1ec1f569 100644 --- a/src/lib/homestore_backend/hs_blob_manager.cpp +++ b/src/lib/homestore_backend/hs_blob_manager.cpp @@ -4,6 +4,7 @@ #include "lib/homeobject_impl.hpp" #include "lib/blob_route.hpp" #include +#include SISL_LOGGING_DECL(blobmgr) @@ -250,7 +251,13 @@ BlobManager::AsyncResult< Blob > HSHomeObject::_get_blob(ShardInfo const& shard, return folly::makeUnexpected(r.error()); } - auto const blkid = r.value(); + return _get_blob_data(repl_dev, shard.id, blob_id, req_offset, req_len, r.value() /* blkid*/); +} + +BlobManager::AsyncResult< Blob > HSHomeObject::_get_blob_data(const shared< homestore::ReplDev >& repl_dev, + shard_id_t shard_id, blob_id_t blob_id, + uint64_t req_offset, uint64_t req_len, + const homestore::MultiBlkId& blkid) const { auto const total_size = blkid.blk_count() * repl_dev->get_blk_size(); sisl::io_blob_safe read_buf{total_size, io_align}; @@ -258,9 +265,9 @@ BlobManager::AsyncResult< Blob > HSHomeObject::_get_blob(ShardInfo const& shard, sgs.size = total_size; sgs.iovs.emplace_back(iovec{.iov_base = read_buf.bytes(), .iov_len = read_buf.size()}); - BLOGT(shard.id, blob_id, "Blob get request: blkid={}, buf={}", blkid.to_string(), (void*)read_buf.bytes()); + BLOGT(shard_id, blob_id, "Blob get request: blkid={}, buf={}", blkid.to_string(), (void*)read_buf.bytes()); return repl_dev->async_read(blkid, sgs, total_size) - .thenValue([this, blob_id, shard_id = shard.id, req_len, req_offset, blkid, + .thenValue([this, blob_id, shard_id, req_len, req_offset, blkid, read_buf = std::move(read_buf)](auto&& result) mutable -> BlobManager::AsyncResult< Blob > { if (result) { BLOGE(shard_id, blob_id, "Failed to get blob: err={}", blob_id, shard_id, result.value()); diff --git a/src/lib/homestore_backend/hs_homeobject.hpp b/src/lib/homestore_backend/hs_homeobject.hpp index ad7a9965..2832dc50 100644 --- a/src/lib/homestore_backend/hs_homeobject.hpp +++ b/src/lib/homestore_backend/hs_homeobject.hpp @@ -11,6 +11,8 @@ #include "heap_chunk_selector.h" #include "lib/homeobject_impl.hpp" #include "replication_message.hpp" +#include "homeobject/common.hpp" +#include "index_kv.hpp" namespace homestore { struct meta_blk; @@ -267,6 +269,10 @@ class HSHomeObject : public HomeObjectImpl { homestore::MultiBlkId pbas; }; + struct BlobInfoData : public BlobInfo { + Blob blob; + }; + enum class BlobState : uint8_t { ALIVE = 0, TOMBSTONE = 1, @@ -275,6 +281,25 @@ class HSHomeObject : public HomeObjectImpl { inline const static homestore::MultiBlkId tombstone_pbas{0, 0, 0}; + struct PGBlobIterator { + PGBlobIterator(HSHomeObject& home_obj, homestore::group_id_t group_id, uint64_t upto_lsn = 0); + PG* get_pg_metadata(); + int64_t get_next_blobs(uint64_t max_num_blobs_in_batch, uint64_t max_batch_size_bytes, + std::vector< HSHomeObject::BlobInfoData >& blob_vec, bool& end_of_shard); + void create_pg_shard_snapshot_data(sisl::io_blob_safe& meta_blob); + void create_blobs_snapshot_data(std::vector< HSHomeObject::BlobInfoData >& blob_vec, + sisl::io_blob_safe& data_blob, bool end_of_shard); + bool end_of_scan() const; + + uint64_t cur_shard_seq_num_{1}; + int64_t cur_blob_id_{-1}; + uint64_t max_shard_seq_num_{0}; + HSHomeObject& home_obj_; + homestore::group_id_t group_id_; + pg_id_t pg_id_; + shared< homestore::ReplDev > repl_dev_; + }; + private: shared< HeapChunkSelector > chunk_selector_; unique< HttpManager > http_mgr_; @@ -286,6 +311,11 @@ class HSHomeObject : public HomeObjectImpl { private: static homestore::ReplicationService& hs_repl_service() { return homestore::hs()->repl_service(); } + // blob related + BlobManager::AsyncResult< Blob > _get_blob_data(const shared< homestore::ReplDev >& repl_dev, shard_id_t shard_id, + blob_id_t blob_id, uint64_t req_offset, uint64_t req_len, + const homestore::MultiBlkId& blkid) const; + // create pg related PGManager::NullAsyncResult do_create_pg(cshared< homestore::ReplDev > repl_dev, PGInfo&& pg_info); static std::string serialize_pg_info(const PGInfo& info); @@ -414,6 +444,11 @@ class HSHomeObject : public HomeObjectImpl { const BlobInfo& blob_info); void print_btree_index(pg_id_t pg_id); + shared< BlobIndexTable > get_index_table(pg_id_t pg_id); + + BlobManager::Result< std::vector< BlobInfo > > + query_blobs_in_shard(pg_id_t pg_id, uint64_t cur_shard_seq_num, blob_id_t start_blob_id, uint64_t max_num_in_batch); + // Zero padding buffer related. size_t max_pad_size() const; sisl::io_blob_safe& get_pad_buf(uint32_t pad_len); diff --git a/src/lib/homestore_backend/hs_shard_manager.cpp b/src/lib/homestore_backend/hs_shard_manager.cpp index 14df4153..35855a0f 100644 --- a/src/lib/homestore_backend/hs_shard_manager.cpp +++ b/src/lib/homestore_backend/hs_shard_manager.cpp @@ -67,6 +67,7 @@ std::string HSHomeObject::serialize_shard_info(const ShardInfo& info) { j["shard_info"]["shard_id_t"] = info.id; j["shard_info"]["pg_id_t"] = info.placement_group; j["shard_info"]["state"] = info.state; + j["shard_info"]["lsn"] = info.lsn; j["shard_info"]["created_time"] = info.created_time; j["shard_info"]["modified_time"] = info.last_modified_time; j["shard_info"]["total_capacity"] = info.total_capacity_bytes; @@ -81,6 +82,7 @@ ShardInfo HSHomeObject::deserialize_shard_info(const char* json_str, size_t str_ shard_info.id = shard_json["shard_info"]["shard_id_t"].get< shard_id_t >(); shard_info.placement_group = shard_json["shard_info"]["pg_id_t"].get< pg_id_t >(); shard_info.state = static_cast< ShardInfo::State >(shard_json["shard_info"]["state"].get< int >()); + shard_info.lsn = shard_json["shard_info"]["lsn"].get< uint64_t >(); shard_info.created_time = shard_json["shard_info"]["created_time"].get< uint64_t >(); shard_info.last_modified_time = shard_json["shard_info"]["modified_time"].get< uint64_t >(); shard_info.available_capacity_bytes = shard_json["shard_info"]["available_capacity"].get< uint64_t >(); @@ -116,6 +118,7 @@ ShardManager::AsyncResult< ShardInfo > HSHomeObject::_create_shard(pg_id_t pg_ow sb->info = ShardInfo{.id = new_shard_id, .placement_group = pg_owner, .state = ShardInfo::State::OPEN, + .lsn = 0, .created_time = create_time, .last_modified_time = create_time, .available_capacity_bytes = size_bytes, @@ -313,7 +316,8 @@ void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& h, hom switch (header->msg_type) { case ReplicationMessageType::CREATE_SHARD_MSG: { auto sb = r_cast< shard_info_superblk const* >(h.cbytes() + sizeof(ReplicationMessageHeader)); - auto const shard_info = sb->info; + auto shard_info = sb->info; + shard_info.lsn = lsn; bool shard_exist = false; { diff --git a/src/lib/homestore_backend/index_kv.cpp b/src/lib/homestore_backend/index_kv.cpp index a4fd158b..f3f76ce1 100644 --- a/src/lib/homestore_backend/index_kv.cpp +++ b/src/lib/homestore_backend/index_kv.cpp @@ -111,4 +111,41 @@ void HSHomeObject::print_btree_index(pg_id_t pg_id) { index_table->dump_tree_to_file(); } +shared< BlobIndexTable > HSHomeObject::get_index_table(pg_id_t pg_id) { + std::shared_lock lock_guard(_pg_lock); + auto iter = _pg_map.find(pg_id); + RELEASE_ASSERT(iter != _pg_map.end(), "PG not found"); + auto hs_pg = static_cast< HSHomeObject::HS_PG* >(iter->second.get()); + RELEASE_ASSERT(hs_pg->index_table_ != nullptr, "Index table not found for PG"); + return hs_pg->index_table_; +} + +BlobManager::Result< std::vector< HSHomeObject::BlobInfo > > +HSHomeObject::query_blobs_in_shard(pg_id_t pg_id, uint64_t cur_shard_seq_num, blob_id_t start_blob_id, + uint64_t max_num_in_batch) { + // Query all blobs from start_blob_id to the maximum blob_id value. + std::vector< std::pair< BlobRouteKey, BlobRouteValue > > out_vector; + auto shard_id = make_new_shard_id(pg_id, cur_shard_seq_num); + auto start_key = BlobRouteKey{BlobRoute{shard_id, start_blob_id}}; + auto end_key = BlobRouteKey{BlobRoute{shard_id, std::numeric_limits< uint64_t >::max()}}; + homestore::BtreeQueryRequest< BlobRouteKey > query_req{ + homestore::BtreeKeyRange< BlobRouteKey >{std::move(start_key), true /* inclusive */, std::move(end_key), + true /* inclusive */}, + homestore::BtreeQueryType::SWEEP_NON_INTRUSIVE_PAGINATION_QUERY, static_cast< uint32_t >(max_num_in_batch)}; + auto index_table = get_index_table(pg_id); + auto const ret = index_table->query(query_req, out_vector); + if (ret != homestore::btree_status_t::success && ret != homestore::btree_status_t::has_more) { + LOGE("Failed to query blobs in index table for ret={} shard={} start_blob_id={}", ret, shard_id, start_blob_id); + return folly::makeUnexpected(BlobError::INDEX_ERROR); + } + + std::vector< BlobInfo > blob_info_vec; + blob_info_vec.reserve(out_vector.size()); + for (auto& [r, v] : out_vector) { + blob_info_vec.push_back(BlobInfo{r.key().shard, r.key().blob, v.pbas()}); + } + + return blob_info_vec; +} + } // namespace homeobject diff --git a/src/lib/homestore_backend/pg_blob_iterator.cpp b/src/lib/homestore_backend/pg_blob_iterator.cpp new file mode 100644 index 00000000..5d543bd5 --- /dev/null +++ b/src/lib/homestore_backend/pg_blob_iterator.cpp @@ -0,0 +1,128 @@ +#include "hs_homeobject.hpp" +#include +#include +#include +#include "generated/resync_pg_shard_generated.h" +#include "generated/resync_blob_data_generated.h" + +namespace homeobject { + +HSHomeObject::PGBlobIterator::PGBlobIterator(HSHomeObject& home_obj, homestore::group_id_t group_id, + uint64_t upto_lsn) : + home_obj_(home_obj), group_id_(group_id) { + auto pg = get_pg_metadata(); + pg_id_ = pg->pg_info_.id; + repl_dev_ = static_cast< HS_PG* >(pg)->repl_dev_; + if (upto_lsn != 0) { + // Iterate all shards and its blob which have lsn <= upto_lsn + for (auto& shard : pg->shards_) { + auto sequence_num = home_obj_.get_sequence_num_from_shard_id(shard->info.id); + if (shard->info.lsn <= upto_lsn) { max_shard_seq_num_ = std::max(max_shard_seq_num_, sequence_num); } + } + } else { + max_shard_seq_num_ = pg->shard_sequence_num_; + } +} + +PG* HSHomeObject::PGBlobIterator::get_pg_metadata() { + std::scoped_lock lock_guard(home_obj_._pg_lock); + auto iter = home_obj_._pg_map.begin(); + for (; iter != home_obj_._pg_map.end(); iter++) { + if (iter->second->pg_info_.replica_set_uuid == group_id_) { break; } + } + + RELEASE_ASSERT(iter != home_obj_._pg_map.end(), "PG not found replica_set_uuid={}", + boost::uuids::to_string(group_id_)); + return iter->second.get(); +} + +void HSHomeObject::PGBlobIterator::create_pg_shard_snapshot_data(sisl::io_blob_safe& meta_blob) { + auto pg = get_pg_metadata(); + auto& pg_info = pg->pg_info_; + auto& pg_shards = pg->shards_; + + flatbuffers::FlatBufferBuilder builder; + std::vector< std::uint8_t > uuid(pg_info.replica_set_uuid.size()); + std::copy(pg_info.replica_set_uuid.begin(), pg_info.replica_set_uuid.end(), uuid.begin()); + auto pg_entry = CreatePGInfoEntry(builder, pg_info.id, 0 /* priority*/, builder.CreateVector(uuid)); + + std::vector< ::flatbuffers::Offset< ShardInfoEntry > > shard_entries; + for (auto& shard : pg_shards) { + auto& shard_info = shard->info; + // TODO add lsn. + shard_entries.push_back(CreateShardInfoEntry( + builder, static_cast< uint8_t >(shard_info.state), shard_info.placement_group, shard_info.id, + shard_info.total_capacity_bytes, shard_info.created_time, shard_info.last_modified_time)); + } + builder.FinishSizePrefixed(CreateResyncPGShardInfo(builder, pg_entry, builder.CreateVector(shard_entries))); + meta_blob = sisl::io_blob_safe{builder.GetSize()}; + std::memcpy(meta_blob.bytes(), builder.GetBufferPointer(), builder.GetSize()); +} + +int64_t HSHomeObject::PGBlobIterator::get_next_blobs(uint64_t max_num_blobs_in_batch, uint64_t max_batch_size_bytes, + std::vector< BlobInfoData >& blob_data_vec, bool& end_of_shard) { + end_of_shard = false; + uint64_t total_bytes = 0, num_blobs = 0; + while (true) { + auto r = home_obj_.query_blobs_in_shard(pg_id_, cur_shard_seq_num_, cur_blob_id_ + 1, max_num_blobs_in_batch); + if (!r) { return -1; } + auto& blob_info_vec = r.value(); + for (auto& info : blob_info_vec) { + if (info.pbas == HSHomeObject::tombstone_pbas) { + // Skip deleted blobs + continue; + } + auto result = home_obj_ + ._get_blob_data(repl_dev_, info.shard_id, info.blob_id, 0 /*start_offset*/, + 0 /* req_len */, info.pbas) + .get(); + if (!result) { + LOGE("Failed to retrieve blob for shard={} blob={} pbas={}", info.shard_id, info.blob_id, + info.pbas.to_string(), result.error()); + return -1; + } + + auto& blob = result.value(); + num_blobs++; + total_bytes += blob.body.size() + blob.user_key.size(); + if (num_blobs > max_num_blobs_in_batch || total_bytes > max_batch_size_bytes) { return 0; } + + BlobInfoData blob_data{{info.shard_id, info.blob_id, std::move(info.pbas)}, std::move(blob)}; + blob_data_vec.push_back(std::move(blob_data)); + cur_blob_id_ = info.blob_id; + } + + if (blob_info_vec.empty()) { + // We read all the blobs in the current shard + end_of_shard = true; + cur_shard_seq_num_++; + cur_blob_id_ = -1; + break; + } + } + + return 0; +} + +void HSHomeObject::PGBlobIterator::create_blobs_snapshot_data(std::vector< BlobInfoData >& blob_data_vec, + sisl::io_blob_safe& data_blob, bool end_of_shard) { + std::vector< ::flatbuffers::Offset< BlobData > > blob_entries; + flatbuffers::FlatBufferBuilder builder; + for (auto& b : blob_data_vec) { + blob_entries.push_back( + CreateBlobData(builder, b.shard_id, b.blob_id, b.blob.user_key.size(), b.blob.body.size(), + builder.CreateVector(r_cast< uint8_t* >(const_cast< char* >(b.blob.user_key.data())), + b.blob.user_key.size()), + builder.CreateVector(b.blob.body.bytes(), b.blob.body.size()))); + } + builder.FinishSizePrefixed( + CreateResyncBlobDataBatch(builder, builder.CreateVector(blob_entries), end_of_shard /* end_of_batch */)); + data_blob = sisl::io_blob_safe{builder.GetSize()}; + std::memcpy(data_blob.bytes(), builder.GetBufferPointer(), builder.GetSize()); +} + +bool HSHomeObject::PGBlobIterator::end_of_scan() const { + return max_shard_seq_num_ == 0 || cur_shard_seq_num_ > max_shard_seq_num_; +} + +} // namespace homeobject \ No newline at end of file diff --git a/src/lib/homestore_backend/replication_state_machine.cpp b/src/lib/homestore_backend/replication_state_machine.cpp index fbe629ef..8e471dff 100644 --- a/src/lib/homestore_backend/replication_state_machine.cpp +++ b/src/lib/homestore_backend/replication_state_machine.cpp @@ -1,5 +1,10 @@ #include "replication_message.hpp" #include "replication_state_machine.hpp" +#include "hs_backend_config.hpp" +#include "lib/blob_route.hpp" + +#include "generated/resync_pg_shard_generated.h" +#include "generated/resync_blob_data_generated.h" namespace homeobject { void ReplicationStateMachine::on_commit(int64_t lsn, const sisl::blob& header, const sisl::blob& key, @@ -159,30 +164,148 @@ ReplicationStateMachine::create_snapshot(std::shared_ptr< homestore::snapshot_co auto ctx = dynamic_pointer_cast< homestore::nuraft_snapshot_context >(context); auto s = ctx->nuraft_snapshot(); LOGI("create snapshot, last_log_idx_: {} , last_log_term_: {}", s->get_last_log_idx(), s->get_last_log_term()); + m_snapshot_context = context; return folly::makeSemiFuture< homestore::ReplResult< folly::Unit > >(folly::Unit{}); } bool ReplicationStateMachine::apply_snapshot(std::shared_ptr< homestore::snapshot_context > context) { - LOGE("apply_snapshot not implemented"); - return false; + // TODO persist snapshot + m_snapshot_context = context; + return true; } -std::shared_ptr< homestore::snapshot_context > ReplicationStateMachine::last_snapshot() { - LOGE("last_snapshot not implemented"); - return nullptr; -} +std::shared_ptr< homestore::snapshot_context > ReplicationStateMachine::last_snapshot() { return m_snapshot_context; } int ReplicationStateMachine::read_snapshot_data(std::shared_ptr< homestore::snapshot_context > context, std::shared_ptr< homestore::snapshot_data > snp_data) { - LOGE("read_snapshot_data not implemented"); - return -1; + HSHomeObject::PGBlobIterator* pg_iter = nullptr; + auto s = dynamic_pointer_cast< homestore::nuraft_snapshot_context >(context)->nuraft_snapshot(); + + if (snp_data->user_ctx == nullptr) { + // Create the pg blob iterator for the first time. + pg_iter = new HSHomeObject::PGBlobIterator(*home_object_, repl_dev()->group_id()); + snp_data->user_ctx = (void*)pg_iter; + } else { + pg_iter = r_cast< HSHomeObject::PGBlobIterator* >(snp_data->user_ctx); + } + + // Nuraft uses obj_id as a way to track the state of the snapshot read and write. + // Nuraft starts with obj_id == 0 as first message always, leader send all the shards and + // PG metadata as response. Follower responds with next obj_id it expects. obj_id's are + // encoded in the form of obj_id = shard_seq_num(6 bytes) | batch_number(2 bytes) + // Leader starts with shard sequence number 1 and read upto maximum size of data + // and send to follower in a batch. Once all blob's are send in a shard, + // leader notifies the follower by setting end_of_batch in the payload. Follower + // moves to the next shard by incrementing shard_seq_num and reset batch number to 0. + // Batch number is used to identify which batch in the current shard sequence number. + // We use pg blob iterator to go over all the blobs in all the shards in that PG. + // Once all the blobs are finished sending, end_of_scan will be true. + int64_t obj_id = snp_data->offset; + uint64_t shard_seq_num = obj_id >> 16; + uint64_t batch_number = obj_id & 0xFFFF; + auto log_str = fmt::format("group={} term={} lsn={} shard_seq={} batch_num={} size={}", + boost::uuids::to_string(repl_dev()->group_id()), s->get_last_log_term(), + s->get_last_log_idx(), shard_seq_num, batch_number, snp_data->blob.size()); + if (obj_id == 0) { + // obj_id = 0 means its the first message and we send the pg and its shards metadata. + cur_snapshot_batch_num = 0; + pg_iter->create_pg_shard_snapshot_data(snp_data->blob); + RELEASE_ASSERT(snp_data->blob.size() > 0, "Empty metadata snapshot data"); + LOGD("Read snapshot data first message {}", log_str); + return 0; + } + + if (shard_seq_num != pg_iter->cur_shard_seq_num_ || batch_number != cur_snapshot_batch_num) { + // Follower can request for the old shard again. This may be due to error in writing or + // it crashed and want to continue from where it left. + LOGW("Shard or batch number not same as in iterator shard={}/{} batch_num={}/{}", shard_seq_num, + pg_iter->cur_shard_seq_num_, batch_number, cur_snapshot_batch_num); + if (shard_seq_num > pg_iter->cur_shard_seq_num_ || batch_number > cur_snapshot_batch_num) { + // If we retrieve some invalid values, return error. + return -1; + } + + // Use the shard sequence number provided by the follower and we restart the batch. + pg_iter->cur_shard_seq_num_ = shard_seq_num; + pg_iter->cur_blob_id_ = -1; + cur_snapshot_batch_num = 0; + } + + if (pg_iter->end_of_scan()) { + // No more shards to read, baseline resync is finished after this. + snp_data->is_last_obj = true; + LOGD("Read snapshot reached is_last_obj true {}", log_str); + return 0; + } + + // Get next set of blobs in the batch. + std::vector< HSHomeObject::BlobInfoData > blob_data_vec; + bool end_of_shard; + auto result = pg_iter->get_next_blobs(HS_BACKEND_DYNAMIC_CONFIG(max_num_blobs_in_snapshot_batch), + HS_BACKEND_DYNAMIC_CONFIG(max_snapshot_batch_size_mb) * 1024 * 1024, + blob_data_vec, end_of_shard); + if (result != 0) { + LOGE("Failed to get next blobs in snapshot read result={} {}", result, log_str); + return -1; + } + + // Create snapshot flatbuffer data. + pg_iter->create_blobs_snapshot_data(blob_data_vec, snp_data->blob, end_of_shard); + if (end_of_shard) { + cur_snapshot_batch_num = 0; + } else { + cur_snapshot_batch_num++; + } + + LOGT("Read snapshot num_blobs={} end_of_shard={} {}", blob_data_vec.size(), end_of_shard, log_str); + return 0; } void ReplicationStateMachine::write_snapshot_data(std::shared_ptr< homestore::snapshot_context > context, std::shared_ptr< homestore::snapshot_data > snp_data) { - LOGE("write_snapshot_data not implemented"); + RELEASE_ASSERT(context != nullptr, "Context null"); + RELEASE_ASSERT(snp_data != nullptr, "Snapshot data null"); + auto s = dynamic_pointer_cast< homestore::nuraft_snapshot_context >(context)->nuraft_snapshot(); + int64_t obj_id = snp_data->offset; + uint64_t shard_seq_num = obj_id >> 16; + uint64_t batch_number = obj_id & 0xFFFF; + + auto log_str = fmt::format("group={} term={} lsn={} shard_seq={} batch_num={} size={}", + boost::uuids::to_string(repl_dev()->group_id()), s->get_last_log_term(), + s->get_last_log_idx(), shard_seq_num, batch_number, snp_data->blob.size()); + + if (snp_data->is_last_obj) { + LOGD("Write snapshot reached is_last_obj true {}", log_str); + return; + } + + if (obj_id == 0) { + snp_data->offset = 1 << 16; + // TODO add metadata. + return; + } + + auto snp = GetSizePrefixedResyncBlobDataBatch(snp_data->blob.bytes()); + // TODO Add blob puts + + if (snp->end_of_batch()) { + snp_data->offset = (shard_seq_num + 1) << 16; + } else { + snp_data->offset = (shard_seq_num << 16) | (batch_number + 1); + } + + LOGT("Read snapshot num_blobs={} end_of_batch={} {}", snp->data_array()->size(), snp->end_of_batch(), log_str); } -void ReplicationStateMachine::free_user_snp_ctx(void*& user_snp_ctx) { LOGE("free_user_snp_ctx not implemented"); } +void ReplicationStateMachine::free_user_snp_ctx(void*& user_snp_ctx) { + if (user_snp_ctx) { + LOGE("User snapshot context null group={}", boost::uuids::to_string(repl_dev()->group_id())); + return; + } + + auto pg_iter = r_cast< HSHomeObject::PGBlobIterator* >(user_snp_ctx); + LOGD("Freeing snapshot iterator pg_id={} group={}", pg_iter->pg_id_, boost::uuids::to_string(pg_iter->group_id_)); + delete pg_iter; +} } // namespace homeobject diff --git a/src/lib/homestore_backend/replication_state_machine.hpp b/src/lib/homestore_backend/replication_state_machine.hpp index dad9ee7f..6a7e446c 100644 --- a/src/lib/homestore_backend/replication_state_machine.hpp +++ b/src/lib/homestore_backend/replication_state_machine.hpp @@ -184,6 +184,8 @@ class ReplicationStateMachine : public homestore::ReplDevListener { private: HSHomeObject* home_object_{nullptr}; + uint64_t cur_snapshot_batch_num{0}; + std::shared_ptr< homestore::snapshot_context > m_snapshot_context; }; } // namespace homeobject diff --git a/src/lib/homestore_backend/tests/hs_blob_tests.cpp b/src/lib/homestore_backend/tests/hs_blob_tests.cpp index 78922791..ba78b4d5 100644 --- a/src/lib/homestore_backend/tests/hs_blob_tests.cpp +++ b/src/lib/homestore_backend/tests/hs_blob_tests.cpp @@ -1,4 +1,7 @@ #include "homeobj_fixture.hpp" +#include "lib/homestore_backend/index_kv.hpp" +#include "generated/resync_pg_shard_generated.h" +#include "generated/resync_blob_data_generated.h" TEST(HomeObject, BasicEquivalence) { auto app = std::make_shared< FixtureApp >(); @@ -181,3 +184,74 @@ TEST_F(HomeObjectFixture, SealShardWithRestart) { ASSERT_EQ(b.error(), BlobError::SEALED_SHARD); LOGINFO("Put blob {}", b.error()); } + +TEST_F(HomeObjectFixture, PGBlobIterator) { + uint64_t num_shards_per_pg = 3; + uint64_t num_blobs_per_shard = 5; + std::vector< std::pair< pg_id_t, shard_id_t > > pg_shard_id_vec; + blob_map_t blob_map; + + // Create blob size in range (1, 16kb) and user key in range (1, 1kb) + const uint32_t max_blob_size = 16 * 1024; + + create_pg(1 /* pg_id */); + for (uint64_t j = 0; j < num_shards_per_pg; j++) { + auto shard = _obj_inst->shard_manager()->create_shard(1 /* pg_id */, 64 * Mi).get(); + ASSERT_TRUE(!!shard); + pg_shard_id_vec.emplace_back(1, shard->id); + LOGINFO("pg {} shard {}", 1, shard->id); + } + + // Put blob for all shards in all pg's. + put_blob(blob_map, pg_shard_id_vec, num_blobs_per_shard, max_blob_size); + + auto ho = dynamic_cast< homeobject::HSHomeObject* >(_obj_inst.get()); + PG* pg1; + { + auto lg = std::shared_lock(ho->_pg_lock); + auto iter = ho->_pg_map.find(1); + ASSERT_TRUE(iter != ho->_pg_map.end()); + pg1 = iter->second.get(); + } + + auto pg1_iter = std::make_shared< homeobject::HSHomeObject::PGBlobIterator >(*ho, pg1->pg_info_.replica_set_uuid); + ASSERT_EQ(pg1_iter->end_of_scan(), false); + + // Verify PG shard meta data. + sisl::io_blob_safe meta_blob; + pg1_iter->create_pg_shard_snapshot_data(meta_blob); + ASSERT_TRUE(meta_blob.size() > 0); + + auto pg_req = GetSizePrefixedResyncPGShardInfo(meta_blob.bytes()); + ASSERT_EQ(pg_req->pg()->pg_id(), pg1->pg_info_.id); + auto u1 = pg_req->pg()->replica_set_uuid(); + auto u2 = pg1->pg_info_.replica_set_uuid; + ASSERT_EQ(std::string(u1->begin(), u1->end()), std::string(u2.begin(), u2.end())); + + // Verify get blobs for pg. + uint64_t max_num_blobs_in_batch = 3, max_batch_size_bytes = 128 * Mi; + std::vector< HSHomeObject::BlobInfoData > blob_data_vec; + while (!pg1_iter->end_of_scan()) { + std::vector< HSHomeObject::BlobInfoData > vec; + bool end_of_shard; + auto result = pg1_iter->get_next_blobs(max_num_blobs_in_batch, max_batch_size_bytes, vec, end_of_shard); + ASSERT_EQ(result, 0); + for (auto& v : vec) { + blob_data_vec.push_back(std::move(v)); + } + } + + ASSERT_EQ(blob_data_vec.size(), num_shards_per_pg * num_blobs_per_shard); + for (auto& b : blob_data_vec) { + auto g = _obj_inst->blob_manager()->get(b.shard_id, b.blob_id, 0, 0).get(); + ASSERT_TRUE(!!g); + auto result = std::move(g.value()); + LOGINFO("Get blob pg {} shard {} blob {} len {} data {}", 1, b.shard_id, b.blob_id, b.blob.body.size(), + hex_bytes(result.body.cbytes(), 5)); + EXPECT_EQ(result.body.size(), b.blob.body.size()); + EXPECT_EQ(std::memcmp(result.body.bytes(), b.blob.body.cbytes(), result.body.size()), 0); + EXPECT_EQ(result.user_key.size(), b.blob.user_key.size()); + EXPECT_EQ(result.user_key, b.blob.user_key); + EXPECT_EQ(result.object_off, b.blob.object_off); + } +} diff --git a/src/lib/homestore_backend/tests/hs_shard_tests.cpp b/src/lib/homestore_backend/tests/hs_shard_tests.cpp index b2d86dca..9598fab1 100644 --- a/src/lib/homestore_backend/tests/hs_shard_tests.cpp +++ b/src/lib/homestore_backend/tests/hs_shard_tests.cpp @@ -80,6 +80,7 @@ TEST_F(TestFixture, MockSealShard) { j["shard_info"]["shard_id_t"] = shard_info.id; j["shard_info"]["pg_id_t"] = shard_info.placement_group; j["shard_info"]["state"] = shard_info.state; + j["shard_info"]["lsn"] = shard_info.lsn; j["shard_info"]["created_time"] = shard_info.created_time; j["shard_info"]["modified_time"] = shard_info.last_modified_time; j["shard_info"]["total_capacity"] = shard_info.total_capacity_bytes; @@ -135,6 +136,7 @@ class ShardManagerTestingRecovery : public ::testing::Test { EXPECT_EQ(lhs.id, rhs.id); EXPECT_EQ(lhs.placement_group, rhs.placement_group); EXPECT_EQ(lhs.state, rhs.state); + EXPECT_EQ(lhs.lsn, rhs.lsn); EXPECT_EQ(lhs.created_time, rhs.created_time); EXPECT_EQ(lhs.last_modified_time, rhs.last_modified_time); EXPECT_EQ(lhs.available_capacity_bytes, rhs.available_capacity_bytes); diff --git a/src/lib/memory_backend/mem_shard_manager.cpp b/src/lib/memory_backend/mem_shard_manager.cpp index 47936678..7546f998 100644 --- a/src/lib/memory_backend/mem_shard_manager.cpp +++ b/src/lib/memory_backend/mem_shard_manager.cpp @@ -8,7 +8,7 @@ uint64_t ShardManager::max_shard_size() { return Gi; } ShardManager::AsyncResult< ShardInfo > MemoryHomeObject::_create_shard(pg_id_t pg_owner, uint64_t size_bytes) { auto const now = get_current_timestamp(); - auto info = ShardInfo(0ull, pg_owner, ShardInfo::State::OPEN, now, now, size_bytes, size_bytes, 0); + auto info = ShardInfo(0ull, pg_owner, ShardInfo::State::OPEN, 0, now, now, size_bytes, size_bytes, 0); { auto lg = std::scoped_lock(_pg_lock, _shard_lock); auto pg_it = _pg_map.find(pg_owner); diff --git a/src/lib/pg_manager.cpp b/src/lib/pg_manager.cpp index 4bb3d43d..f38410f3 100644 --- a/src/lib/pg_manager.cpp +++ b/src/lib/pg_manager.cpp @@ -16,7 +16,7 @@ PGManager::NullAsyncResult HomeObjectImpl::create_pg(PGInfo&& pg_info) { if (member.priority > 0) saw_leader = true; peers.insert(member.id); } - if (!saw_ourself || !saw_leader) return folly::makeUnexpected(PGError::INVALID_ARG); + if (!saw_ourself || !saw_leader) { return folly::makeUnexpected(PGError::INVALID_ARG); } return _create_pg(std::move(pg_info), peers); }