diff --git a/src/lib/homestore_backend/hs_homeobject.hpp b/src/lib/homestore_backend/hs_homeobject.hpp index f541cf5..a155763 100644 --- a/src/lib/homestore_backend/hs_homeobject.hpp +++ b/src/lib/homestore_backend/hs_homeobject.hpp @@ -316,8 +316,8 @@ class HSHomeObject : public HomeObjectImpl { uint64_t max_batch_size_; }; - // SnapshotReceiverContext is the context used in follower side snapshot receiving. [drafting] The functions is not the final version. - struct SnapshotReceiveHandler { + class SnapshotReceiveHandler { + public: enum ErrorCode { ALLOC_BLK_ERR = 1, WRITE_DATA_ERR, @@ -329,24 +329,37 @@ class HSHomeObject : public HomeObjectImpl { constexpr static shard_id_t invalid_shard_id = 0; constexpr static shard_id_t shard_list_end_marker = ULLONG_MAX; - SnapshotReceiveHandler(HSHomeObject& home_obj, pg_id_t pg_id_, homestore::group_id_t group_id, int64_t lsn, - shared< homestore::ReplDev > repl_dev); + SnapshotReceiveHandler(HSHomeObject& home_obj, shared< homestore::ReplDev > repl_dev); + void process_pg_snapshot_data(ResyncPGMetaData const& pg_meta); int process_shard_snapshot_data(ResyncShardMetaData const& shard_meta); - int process_blobs_snapshot_data(ResyncBlobDataBatch const& data_blobs, snp_batch_id_t batch_num); + int process_blobs_snapshot_data(ResyncBlobDataBatch const& data_blobs, snp_batch_id_t batch_num, + bool is_last_batch); + + int64_t get_context_lsn() const; + void reset_context(int64_t lsn, pg_id_t pg_id); + shard_id_t get_shard_cursor() const; shard_id_t get_next_shard() const; - shard_id_t shard_cursor_{invalid_shard_id}; - snp_batch_id_t cur_batch_num_{0}; - std::vector< shard_id_t > shard_list_; + private: + // SnapshotContext is the context data of current snapshot transmission + struct SnapshotContext { + shard_id_t shard_cursor{invalid_shard_id}; + snp_batch_id_t cur_batch_num{0}; + std::vector< shard_id_t > shard_list; + const int64_t snp_lsn; + const pg_id_t pg_id; + shared< BlobIndexTable > index_table; + + SnapshotContext(int64_t lsn, pg_id_t pg_id) : snp_lsn{lsn}, pg_id{pg_id} {} + }; - const int64_t snp_lsn_{0}; HSHomeObject& home_obj_; - const homestore::group_id_t group_id_; - const pg_id_t pg_id_; const shared< homestore::ReplDev > repl_dev_; - //snapshot info, can be used as a checkpoint for recovery + std::unique_ptr< SnapshotContext > ctx_; + + // snapshot info, can be used as a checkpoint for recovery snapshot_info_superblk snp_info_; // other stats for snapshot transmission progress }; diff --git a/src/lib/homestore_backend/replication_state_machine.cpp b/src/lib/homestore_backend/replication_state_machine.cpp index 96625b0..7b024df 100644 --- a/src/lib/homestore_backend/replication_state_machine.cpp +++ b/src/lib/homestore_backend/replication_state_machine.cpp @@ -198,7 +198,7 @@ int ReplicationStateMachine::read_snapshot_data(std::shared_ptr< homestore::snap if (snp_data->user_ctx == nullptr) { // Create the pg blob iterator for the first time. - pg_iter = new HSHomeObject::PGBlobIterator(*home_object_, repl_dev()->group_id()); + pg_iter = new HSHomeObject::PGBlobIterator(*home_object_, repl_dev()->group_id(), context->get_lsn()); snp_data->user_ctx = (void*)pg_iter; } else { pg_iter = r_cast< HSHomeObject::PGBlobIterator* >(snp_data->user_ctx); } @@ -268,10 +268,13 @@ void ReplicationStateMachine::write_snapshot_data(std::shared_ptr< homestore::sn std::shared_ptr< homestore::snapshot_data > snp_data) { RELEASE_ASSERT(context != nullptr, "Context null"); RELEASE_ASSERT(snp_data != nullptr, "Snapshot data null"); - auto s = dynamic_pointer_cast< homestore::nuraft_snapshot_context >(context)->nuraft_snapshot(); - auto obj_id = objId(static_cast< snp_obj_id_t >(snp_data->offset)); auto r_dev = repl_dev(); + if (!m_snp_rcv_handler) { + m_snp_rcv_handler = std::make_unique< HSHomeObject::SnapshotReceiveHandler >(*home_object_, r_dev); + } + auto s = dynamic_pointer_cast< homestore::nuraft_snapshot_context >(context)->nuraft_snapshot(); + auto obj_id = objId(static_cast< snp_obj_id_t >(snp_data->offset)); auto log_suffix = fmt::format("group={} term={} lsn={} shard={} batch_num={} size={}", uuids::to_string(r_dev->group_id()), s->get_last_log_term(), s->get_last_log_idx(), obj_id.shard_seq_num, obj_id.batch_id, snp_data->blob.size()); @@ -281,10 +284,6 @@ void ReplicationStateMachine::write_snapshot_data(std::shared_ptr< homestore::sn return; } - // Check if the snapshot context is same as the current snapshot context. - // If not, drop the previous context and re-init a new one - if (m_snp_rcv_handler && m_snp_rcv_handler->snp_lsn_ != context->get_lsn()) { m_snp_rcv_handler.reset(nullptr); } - // Check message integrity // TODO: add a flip here to simulate corrupted message auto header = r_cast< const SyncMessageHeader* >(snp_data->blob.cbytes()); @@ -304,11 +303,15 @@ void ReplicationStateMachine::write_snapshot_data(std::shared_ptr< homestore::sn // PG metadata & shard list message RELEASE_ASSERT(obj_id.batch_id == 0, "Invalid obj_id"); - // TODO: Reset all data of current PG - let's resync on a pristine base - auto pg_data = GetSizePrefixedResyncPGMetaData(data_buf); - m_snp_rcv_handler = std::make_unique< HSHomeObject::SnapshotReceiveHandler >( - *home_object_, pg_data->pg_id(), r_dev->group_id(), context->get_lsn(), r_dev); + + // Check if the snapshot context is same as the current snapshot context. + // If not, drop the previous context and re-init a new one + if (m_snp_rcv_handler->get_context_lsn() != context->get_lsn()) { + m_snp_rcv_handler->reset_context(pg_data->pg_id(), context->get_lsn()); + // TODO: Reset all data of current PG - let's resync on a pristine base + } + m_snp_rcv_handler->process_pg_snapshot_data(*pg_data); snp_data->offset = objId(HSHomeObject::get_sequence_num_from_shard_id(m_snp_rcv_handler->get_next_shard()), 0).value; @@ -316,8 +319,8 @@ void ReplicationStateMachine::write_snapshot_data(std::shared_ptr< homestore::sn return; } - RELEASE_ASSERT(m_snp_rcv_handler, - "Snapshot receiver not initialized"); // Here we should have a valid snapshot receiver context + RELEASE_ASSERT(m_snp_rcv_handler->get_context_lsn() == context->get_lsn(), "Snapshot context lsn not matching"); + if (obj_id.batch_id == 0) { // Shard metadata message RELEASE_ASSERT(obj_id.shard_seq_num != 0, "Invalid obj_id"); @@ -339,10 +342,11 @@ void ReplicationStateMachine::write_snapshot_data(std::shared_ptr< homestore::sn // Blob data message // TODO: enhance error handling for wrong shard id - what may cause this? RELEASE_ASSERT(obj_id.shard_seq_num == - HSHomeObject::get_sequence_num_from_shard_id(m_snp_rcv_handler->shard_cursor_), + HSHomeObject::get_sequence_num_from_shard_id(m_snp_rcv_handler->get_shard_cursor()), "Shard id not matching with the current shard cursor"); auto blob_batch = GetSizePrefixedResyncBlobDataBatch(data_buf); - auto ret = m_snp_rcv_handler->process_blobs_snapshot_data(*blob_batch, obj_id.batch_id); + auto ret = + m_snp_rcv_handler->process_blobs_snapshot_data(*blob_batch, obj_id.batch_id, blob_batch->is_last_batch()); if (ret) { // Do not proceed, will request for resending the current blob batch LOGE("Failed to process blob snapshot data lsn:{} obj_id {} shard {} batch {}, err {}", s->get_last_log_idx(), diff --git a/src/lib/homestore_backend/snapshot_receive_handler.cpp b/src/lib/homestore_backend/snapshot_receive_handler.cpp index 840109b..a167a80 100644 --- a/src/lib/homestore_backend/snapshot_receive_handler.cpp +++ b/src/lib/homestore_backend/snapshot_receive_handler.cpp @@ -5,19 +5,18 @@ #include namespace homeobject { -HSHomeObject::SnapshotReceiveHandler::SnapshotReceiveHandler(HSHomeObject& home_obj, pg_id_t pg_id_, - homestore::group_id_t group_id, int64_t lsn, +HSHomeObject::SnapshotReceiveHandler::SnapshotReceiveHandler(HSHomeObject& home_obj, shared< homestore::ReplDev > repl_dev) : - snp_lsn_(lsn), home_obj_(home_obj), group_id_(group_id), pg_id_(pg_id_), repl_dev_(std::move(repl_dev)) {} + home_obj_(home_obj), repl_dev_(std::move(repl_dev)) {} void HSHomeObject::SnapshotReceiveHandler::process_pg_snapshot_data(ResyncPGMetaData const& pg_meta) { LOGI("process_pg_snapshot_data pg_id:{}", pg_meta.pg_id()); // Init shard list - shard_list_.clear(); + ctx_->shard_list.clear(); const auto ids = pg_meta.shard_ids(); for (unsigned int i = 0; i < ids->size(); i++) { - shard_list_.push_back(ids->Get(i)); + ctx_->shard_list.push_back(ids->Get(i)); } // Create local PG @@ -46,29 +45,31 @@ int HSHomeObject::SnapshotReceiveHandler::process_shard_snapshot_data(ResyncShar // Persist shard meta on chunk data homestore::chunk_num_t chunk_id = shard_meta.vchunk_id(); // FIXME: vchunk id to chunk id - shard_info_superblk shard_sb; - shard_sb.info.id = shard_meta.shard_id(); - shard_sb.info.placement_group = shard_meta.pg_id(); - shard_sb.info.state = static_cast< ShardInfo::State >(shard_meta.state()); - shard_sb.info.lsn = shard_meta.created_lsn(); - shard_sb.info.created_time = shard_meta.created_time(); - shard_sb.info.last_modified_time = shard_meta.last_modified_time(); - shard_sb.info.available_capacity_bytes = shard_meta.total_capacity_bytes(); - shard_sb.info.total_capacity_bytes = shard_meta.total_capacity_bytes(); - shard_sb.info.deleted_capacity_bytes = 0; - shard_sb.chunk_id = chunk_id; + sisl::io_blob_safe aligned_buf(sisl::round_up(sizeof(shard_info_superblk), io_align), io_align); + shard_info_superblk* shard_sb = r_cast< shard_info_superblk* >(aligned_buf.bytes()); + shard_sb->info.id = shard_meta.shard_id(); + shard_sb->info.placement_group = shard_meta.pg_id(); + shard_sb->info.state = static_cast< ShardInfo::State >(shard_meta.state()); + shard_sb->info.lsn = shard_meta.created_lsn(); + shard_sb->info.created_time = shard_meta.created_time(); + shard_sb->info.last_modified_time = shard_meta.last_modified_time(); + shard_sb->info.available_capacity_bytes = shard_meta.total_capacity_bytes(); + shard_sb->info.total_capacity_bytes = shard_meta.total_capacity_bytes(); + shard_sb->info.deleted_capacity_bytes = 0; + shard_sb->chunk_id = chunk_id; homestore::MultiBlkId blk_id; - const auto hints = home_obj_.chunk_selector()->chunk_to_hints(chunk_id); + const auto hints = home_obj_.chunk_selector()->chunk_to_hints(shard_sb->chunk_id); auto status = homestore::data_service().alloc_blks( - sisl::round_up(sizeof(shard_sb), homestore::data_service().get_blk_size()), hints, blk_id); + sisl::round_up(aligned_buf.size(), homestore::data_service().get_blk_size()), hints, blk_id); if (status != homestore::BlkAllocStatus::SUCCESS) { LOGE("Failed to allocate blocks for shard {}", shard_meta.shard_id()); return ALLOC_BLK_ERR; } + shard_sb->chunk_id = blk_id.to_single_blkid().chunk_num(); // FIXME: remove this after intergating vchunk const auto ret = homestore::data_service() - .async_write(r_cast< char const* >(&shard_sb), sizeof(shard_sb), blk_id) + .async_write(r_cast< char const* >(aligned_buf.cbytes()), aligned_buf.size(), blk_id) .thenValue([&blk_id](auto&& err) -> BlobManager::AsyncResult< blob_id_t > { // TODO: do we need to update repl_dev metrics? if (err) { @@ -79,21 +80,29 @@ int HSHomeObject::SnapshotReceiveHandler::process_shard_snapshot_data(ResyncShar return 0; }) .get(); - if (ret) { + if (ret.hasError()) { LOGE("Failed to write shard info of shard_id {} to blk_id:{}", shard_meta.shard_id(), blk_id.to_string()); return WRITE_DATA_ERR; } // Now let's create local shard - home_obj_.local_create_shard(shard_sb.info, chunk_id, blk_id.blk_count()); - shard_cursor_ = shard_meta.shard_id(); - cur_batch_num_ = 0; + home_obj_.local_create_shard(shard_sb->info, shard_sb->chunk_id, blk_id.blk_count()); + ctx_->shard_cursor = shard_meta.shard_id(); + ctx_->cur_batch_num = 0; return 0; } int HSHomeObject::SnapshotReceiveHandler::process_blobs_snapshot_data(ResyncBlobDataBatch const& data_blobs, - const snp_batch_id_t batch_num) { - cur_batch_num_ = batch_num; + const snp_batch_id_t batch_num, + bool is_last_batch) { + ctx_->cur_batch_num = batch_num; + + // Find chunk id for current shard + auto chunk_id = home_obj_.get_shard_chunk(ctx_->shard_cursor); + RELEASE_ASSERT(chunk_id.has_value(), "Failed to load chunk of current shard_cursor:{}", ctx_->shard_cursor); + homestore::blk_alloc_hints hints; + hints.chunk_id_hint = *chunk_id; + for (unsigned int i = 0; i < data_blobs.blob_list()->size(); i++) { const auto blob = data_blobs.blob_list()->Get(i); @@ -103,16 +112,15 @@ int HSHomeObject::SnapshotReceiveHandler::process_blobs_snapshot_data(ResyncBlob continue; } - // Check duplication to avoid reprocessing - shared< BlobIndexTable > index_table; - { + // Check duplication to avoid reprocessing. This may happen on resent blob batches. + if (!ctx_->index_table) { std::shared_lock lock_guard(home_obj_._pg_lock); - auto iter = home_obj_._pg_map.find(pg_id_); + auto iter = home_obj_._pg_map.find(ctx_->pg_id); RELEASE_ASSERT(iter != home_obj_._pg_map.end(), "PG not found"); - index_table = dynamic_cast< HS_PG* >(iter->second.get())->index_table_; + ctx_->index_table = dynamic_cast< HS_PG* >(iter->second.get())->index_table_; } - RELEASE_ASSERT(index_table != nullptr, "Index table instance null"); - if (home_obj_.get_blob_from_index_table(index_table, shard_cursor_, blob->blob_id())) { + RELEASE_ASSERT(ctx_->index_table != nullptr, "Index table instance null"); + if (home_obj_.get_blob_from_index_table(ctx_->index_table, ctx_->shard_cursor, blob->blob_id())) { LOGD("Skip already persisted blob_id:{}", blob->blob_id()); continue; } @@ -142,17 +150,15 @@ int HSHomeObject::SnapshotReceiveHandler::process_blobs_snapshot_data(ResyncBlob } // Alloc & persist blob data - auto chunk_id = home_obj_.get_shard_chunk(shard_cursor_); - RELEASE_ASSERT(chunk_id.has_value(), "Failed to load chunk of current shard_cursor:{}", shard_cursor_); - homestore::blk_alloc_hints hints; - hints.chunk_id_hint = *chunk_id; - auto data_size = blob->data()->size(); + sisl::io_blob_safe aligned_buf(sisl::round_up(data_size, io_align), io_align); + std::memcpy(aligned_buf.bytes(), blob_data, data_size); + homestore::MultiBlkId blk_id; auto status = homestore::data_service().alloc_blks( - sisl::round_up(data_size, homestore::data_service().get_blk_size()), hints, blk_id); + sisl::round_up(aligned_buf.size(), homestore::data_service().get_blk_size()), hints, blk_id); if (status != homestore::BlkAllocStatus::SUCCESS) { - LOGE("Failed to allocate blocks for shard {} blob {}", shard_cursor_, blob->blob_id()); + LOGE("Failed to allocate blocks for shard {} blob {}", ctx_->shard_cursor, blob->blob_id()); return ALLOC_BLK_ERR; } @@ -164,7 +170,7 @@ int HSHomeObject::SnapshotReceiveHandler::process_blobs_snapshot_data(ResyncBlob }; const auto ret = homestore::data_service() - .async_write(r_cast< char const* >(blob_data), data_size, blk_id) + .async_write(r_cast< char const* >(aligned_buf.cbytes()), aligned_buf.size(), blk_id) .thenValue([&blk_id](auto&& err) -> BlobManager::AsyncResult< blob_id_t > { // TODO: do we need to update repl_dev metrics? if (err) { @@ -175,14 +181,15 @@ int HSHomeObject::SnapshotReceiveHandler::process_blobs_snapshot_data(ResyncBlob return 0; }) .get(); - if (ret) { + if (ret.hasError()) { LOGE("Failed to write shard info of blob_id {} to blk_id:{}", blob->blob_id(), blk_id.to_string()); free_allocated_blks(); return WRITE_DATA_ERR; } // Add local blob info to index & PG - bool success = home_obj_.local_add_blob_info(pg_id_, BlobInfo{shard_cursor_, blob->blob_id(), {0, 0, 0}}); + bool success = + home_obj_.local_add_blob_info(ctx_->pg_id, BlobInfo{ctx_->shard_cursor, blob->blob_id(), blk_id}); if (!success) { LOGE("Failed to add blob info for blob_id:{}", blob->blob_id()); free_allocated_blks(); @@ -190,17 +197,36 @@ int HSHomeObject::SnapshotReceiveHandler::process_blobs_snapshot_data(ResyncBlob } } + if (is_last_batch) { + // Release chunk for sealed shard + ShardInfo::State state; + { + std::scoped_lock lock_guard(home_obj_._shard_lock); + auto iter = home_obj_._shard_map.find(ctx_->shard_cursor); + state = (*iter->second)->info.state; + } + if (state == ShardInfo::State::SEALED) { home_obj_.chunk_selector()->release_chunk(chunk_id.value()); } + } + return 0; } +int64_t HSHomeObject::SnapshotReceiveHandler::get_context_lsn() const { return ctx_ ? ctx_->snp_lsn : -1; } + +void HSHomeObject::SnapshotReceiveHandler::reset_context(int64_t lsn, pg_id_t pg_id) { + ctx_ = std::make_unique< SnapshotContext >(lsn, pg_id); +} + +shard_id_t HSHomeObject::SnapshotReceiveHandler::get_shard_cursor() const { return ctx_->shard_cursor; } + shard_id_t HSHomeObject::SnapshotReceiveHandler::get_next_shard() const { - if (shard_list_.empty()) { return shard_list_end_marker; } + if (ctx_->shard_list.empty()) { return shard_list_end_marker; } - if (shard_cursor_ == 0) { return shard_list_[0]; } + if (ctx_->shard_cursor == 0) { return ctx_->shard_list[0]; } - for (size_t i = 0; i < shard_list_.size(); ++i) { - if (shard_list_[i] == shard_cursor_) { - return (i + 1 < shard_list_.size()) ? shard_list_[i + 1] : shard_list_end_marker; + for (size_t i = 0; i < ctx_->shard_list.size(); ++i) { + if (ctx_->shard_list[i] == ctx_->shard_cursor) { + return (i + 1 < ctx_->shard_list.size()) ? ctx_->shard_list[i + 1] : shard_list_end_marker; } } diff --git a/src/lib/homestore_backend/tests/hs_blob_tests.cpp b/src/lib/homestore_backend/tests/hs_blob_tests.cpp index b297ec8..d13690b 100644 --- a/src/lib/homestore_backend/tests/hs_blob_tests.cpp +++ b/src/lib/homestore_backend/tests/hs_blob_tests.cpp @@ -3,6 +3,8 @@ #include "lib/homestore_backend/index_kv.hpp" #include "generated/resync_blob_data_generated.h" +#include + TEST_F(HomeObjectFixture, BasicEquivalence) { auto shard_mgr = _obj_inst->shard_manager(); auto pg_mgr = _obj_inst->pg_manager(); @@ -285,4 +287,166 @@ TEST_F(HomeObjectFixture, PGBlobIterator) { ASSERT_TRUE(pg_iter->update_cursor(objId(LAST_OBJ_ID))); } -TEST_F(HomeObjectFixture, SnapshotReceiveHandler) {} +TEST_F(HomeObjectFixture, SnapshotReceiveHandler) { + // TODO: add filps to test corrupted data + // MUST TEST WITH replica=1 + constexpr uint64_t snp_lsn = 1; + constexpr uint64_t num_shards_per_pg = 3; + constexpr uint64_t num_open_shards_per_pg = 2; // Should be less than num_shards_per_pg + constexpr uint64_t num_batches_per_shard = 3; + constexpr uint64_t num_blobs_per_batch = 5; + constexpr int corrupted_blob_percentage = 10; + + // We have to create a PG first to init repl_dev + constexpr pg_id_t pg_id = 1; + create_pg(pg_id); // to create repl dev + PGStats stats; + ASSERT_TRUE(_obj_inst->pg_manager()->get_stats(pg_id, stats)); + auto r_dev = homestore::HomeStore::instance()->repl_service().get_repl_dev(stats.replica_set_uuid); + ASSERT_TRUE(r_dev.hasValue()); + + auto handler = std::make_unique< homeobject::HSHomeObject::SnapshotReceiveHandler >(*_obj_inst, r_dev.value()); + handler->reset_context(snp_lsn, pg_id); + + // Step 1: Test write pg meta - cannot test full logic since the PG already exists + // Generate ResyncPGMetaData message + LOGINFO("TESTING: applying meta for pg {}", pg_id); + constexpr auto blob_seq_num = num_shards_per_pg * num_batches_per_shard * num_blobs_per_batch; + flatbuffers::FlatBufferBuilder builder; + std::vector< flatbuffers::Offset< Member > > members; + std::vector uuid(stats.replica_set_uuid.begin(), stats.replica_set_uuid.end()); + for (auto& member : stats.members) { + auto id = std::vector< std::uint8_t >(std::get< 0 >(member).begin(), std::get< 0 >(member).end()); + members.push_back(CreateMemberDirect(builder, &id, std::get< 1 >(member).c_str(), 100)); + } + std::vector< uint64_t > shard_ids; + for (uint64_t i = 1; i <= num_shards_per_pg; i++) { + shard_ids.push_back(i); + } + auto pg_entry = + CreateResyncPGMetaDataDirect(builder, pg_id, &uuid, blob_seq_num, num_shards_per_pg, &members, &shard_ids); + builder.Finish(pg_entry); + auto pg_meta = GetResyncPGMetaData(builder.GetBufferPointer()); + handler->process_pg_snapshot_data(*pg_meta); + builder.Reset(); + + // Step 2: Test shard and blob batches + std::random_device rd; // Random generators for blob corruption + std::mt19937 gen(rd()); + std::uniform_int_distribution<> corrupt_dis(1, 100); + std::uniform_int_distribution<> random_bytes_dis(1, 16 * 1024); + + blob_id_t cur_blob_id{0}; + for (uint64_t i = 1; i <= num_shards_per_pg; i++) { + LOGINFO("TESTING: applying meta for shard {}", i); + + // Step 2-1: Test write shard meta + // Generate ResyncShardMetaData message + ShardInfo shard; + shard.id = i; + shard.state = + i <= num_shards_per_pg - num_open_shards_per_pg ? ShardInfo::State::SEALED : ShardInfo::State::OPEN; + shard.created_time = get_time_since_epoch_ms(); + shard.last_modified_time = shard.created_time; + shard.total_capacity_bytes = 1024 * Mi; + shard.lsn = snp_lsn; + + auto shard_entry = + CreateResyncShardMetaData(builder, shard.id, pg_id, static_cast< uint8_t >(shard.state), shard.lsn, + shard.created_time, shard.last_modified_time, shard.total_capacity_bytes, 0); + builder.Finish(shard_entry); + auto shard_meta = GetResyncShardMetaData(builder.GetBufferPointer()); + auto ret = handler->process_shard_snapshot_data(*shard_meta); + builder.Reset(); + ASSERT_EQ(ret, 0); + + auto res = _obj_inst->shard_manager()->get_shard(shard.id).get(); + ASSERT_TRUE(!!res); + auto shard_res = std::move(res.value()); + ASSERT_EQ(shard_res.id, shard.id); + ASSERT_EQ(shard_res.state, shard.state); + ASSERT_EQ(shard_res.created_time, shard.created_time); + ASSERT_EQ(shard_res.last_modified_time, shard.last_modified_time); + ASSERT_EQ(shard_res.total_capacity_bytes, shard.total_capacity_bytes); + ASSERT_EQ(shard_res.lsn, shard.lsn); + // TODO: vchunk id + + // Step 2-2: Test write blob batch data + // Generate ResyncBlobDataBatch message + std::map< blob_id_t, std::tuple< Blob, bool > > blob_map; + for (uint64_t j = 1; j <= num_batches_per_shard; j++) { + LOGINFO("TESTING: applying blobs for shard {} batch {}", shard.id, j); + std::vector< ::flatbuffers::Offset< ResyncBlobData > > blob_entries; + for (uint64_t k = 0; k < num_blobs_per_batch; k++) { + auto blob_state = corrupt_dis(gen) <= corrupted_blob_percentage ? ResyncBlobState::CORRUPTED + : ResyncBlobState::NORMAL; + + // Construct raw blob buffer + auto blob = build_blob(cur_blob_id); + const auto aligned_hdr_size = + sisl::round_up(sizeof(HSHomeObject::BlobHeader) + blob.user_key.size(), io_align); + sisl::io_blob_safe blob_raw(aligned_hdr_size + blob.body.size(), io_align); + HSHomeObject::BlobHeader hdr; + hdr.type = HSHomeObject::DataHeader::data_type_t::BLOB_INFO; + hdr.shard_id = shard.id; + hdr.blob_id = cur_blob_id; + hdr.hash_algorithm = HSHomeObject::BlobHeader::HashAlgorithm::CRC32; + hdr.blob_size = blob.body.size(); + hdr.user_key_size = blob.user_key.size(); + hdr.object_offset = blob.object_off; + hdr.data_offset = aligned_hdr_size; + _obj_inst->compute_blob_payload_hash(hdr.hash_algorithm, blob.body.cbytes(), blob.body.size(), + reinterpret_cast< uint8_t* >(blob.user_key.data()), + blob.user_key.size(), hdr.hash, + HSHomeObject::BlobHeader::blob_max_hash_len); + + std::memcpy(blob_raw.bytes(), &hdr, sizeof(HSHomeObject::BlobHeader)); + if (!blob.user_key.empty()) { + std::memcpy((blob_raw.bytes() + sizeof(HSHomeObject::BlobHeader)), blob.user_key.data(), + blob.user_key.size()); + } + std::memcpy(blob_raw.bytes() + aligned_hdr_size, blob.body.cbytes(), blob.body.size()); + + // Simulate blob data corruption - tamper with random bytes + if (blob_state == ResyncBlobState::CORRUPTED) { + constexpr int corrupted_bytes = 5; + for (auto i = 0; i < corrupted_bytes; i++) { + auto offset = random_bytes_dis(gen) % blob_raw.size(); + auto byte = random_bytes_dis(gen) % 256; + blob_raw.bytes()[offset] = byte; + LOGINFO("Changing byte at offset {} to simulate data corruption", offset, byte); + } + } + + std::vector data(blob_raw.bytes(), blob_raw.bytes() + blob_raw.size()); + blob_entries.push_back( + CreateResyncBlobDataDirect(builder, cur_blob_id, static_cast< uint8_t >(blob_state), &data)); + blob_map[cur_blob_id++] = + std::make_tuple< Blob, bool >(std::move(blob), blob_state == ResyncBlobState::CORRUPTED); + } + builder.Finish(CreateResyncBlobDataBatchDirect(builder, &blob_entries, true)); + auto blob_batch = GetResyncBlobDataBatch(builder.GetBufferPointer()); + ret = handler->process_blobs_snapshot_data(*blob_batch, j, j == num_batches_per_shard); + builder.Reset(); + ASSERT_EQ(ret, 0); + } + for (const auto& b : blob_map) { + auto blob_id = b.first; + auto& blob = std::get< 0 >(b.second); + auto is_corrupted = std::get< 1 >(b.second); + + auto res = _obj_inst->blob_manager()->get(shard.id, blob_id, 0, blob.body.size()).get(); + if (is_corrupted) { + ASSERT_FALSE(!!res); + } else { + ASSERT_TRUE(!!res); + auto blob_res = std::move(res.value()); + ASSERT_EQ(blob_res.body.size(), blob.body.size()); + ASSERT_EQ(std::memcmp(blob_res.body.bytes(), blob.body.cbytes(), blob_res.body.size()), 0); + } + } + if (shard.state == ShardInfo::State::SEALED) { + // TODO: Verify chunk is released. Currently we don't have chunk_id, so let's do this after rebasing + } + } +}