Skip to content

Commit

Permalink
Add UT for PGBlobIterator
Browse files Browse the repository at this point in the history
  • Loading branch information
yuwmao committed Nov 20, 2024
1 parent 7b9819d commit a454abe
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 79 deletions.
2 changes: 1 addition & 1 deletion src/lib/homestore_backend/hs_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ class HSHomeObject : public HomeObjectImpl {
objId cur_obj_id_{1, 0};
uint64_t cur_shard_idx_{0};
std::vector<BlobInfo> cur_blob_list_{0};
int64_t last_end_blob_idx_{-1};
uint64_t cur_start_blob_idx_{0};
uint64_t cur_batch_blob_count_{0};
flatbuffers::FlatBufferBuilder builder_;

Expand Down
22 changes: 12 additions & 10 deletions src/lib/homestore_backend/pg_blob_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,18 @@ bool HSHomeObject::PGBlobIterator::update_cursor(objId id) {
//resend batch
if (id.value == cur_obj_id_.value) { return true; }
auto next_obj_id = expected_next_obj_id();
if (id.value != next_obj_id.value) { return false; }
if (id.value != next_obj_id.value) {
LOGE("invalid objId, expected={}, actual={}", next_obj_id.to_string(), id.to_string());
return false;
}
//next shard
if (cur_obj_id_.shard_seq_num != next_obj_id.shard_seq_num) {
cur_shard_idx_++;
last_end_blob_idx_ = -1;
cur_start_blob_idx_ = 0;
cur_batch_blob_count_ = 0;
} else {
//next batch
last_end_blob_idx_ = last_end_blob_idx_ + cur_batch_blob_count_;
cur_start_blob_idx_ = cur_start_blob_idx_ + cur_batch_blob_count_;
cur_batch_blob_count_ = 0;
}
cur_obj_id_ = id;
Expand All @@ -55,12 +58,12 @@ bool HSHomeObject::PGBlobIterator::update_cursor(objId id) {

objId HSHomeObject::PGBlobIterator::expected_next_obj_id() {
//next batch
if (last_end_blob_idx_ + cur_batch_blob_count_ < cur_blob_list_.size() - 1) {
if (cur_start_blob_idx_ + cur_batch_blob_count_ < cur_blob_list_.size()) {
return objId(cur_obj_id_.shard_seq_num, cur_obj_id_.batch_id + 1);
}
//next shard
if (cur_shard_idx_ < shard_list_.size() - 1) {
auto next_shard_seq_num = shard_list_[cur_shard_idx_++].id & 0xFFFFFFFFFFFF;
auto next_shard_seq_num = shard_list_[cur_shard_idx_+1].id & 0xFFFFFFFFFFFF;
return objId(next_shard_seq_num, 0);
}
return objId(LAST_OBJ_ID);
Expand Down Expand Up @@ -194,10 +197,10 @@ bool HSHomeObject::PGBlobIterator::create_blobs_snapshot_data(sisl::io_blob_safe

bool end_of_shard = false;
uint64_t total_bytes = 0;
auto idx = (uint64_t)(last_end_blob_idx_ + 1);
auto idx = cur_start_blob_idx_;

while (total_bytes < max_batch_size_ && idx < cur_blob_list_.size()) {
auto info = cur_blob_list_[idx];
auto info = cur_blob_list_[idx++];
ResyncBlobState state = ResyncBlobState::NORMAL;
//handle deleted object
if (info.pbas == tombstone_pbas) {
Expand Down Expand Up @@ -231,10 +234,9 @@ bool HSHomeObject::PGBlobIterator::create_blobs_snapshot_data(sisl::io_blob_safe
blob_entries.push_back(CreateResyncBlobDataDirect(builder_, info.blob_id, (uint8_t)state, &data));
total_bytes += blob.size();
}
idx++;

//should include the deleted blobs
cur_batch_blob_count_ = idx - cur_start_blob_idx_;
if (idx == cur_blob_list_.size()) { end_of_shard = true; }
cur_batch_blob_count_ = blob_entries.size();
builder_.FinishSizePrefixed(
CreateResyncBlobDataBatchDirect(builder_, &blob_entries, end_of_shard));

Expand Down
2 changes: 1 addition & 1 deletion src/lib/homestore_backend/replication_message.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ struct objId {
}

std::string to_string() const {
return fmt::format("{}[shardId={}, batchId={} ]", value, shard_seq_num, batch_id);
return fmt::format("{}[shardSeqNum={}, batchId={} ]", value, shard_seq_num, batch_id);
}
};

Expand Down
14 changes: 14 additions & 0 deletions src/lib/homestore_backend/tests/homeobj_fixture.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,20 @@ class HomeObjectFixture : public ::testing::Test {
}
}

void del_blob(pg_id_t pg_id, shard_id_t shard_id, blob_id_t blob_id) {
g_helper->sync();
run_on_pg_leader(pg_id, [&]()
{
auto g = _obj_inst->blob_manager()->del(shard_id, blob_id).get();
ASSERT_TRUE(g);
LOGINFO("delete blob, pg {} shard {} blob {}", pg_id, shard_id, blob_id);
});
while (blob_exist(shard_id, blob_id)) {
LOGINFO("waiting for shard {} blob {} to be deleted locally", shard_id, blob_id);
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
}

// TODO:make this run in parallel
void del_all_blobs(std::map< pg_id_t, std::vector< shard_id_t > > const& pg_shard_id_vec,
uint64_t const num_blobs_per_shard, std::map< pg_id_t, blob_id_t >& pg_blob_id) {
Expand Down
211 changes: 144 additions & 67 deletions src/lib/homestore_backend/tests/hs_blob_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,71 +139,148 @@ TEST_F(HomeObjectFixture, BasicPutGetDelBlobWRestart) {
}

TEST_F(HomeObjectFixture, PGBlobIterator) {
// uint64_t num_shards_per_pg = 3;
// uint64_t num_blobs_per_shard = 5;
// std::map< pg_id_t, std::vector< shard_id_t > > pg_shard_id_vec;
// // pg -> next blob_id in this pg
// std::map< pg_id_t, blob_id_t > pg_blob_id;
//
// pg_id_t pg_id{1};
// create_pg(pg_id);
// for (uint64_t i = 0; i < num_shards_per_pg; i++) {
// auto shard = create_shard(1, 64 * Mi);
// pg_shard_id_vec[1].emplace_back(shard.id);
// pg_blob_id[i] = 0;
// LOGINFO("pg {} shard {}", pg_id, shard.id);
// }
//
// // Put blob for all shards in all pg's.
// put_blobs(pg_shard_id_vec, num_blobs_per_shard, pg_blob_id);
//
// PG* pg1;
// {
// auto lg = std::shared_lock(_obj_inst->_pg_lock);
// auto iter = _obj_inst->_pg_map.find(pg_id);
// ASSERT_TRUE(iter != _obj_inst->_pg_map.end());
// pg1 = iter->second.get();
// }
//
// auto pg1_iter =
// std::make_shared< homeobject::HSHomeObject::PGBlobIterator >(*_obj_inst, pg1->pg_info_.replica_set_uuid);
// ASSERT_EQ(pg1_iter->end_of_scan(), false);
//
// // Verify PG shard meta data.
// sisl::io_blob_safe meta_blob;
// pg1_iter->create_pg_shard_snapshot_data(meta_blob);
// ASSERT_TRUE(meta_blob.size() > 0);
//
// auto pg_req = GetSizePrefixedResyncPGShardInfo(meta_blob.bytes());
// ASSERT_EQ(pg_req->pg()->pg_id(), pg1->pg_info_.id);
// auto u1 = pg_req->pg()->replica_set_uuid();
// auto u2 = pg1->pg_info_.replica_set_uuid;
// ASSERT_EQ(std::string(u1->begin(), u1->end()), std::string(u2.begin(), u2.end()));
//
// // Verify get blobs for pg.
// uint64_t max_num_blobs_in_batch = 3, max_batch_size_bytes = 128 * Mi;
// std::vector< HSHomeObject::BlobInfoData > blob_data_vec;
// while (!pg1_iter->end_of_scan()) {
// std::vector< HSHomeObject::BlobInfoData > vec;
// bool end_of_shard;
// auto result = pg1_iter->get_next_blobs(max_num_blobs_in_batch, max_batch_size_bytes, vec, end_of_shard);
// ASSERT_EQ(result, 0);
// for (auto& v : vec) {
// blob_data_vec.push_back(std::move(v));
// }
// }
//
// ASSERT_EQ(blob_data_vec.size(), num_shards_per_pg * num_blobs_per_shard);
// for (auto& b : blob_data_vec) {
// auto g = _obj_inst->blob_manager()->get(b.shard_id, b.blob_id, 0, 0).get();
// ASSERT_TRUE(!!g);
// auto result = std::move(g.value());
// LOGINFO("Get blob pg {} shard {} blob {} len {} data {}", 1, b.shard_id, b.blob_id, b.blob.body.size(),
// hex_bytes(result.body.cbytes(), 5));
// EXPECT_EQ(result.body.size(), b.blob.body.size());
// EXPECT_EQ(std::memcmp(result.body.bytes(), b.blob.body.cbytes(), result.body.size()), 0);
// EXPECT_EQ(result.user_key.size(), b.blob.user_key.size());
// EXPECT_EQ(result.user_key, b.blob.user_key);
// EXPECT_EQ(result.object_off, b.blob.object_off);
// }
// Generate test data
uint64_t num_shards_per_pg = 3;
uint64_t num_blobs_per_shard = 5;
std::map< pg_id_t, std::vector< shard_id_t > > pg_shard_id_vec;
std::map< pg_id_t, blob_id_t > pg_blob_id;

pg_id_t pg_id{1};
create_pg(pg_id);
for (uint64_t i = 0; i < num_shards_per_pg; i++) {
auto shard = create_shard(1, 64 * Mi);
pg_shard_id_vec[1].emplace_back(shard.id);
pg_blob_id[i] = 0;
LOGINFO("pg {} shard {}", pg_id, shard.id);
}
put_blobs(pg_shard_id_vec, num_blobs_per_shard, pg_blob_id);

PG* pg;
{
auto lg = std::shared_lock(_obj_inst->_pg_lock);
auto iter = _obj_inst->_pg_map.find(pg_id);
ASSERT_TRUE(iter != _obj_inst->_pg_map.end());
pg = iter->second.get();
}

//Construct shards as [sealed, open, filtered]
seal_shard(pg->shards_.front()->info.id);
ASSERT_EQ(pg->shards_.front()->info.state, homeobject::ShardInfo::State::SEALED);
//Filter out the last shard
auto snp_lsn = pg->shards_.back()->info.lsn - 1;
//Delete some blobs: delete the first blob of each shard
blob_id_t current_blob_id{0};
for (auto& shard : pg->shards_) {
del_blob(pg->pg_info_.id, shard->info.id, current_blob_id);
current_blob_id += num_blobs_per_shard;
}

auto pg_iter =
std::make_shared< HSHomeObject::PGBlobIterator >(*_obj_inst, pg->pg_info_.replica_set_uuid, snp_lsn);
ASSERT_EQ(pg_iter->shard_list_.size(), num_shards_per_pg - 1);
// Created blob sizes are distributed in range (1, 16kb)
pg_iter->max_batch_size_ = 16 * 1024;

// Verify PG meta data
sisl::io_blob_safe meta_blob;
pg_iter->create_pg_snapshot_data(meta_blob);
ASSERT_TRUE(meta_blob.size() > 0);

SyncMessageHeader* header = r_cast< SyncMessageHeader* >(meta_blob.bytes());
ASSERT_EQ(header->msg_type, SyncMessageType::PG_META);
auto pg_msg = GetSizePrefixedResyncPGMetaData(meta_blob.cbytes() + sizeof(SyncMessageHeader));
ASSERT_EQ(pg_msg->pg_id(), pg->pg_info_.id);
auto u1 = pg_msg->replica_set_uuid();
auto u2 = pg->pg_info_.replica_set_uuid;
ASSERT_EQ(std::string(u1->begin(), u1->end()), std::string(u2.begin(), u2.end()));
ASSERT_EQ(pg_msg->blob_seq_num(), pg->durable_entities().blob_sequence_num.load());
ASSERT_EQ(pg_msg->shard_seq_num(), pg->shard_sequence_num_);

auto msg_members = pg_msg->members();
ASSERT_EQ(msg_members->size(), pg->pg_info_.members.size());
for (auto m : *msg_members) {
uuids::uuid id{};
std::copy_n(m->uuid()->data(), 16, id.begin());
auto it = pg->pg_info_.members.find(PGMember{id});
ASSERT_TRUE(it != pg->pg_info_.members.end());
ASSERT_EQ(m->name()->str(), it->name);
ASSERT_EQ(m->priority(), it->priority);
}

auto idx = 0;
ASSERT_EQ(pg->shards_.size()-1, pg_msg->shard_ids()->size());
for (auto& shard : pg->shards_) {
if (shard->info.lsn > snp_lsn) { continue; }
ASSERT_EQ(shard->info.id, pg_msg->shard_ids()->Get(idx++));
}

//Verify shard meta data
auto shard_list = pg_shard_id_vec[1];
current_blob_id = 0;
for (auto& shard : pg->shards_) {
auto shard_seq_num = shard->info.id & 0xFFFFFFFFFFFF;
auto batch_id = 0;
objId oid(shard_seq_num, batch_id++);
if (shard->info.lsn > snp_lsn) {
ASSERT_FALSE(pg_iter->update_cursor(oid));
continue;
}
LOGINFO("shard meta, oid {}", oid.to_string());
ASSERT_TRUE(pg_iter->update_cursor(oid));
ASSERT_TRUE(pg_iter->generate_shard_blob_list());
ASSERT_EQ(pg_iter->cur_blob_list_.size(), num_blobs_per_shard);
sisl::io_blob_safe meta_data;
ASSERT_TRUE(pg_iter->create_shard_snapshot_data(meta_data));

SyncMessageHeader* header = r_cast< SyncMessageHeader* >(meta_data.bytes());
ASSERT_EQ(header->msg_type, SyncMessageType::SHARD_META);
auto shard_msg = GetSizePrefixedResyncShardMetaData(meta_data.cbytes() + sizeof(SyncMessageHeader));
ASSERT_EQ(shard_msg->shard_id(), shard->info.id);
ASSERT_EQ(shard_msg->pg_id(), pg->pg_info_.id);
ASSERT_EQ(shard_msg->state(), static_cast<uint8_t>(shard->info.state));
ASSERT_EQ(shard_msg->created_lsn(), shard->info.lsn);
ASSERT_EQ(shard_msg->created_time(), shard->info.created_time);
ASSERT_EQ(shard_msg->last_modified_time(), shard->info.last_modified_time);
ASSERT_EQ(shard_msg->total_capacity_bytes(), shard->info.total_capacity_bytes);

//Verify blob data
uint64_t packed_blob_size{0};
auto is_finished = false;
//skip the first blob(deleted) of the shard
current_blob_id++;
while (!is_finished) {
oid = objId(shard_seq_num, batch_id++);
ASSERT_TRUE(pg_iter->update_cursor(oid));
sisl::io_blob_safe blob_batch;
ASSERT_TRUE(pg_iter->create_blobs_snapshot_data(blob_batch));
header = r_cast< SyncMessageHeader* >(blob_batch.bytes());
ASSERT_EQ(header->msg_type, SyncMessageType::SHARD_BATCH);
auto blob_msg = GetSizePrefixedResyncBlobDataBatch(blob_batch.cbytes() + sizeof(SyncMessageHeader));
LOGINFO("blob batch, oid {}, blob_cnt {}", oid.to_string(), blob_msg->blob_list()->size());
for (auto i = 0; i < static_cast< int >(blob_msg->blob_list()->size()); i++) {
auto b = blob_msg->blob_list()->Get(i);
ASSERT_EQ(b->blob_id(), current_blob_id++);
ASSERT_EQ(b->state(), static_cast<uint8_t>(ResyncBlobState::NORMAL));
auto blob_data = b->data()->Data();
auto header = r_cast< HSHomeObject::BlobHeader const* >(blob_data);
ASSERT_TRUE(header->valid());
auto g = _obj_inst->blob_manager()->get(shard->info.id, b->blob_id(), 0, 0).get();
ASSERT_TRUE(!!g);
auto result = std::move(g.value());
EXPECT_EQ(result.body.size(), header->blob_size);
ASSERT_TRUE(
memcmp(result.body.cbytes(), blob_data+header->data_offset, header->blob_size) == 0);
packed_blob_size++;
LOGDEBUG(
"[{}]Get blob pg {}, shard {}, blob {}, data_len {}, blob_len {}, header_len {}, user_key_len {}, data {}",
packed_blob_size, pg->pg_info_.id, shard->info.id, b->blob_id(),
b->data()->size(), header->blob_size, sizeof(HSHomeObject::BlobHeader), header->user_key_size,
hex_bytes(result.body.cbytes(), 5));
}
is_finished = blob_msg->is_last_batch();
}
ASSERT_EQ(packed_blob_size, num_blobs_per_shard-1);
}
//Verify last obj
ASSERT_TRUE(pg_iter->update_cursor(objId(LAST_OBJ_ID)));
}

0 comments on commit a454abe

Please sign in to comment.