diff --git a/src/lib/homestore_backend/hs_homeobject.hpp b/src/lib/homestore_backend/hs_homeobject.hpp index 6101ec48..9dd82c38 100644 --- a/src/lib/homestore_backend/hs_homeobject.hpp +++ b/src/lib/homestore_backend/hs_homeobject.hpp @@ -304,7 +304,7 @@ class HSHomeObject : public HomeObjectImpl { objId cur_obj_id_{1, 0}; uint64_t cur_shard_idx_{0}; std::vector cur_blob_list_{0}; - int64_t last_end_blob_idx_{-1}; + uint64_t cur_start_blob_idx_{0}; uint64_t cur_batch_blob_count_{0}; flatbuffers::FlatBufferBuilder builder_; diff --git a/src/lib/homestore_backend/pg_blob_iterator.cpp b/src/lib/homestore_backend/pg_blob_iterator.cpp index 86889f8b..3a0b0b17 100644 --- a/src/lib/homestore_backend/pg_blob_iterator.cpp +++ b/src/lib/homestore_backend/pg_blob_iterator.cpp @@ -38,15 +38,18 @@ bool HSHomeObject::PGBlobIterator::update_cursor(objId id) { //resend batch if (id.value == cur_obj_id_.value) { return true; } auto next_obj_id = expected_next_obj_id(); - if (id.value != next_obj_id.value) { return false; } + if (id.value != next_obj_id.value) { + LOGE("invalid objId, expected={}, actual={}", next_obj_id.to_string(), id.to_string()); + return false; + } //next shard if (cur_obj_id_.shard_seq_num != next_obj_id.shard_seq_num) { cur_shard_idx_++; - last_end_blob_idx_ = -1; + cur_start_blob_idx_ = 0; cur_batch_blob_count_ = 0; } else { //next batch - last_end_blob_idx_ = last_end_blob_idx_ + cur_batch_blob_count_; + cur_start_blob_idx_ = cur_start_blob_idx_ + cur_batch_blob_count_; cur_batch_blob_count_ = 0; } cur_obj_id_ = id; @@ -55,12 +58,12 @@ bool HSHomeObject::PGBlobIterator::update_cursor(objId id) { objId HSHomeObject::PGBlobIterator::expected_next_obj_id() { //next batch - if (last_end_blob_idx_ + cur_batch_blob_count_ < cur_blob_list_.size() - 1) { + if (cur_start_blob_idx_ + cur_batch_blob_count_ < cur_blob_list_.size()) { return objId(cur_obj_id_.shard_seq_num, cur_obj_id_.batch_id + 1); } //next shard if (cur_shard_idx_ < shard_list_.size() - 1) { - auto next_shard_seq_num = shard_list_[cur_shard_idx_++].id & 0xFFFFFFFFFFFF; + auto next_shard_seq_num = shard_list_[cur_shard_idx_+1].id & 0xFFFFFFFFFFFF; return objId(next_shard_seq_num, 0); } return objId(LAST_OBJ_ID); @@ -194,10 +197,10 @@ bool HSHomeObject::PGBlobIterator::create_blobs_snapshot_data(sisl::io_blob_safe bool end_of_shard = false; uint64_t total_bytes = 0; - auto idx = (uint64_t)(last_end_blob_idx_ + 1); + auto idx = cur_start_blob_idx_; while (total_bytes < max_batch_size_ && idx < cur_blob_list_.size()) { - auto info = cur_blob_list_[idx]; + auto info = cur_blob_list_[idx++]; ResyncBlobState state = ResyncBlobState::NORMAL; //handle deleted object if (info.pbas == tombstone_pbas) { @@ -231,10 +234,9 @@ bool HSHomeObject::PGBlobIterator::create_blobs_snapshot_data(sisl::io_blob_safe blob_entries.push_back(CreateResyncBlobDataDirect(builder_, info.blob_id, (uint8_t)state, &data)); total_bytes += blob.size(); } - idx++; - + //should include the deleted blobs + cur_batch_blob_count_ = idx - cur_start_blob_idx_; if (idx == cur_blob_list_.size()) { end_of_shard = true; } - cur_batch_blob_count_ = blob_entries.size(); builder_.FinishSizePrefixed( CreateResyncBlobDataBatchDirect(builder_, &blob_entries, end_of_shard)); diff --git a/src/lib/homestore_backend/replication_message.hpp b/src/lib/homestore_backend/replication_message.hpp index 88c6b694..10bc83d2 100644 --- a/src/lib/homestore_backend/replication_message.hpp +++ b/src/lib/homestore_backend/replication_message.hpp @@ -129,7 +129,7 @@ struct objId { } std::string to_string() const { - return fmt::format("{}[shardId={}, batchId={} ]", value, shard_seq_num, batch_id); + return fmt::format("{}[shardSeqNum={}, batchId={} ]", value, shard_seq_num, batch_id); } }; diff --git a/src/lib/homestore_backend/tests/homeobj_fixture.hpp b/src/lib/homestore_backend/tests/homeobj_fixture.hpp index c0f3bb66..de36c4df 100644 --- a/src/lib/homestore_backend/tests/homeobj_fixture.hpp +++ b/src/lib/homestore_backend/tests/homeobj_fixture.hpp @@ -201,6 +201,20 @@ class HomeObjectFixture : public ::testing::Test { } } + void del_blob(pg_id_t pg_id, shard_id_t shard_id, blob_id_t blob_id) { + g_helper->sync(); + run_on_pg_leader(pg_id, [&]() + { + auto g = _obj_inst->blob_manager()->del(shard_id, blob_id).get(); + ASSERT_TRUE(g); + LOGINFO("delete blob, pg {} shard {} blob {}", pg_id, shard_id, blob_id); + }); + while (blob_exist(shard_id, blob_id)) { + LOGINFO("waiting for shard {} blob {} to be deleted locally", shard_id, blob_id); + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + } + } + // TODO:make this run in parallel void del_all_blobs(std::map< pg_id_t, std::vector< shard_id_t > > const& pg_shard_id_vec, uint64_t const num_blobs_per_shard, std::map< pg_id_t, blob_id_t >& pg_blob_id) { diff --git a/src/lib/homestore_backend/tests/hs_blob_tests.cpp b/src/lib/homestore_backend/tests/hs_blob_tests.cpp index df2c3355..576211fb 100644 --- a/src/lib/homestore_backend/tests/hs_blob_tests.cpp +++ b/src/lib/homestore_backend/tests/hs_blob_tests.cpp @@ -139,71 +139,148 @@ TEST_F(HomeObjectFixture, BasicPutGetDelBlobWRestart) { } TEST_F(HomeObjectFixture, PGBlobIterator) { - // uint64_t num_shards_per_pg = 3; - // uint64_t num_blobs_per_shard = 5; - // std::map< pg_id_t, std::vector< shard_id_t > > pg_shard_id_vec; - // // pg -> next blob_id in this pg - // std::map< pg_id_t, blob_id_t > pg_blob_id; - // - // pg_id_t pg_id{1}; - // create_pg(pg_id); - // for (uint64_t i = 0; i < num_shards_per_pg; i++) { - // auto shard = create_shard(1, 64 * Mi); - // pg_shard_id_vec[1].emplace_back(shard.id); - // pg_blob_id[i] = 0; - // LOGINFO("pg {} shard {}", pg_id, shard.id); - // } - // - // // Put blob for all shards in all pg's. - // put_blobs(pg_shard_id_vec, num_blobs_per_shard, pg_blob_id); - // - // PG* pg1; - // { - // auto lg = std::shared_lock(_obj_inst->_pg_lock); - // auto iter = _obj_inst->_pg_map.find(pg_id); - // ASSERT_TRUE(iter != _obj_inst->_pg_map.end()); - // pg1 = iter->second.get(); - // } - // - // auto pg1_iter = - // std::make_shared< homeobject::HSHomeObject::PGBlobIterator >(*_obj_inst, pg1->pg_info_.replica_set_uuid); - // ASSERT_EQ(pg1_iter->end_of_scan(), false); - // - // // Verify PG shard meta data. - // sisl::io_blob_safe meta_blob; - // pg1_iter->create_pg_shard_snapshot_data(meta_blob); - // ASSERT_TRUE(meta_blob.size() > 0); - // - // auto pg_req = GetSizePrefixedResyncPGShardInfo(meta_blob.bytes()); - // ASSERT_EQ(pg_req->pg()->pg_id(), pg1->pg_info_.id); - // auto u1 = pg_req->pg()->replica_set_uuid(); - // auto u2 = pg1->pg_info_.replica_set_uuid; - // ASSERT_EQ(std::string(u1->begin(), u1->end()), std::string(u2.begin(), u2.end())); - // - // // Verify get blobs for pg. - // uint64_t max_num_blobs_in_batch = 3, max_batch_size_bytes = 128 * Mi; - // std::vector< HSHomeObject::BlobInfoData > blob_data_vec; - // while (!pg1_iter->end_of_scan()) { - // std::vector< HSHomeObject::BlobInfoData > vec; - // bool end_of_shard; - // auto result = pg1_iter->get_next_blobs(max_num_blobs_in_batch, max_batch_size_bytes, vec, end_of_shard); - // ASSERT_EQ(result, 0); - // for (auto& v : vec) { - // blob_data_vec.push_back(std::move(v)); - // } - // } - // - // ASSERT_EQ(blob_data_vec.size(), num_shards_per_pg * num_blobs_per_shard); - // for (auto& b : blob_data_vec) { - // auto g = _obj_inst->blob_manager()->get(b.shard_id, b.blob_id, 0, 0).get(); - // ASSERT_TRUE(!!g); - // auto result = std::move(g.value()); - // LOGINFO("Get blob pg {} shard {} blob {} len {} data {}", 1, b.shard_id, b.blob_id, b.blob.body.size(), - // hex_bytes(result.body.cbytes(), 5)); - // EXPECT_EQ(result.body.size(), b.blob.body.size()); - // EXPECT_EQ(std::memcmp(result.body.bytes(), b.blob.body.cbytes(), result.body.size()), 0); - // EXPECT_EQ(result.user_key.size(), b.blob.user_key.size()); - // EXPECT_EQ(result.user_key, b.blob.user_key); - // EXPECT_EQ(result.object_off, b.blob.object_off); - // } + // Generate test data + uint64_t num_shards_per_pg = 3; + uint64_t num_blobs_per_shard = 5; + std::map< pg_id_t, std::vector< shard_id_t > > pg_shard_id_vec; + std::map< pg_id_t, blob_id_t > pg_blob_id; + + pg_id_t pg_id{1}; + create_pg(pg_id); + for (uint64_t i = 0; i < num_shards_per_pg; i++) { + auto shard = create_shard(1, 64 * Mi); + pg_shard_id_vec[1].emplace_back(shard.id); + pg_blob_id[i] = 0; + LOGINFO("pg {} shard {}", pg_id, shard.id); + } + put_blobs(pg_shard_id_vec, num_blobs_per_shard, pg_blob_id); + + PG* pg; + { + auto lg = std::shared_lock(_obj_inst->_pg_lock); + auto iter = _obj_inst->_pg_map.find(pg_id); + ASSERT_TRUE(iter != _obj_inst->_pg_map.end()); + pg = iter->second.get(); + } + + //Construct shards as [sealed, open, filtered] + seal_shard(pg->shards_.front()->info.id); + ASSERT_EQ(pg->shards_.front()->info.state, homeobject::ShardInfo::State::SEALED); + //Filter out the last shard + auto snp_lsn = pg->shards_.back()->info.lsn - 1; + //Delete some blobs: delete the first blob of each shard + blob_id_t current_blob_id{0}; + for (auto& shard : pg->shards_) { + del_blob(pg->pg_info_.id, shard->info.id, current_blob_id); + current_blob_id += num_blobs_per_shard; + } + + auto pg_iter = + std::make_shared< HSHomeObject::PGBlobIterator >(*_obj_inst, pg->pg_info_.replica_set_uuid, snp_lsn); + ASSERT_EQ(pg_iter->shard_list_.size(), num_shards_per_pg - 1); + // Created blob sizes are distributed in range (1, 16kb) + pg_iter->max_batch_size_ = 16 * 1024; + + // Verify PG meta data + sisl::io_blob_safe meta_blob; + pg_iter->create_pg_snapshot_data(meta_blob); + ASSERT_TRUE(meta_blob.size() > 0); + + SyncMessageHeader* header = r_cast< SyncMessageHeader* >(meta_blob.bytes()); + ASSERT_EQ(header->msg_type, SyncMessageType::PG_META); + auto pg_msg = GetSizePrefixedResyncPGMetaData(meta_blob.cbytes() + sizeof(SyncMessageHeader)); + ASSERT_EQ(pg_msg->pg_id(), pg->pg_info_.id); + auto u1 = pg_msg->replica_set_uuid(); + auto u2 = pg->pg_info_.replica_set_uuid; + ASSERT_EQ(std::string(u1->begin(), u1->end()), std::string(u2.begin(), u2.end())); + ASSERT_EQ(pg_msg->blob_seq_num(), pg->durable_entities().blob_sequence_num.load()); + ASSERT_EQ(pg_msg->shard_seq_num(), pg->shard_sequence_num_); + + auto msg_members = pg_msg->members(); + ASSERT_EQ(msg_members->size(), pg->pg_info_.members.size()); + for (auto m : *msg_members) { + uuids::uuid id{}; + std::copy_n(m->uuid()->data(), 16, id.begin()); + auto it = pg->pg_info_.members.find(PGMember{id}); + ASSERT_TRUE(it != pg->pg_info_.members.end()); + ASSERT_EQ(m->name()->str(), it->name); + ASSERT_EQ(m->priority(), it->priority); + } + + auto idx = 0; + ASSERT_EQ(pg->shards_.size()-1, pg_msg->shard_ids()->size()); + for (auto& shard : pg->shards_) { + if (shard->info.lsn > snp_lsn) { continue; } + ASSERT_EQ(shard->info.id, pg_msg->shard_ids()->Get(idx++)); + } + + //Verify shard meta data + auto shard_list = pg_shard_id_vec[1]; + current_blob_id = 0; + for (auto& shard : pg->shards_) { + auto shard_seq_num = shard->info.id & 0xFFFFFFFFFFFF; + auto batch_id = 0; + objId oid(shard_seq_num, batch_id++); + if (shard->info.lsn > snp_lsn) { + ASSERT_FALSE(pg_iter->update_cursor(oid)); + continue; + } + LOGINFO("shard meta, oid {}", oid.to_string()); + ASSERT_TRUE(pg_iter->update_cursor(oid)); + ASSERT_TRUE(pg_iter->generate_shard_blob_list()); + ASSERT_EQ(pg_iter->cur_blob_list_.size(), num_blobs_per_shard); + sisl::io_blob_safe meta_data; + ASSERT_TRUE(pg_iter->create_shard_snapshot_data(meta_data)); + + SyncMessageHeader* header = r_cast< SyncMessageHeader* >(meta_data.bytes()); + ASSERT_EQ(header->msg_type, SyncMessageType::SHARD_META); + auto shard_msg = GetSizePrefixedResyncShardMetaData(meta_data.cbytes() + sizeof(SyncMessageHeader)); + ASSERT_EQ(shard_msg->shard_id(), shard->info.id); + ASSERT_EQ(shard_msg->pg_id(), pg->pg_info_.id); + ASSERT_EQ(shard_msg->state(), static_cast(shard->info.state)); + ASSERT_EQ(shard_msg->created_lsn(), shard->info.lsn); + ASSERT_EQ(shard_msg->created_time(), shard->info.created_time); + ASSERT_EQ(shard_msg->last_modified_time(), shard->info.last_modified_time); + ASSERT_EQ(shard_msg->total_capacity_bytes(), shard->info.total_capacity_bytes); + + //Verify blob data + uint64_t packed_blob_size{0}; + auto is_finished = false; + //skip the first blob(deleted) of the shard + current_blob_id++; + while (!is_finished) { + oid = objId(shard_seq_num, batch_id++); + ASSERT_TRUE(pg_iter->update_cursor(oid)); + sisl::io_blob_safe blob_batch; + ASSERT_TRUE(pg_iter->create_blobs_snapshot_data(blob_batch)); + header = r_cast< SyncMessageHeader* >(blob_batch.bytes()); + ASSERT_EQ(header->msg_type, SyncMessageType::SHARD_BATCH); + auto blob_msg = GetSizePrefixedResyncBlobDataBatch(blob_batch.cbytes() + sizeof(SyncMessageHeader)); + LOGINFO("blob batch, oid {}, blob_cnt {}", oid.to_string(), blob_msg->blob_list()->size()); + for (auto i = 0; i < static_cast< int >(blob_msg->blob_list()->size()); i++) { + auto b = blob_msg->blob_list()->Get(i); + ASSERT_EQ(b->blob_id(), current_blob_id++); + ASSERT_EQ(b->state(), static_cast(ResyncBlobState::NORMAL)); + auto blob_data = b->data()->Data(); + auto header = r_cast< HSHomeObject::BlobHeader const* >(blob_data); + ASSERT_TRUE(header->valid()); + auto g = _obj_inst->blob_manager()->get(shard->info.id, b->blob_id(), 0, 0).get(); + ASSERT_TRUE(!!g); + auto result = std::move(g.value()); + EXPECT_EQ(result.body.size(), header->blob_size); + ASSERT_TRUE( + memcmp(result.body.cbytes(), blob_data+header->data_offset, header->blob_size) == 0); + packed_blob_size++; + LOGDEBUG( + "[{}]Get blob pg {}, shard {}, blob {}, data_len {}, blob_len {}, header_len {}, user_key_len {}, data {}", + packed_blob_size, pg->pg_info_.id, shard->info.id, b->blob_id(), + b->data()->size(), header->blob_size, sizeof(HSHomeObject::BlobHeader), header->user_key_size, + hex_bytes(result.body.cbytes(), 5)); + } + is_finished = blob_msg->is_last_batch(); + } + ASSERT_EQ(packed_blob_size, num_blobs_per_shard-1); + } + //Verify last obj + ASSERT_TRUE(pg_iter->update_cursor(objId(LAST_OBJ_ID))); }