From 766786f54cdd53bc92b3af799f802365193dab72 Mon Sep 17 00:00:00 2001 From: Jilong Kou Date: Wed, 27 Nov 2024 17:32:06 +0800 Subject: [PATCH] Release chunk for sealed shards & Enhance UT --- src/lib/homestore_backend/hs_homeobject.hpp | 3 +- .../replication_state_machine.cpp | 3 +- .../snapshot_receive_handler.cpp | 26 +++++++-- .../homestore_backend/tests/hs_blob_tests.cpp | 56 +++++++++++++++---- 4 files changed, 69 insertions(+), 19 deletions(-) diff --git a/src/lib/homestore_backend/hs_homeobject.hpp b/src/lib/homestore_backend/hs_homeobject.hpp index 0c86cc6..a155763 100644 --- a/src/lib/homestore_backend/hs_homeobject.hpp +++ b/src/lib/homestore_backend/hs_homeobject.hpp @@ -333,7 +333,8 @@ class HSHomeObject : public HomeObjectImpl { void process_pg_snapshot_data(ResyncPGMetaData const& pg_meta); int process_shard_snapshot_data(ResyncShardMetaData const& shard_meta); - int process_blobs_snapshot_data(ResyncBlobDataBatch const& data_blobs, snp_batch_id_t batch_num); + int process_blobs_snapshot_data(ResyncBlobDataBatch const& data_blobs, snp_batch_id_t batch_num, + bool is_last_batch); int64_t get_context_lsn() const; void reset_context(int64_t lsn, pg_id_t pg_id); diff --git a/src/lib/homestore_backend/replication_state_machine.cpp b/src/lib/homestore_backend/replication_state_machine.cpp index 4615fc8..9fe30bf 100644 --- a/src/lib/homestore_backend/replication_state_machine.cpp +++ b/src/lib/homestore_backend/replication_state_machine.cpp @@ -343,7 +343,8 @@ void ReplicationStateMachine::write_snapshot_data(std::shared_ptr< homestore::sn HSHomeObject::get_sequence_num_from_shard_id(m_snp_rcv_handler->get_shard_cursor()), "Shard id not matching with the current shard cursor"); auto blob_batch = GetSizePrefixedResyncBlobDataBatch(data_buf); - auto ret = m_snp_rcv_handler->process_blobs_snapshot_data(*blob_batch, obj_id.batch_id); + auto ret = + m_snp_rcv_handler->process_blobs_snapshot_data(*blob_batch, obj_id.batch_id, blob_batch->is_last_batch()); if (ret) { // Do not proceed, will request for resending the current blob batch LOGE("Failed to process blob snapshot data lsn:{} obj_id {} shard {} batch {}, err {}", s->get_last_log_idx(), diff --git a/src/lib/homestore_backend/snapshot_receive_handler.cpp b/src/lib/homestore_backend/snapshot_receive_handler.cpp index 573bbf3..1bd60f3 100644 --- a/src/lib/homestore_backend/snapshot_receive_handler.cpp +++ b/src/lib/homestore_backend/snapshot_receive_handler.cpp @@ -92,8 +92,16 @@ int HSHomeObject::SnapshotReceiveHandler::process_shard_snapshot_data(ResyncShar } int HSHomeObject::SnapshotReceiveHandler::process_blobs_snapshot_data(ResyncBlobDataBatch const& data_blobs, - const snp_batch_id_t batch_num) { + const snp_batch_id_t batch_num, + bool is_last_batch) { ctx_->cur_batch_num = batch_num; + + // Find chunk id for current shard + auto chunk_id = home_obj_.get_shard_chunk(ctx_->shard_cursor); + RELEASE_ASSERT(chunk_id.has_value(), "Failed to load chunk of current shard_cursor:{}", ctx_->shard_cursor); + homestore::blk_alloc_hints hints; + hints.chunk_id_hint = *chunk_id; + for (unsigned int i = 0; i < data_blobs.blob_list()->size(); i++) { const auto blob = data_blobs.blob_list()->Get(i); @@ -141,11 +149,6 @@ int HSHomeObject::SnapshotReceiveHandler::process_blobs_snapshot_data(ResyncBlob } // Alloc & persist blob data - auto chunk_id = home_obj_.get_shard_chunk(ctx_->shard_cursor); - RELEASE_ASSERT(chunk_id.has_value(), "Failed to load chunk of current shard_cursor:{}", ctx_->shard_cursor); - homestore::blk_alloc_hints hints; - hints.chunk_id_hint = *chunk_id; - auto data_size = blob->data()->size(); homestore::MultiBlkId blk_id; auto status = homestore::data_service().alloc_blks( @@ -190,6 +193,17 @@ int HSHomeObject::SnapshotReceiveHandler::process_blobs_snapshot_data(ResyncBlob } } + if (is_last_batch) { + // Release chunk for sealed shard + ShardInfo::State state; + { + std::scoped_lock lock_guard(home_obj_._shard_lock); + auto iter = home_obj_._shard_map.find(ctx_->shard_cursor); + state = (*iter->second)->info.state; + } + if (state == ShardInfo::State::SEALED) { home_obj_.chunk_selector()->release_chunk(chunk_id.value()); } + } + return 0; } diff --git a/src/lib/homestore_backend/tests/hs_blob_tests.cpp b/src/lib/homestore_backend/tests/hs_blob_tests.cpp index 13f9914..4bb3577 100644 --- a/src/lib/homestore_backend/tests/hs_blob_tests.cpp +++ b/src/lib/homestore_backend/tests/hs_blob_tests.cpp @@ -292,8 +292,10 @@ TEST_F(HomeObjectFixture, SnapshotReceiveHandler) { // MUST TEST WITH replica=1 constexpr uint64_t snp_lsn = 1; constexpr uint64_t num_shards_per_pg = 3; + constexpr uint64_t num_open_shards_per_pg = 2; // Should be less than num_shards_per_pg constexpr uint64_t num_batches_per_shard = 3; constexpr uint64_t num_blobs_per_batch = 5; + constexpr int corrupted_blob_percentage = 10; // We have to create a PG first to init repl_dev constexpr pg_id_t pg_id = 1; @@ -330,6 +332,11 @@ TEST_F(HomeObjectFixture, SnapshotReceiveHandler) { handler->process_pg_snapshot_data(*pg_meta); // Step 2: Test shard and blob batches + std::random_device rd; // Random generators for blob corruption + std::mt19937 gen(rd()); + std::uniform_int_distribution<> corrupt_dis(1, 100); + std::uniform_int_distribution<> random_bytes_dis(1, 16 * 1024); + blob_id_t cur_blob_id{0}; for (uint64_t i = 1; i <= num_shards_per_pg; i++) { LOGINFO("TESTING: applying meta for shard {}", i); @@ -338,7 +345,8 @@ TEST_F(HomeObjectFixture, SnapshotReceiveHandler) { // Generate ResyncShardMetaData message ShardInfo shard; shard.id = i; - shard.state = ShardInfo::State::OPEN; + shard.state = + i <= num_shards_per_pg - num_open_shards_per_pg ? ShardInfo::State::SEALED : ShardInfo::State::OPEN; shard.created_time = get_time_since_epoch_ms(); shard.last_modified_time = shard.created_time; shard.total_capacity_bytes = 1024 * Mi; @@ -367,11 +375,14 @@ TEST_F(HomeObjectFixture, SnapshotReceiveHandler) { // Step 2-2: Test write blob batch data // Generate ResyncBlobDataBatch message - std::map< blob_id_t, Blob > blob_map; + std::map< blob_id_t, std::tuple< Blob, bool > > blob_map; for (uint64_t j = 1; j <= num_batches_per_shard; j++) { LOGINFO("TESTING: applying blobs for shard {} batch {}", shard.id, j); std::vector< ::flatbuffers::Offset< ResyncBlobData > > blob_entries; for (uint64_t k = 0; k < num_blobs_per_batch; k++) { + auto blob_state = corrupt_dis(gen) <= corrupted_blob_percentage ? ResyncBlobState::CORRUPTED + : ResyncBlobState::NORMAL; + // Construct raw blob buffer auto blob = build_blob(cur_blob_id); const auto aligned_hdr_size = @@ -398,24 +409,47 @@ TEST_F(HomeObjectFixture, SnapshotReceiveHandler) { } std::memcpy(blob_raw.bytes() + aligned_hdr_size, blob.body.cbytes(), blob.body.size()); + // Simulate blob data corruption - tamper with random bytes + if (blob_state == ResyncBlobState::CORRUPTED) { + constexpr int corrupted_bytes = 5; + for (auto i = 0; i < corrupted_bytes; i++) { + auto offset = random_bytes_dis(gen) % blob_raw.size(); + auto byte = random_bytes_dis(gen) % 256; + blob_raw.bytes()[offset] = byte; + LOGINFO("Changing byte at offset {} to simulate data corruption", offset, byte); + } + } + std::vector data(blob_raw.bytes(), blob_raw.bytes() + blob_raw.size()); - blob_entries.push_back(CreateResyncBlobDataDirect( - builder, cur_blob_id, static_cast< uint8_t >(ResyncBlobState::NORMAL), &data)); - blob_map[cur_blob_id++] = std::move(blob); + blob_entries.push_back( + CreateResyncBlobDataDirect(builder, cur_blob_id, static_cast< uint8_t >(blob_state), &data)); + blob_map[cur_blob_id++] = + std::make_tuple< Blob, bool >(std::move(blob), blob_state == ResyncBlobState::CORRUPTED); } builder.Finish(CreateResyncBlobDataBatchDirect(builder, &blob_entries, true)); auto blob_batch = GetResyncBlobDataBatch(builder.GetBufferPointer()); builder.Reset(); - ret = handler->process_blobs_snapshot_data(*blob_batch, j); + ret = handler->process_blobs_snapshot_data(*blob_batch, j, j == num_batches_per_shard); ASSERT_EQ(ret, 0); } - for (const auto& [blob_id, blob] : blob_map) { + for (const auto& b : blob_map) { + auto blob_id = b.first; + auto& blob = std::get< 0 >(b.second); + auto is_corrupted = std::get< 1 >(b.second); + auto res = _obj_inst->blob_manager()->get(shard.id, blob_id, 0, blob.body.size()).get(); - ASSERT_TRUE(!!res); - auto blob_res = std::move(res.value()); - ASSERT_EQ(blob_res.body.size(), blob.body.size()); - ASSERT_EQ(std::memcmp(blob_res.body.bytes(), blob.body.cbytes(), blob_res.body.size()), 0); + if (is_corrupted) { + ASSERT_FALSE(!!res); + } else { + ASSERT_TRUE(!!res); + auto blob_res = std::move(res.value()); + ASSERT_EQ(blob_res.body.size(), blob.body.size()); + ASSERT_EQ(std::memcmp(blob_res.body.bytes(), blob.body.cbytes(), blob_res.body.size()), 0); + } + } + if (shard.state == ShardInfo::State::SEALED) { + // TODO: Verify chunk is released. Currently we don't have chunk_id, so let's do this after rebasing } } }