Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add UT for PGBlobIterator #228

Merged
merged 2 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomeObjectConan(ConanFile):
name = "homeobject"
version = "2.1.8"
version = "2.1.9"

homepage = "https://github.com/eBay/HomeObject"
description = "Blob Store built on HomeReplication"
Expand Down
2 changes: 1 addition & 1 deletion src/lib/homestore_backend/hs_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ class HSHomeObject : public HomeObjectImpl {
objId cur_obj_id_{1, 0};
uint64_t cur_shard_idx_{0};
std::vector<BlobInfo> cur_blob_list_{0};
int64_t last_end_blob_idx_{-1};
uint64_t cur_start_blob_idx_{0};
uint64_t cur_batch_blob_count_{0};
flatbuffers::FlatBufferBuilder builder_;

Expand Down
22 changes: 12 additions & 10 deletions src/lib/homestore_backend/pg_blob_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,18 @@ bool HSHomeObject::PGBlobIterator::update_cursor(objId id) {
//resend batch
if (id.value == cur_obj_id_.value) { return true; }
auto next_obj_id = expected_next_obj_id();
if (id.value != next_obj_id.value) { return false; }
if (id.value != next_obj_id.value) {
LOGE("invalid objId, expected={}, actual={}", next_obj_id.to_string(), id.to_string());
return false;
}
//next shard
if (cur_obj_id_.shard_seq_num != next_obj_id.shard_seq_num) {
cur_shard_idx_++;
last_end_blob_idx_ = -1;
cur_start_blob_idx_ = 0;
cur_batch_blob_count_ = 0;
} else {
//next batch
last_end_blob_idx_ = last_end_blob_idx_ + cur_batch_blob_count_;
cur_start_blob_idx_ = cur_start_blob_idx_ + cur_batch_blob_count_;
cur_batch_blob_count_ = 0;
}
cur_obj_id_ = id;
Expand All @@ -55,12 +58,12 @@ bool HSHomeObject::PGBlobIterator::update_cursor(objId id) {

objId HSHomeObject::PGBlobIterator::expected_next_obj_id() {
//next batch
if (last_end_blob_idx_ + cur_batch_blob_count_ < cur_blob_list_.size() - 1) {
if (cur_start_blob_idx_ + cur_batch_blob_count_ < cur_blob_list_.size()) {
return objId(cur_obj_id_.shard_seq_num, cur_obj_id_.batch_id + 1);
}
//next shard
if (cur_shard_idx_ < shard_list_.size() - 1) {
auto next_shard_seq_num = shard_list_[cur_shard_idx_++].id & 0xFFFFFFFFFFFF;
auto next_shard_seq_num = shard_list_[cur_shard_idx_+1].id & 0xFFFFFFFFFFFF;
return objId(next_shard_seq_num, 0);
}
return objId(LAST_OBJ_ID);
Expand Down Expand Up @@ -194,10 +197,10 @@ bool HSHomeObject::PGBlobIterator::create_blobs_snapshot_data(sisl::io_blob_safe

bool end_of_shard = false;
uint64_t total_bytes = 0;
auto idx = (uint64_t)(last_end_blob_idx_ + 1);
auto idx = cur_start_blob_idx_;

while (total_bytes < max_batch_size_ && idx < cur_blob_list_.size()) {
auto info = cur_blob_list_[idx];
auto info = cur_blob_list_[idx++];
ResyncBlobState state = ResyncBlobState::NORMAL;
//handle deleted object
if (info.pbas == tombstone_pbas) {
Expand Down Expand Up @@ -231,10 +234,9 @@ bool HSHomeObject::PGBlobIterator::create_blobs_snapshot_data(sisl::io_blob_safe
blob_entries.push_back(CreateResyncBlobDataDirect(builder_, info.blob_id, (uint8_t)state, &data));
total_bytes += blob.size();
}
idx++;

//should include the deleted blobs
cur_batch_blob_count_ = idx - cur_start_blob_idx_;
if (idx == cur_blob_list_.size()) { end_of_shard = true; }
cur_batch_blob_count_ = blob_entries.size();
builder_.FinishSizePrefixed(
CreateResyncBlobDataBatchDirect(builder_, &blob_entries, end_of_shard));

Expand Down
2 changes: 1 addition & 1 deletion src/lib/homestore_backend/replication_message.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ struct objId {
}

std::string to_string() const {
return fmt::format("{}[shardId={}, batchId={} ]", value, shard_seq_num, batch_id);
return fmt::format("{}[shardSeqNum={}, batchId={} ]", value, shard_seq_num, batch_id);
}
};

Expand Down
14 changes: 14 additions & 0 deletions src/lib/homestore_backend/tests/homeobj_fixture.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,20 @@ class HomeObjectFixture : public ::testing::Test {
}
}

void del_blob(pg_id_t pg_id, shard_id_t shard_id, blob_id_t blob_id) {
g_helper->sync();
run_on_pg_leader(pg_id, [&]()
{
auto g = _obj_inst->blob_manager()->del(shard_id, blob_id).get();
ASSERT_TRUE(g);
LOGINFO("delete blob, pg {} shard {} blob {}", pg_id, shard_id, blob_id);
});
while (blob_exist(shard_id, blob_id)) {
LOGINFO("waiting for shard {} blob {} to be deleted locally", shard_id, blob_id);
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
}

// TODO:make this run in parallel
void del_all_blobs(std::map< pg_id_t, std::vector< shard_id_t > > const& pg_shard_id_vec,
uint64_t const num_blobs_per_shard, std::map< pg_id_t, blob_id_t >& pg_blob_id) {
Expand Down
211 changes: 144 additions & 67 deletions src/lib/homestore_backend/tests/hs_blob_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,73 +139,150 @@ TEST_F(HomeObjectFixture, BasicPutGetDelBlobWRestart) {
}

TEST_F(HomeObjectFixture, PGBlobIterator) {
// uint64_t num_shards_per_pg = 3;
// uint64_t num_blobs_per_shard = 5;
// std::map< pg_id_t, std::vector< shard_id_t > > pg_shard_id_vec;
// // pg -> next blob_id in this pg
// std::map< pg_id_t, blob_id_t > pg_blob_id;
//
// pg_id_t pg_id{1};
// create_pg(pg_id);
// for (uint64_t i = 0; i < num_shards_per_pg; i++) {
// auto shard = create_shard(1, 64 * Mi);
// pg_shard_id_vec[1].emplace_back(shard.id);
// pg_blob_id[i] = 0;
// LOGINFO("pg {} shard {}", pg_id, shard.id);
// }
//
// // Put blob for all shards in all pg's.
// put_blobs(pg_shard_id_vec, num_blobs_per_shard, pg_blob_id);
//
// PG* pg1;
// {
// auto lg = std::shared_lock(_obj_inst->_pg_lock);
// auto iter = _obj_inst->_pg_map.find(pg_id);
// ASSERT_TRUE(iter != _obj_inst->_pg_map.end());
// pg1 = iter->second.get();
// }
//
// auto pg1_iter =
// std::make_shared< homeobject::HSHomeObject::PGBlobIterator >(*_obj_inst, pg1->pg_info_.replica_set_uuid);
// ASSERT_EQ(pg1_iter->end_of_scan(), false);
//
// // Verify PG shard meta data.
// sisl::io_blob_safe meta_blob;
// pg1_iter->create_pg_shard_snapshot_data(meta_blob);
// ASSERT_TRUE(meta_blob.size() > 0);
//
// auto pg_req = GetSizePrefixedResyncPGShardInfo(meta_blob.bytes());
// ASSERT_EQ(pg_req->pg()->pg_id(), pg1->pg_info_.id);
// auto u1 = pg_req->pg()->replica_set_uuid();
// auto u2 = pg1->pg_info_.replica_set_uuid;
// ASSERT_EQ(std::string(u1->begin(), u1->end()), std::string(u2.begin(), u2.end()));
//
// // Verify get blobs for pg.
// uint64_t max_num_blobs_in_batch = 3, max_batch_size_bytes = 128 * Mi;
// std::vector< HSHomeObject::BlobInfoData > blob_data_vec;
// while (!pg1_iter->end_of_scan()) {
// std::vector< HSHomeObject::BlobInfoData > vec;
// bool end_of_shard;
// auto result = pg1_iter->get_next_blobs(max_num_blobs_in_batch, max_batch_size_bytes, vec, end_of_shard);
// ASSERT_EQ(result, 0);
// for (auto& v : vec) {
// blob_data_vec.push_back(std::move(v));
// }
// }
//
// ASSERT_EQ(blob_data_vec.size(), num_shards_per_pg * num_blobs_per_shard);
// for (auto& b : blob_data_vec) {
// auto g = _obj_inst->blob_manager()->get(b.shard_id, b.blob_id, 0, 0).get();
// ASSERT_TRUE(!!g);
// auto result = std::move(g.value());
// LOGINFO("Get blob pg {} shard {} blob {} len {} data {}", 1, b.shard_id, b.blob_id, b.blob.body.size(),
// hex_bytes(result.body.cbytes(), 5));
// EXPECT_EQ(result.body.size(), b.blob.body.size());
// EXPECT_EQ(std::memcmp(result.body.bytes(), b.blob.body.cbytes(), result.body.size()), 0);
// EXPECT_EQ(result.user_key.size(), b.blob.user_key.size());
// EXPECT_EQ(result.user_key, b.blob.user_key);
// EXPECT_EQ(result.object_off, b.blob.object_off);
// }
constexpr pg_id_t pg_id{1};
// Generate test data
uint64_t num_shards_per_pg = 3;
uint64_t num_blobs_per_shard = 5;
std::map< pg_id_t, std::vector< shard_id_t > > pg_shard_id_vec;
std::map< pg_id_t, blob_id_t > pg_blob_id;

auto& shard_list = pg_shard_id_vec[pg_id];
create_pg(pg_id);
for (uint64_t i = 0; i < num_shards_per_pg; i++) {
auto shard = create_shard(pg_id, 64 * Mi);
shard_list.emplace_back(shard.id);
pg_blob_id[i] = 0;
LOGINFO("pg {} shard {}", pg_id, shard.id);
}
put_blobs(pg_shard_id_vec, num_blobs_per_shard, pg_blob_id);

PG* pg;
{
auto lg = std::shared_lock(_obj_inst->_pg_lock);
auto iter = _obj_inst->_pg_map.find(pg_id);
ASSERT_TRUE(iter != _obj_inst->_pg_map.end());
pg = iter->second.get();
}

//Construct shards as [sealed, open, filtered]
seal_shard(pg->shards_.front()->info.id);
ASSERT_EQ(pg->shards_.front()->info.state, homeobject::ShardInfo::State::SEALED);
//Filter out the last shard
auto snp_lsn = pg->shards_.back()->info.lsn - 1;
//Delete some blobs: delete the first blob of each shard
blob_id_t current_blob_id{0};
for (auto& shard : pg->shards_) {
del_blob(pg->pg_info_.id, shard->info.id, current_blob_id);
current_blob_id += num_blobs_per_shard;
}

auto pg_iter =
std::make_shared< HSHomeObject::PGBlobIterator >(*_obj_inst, pg->pg_info_.replica_set_uuid, snp_lsn);
ASSERT_EQ(pg_iter->shard_list_.size(), num_shards_per_pg - 1);
// Created blob sizes are distributed in range (1, 16kb)
pg_iter->max_batch_size_ = 16 * 1024;

// Verify PG meta data
sisl::io_blob_safe meta_blob;
pg_iter->create_pg_snapshot_data(meta_blob);
ASSERT_TRUE(meta_blob.size() > 0);

SyncMessageHeader* header = r_cast< SyncMessageHeader* >(meta_blob.bytes());
ASSERT_EQ(header->msg_type, SyncMessageType::PG_META);
auto pg_msg = GetSizePrefixedResyncPGMetaData(meta_blob.cbytes() + sizeof(SyncMessageHeader));
ASSERT_EQ(pg_msg->pg_id(), pg->pg_info_.id);
auto u1 = pg_msg->replica_set_uuid();
auto u2 = pg->pg_info_.replica_set_uuid;
ASSERT_EQ(std::string(u1->begin(), u1->end()), std::string(u2.begin(), u2.end()));
ASSERT_EQ(pg_msg->blob_seq_num(), pg->durable_entities().blob_sequence_num.load());
ASSERT_EQ(pg_msg->shard_seq_num(), pg->shard_sequence_num_);

auto msg_members = pg_msg->members();
ASSERT_EQ(msg_members->size(), pg->pg_info_.members.size());
for (auto m : *msg_members) {
uuids::uuid id{};
std::copy_n(m->uuid()->data(), 16, id.begin());
auto it = pg->pg_info_.members.find(PGMember{id});
ASSERT_TRUE(it != pg->pg_info_.members.end());
ASSERT_EQ(m->name()->str(), it->name);
ASSERT_EQ(m->priority(), it->priority);
}

auto idx = 0;
ASSERT_EQ(pg->shards_.size()-1, pg_msg->shard_ids()->size());
for (auto& shard : pg->shards_) {
if (shard->info.lsn > snp_lsn) { continue; }
ASSERT_EQ(shard->info.id, pg_msg->shard_ids()->Get(idx++));
}

//Verify shard meta data
current_blob_id = 0;
for (auto& shard : pg->shards_) {
auto shard_seq_num = HSHomeObject::get_sequence_num_from_shard_id(shard->info.id);
auto batch_id = 0;
objId oid(shard_seq_num, batch_id++);
if (shard->info.lsn > snp_lsn) {
ASSERT_FALSE(pg_iter->update_cursor(oid));
continue;
}
LOGINFO("shard meta, oid {}", oid.to_string());
ASSERT_TRUE(pg_iter->update_cursor(oid));
ASSERT_TRUE(pg_iter->generate_shard_blob_list());
ASSERT_EQ(pg_iter->cur_blob_list_.size(), num_blobs_per_shard);
sisl::io_blob_safe meta_data;
ASSERT_TRUE(pg_iter->create_shard_snapshot_data(meta_data));

SyncMessageHeader* header = r_cast< SyncMessageHeader* >(meta_data.bytes());
ASSERT_EQ(header->msg_type, SyncMessageType::SHARD_META);
auto shard_msg = GetSizePrefixedResyncShardMetaData(meta_data.cbytes() + sizeof(SyncMessageHeader));
ASSERT_EQ(shard_msg->shard_id(), shard->info.id);
ASSERT_EQ(shard_msg->pg_id(), pg->pg_info_.id);
ASSERT_EQ(shard_msg->state(), static_cast<uint8_t>(shard->info.state));
ASSERT_EQ(shard_msg->created_lsn(), shard->info.lsn);
ASSERT_EQ(shard_msg->created_time(), shard->info.created_time);
ASSERT_EQ(shard_msg->last_modified_time(), shard->info.last_modified_time);
ASSERT_EQ(shard_msg->total_capacity_bytes(), shard->info.total_capacity_bytes);

//Verify blob data
uint64_t packed_blob_size{0};
auto is_finished = false;
//skip the first blob(deleted) of the shard
current_blob_id++;
while (!is_finished) {
oid = objId(shard_seq_num, batch_id++);
ASSERT_TRUE(pg_iter->update_cursor(oid));
sisl::io_blob_safe blob_batch;
ASSERT_TRUE(pg_iter->create_blobs_snapshot_data(blob_batch));
header = r_cast< SyncMessageHeader* >(blob_batch.bytes());
ASSERT_EQ(header->msg_type, SyncMessageType::SHARD_BATCH);
auto blob_msg = GetSizePrefixedResyncBlobDataBatch(blob_batch.cbytes() + sizeof(SyncMessageHeader));
LOGINFO("blob batch, oid {}, blob_cnt {}", oid.to_string(), blob_msg->blob_list()->size());
for (auto i = 0; i < static_cast< int >(blob_msg->blob_list()->size()); i++) {
auto b = blob_msg->blob_list()->Get(i);
ASSERT_EQ(b->blob_id(), current_blob_id++);
ASSERT_EQ(b->state(), static_cast<uint8_t>(ResyncBlobState::NORMAL));
auto blob_data = b->data()->Data();
auto header = r_cast< HSHomeObject::BlobHeader const* >(blob_data);
ASSERT_TRUE(header->valid());
auto g = _obj_inst->blob_manager()->get(shard->info.id, b->blob_id(), 0, 0).get();
ASSERT_TRUE(!!g);
auto result = std::move(g.value());
EXPECT_EQ(result.body.size(), header->blob_size);
ASSERT_TRUE(
memcmp(result.body.cbytes(), blob_data+header->data_offset, header->blob_size) == 0);
packed_blob_size++;
LOGDEBUG(
"[{}]Get blob pg {}, shard {}, blob {}, data_len {}, blob_len {}, header_len {}, user_key_len {}, data {}",
packed_blob_size, pg->pg_info_.id, shard->info.id, b->blob_id(),
b->data()->size(), header->blob_size, sizeof(HSHomeObject::BlobHeader), header->user_key_size,
hex_bytes(result.body.cbytes(), 5));
}
is_finished = blob_msg->is_last_batch();
}
ASSERT_EQ(packed_blob_size, num_blobs_per_shard-1);
}
//Verify last obj
ASSERT_TRUE(pg_iter->update_cursor(objId(LAST_OBJ_ID)));
}

TEST_F(HomeObjectFixture, SnapshotReceiveHandler) {}
Loading