Skip to content

Commit

Permalink
Add test and flip for snapshot resync
Browse files Browse the repository at this point in the history
  • Loading branch information
koujl committed Dec 20, 2024
1 parent 4739ccf commit ba51e6b
Show file tree
Hide file tree
Showing 8 changed files with 112 additions and 10 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomeObjectConan(ConanFile):
name = "homeobject"
version = "2.1.17"
version = "2.1.18"

homepage = "https://github.com/eBay/HomeObject"
description = "Blob Store built on HomeReplication"
Expand Down
9 changes: 8 additions & 1 deletion src/lib/homestore_backend/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -69,5 +69,12 @@ target_link_libraries(homestore_test_dynamic PUBLIC
${COMMON_TEST_DEPS}
)

add_test(NAME HomestoreTestDynamic COMMAND homestore_test_dynamic -csv error --executor immediate --config_path ./ --override_config homestore_config.consensus.snapshot_freq_distance:0)
add_test(NAME HomestoreTestDynamic
COMMAND homestore_test_dynamic -csv error --executor immediate --config_path ./
--override_config homestore_config.consensus.snapshot_freq_distance:0)

# To test both baseline & incremental resync functionality, we use 13 to minimize the likelihood of it being a divisor of the total LSN (currently 30)
add_test(NAME HomestoreTestDynamicWithResync
COMMAND homestore_test_dynamic -csv error --executor immediate --config_path ./
--override_config homestore_config.consensus.snapshot_freq_distance:13
--override_config homestore_config.consensus.num_reserved_log_items=13)
18 changes: 18 additions & 0 deletions src/lib/homestore_backend/pg_blob_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ PG* HSHomeObject::PGBlobIterator::get_pg_metadata() {
}

bool HSHomeObject::PGBlobIterator::create_pg_snapshot_data(sisl::io_blob_safe& meta_blob) {
#ifdef _PRERELEASE
if (iomgr_flip::instance()->test_flip("pg_blob_iterator_create_snapshot_data_error")) {
LOGW("Simulating creating pg snapshot data error");
return false;
}
#endif
auto pg = home_obj_._pg_map[pg_id_].get();
if (pg == nullptr) {
LOGE("PG not found in pg_map, pg_id={}", pg_id_);
Expand Down Expand Up @@ -112,6 +118,12 @@ bool HSHomeObject::PGBlobIterator::create_pg_snapshot_data(sisl::io_blob_safe& m
}

bool HSHomeObject::PGBlobIterator::generate_shard_blob_list() {
#ifdef _PRERELEASE
if (iomgr_flip::instance()->test_flip("pg_blob_iterator_generate_shard_blob_list_error")) {
LOGW("Simulating generating shard blob list error");
return false;
}
#endif
auto r = home_obj_.query_blobs_in_shard(pg_id_, cur_obj_id_.shard_seq_num, 0, UINT64_MAX);
if (!r) { return false; }
cur_blob_list_ = r.value();
Expand Down Expand Up @@ -211,6 +223,12 @@ bool HSHomeObject::PGBlobIterator::create_blobs_snapshot_data(sisl::io_blob_safe
continue;
}

#ifdef _PRERELEASE
if (iomgr_flip::instance()->test_flip("pg_blob_iterator_load_blob_data_error")) {
LOGW("Simulating loading blob data error");
return false;
}
#endif
sisl::io_blob_safe blob;
uint8_t retries = HS_BACKEND_DYNAMIC_CONFIG(snapshot_blob_load_retry);
for (int i = 0; i < retries; i++) {
Expand Down
20 changes: 14 additions & 6 deletions src/lib/homestore_backend/replication_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,6 @@ int ReplicationStateMachine::read_snapshot_obj(std::shared_ptr< homestore::snaps
auto log_str = fmt::format("group={}, term={}, lsn={},",
boost::uuids::to_string(repl_dev()->group_id()), s->get_last_log_term(),
s->get_last_log_idx());
//TODO snp_data->offset is int64, need to change to uint64 in homestore
if (snp_obj->offset == LAST_OBJ_ID) {
// No more shards to read, baseline resync is finished after this.
snp_obj->is_last_obj = true;
Expand Down Expand Up @@ -287,8 +286,9 @@ int ReplicationStateMachine::read_snapshot_obj(std::shared_ptr< homestore::snaps
}
return 0;
}
//shard metadata message
if (obj_id.shard_seq_num != 0 && obj_id.batch_id == 0) {

// shard metadata message
if (obj_id.batch_id == 0) {
if (!pg_iter->generate_shard_blob_list()) {
LOGE("Failed to generate shard blob list for snapshot read, {}", log_str);
return -1;
Expand All @@ -299,7 +299,8 @@ int ReplicationStateMachine::read_snapshot_obj(std::shared_ptr< homestore::snaps
}
return 0;
}
//general blob message

// general blob message
if (!pg_iter->create_blobs_snapshot_data(snp_obj->blob)) {
LOGE("Failed to create blob batch data for snapshot read, {}", log_str);
return -1;
Expand Down Expand Up @@ -328,7 +329,13 @@ void ReplicationStateMachine::write_snapshot_obj(std::shared_ptr< homestore::sna
}

// Check message integrity
// TODO: add a flip here to simulate corrupted message
#ifdef _PRERELEASE
if (iomgr_flip::instance()->test_flip("state_machine_write_corrupted_data")) {
LOGW("Simulating writing corrupted snapshot data, lsn:{}, obj_id {} shard {} batch {}", s->get_last_log_idx(),
obj_id.value, obj_id.shard_seq_num, obj_id.batch_id);
return;
}
#endif
auto header = r_cast< const SyncMessageHeader* >(snp_obj->blob.cbytes());
if (header->corrupted()) {
LOGE("corrupted message in write_snapshot_data, lsn:{}, obj_id {} shard {} batch {}", s->get_last_log_idx(),
Expand Down Expand Up @@ -420,14 +427,15 @@ void ReplicationStateMachine::write_snapshot_obj(std::shared_ptr< homestore::sna
}

void ReplicationStateMachine::free_user_snp_ctx(void*& user_snp_ctx) {
if (user_snp_ctx) {
if (user_snp_ctx == nullptr) {
LOGE("User snapshot context null group={}", boost::uuids::to_string(repl_dev()->group_id()));
return;
}

auto pg_iter = r_cast< HSHomeObject::PGBlobIterator* >(user_snp_ctx);
LOGD("Freeing snapshot iterator pg_id={} group={}", pg_iter->pg_id_, boost::uuids::to_string(pg_iter->group_id_));
delete pg_iter;
user_snp_ctx = nullptr;
}

} // namespace homeobject
29 changes: 29 additions & 0 deletions src/lib/homestore_backend/snapshot_receive_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ int HSHomeObject::SnapshotReceiveHandler::process_pg_snapshot_data(ResyncPGMetaD
pg_member.priority = member->priority();
pg_info.members.insert(pg_member);
}

#ifdef _PRERELEASE
if (iomgr_flip::instance()->test_flip("snapshot_receiver_pg_error")) {
LOGW("Simulating PG snapshot error");
return CREATE_PG_ERR;
}
#endif
auto ret = home_obj_.local_create_pg(repl_dev_, pg_info);
if (ret.hasError()) {
LOGE("Failed to create PG {}, err {}", pg_meta.pg_id(), ret.error());
Expand Down Expand Up @@ -76,6 +83,20 @@ int HSHomeObject::SnapshotReceiveHandler::process_shard_snapshot_data(ResyncShar
}
shard_sb->p_chunk_id = blk_id.to_single_blkid().chunk_num();

auto free_allocated_blks = [blk_id]() {
homestore::data_service().async_free_blk(blk_id).thenValue([blk_id](auto&& err) {
LOGD("Freed blk_id:{} due to failure in persisting shard info, err {}", blk_id.to_string(),
err ? err.message() : "nil");
});
};

#ifdef _PRERELEASE
if (iomgr_flip::instance()->test_flip("snapshot_receiver_shard_write_data_error")) {
LOGW("Simulating shard snapshot write data error");
free_allocated_blks();
return WRITE_DATA_ERR;
}
#endif
const auto ret = homestore::data_service()
.async_write(r_cast< char const* >(aligned_buf.cbytes()), aligned_buf.size(), blk_id)
.thenValue([&blk_id](auto&& err) -> BlobManager::AsyncResult< blob_id_t > {
Expand All @@ -90,6 +111,7 @@ int HSHomeObject::SnapshotReceiveHandler::process_shard_snapshot_data(ResyncShar
.get();
if (ret.hasError()) {
LOGE("Failed to write shard info of shard_id {} to blk_id:{}", shard_meta.shard_id(), blk_id.to_string());
free_allocated_blks();
return WRITE_DATA_ERR;
}

Expand Down Expand Up @@ -178,6 +200,13 @@ int HSHomeObject::SnapshotReceiveHandler::process_blobs_snapshot_data(ResyncBlob
});
};

#ifdef _PRERELEASE
if (iomgr_flip::instance()->test_flip("snapshot_receiver_blob_write_data_error")) {
LOGW("Simulating blob snapshot write data error");
free_allocated_blks();
return WRITE_DATA_ERR;
}
#endif
const auto ret = homestore::data_service()
.async_write(r_cast< char const* >(aligned_buf.cbytes()), aligned_buf.size(), blk_id)
.thenValue([&blk_id](auto&& err) -> BlobManager::AsyncResult< blob_id_t > {
Expand Down
28 changes: 28 additions & 0 deletions src/lib/homestore_backend/tests/homeobj_fixture.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -458,9 +458,37 @@ class HomeObjectFixture : public ::testing::Test {
return blob;
}

#ifdef _PRERELEASE
void set_basic_flip(const std::string flip_name, uint32_t count = 1, uint32_t percent = 100) {
flip::FlipCondition null_cond;
flip::FlipFrequency freq;
freq.set_count(count);
freq.set_percent(percent);
m_fc.inject_noreturn_flip(flip_name, {null_cond}, freq);
LOGINFO("Flip {} set", flip_name);
}

void set_delay_flip(const std::string flip_name, uint64_t delay_usec, uint32_t count = 1, uint32_t percent = 100) {
flip::FlipCondition null_cond;
flip::FlipFrequency freq;
freq.set_count(count);
freq.set_percent(percent);
m_fc.inject_delay_flip(flip_name, {null_cond}, freq, delay_usec);
LOGINFO("Flip {} set", flip_name);
}

void remove_flip(const std::string flip_name) {
m_fc.remove_flip(flip_name);
LOGINFO("Flip {} removed", flip_name);
}
#endif

private:
std::random_device rnd{};
std::default_random_engine rnd_engine{rnd()};
std::uniform_int_distribution< uint32_t > rand_blob_size;
std::uniform_int_distribution< uint32_t > rand_user_key_size;
#ifdef _PRERELEASE
flip::FlipClient m_fc{iomgr_flip::instance()};
#endif
};
1 change: 0 additions & 1 deletion src/lib/homestore_backend/tests/hs_blob_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,6 @@ TEST_F(HomeObjectFixture, PGBlobIterator) {
}

TEST_F(HomeObjectFixture, SnapshotReceiveHandler) {
// MUST TEST WITH replica=1
constexpr uint64_t snp_lsn = 1;
constexpr uint64_t num_shards_per_pg = 3;
constexpr uint64_t num_open_shards_per_pg = 2; // Should be less than num_shards_per_pg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ TEST_F(HomeObjectFixture, ReplaceMember) {
put_blobs(pg_shard_id_vec, num_blobs_per_shard, pg_blob_id);

verify_get_blob(pg_shard_id_vec, num_blobs_per_shard);
verify_obj_count(1, num_blobs_per_shard, num_shards_per_pg, false);
verify_obj_count(1, num_shards_per_pg, num_blobs_per_shard, false);

// all the replicas , including the spare ones, sync at this point
g_helper->sync();
Expand All @@ -66,6 +66,19 @@ TEST_F(HomeObjectFixture, ReplaceMember) {
auto out_member_id = g_helper->replica_id(num_replicas - 1);
auto in_member_id = g_helper->replica_id(num_replicas); /*spare replica*/

#ifdef _PRERELEASE
// Set flips (name, count, percentage) to simulate different error scenarios
set_basic_flip("pg_blob_iterator_create_snapshot_data_error", 1); // simulate read pg snapshot data error
set_basic_flip("pg_blob_iterator_generate_shard_blob_list_error", 1); // simulate generate shard blob list error
// simulate load blob data error - do not enable until completing the resume logic
// set_basic_flip("pg_blob_iterator_load_blob_data_error", 1, 10);

set_basic_flip("state_machine_write_corrupted_data", 3, 25); // simulate random data corruption
set_basic_flip("snapshot_receiver_pg_error", 1); // simulate pg creation error
set_basic_flip("snapshot_receiver_shard_write_data_error", 2, 33); // simulate shard write data error
set_basic_flip("snapshot_receiver_blob_write_data_error", 4, 15); // simulate blob write data error
#endif

run_on_pg_leader(pg_id, [&]() {
auto r = _obj_inst->pg_manager()
->replace_member(pg_id, out_member_id, PGMember{in_member_id, "new_member", 0})
Expand Down

0 comments on commit ba51e6b

Please sign in to comment.