diff --git a/src/include/homeobject/shard_manager.hpp b/src/include/homeobject/shard_manager.hpp index 812ebe9f..89095f42 100644 --- a/src/include/homeobject/shard_manager.hpp +++ b/src/include/homeobject/shard_manager.hpp @@ -22,7 +22,7 @@ struct ShardInfo { shard_id_t id; pg_id_t placement_group; State state; - uint64_t lsn; + uint64_t lsn; // created_lsn uint64_t created_time; uint64_t last_modified_time; uint64_t available_capacity_bytes; diff --git a/src/lib/homestore_backend/hs_backend_config.fbs b/src/lib/homestore_backend/hs_backend_config.fbs index db55a66f..ad917da7 100644 --- a/src/lib/homestore_backend/hs_backend_config.fbs +++ b/src/lib/homestore_backend/hs_backend_config.fbs @@ -10,9 +10,6 @@ table HSBackendSettings { // timer thread freq in us backend_timer_us: uint64 = 60000000 (hotswap); - // Maximum number of blobs in a snapshot batch - max_num_blobs_in_snapshot_batch: uint64 = 1024 (hotswap); - // Maximum size of a snapshot batch max_snapshot_batch_size_mb: uint64 = 128 (hotswap); } diff --git a/src/lib/homestore_backend/hs_homeobject.hpp b/src/lib/homestore_backend/hs_homeobject.hpp index 29bab6f9..ef8ba328 100644 --- a/src/lib/homestore_backend/hs_homeobject.hpp +++ b/src/lib/homestore_backend/hs_homeobject.hpp @@ -289,27 +289,30 @@ class HSHomeObject : public HomeObjectImpl { struct PGBlobIterator { PGBlobIterator(HSHomeObject& home_obj, homestore::group_id_t group_id, uint64_t upto_lsn = 0); PG* get_pg_metadata(); - int64_t get_next_blobs(uint64_t max_num_blobs_in_batch, uint64_t max_batch_size_bytes, - std::vector< HSHomeObject::BlobInfoData >& blob_vec, bool& end_of_shard); - void create_pg_shard_snapshot_data(sisl::io_blob_safe& meta_blob); - void create_pg_snapshot_data(sisl::io_blob_safe& meta_blob); - void create_shard_snapshot_data(sisl::io_blob_safe& meta_blob); - void create_blobs_snapshot_data(std::vector< HSHomeObject::BlobInfoData >& blob_vec, - sisl::io_blob_safe& data_blob, bool end_of_shard); + bool updateCursor(objId id); + bool generate_shard_blob_list(); + BlobManager::AsyncResult< sisl::io_blob_safe > load_blob_data(const BlobInfo& blob_info, ResyncBlobState& state); + bool create_pg_snapshot_data(sisl::io_blob_safe& meta_blob); + bool create_shard_snapshot_data(sisl::io_blob_safe& meta_blob); + bool create_blobs_snapshot_data(sisl::io_blob_safe& data_blob); + void pack_resync_message(sisl::io_blob_safe& meta_blob, SyncMessageType type, sisl::io_blob_safe& payload); bool end_of_scan() const; - uint64_t snp_start_lsn{0}; std::vector shard_list{0}; - shard_id_t cur_shard_seq_num_{1}; + + shard_id_t cur_shard_seq_num{1}; + uint64_t cur_shard_idx{0}; std::vector cur_blob_list{0}; int64_t last_end_blob_idx{-1}; - int64_t last_batch_size{0}; - uint64_t max_shard_seq_num_{0}; - uint64_t cur_snapshot_batch_num{0}; + uint64_t cur_batch_num{0}; + uint64_t cur_batch_blob_count{0}; + HSHomeObject& home_obj_; homestore::group_id_t group_id_; + uint64_t snp_start_lsn; pg_id_t pg_id_; shared< homestore::ReplDev > repl_dev_; + uint64_t max_batch_size; }; // SnapshotReceiverContext is the context used in follower side snapshot receiving. [drafting] The functions is not the final version. diff --git a/src/lib/homestore_backend/pg_blob_iterator.cpp b/src/lib/homestore_backend/pg_blob_iterator.cpp index 02aba46e..fc7acf9f 100644 --- a/src/lib/homestore_backend/pg_blob_iterator.cpp +++ b/src/lib/homestore_backend/pg_blob_iterator.cpp @@ -5,23 +5,69 @@ #include "generated/resync_blob_data_generated.h" #include "generated/resync_pg_data_generated.h" #include "generated/resync_shard_data_generated.h" +#include "replication_message.hpp" +#include "lib/homeobject_impl.hpp" +#include "hs_backend_config.hpp" namespace homeobject { HSHomeObject::PGBlobIterator::PGBlobIterator(HSHomeObject& home_obj, homestore::group_id_t group_id, uint64_t upto_lsn) : - home_obj_(home_obj), group_id_(group_id) { + home_obj_(home_obj), + group_id_(group_id), + snp_start_lsn(upto_lsn) { auto pg = get_pg_metadata(); pg_id_ = pg->pg_info_.id; repl_dev_ = static_cast< HS_PG* >(pg)->repl_dev_; + max_batch_size = HS_BACKEND_DYNAMIC_CONFIG(max_snapshot_batch_size_mb) * Mi; + if (max_batch_size == 0) { max_batch_size = DEFAULT_MAX_BATCH_SIZE_MB * Mi; } + if (upto_lsn != 0) { // Iterate all shards and its blob which have lsn <= upto_lsn - for (auto& shard : pg->shards_) { - auto sequence_num = home_obj_.get_sequence_num_from_shard_id(shard->info.id); - if (shard->info.lsn <= upto_lsn) { max_shard_seq_num_ = std::max(max_shard_seq_num_, sequence_num); } - } - } else { - max_shard_seq_num_ = pg->shard_sequence_num_; + for (auto& shard : pg->shards_) { if (shard->info.lsn <= upto_lsn) { shard_list.emplace_back(shard->info); } } + //sort shard list by + std::sort(shard_list.begin(), shard_list.end(), [](const ShardInfo& a, const ShardInfo& b) { + //TODO compare vchunk id + return a.lsn < b.lsn; + }); + } +} + +//result represents if the objId is valid and the cursors are updated +bool HSHomeObject::PGBlobIterator::updateCursor(objId id) { + bool next_shard = false; + auto target_shard_id = make_new_shard_id(pg_id_, id.shard_seq_num); + auto cur_shard_id = shard_list[cur_shard_idx].id; + //check shard id + if (cur_shard_idx < shard_list.size() - 1 && target_shard_id == shard_list[cur_shard_idx + 1].id) { + next_shard = true; + } + if (target_shard_id != cur_shard_id && !next_shard) { + shard_id_t next = cur_shard_idx == shard_list.size() - 1 ? 0 : shard_list[cur_shard_idx + 1].id; + LOGW( + "invalid asked shardId in snapshot read, required shard_seq_num={}, current shard_seq_num={}, current shard_idx={}, shard_list size={}, next shard in shard_list={}", + id.shard_seq_num, cur_shard_seq_num, cur_shard_idx, shard_list.size(), next); + return false; + } + + // check batch id + if (next_shard && id.shard_seq_num == 0) { + cur_shard_idx++; + cur_batch_num = 0; + last_end_blob_idx = -1; + cur_shard_seq_num = shard_list[cur_shard_idx].id & 0xFFFFFFFFFFFF; + return true; + } + //resend batch + if (!next_shard && id.batch_id == cur_batch_num) { return true; } + //next batch + if (!next_shard && id.batch_id == cur_batch_num + 1) { + cur_batch_num = id.batch_id; + last_end_blob_idx = last_end_blob_idx + cur_batch_blob_count; + cur_batch_blob_count = 0; + return true; } + + return false; } PG* HSHomeObject::PGBlobIterator::get_pg_metadata() { @@ -36,102 +82,178 @@ PG* HSHomeObject::PGBlobIterator::get_pg_metadata() { return iter->second.get(); } -void HSHomeObject::PGBlobIterator::create_pg_shard_snapshot_data(sisl::io_blob_safe& meta_blob) { - // auto pg = get_pg_metadata(); - // auto& pg_info = pg->pg_info_; - // auto& pg_shards = pg->shards_; - // - // flatbuffers::FlatBufferBuilder builder; - // std::vector< std::uint8_t > uuid(pg_info.replica_set_uuid.size()); - // std::copy(pg_info.replica_set_uuid.begin(), pg_info.replica_set_uuid.end(), uuid.begin()); - // auto pg_entry = CreatePGInfoEntry(builder, pg_info.id, 0 /* priority*/, builder.CreateVector(uuid)); - // - // std::vector< ::flatbuffers::Offset< ShardInfoEntry > > shard_entries; - // for (auto& shard : pg_shards) { - // auto& shard_info = shard->info; - // // TODO add lsn. - // shard_entries.push_back(CreateShardInfoEntry( - // builder, static_cast< uint8_t >(shard_info.state), shard_info.placement_group, shard_info.id, - // shard_info.total_capacity_bytes, shard_info.created_time, shard_info.last_modified_time)); - // } - // builder.FinishSizePrefixed(CreateResyncPGShardInfo(builder, pg_entry, builder.CreateVector(shard_entries))); - // meta_blob = sisl::io_blob_safe{builder.GetSize()}; - // std::memcpy(meta_blob.bytes(), builder.GetBufferPointer(), builder.GetSize()); +bool HSHomeObject::PGBlobIterator::create_pg_snapshot_data(sisl::io_blob_safe& meta_blob) { + auto pg = home_obj_._pg_map[pg_id_].get(); + if (pg == nullptr) { + LOGE("PG not found in pg_map, pg_id={}", pg_id_); + return false; + } + auto pg_info = pg->pg_info_; + + flatbuffers::FlatBufferBuilder builder; + std::vector< std::uint8_t > uuid(pg_info.replica_set_uuid.begin(), pg_info.replica_set_uuid.end()); + + std::vector< ::flatbuffers::Offset< homeobject::Member > > members(pg_info.members.size()); + for (auto& member : pg_info.members) { + auto id = std::vector< std::uint8_t >(member.id.begin(), member.id.end()); + members.push_back(CreateMemberDirect(builder, &id, member.name.c_str(), member.priority)); + } + std::vector< uint64_t > shard_ids; + for (auto& shard : pg->shards_) { shard_ids.push_back(shard->info.id); } + + auto pg_entry = CreateResyncPGMetaDataDirect(builder, pg_info.id, &uuid, pg->durable_entities().blob_sequence_num, + pg->shard_sequence_num_, &members, &shard_ids); + builder.FinishSizePrefixed(pg_entry); + auto payload = sisl::io_blob_safe{builder.GetSize()}; + std::memcpy(payload.bytes(), builder.GetBufferPointer(), builder.GetSize()); + + pack_resync_message(meta_blob, SyncMessageType::PG_META, payload); + return true; } -void HSHomeObject::PGBlobIterator::create_pg_snapshot_data(sisl::io_blob_safe& meta_blob) { - //TODO +bool HSHomeObject::PGBlobIterator::generate_shard_blob_list() { + //TODO do we need pagination query? + auto r = home_obj_.query_blobs_in_shard(pg_id_, cur_shard_seq_num, 0, UINT64_MAX); + if (!r) { return false; } + cur_blob_list = r.value(); + return true; } -void HSHomeObject::PGBlobIterator::create_shard_snapshot_data(sisl::io_blob_safe& meta_blob) { - //TODO +bool HSHomeObject::PGBlobIterator::create_shard_snapshot_data(sisl::io_blob_safe& meta_blob) { + auto shard = shard_list[cur_shard_idx]; + flatbuffers::FlatBufferBuilder builder; + + //TODO fill vchunk + auto shard_entry = CreateResyncShardMetaData(builder, shard.id, pg_id_, uint8_t(shard.state), shard.lsn, + shard.created_time, shard.last_modified_time, + shard.total_capacity_bytes, 0); + + builder.FinishSizePrefixed(shard_entry); + auto payload = sisl::io_blob_safe{builder.GetSize()}; + std::memcpy(payload.bytes(), builder.GetBufferPointer(), builder.GetSize()); + + pack_resync_message(meta_blob, SyncMessageType::SHARD_META, payload); + return true; } -int64_t HSHomeObject::PGBlobIterator::get_next_blobs(uint64_t max_num_blobs_in_batch, uint64_t max_batch_size_bytes, - std::vector< BlobInfoData >& blob_data_vec, bool& end_of_shard) { - // end_of_shard = false; - // uint64_t total_bytes = 0, num_blobs = 0; - // while (true) { - // auto r = home_obj_.query_blobs_in_shard(pg_id_, cur_shard_seq_num_, cur_blob_id_ + 1, max_num_blobs_in_batch); - // if (!r) { return -1; } - // auto& index_results_vec = r.value(); - // for (auto& info : index_results_vec) { - // if (info.pbas == HSHomeObject::tombstone_pbas) { - // // Skip deleted blobs - // continue; - // } - // auto result = home_obj_ - // ._get_blob_data(repl_dev_, info.shard_id, info.blob_id, 0 /*start_offset*/, - // 0 /* req_len */, info.pbas) - // .get(); - // if (!result) { - // LOGE("Failed to retrieve blob for shard={} blob={} pbas={}", info.shard_id, info.blob_id, - // info.pbas.to_string(), result.error()); - // return -1; - // } - // - // auto& blob = result.value(); - // num_blobs++; - // total_bytes += blob.body.size() + blob.user_key.size(); - // if (num_blobs > max_num_blobs_in_batch || total_bytes > max_batch_size_bytes) { return 0; } - // - // BlobInfoData blob_data{{info.shard_id, info.blob_id, std::move(info.pbas)}, std::move(blob)}; - // blob_data_vec.push_back(std::move(blob_data)); - // cur_blob_id_ = info.blob_id; - // } - // - // if (index_results_vec.empty()) { - // // We got empty results from index, which means we read - // // all the blobs in the current shard - // end_of_shard = true; - // cur_shard_seq_num_++; - // cur_blob_id_ = -1; - // break; - // } - // } - // - return 0; +BlobManager::AsyncResult< sisl::io_blob_safe > HSHomeObject::PGBlobIterator::load_blob_data( + const BlobInfo& blob_info, ResyncBlobState& state) { + auto shard_id = blob_info.shard_id; + auto blob_id = blob_info.blob_id; + auto blkid = blob_info.pbas; + auto const total_size = blob_info.pbas.blk_count() * repl_dev_->get_blk_size(); + sisl::io_blob_safe read_buf{total_size, io_align}; + + sisl::sg_list sgs; + sgs.size = total_size; + sgs.iovs.emplace_back(iovec{.iov_base = read_buf.bytes(), .iov_len = read_buf.size()}); + + LOGD("Blob get request: shard_id={}, blob_id={}, blkid={}", shard_id, blob_id, blkid.to_string()); + return repl_dev_->async_read(blkid, sgs, total_size) + .thenValue([this, blob_id, shard_id, &state, read_buf = std::move(read_buf)] + (auto&& result) mutable -> BlobManager::AsyncResult< sisl::io_blob_safe > { + if (result) { + LOGE("Failed to get blob, shard_id={}, blob_id={}, err={}", shard_id, blob_id, + result.value()); + return folly::makeUnexpected(BlobError(BlobErrorCode::READ_FAILED)); + } + + BlobHeader const* header = r_cast< BlobHeader const* >(read_buf.cbytes()); + if (!header->valid()) { + LOGE("Invalid header found, shard_id={}, blob_id={}, [header={}]", shard_id, blob_id, + header->to_string()); + return folly::makeUnexpected(BlobError(BlobErrorCode::READ_FAILED)); + } + + if (header->shard_id != shard_id) { + LOGE("Invalid shard_id in header, shard_id={}, blob_id={}, [header={}]", shard_id, + blob_id, header->to_string()); + return folly::makeUnexpected(BlobError(BlobErrorCode::READ_FAILED)); + } + + std::string user_key = header->user_key_size + ? std::string((const char*)(read_buf.bytes() + sizeof(BlobHeader)), + (size_t)header->user_key_size) + : std::string{}; + + uint8_t const* blob_bytes = read_buf.bytes() + header->data_offset; + uint8_t computed_hash[BlobHeader::blob_max_hash_len]{}; + home_obj_.compute_blob_payload_hash(header->hash_algorithm, blob_bytes, header->blob_size, + uintptr_cast(user_key.data()), header->user_key_size, + computed_hash, + BlobHeader::blob_max_hash_len); + if (std::memcmp(computed_hash, header->hash, BlobHeader::blob_max_hash_len) != 0) { + LOGE( + "corrupted blob found, shard_id: {}, blob_id: {}, hash mismatch header [{}] [computed={:np}]", + shard_id, blob_id, header->to_string(), + spdlog::to_hex(computed_hash, computed_hash + BlobHeader::blob_max_hash_len)); + state = ResyncBlobState::CORRUPTED; + } + + LOGD("Blob get success: shard_id={}, blob_id={}", shard_id, blob_id); + return std::move(read_buf); + }); } -void HSHomeObject::PGBlobIterator::create_blobs_snapshot_data(std::vector< BlobInfoData >& blob_data_vec, - sisl::io_blob_safe& data_blob, bool end_of_shard) { - // std::vector< ::flatbuffers::Offset< BlobData > > blob_entries; - // flatbuffers::FlatBufferBuilder builder; - // for (auto& b : blob_data_vec) { - // blob_entries.push_back( - // CreateBlobData(builder, b.shard_id, b.blob_id, b.blob.user_key.size(), b.blob.body.size(), - // builder.CreateVector(r_cast< uint8_t* >(const_cast< char* >(b.blob.user_key.data())), - // b.blob.user_key.size()), - // builder.CreateVector(b.blob.body.bytes(), b.blob.body.size()))); - // } - // builder.FinishSizePrefixed( - // CreateResyncBlobDataBatch(builder, builder.CreateVector(blob_entries), end_of_shard /* end_of_batch */)); - // data_blob = sisl::io_blob_safe{builder.GetSize()}; - // std::memcpy(data_blob.bytes(), builder.GetBufferPointer(), builder.GetSize()); +bool HSHomeObject::PGBlobIterator::create_blobs_snapshot_data(sisl::io_blob_safe& data_blob) { + std::vector< ::flatbuffers::Offset< ResyncBlobData > > blob_entries; + flatbuffers::FlatBufferBuilder builder; + + bool end_of_shard = false; + uint64_t total_bytes = 0; + auto idx = (uint64_t)(last_end_blob_idx + 1); + + while (total_bytes < max_batch_size && idx < cur_blob_list.size()) { + auto info = cur_blob_list[idx]; + ResyncBlobState state = ResyncBlobState::NORMAL; + //handle deleted object + if (info.pbas == tombstone_pbas) { + state = ResyncBlobState::DELETED; + LOGD("Blob is deleted: shard_id={}, blob_id={}, blkid={}", info.shard_id, info.blob_id, + info.pbas.to_string()); + blob_entries.push_back(CreateResyncBlobDataDirect(builder, info.blob_id, (uint8_t)state, nullptr)); + continue; + } + auto result = load_blob_data(info, state).get(); + if (result.hasError() && result.error().code == BlobErrorCode::READ_FAILED) { + LOGW("Failed to retrieve blob for shard={} blob={} pbas={}, err={}", info.shard_id, info.blob_id, + info.pbas.to_string(), result.error()); + //TODO add metrics or events to track this + return false; + } + + auto& blob = result.value(); + std::vector< uint8_t > data(blob.cbytes(), blob.cbytes() + blob.size()); + blob_entries.push_back(CreateResyncBlobDataDirect(builder, info.blob_id, (uint8_t)state, &data)); + total_bytes += blob.size(); + } + idx++; + + if (idx == cur_blob_list.size()) { end_of_shard = true; } + cur_batch_blob_count = blob_entries.size(); + builder.FinishSizePrefixed( + CreateResyncBlobDataBatchDirect(builder, &blob_entries, end_of_shard)); + auto payload = sisl::io_blob_safe{builder.GetSize()}; + std::memcpy(payload.bytes(), builder.GetBufferPointer(), builder.GetSize()); + LOGD("create blobs snapshot data batch: shard_id={}, batch_num={}, total_bytes={}, blob_num={}, end_of_shard={}", + cur_shard_seq_num, cur_batch_num, total_bytes, blob_entries.size(), end_of_shard); + + pack_resync_message(data_blob, SyncMessageType::SHARD_BATCH, payload); + return true; } -bool HSHomeObject::PGBlobIterator::end_of_scan() const { - return max_shard_seq_num_ == 0 || cur_shard_seq_num_ > max_shard_seq_num_; +void HSHomeObject::PGBlobIterator::pack_resync_message(sisl::io_blob_safe& dest_blob, SyncMessageType type, + sisl::io_blob_safe& payload) { + SyncMessageHeader header; + header.msg_type = type; + header.payload_size = payload.size(); + header.payload_crc = crc32_ieee(init_crc32, payload.cbytes(), payload.size()); + header.seal(); + LOGD("Creating resync message in pg[{}] with header={} ", pg_id_, header.to_string()); + + dest_blob = sisl::io_blob_safe{static_cast< unsigned int >(payload.size() + sizeof(SyncMessageHeader))}; + std::memcpy(dest_blob.bytes(), &header, sizeof(SyncMessageHeader)); + std::memcpy(dest_blob.bytes() + sizeof(SyncMessageHeader), payload.cbytes(), payload.size()); } } // namespace homeobject \ No newline at end of file diff --git a/src/lib/homestore_backend/replication_message.hpp b/src/lib/homestore_backend/replication_message.hpp index 4e6ce856..03f20d4d 100644 --- a/src/lib/homestore_backend/replication_message.hpp +++ b/src/lib/homestore_backend/replication_message.hpp @@ -12,6 +12,7 @@ namespace homeobject { VENUM(ReplicationMessageType, uint16_t, CREATE_PG_MSG = 0, CREATE_SHARD_MSG = 1, SEAL_SHARD_MSG = 2, PUT_BLOB_MSG = 3, DEL_BLOB_MSG = 4, UNKNOWN_MSG = 5); VENUM(SyncMessageType, uint16_t, PG_META = 0, SHARD_META = 1, SHARD_BATCH = 2, LAST_MSG = 3); +VENUM(ResyncBlobState, uint8_t, NORMAL = 0, DELETED = 1, CORRUPTED = 2); // magic num comes from the first 8 bytes of 'echo homeobject_replication | md5sum' static constexpr uint64_t HOMEOBJECT_REPLICATION_MAGIC = 0x11153ca24efc8d34; @@ -21,6 +22,7 @@ static constexpr uint32_t HOMEOBJECT_REPLICATION_PROTOCOL_VERSION_V1 = 0x01; static constexpr uint32_t HOMEOBJECT_RESYNC_PROTOCOL_VERSION_V1 = 0x01; static constexpr uint32_t init_crc32 = 0; static constexpr uint64_t LAST_OBJ_ID =ULLONG_MAX; +static constexpr uint64_t DEFAULT_MAX_BATCH_SIZE_MB =128; #pragma pack(1) template @@ -108,10 +110,10 @@ struct SyncMessageHeader : public BaseMessageHeader { // type_bit = 0 for HomeStore, 1 for HomeObject struct objId { snp_obj_id_t value; - shard_id_t shardId; - snp_batch_id_t batchId; + shard_id_t shard_seq_num; + snp_batch_id_t batch_id; - objId(shard_id_t shard_id, snp_batch_id_t batch_id) : shardId(shard_id), batchId(batch_id) { + objId(shard_id_t shard_id, snp_batch_id_t batch_id) : shard_seq_num(shard_id), batch_id(batch_id) { if (shard_id != (shard_id & 0xFFFFFFFFFFFF)) { throw std::invalid_argument("shard_id is too large"); } @@ -122,12 +124,12 @@ struct objId { value= static_cast(1) << 63 | (shard_id) << 15 | batch_id; } explicit objId(snp_obj_id_t value) : value(value) { - shardId = (value >> 15) & 0xFFFFFFFFFFFF; - batchId = value & 0x7FFF; + shard_seq_num = (value >> 15) & 0xFFFFFFFFFFFF; + batch_id = value & 0x7FFF; } std::string to_string() const { - return fmt::format("{}[shardId={}, batchId={} ]", value, shardId, batchId); + return fmt::format("{}[shardId={}, batchId={} ]", value, shard_seq_num, batch_id); } }; diff --git a/src/lib/homestore_backend/replication_state_machine.cpp b/src/lib/homestore_backend/replication_state_machine.cpp index f8123f06..b35d9fb3 100644 --- a/src/lib/homestore_backend/replication_state_machine.cpp +++ b/src/lib/homestore_backend/replication_state_machine.cpp @@ -1,11 +1,6 @@ #include "replication_message.hpp" #include "replication_state_machine.hpp" #include "hs_backend_config.hpp" -#include "lib/blob_route.hpp" - -#include "generated/resync_blob_data_generated.h" -#include "generated/resync_pg_data_generated.h" -#include "generated/resync_shard_data_generated.h" namespace homeobject { void ReplicationStateMachine::on_commit(int64_t lsn, const sisl::blob& header, const sisl::blob& key, @@ -198,86 +193,66 @@ std::shared_ptr< homestore::snapshot_context > ReplicationStateMachine::last_sna int ReplicationStateMachine::read_snapshot_data(std::shared_ptr< homestore::snapshot_context > context, std::shared_ptr< homestore::snapshot_data > snp_data) { - // HSHomeObject::PGBlobIterator* pg_iter = nullptr; - // auto s = dynamic_pointer_cast< homestore::nuraft_snapshot_context >(context)->nuraft_snapshot(); - // - // if (snp_data->user_ctx == nullptr) { - // // Create the pg blob iterator for the first time. - // pg_iter = new HSHomeObject::PGBlobIterator(*home_object_, repl_dev()->group_id()); - // snp_data->user_ctx = (void*)pg_iter; - // } else { - // pg_iter = r_cast< HSHomeObject::PGBlobIterator* >(snp_data->user_ctx); - // } - // - // // Nuraft uses obj_id as a way to track the state of the snapshot read and write. - // // Nuraft starts with obj_id == 0 as first message always, leader send all the shards and - // // PG metadata as response. Follower responds with next obj_id it expects. obj_id's are - // // encoded in the form of obj_id = shard_seq_num(6 bytes) | batch_number(2 bytes) - // // Leader starts with shard sequence number 1 and read upto maximum size of data - // // and send to follower in a batch. Once all blob's are send in a shard, - // // leader notifies the follower by setting end_of_batch in the payload. Follower - // // moves to the next shard by incrementing shard_seq_num and reset batch number to 0. - // // Batch number is used to identify which batch in the current shard sequence number. - // // We use pg blob iterator to go over all the blobs in all the shards in that PG. - // // Once all the blobs are finished sending, end_of_scan will be true. - // int64_t obj_id = snp_data->offset; - // uint64_t shard_seq_num = obj_id >> 16; - // uint64_t batch_number = obj_id & 0xFFFF; - // auto log_str = fmt::format("group={} term={} lsn={} shard_seq={} batch_num={} size={}", - // boost::uuids::to_string(repl_dev()->group_id()), s->get_last_log_term(), - // s->get_last_log_idx(), shard_seq_num, batch_number, snp_data->blob.size()); - // if (obj_id == 0) { - // // obj_id = 0 means its the first message and we send the pg and its shards metadata. - // pg_iter->cur_snapshot_batch_num = 0; - // pg_iter->create_pg_shard_snapshot_data(snp_data->blob); - // RELEASE_ASSERT(snp_data->blob.size() > 0, "Empty metadata snapshot data"); - // LOGD("Read snapshot data first message {}", log_str); - // return 0; - // } - // - // if (shard_seq_num != pg_iter->cur_shard_seq_num_ || batch_number != pg_iter->cur_snapshot_batch_num) { - // // Follower can request for the old shard again. This may be due to error in writing or - // // it crashed and want to continue from where it left. - // LOGW("Shard or batch number not same as in iterator shard={}/{} batch_num={}/{}", shard_seq_num, - // pg_iter->cur_shard_seq_num_, batch_number, pg_iter->cur_snapshot_batch_num); - // if (shard_seq_num > pg_iter->cur_shard_seq_num_ || batch_number > pg_iter->cur_snapshot_batch_num) { - // // If we retrieve some invalid values, return error. - // return -1; - // } - // - // // Use the shard sequence number provided by the follower and we restart the batch. - // pg_iter->cur_shard_seq_num_ = shard_seq_num; - // pg_iter->cur_snapshot_batch_num = 0; - // } - // - // if (pg_iter->end_of_scan()) { - // // No more shards to read, baseline resync is finished after this. - // snp_data->is_last_obj = true; - // LOGD("Read snapshot reached is_last_obj true {}", log_str); - // return 0; - // } - // - // // Get next set of blobs in the batch. - // std::vector< HSHomeObject::BlobInfoData > blob_data_vec; - // bool end_of_shard; - // auto result = pg_iter->get_next_blobs(HS_BACKEND_DYNAMIC_CONFIG(max_num_blobs_in_snapshot_batch), - // HS_BACKEND_DYNAMIC_CONFIG(max_snapshot_batch_size_mb) * 1024 * 1024, - // blob_data_vec, end_of_shard); - // if (result != 0) { - // LOGE("Failed to get next blobs in snapshot read result={} {}", result, log_str); - // return -1; - // } - // - // // Create snapshot flatbuffer data. - // pg_iter->create_blobs_snapshot_data(blob_data_vec, snp_data->blob, end_of_shard); - // if (end_of_shard) { - // pg_iter->cur_snapshot_batch_num = 0; - // } else { - // pg_iter->cur_snapshot_batch_num++; - // } - // - // LOGT("Read snapshot num_blobs={} end_of_shard={} {}", blob_data_vec.size(), end_of_shard, log_str); - return 0; + HSHomeObject::PGBlobIterator* pg_iter = nullptr; + auto s = dynamic_pointer_cast< homestore::nuraft_snapshot_context >(context)->nuraft_snapshot(); + + if (snp_data->user_ctx == nullptr) { + // Create the pg blob iterator for the first time. + pg_iter = new HSHomeObject::PGBlobIterator(*home_object_, repl_dev()->group_id()); + snp_data->user_ctx = (void*)pg_iter; + } else { pg_iter = r_cast< HSHomeObject::PGBlobIterator* >(snp_data->user_ctx); } + + // Nuraft uses obj_id as a way to track the state of the snapshot read and write. + // Nuraft starts with obj_id == 0 as first message always, leader send all the shards and + // PG metadata as response. Follower responds with next obj_id it expects. obj_id's are + // encoded in the form ofobj_id (64 bits) = type_bit (1 bit) | shard_seq_num (48 bits) | batch_id (15 bits) + // Leader starts with shard sequence number 1 and read upto maximum size of data + // and send to follower in a batch. Once all blob's are send in a shard, + // leader notifies the follower by setting is_last_batch in the payload. Follower + // moves to the next shard by incrementing shard_seq_num and reset batch number to 0. + // Batch number is used to identify which batch in the current shard sequence number. + // We use pg blob iterator to go over all the blobs in all the shards in that PG. + // Once all the shards are done, follower will return next obj Id = LAST_OBJ_ID(ULLONG_MAX) as a end marker, + // leader will stop sending the snapshot data. + auto log_str = fmt::format("group={}, term={}, lsn={},", + boost::uuids::to_string(repl_dev()->group_id()), s->get_last_log_term(), + s->get_last_log_idx()); + //TODO snp_data->offset is int64, need to change to uint64 in homestore + if (snp_data->offset == int64_t(LAST_OBJ_ID)) { + // No more shards to read, baseline resync is finished after this. + snp_data->is_last_obj = true; + LOGD("Read snapshot end, {}", log_str); + return 0; + } + + auto obj_id = objId(snp_data->offset); + log_str = fmt::format("{} shard_seq_num={} batch_num={}", log_str, obj_id.shard_seq_num, obj_id.batch_id); + + //invalid Id + if (!pg_iter->updateCursor(obj_id)) { + LOGW("Invalid objId in snapshot read, {}, current shard_seq_num={}, current batch_num={}", + log_str, pg_iter->cur_shard_seq_num, pg_iter->cur_batch_num); + return -1; + } + + //pg metadata message + //shardId starts from 1 + if (obj_id.shard_seq_num == 0) { + pg_iter->create_pg_snapshot_data(snp_data->blob); + return 0; + } + //shard metadata message + if (obj_id.shard_seq_num != 0 && obj_id.batch_id == 0) { + if (!pg_iter->generate_shard_blob_list()) { + LOGE("Failed to generate shard blob list for snapshot read, {}", log_str); + return -1; + }; + pg_iter->create_shard_snapshot_data(snp_data->blob); + return 0; + } + //general blob message + pg_iter->create_blobs_snapshot_data(snp_data->blob); + return 0; } void ReplicationStateMachine::write_snapshot_data(std::shared_ptr< homestore::snapshot_context > context, diff --git a/src/lib/homestore_backend/resync_pg_data.fbs b/src/lib/homestore_backend/resync_pg_data.fbs index 9ba1b1ef..3d280979 100644 --- a/src/lib/homestore_backend/resync_pg_data.fbs +++ b/src/lib/homestore_backend/resync_pg_data.fbs @@ -4,7 +4,7 @@ namespace homeobject; table Member { uuid : [ubyte]; - name : [ubyte]; + name : string; priority: int; }