Skip to content

Commit

Permalink
Release chunk for sealed shards & Enhance UT
Browse files Browse the repository at this point in the history
  • Loading branch information
koujl committed Nov 27, 2024
1 parent a62ca5a commit 0847b98
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 19 deletions.
3 changes: 2 additions & 1 deletion src/lib/homestore_backend/hs_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,8 @@ class HSHomeObject : public HomeObjectImpl {

void process_pg_snapshot_data(ResyncPGMetaData const& pg_meta);
int process_shard_snapshot_data(ResyncShardMetaData const& shard_meta);
int process_blobs_snapshot_data(ResyncBlobDataBatch const& data_blobs, snp_batch_id_t batch_num);
int process_blobs_snapshot_data(ResyncBlobDataBatch const& data_blobs, snp_batch_id_t batch_num,
bool is_last_batch);

int64_t get_context_lsn() const;
void reset_context(int64_t lsn, pg_id_t pg_id);
Expand Down
3 changes: 2 additions & 1 deletion src/lib/homestore_backend/replication_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,8 @@ void ReplicationStateMachine::write_snapshot_data(std::shared_ptr< homestore::sn
HSHomeObject::get_sequence_num_from_shard_id(m_snp_rcv_handler->get_shard_cursor()),
"Shard id not matching with the current shard cursor");
auto blob_batch = GetSizePrefixedResyncBlobDataBatch(data_buf);
auto ret = m_snp_rcv_handler->process_blobs_snapshot_data(*blob_batch, obj_id.batch_id);
auto ret =
m_snp_rcv_handler->process_blobs_snapshot_data(*blob_batch, obj_id.batch_id, blob_batch->is_last_batch());
if (ret) {
// Do not proceed, will request for resending the current blob batch
LOGE("Failed to process blob snapshot data lsn:{} obj_id {} shard {} batch {}, err {}", s->get_last_log_idx(),
Expand Down
26 changes: 20 additions & 6 deletions src/lib/homestore_backend/snapshot_receive_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,16 @@ int HSHomeObject::SnapshotReceiveHandler::process_shard_snapshot_data(ResyncShar
}

int HSHomeObject::SnapshotReceiveHandler::process_blobs_snapshot_data(ResyncBlobDataBatch const& data_blobs,
const snp_batch_id_t batch_num) {
const snp_batch_id_t batch_num,
bool is_last_batch) {
ctx_->cur_batch_num = batch_num;

// Find chunk id for current shard
auto chunk_id = home_obj_.get_shard_chunk(ctx_->shard_cursor);
RELEASE_ASSERT(chunk_id.has_value(), "Failed to load chunk of current shard_cursor:{}", ctx_->shard_cursor);
homestore::blk_alloc_hints hints;
hints.chunk_id_hint = *chunk_id;

for (unsigned int i = 0; i < data_blobs.blob_list()->size(); i++) {
const auto blob = data_blobs.blob_list()->Get(i);

Expand Down Expand Up @@ -141,11 +149,6 @@ int HSHomeObject::SnapshotReceiveHandler::process_blobs_snapshot_data(ResyncBlob
}

// Alloc & persist blob data
auto chunk_id = home_obj_.get_shard_chunk(ctx_->shard_cursor);
RELEASE_ASSERT(chunk_id.has_value(), "Failed to load chunk of current shard_cursor:{}", ctx_->shard_cursor);
homestore::blk_alloc_hints hints;
hints.chunk_id_hint = *chunk_id;

auto data_size = blob->data()->size();
homestore::MultiBlkId blk_id;
auto status = homestore::data_service().alloc_blks(
Expand Down Expand Up @@ -190,6 +193,17 @@ int HSHomeObject::SnapshotReceiveHandler::process_blobs_snapshot_data(ResyncBlob
}
}

if (is_last_batch) {
// Release chunk for sealed shard
ShardInfo::State state;
{
std::scoped_lock lock_guard(home_obj_._shard_lock);
auto iter = home_obj_._shard_map.find(ctx_->shard_cursor);
state = (*iter->second)->info.state;
}
if (state == ShardInfo::State::SEALED) { home_obj_.chunk_selector()->release_chunk(chunk_id.value()); }
}

return 0;
}

Expand Down
56 changes: 45 additions & 11 deletions src/lib/homestore_backend/tests/hs_blob_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,10 @@ TEST_F(HomeObjectFixture, SnapshotReceiveHandler) {
// MUST TEST WITH replica=1
constexpr uint64_t snp_lsn = 1;
constexpr uint64_t num_shards_per_pg = 3;
constexpr uint64_t num_open_shards_per_pg = 2; // Should be less than num_shards_per_pg
constexpr uint64_t num_batches_per_shard = 3;
constexpr uint64_t num_blobs_per_batch = 5;
constexpr int corrupted_blob_percentage = 10;

// We have to create a PG first to init repl_dev
constexpr pg_id_t pg_id = 1;
Expand Down Expand Up @@ -330,6 +332,11 @@ TEST_F(HomeObjectFixture, SnapshotReceiveHandler) {
handler->process_pg_snapshot_data(*pg_meta);

// Step 2: Test shard and blob batches
std::random_device rd; // Random generators for blob corruption
std::mt19937 gen(rd());
std::uniform_int_distribution<> corrupt_dis(1, 100);
std::uniform_int_distribution<> random_bytes_dis(1, 16 * 1024);

blob_id_t cur_blob_id{0};
for (uint64_t i = 1; i <= num_shards_per_pg; i++) {
LOGINFO("TESTING: applying meta for shard {}", i);
Expand All @@ -338,7 +345,8 @@ TEST_F(HomeObjectFixture, SnapshotReceiveHandler) {
// Generate ResyncShardMetaData message
ShardInfo shard;
shard.id = i;
shard.state = ShardInfo::State::OPEN;
shard.state =
i <= num_shards_per_pg - num_open_shards_per_pg ? ShardInfo::State::SEALED : ShardInfo::State::OPEN;
shard.created_time = get_time_since_epoch_ms();
shard.last_modified_time = shard.created_time;
shard.total_capacity_bytes = 1024 * Mi;
Expand Down Expand Up @@ -367,11 +375,14 @@ TEST_F(HomeObjectFixture, SnapshotReceiveHandler) {

// Step 2-2: Test write blob batch data
// Generate ResyncBlobDataBatch message
std::map< blob_id_t, Blob > blob_map;
std::map< blob_id_t, std::tuple< Blob, bool > > blob_map;
for (uint64_t j = 1; j <= num_batches_per_shard; j++) {
LOGINFO("TESTING: applying blobs for shard {} batch {}", shard.id, j);
std::vector< ::flatbuffers::Offset< ResyncBlobData > > blob_entries;
for (uint64_t k = 0; k < num_blobs_per_batch; k++) {
auto blob_state = corrupt_dis(gen) <= corrupted_blob_percentage ? ResyncBlobState::CORRUPTED
: ResyncBlobState::NORMAL;

// Construct raw blob buffer
auto blob = build_blob(cur_blob_id);
const auto aligned_hdr_size =
Expand All @@ -398,24 +409,47 @@ TEST_F(HomeObjectFixture, SnapshotReceiveHandler) {
}
std::memcpy(blob_raw.bytes() + aligned_hdr_size, blob.body.cbytes(), blob.body.size());

// Simulate blob data corruption - tamper with random bytes
if (blob_state == ResyncBlobState::CORRUPTED) {
constexpr int corrupted_bytes = 5;
for (auto i = 0; i < corrupted_bytes; i++) {
auto offset = random_bytes_dis(gen) % blob_raw.size();
auto byte = random_bytes_dis(gen) % 256;
blob_raw.bytes()[offset] = byte;
LOGINFO("Changing byte at offset {} to simulate data corruption", offset, byte);
}
}

std::vector data(blob_raw.bytes(), blob_raw.bytes() + blob_raw.size());
blob_entries.push_back(CreateResyncBlobDataDirect(
builder, cur_blob_id, static_cast< uint8_t >(ResyncBlobState::NORMAL), &data));
blob_map[cur_blob_id++] = std::move(blob);
blob_entries.push_back(
CreateResyncBlobDataDirect(builder, cur_blob_id, static_cast< uint8_t >(blob_state), &data));
blob_map[cur_blob_id++] =
std::make_tuple< Blob, bool >(std::move(blob), blob_state == ResyncBlobState::CORRUPTED);
}
builder.Finish(CreateResyncBlobDataBatchDirect(builder, &blob_entries, true));
auto blob_batch = GetResyncBlobDataBatch(builder.GetBufferPointer());
builder.Reset();

ret = handler->process_blobs_snapshot_data(*blob_batch, j);
ret = handler->process_blobs_snapshot_data(*blob_batch, j, j == num_batches_per_shard);
ASSERT_EQ(ret, 0);
}
for (const auto& [blob_id, blob] : blob_map) {
for (const auto& b : blob_map) {
auto blob_id = b.first;
auto& blob = std::get< 0 >(b.second);
auto is_corrupted = std::get< 1 >(b.second);

auto res = _obj_inst->blob_manager()->get(shard.id, blob_id, 0, blob.body.size()).get();
ASSERT_TRUE(!!res);
auto blob_res = std::move(res.value());
ASSERT_EQ(blob_res.body.size(), blob.body.size());
ASSERT_EQ(std::memcmp(blob_res.body.bytes(), blob.body.cbytes(), blob_res.body.size()), 0);
if (is_corrupted) {
ASSERT_FALSE(!!res);
} else {
ASSERT_TRUE(!!res);
auto blob_res = std::move(res.value());
ASSERT_EQ(blob_res.body.size(), blob.body.size());
ASSERT_EQ(std::memcmp(blob_res.body.bytes(), blob.body.cbytes(), blob_res.body.size()), 0);
}
}
if (shard.state == ShardInfo::State::SEALED) {
// TODO: Verify chunk is released. Currently we don't have chunk_id, so let's do this after rebasing
}
}
}

0 comments on commit 0847b98

Please sign in to comment.