Skip to content

Commit

Permalink
Add batching in UT to support million ios for baseline test.
Browse files Browse the repository at this point in the history
  • Loading branch information
sanebay committed Aug 22, 2024
1 parent 3030851 commit 272b1ca
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 53 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.4.50"
version = "6.4.51"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
3 changes: 2 additions & 1 deletion src/lib/replication/log_store/home_raft_log_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,8 @@ bool HomeRaftLogStore::compact(ulong compact_lsn) {
// release this assert if for some use case, we should tolorant this case;
// for now, don't expect this case to happen.
// RELEASE_ASSERT(false, "compact_lsn={} is beyond the current max_lsn={}", compact_lsn, cur_max_lsn);

REPL_STORE_LOG(DEBUG, "Adding dummy entries during compact from={} upto={}", cur_max_lsn + 1,
to_store_lsn(compact_lsn));
// We need to fill the remaining entries with dummy data.
for (auto lsn{cur_max_lsn + 1}; lsn <= to_store_lsn(compact_lsn); ++lsn) {
append(m_dummy_log_entry);
Expand Down
12 changes: 11 additions & 1 deletion src/tests/test_common/homestore_test_common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,16 @@ class HSTestHelper {
do_start_homestore(true /* fake_restart*/, false /* init_device */, shutdown_delay_sec);
}

virtual void start_homestore() {
do_start_homestore(true /* fake_restart*/, false /* init_device */, 1 /* shutdown_delay_sec */);
}

virtual void shutdown_homestore(bool cleanup = true) {
if (homestore::HomeStore::safe_instance() == nullptr) {
/* Already shutdown */
return;
}

homestore::HomeStore::instance()->shutdown();
homestore::HomeStore::reset_instance();
iomanager.stop();
Expand Down Expand Up @@ -370,7 +379,8 @@ class HSTestHelper {
if (init_device) { init_raw_devices(m_token.devs_); }
} else {
for (uint32_t i{0}; i < ndevices; ++i) {
m_generated_devs.emplace_back(std::string{"/tmp/" + m_token.name_ + "_" + std::to_string(i + 1)});
m_generated_devs.emplace_back(
std::string{"/tmp/source/tests/" + m_token.name_ + "_" + std::to_string(i + 1)});
}
if (init_device) {
LOGINFO("creating {} device files with each of size {} ", ndevices, homestore::in_bytes(dev_size));
Expand Down
7 changes: 7 additions & 0 deletions src/tests/test_common/hs_repl_test_common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,13 @@ class HSReplTestHelper : public HSTestHelper {
});
}

void shutdown() { shutdown_homestore(false /* cleanup */); }

void start() {
m_token.params(HS_SERVICE::REPLICATION).repl_app = std::make_unique< TestReplApplication >(*this);
start_homestore();
}

uint16_t replica_num() const { return replica_num_; }
homestore::replica_id_t my_replica_id() const { return my_replica_id_; }
homestore::replica_id_t replica_id(uint16_t member_id) const {
Expand Down
147 changes: 97 additions & 50 deletions src/tests/test_raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class TestReplicatedDB : public homestore::ReplDevListener {
uint64_t data_size_;
uint64_t data_pattern_;
MultiBlkId blkid_;
uint64_t id_;
};

struct KeyValuePair {
Expand Down Expand Up @@ -127,15 +128,19 @@ class TestReplicatedDB : public homestore::ReplDevListener {

auto jheader = r_cast< test_req::journal_header const* >(header.cbytes());
Key k{.id_ = *(r_cast< uint64_t const* >(key.cbytes()))};
Value v{
.lsn_ = lsn, .data_size_ = jheader->data_size, .data_pattern_ = jheader->data_pattern, .blkid_ = blkids};
Value v{.lsn_ = lsn,
.data_size_ = jheader->data_size,
.data_pattern_ = jheader->data_pattern,
.blkid_ = blkids,
.id_ = k.id_};

LOGINFOMOD(replication, "[Replica={}] Received commit on lsn={} dsn={} key={} value[blkid={} pattern={}]",
g_helper->replica_num(), lsn, ctx->dsn(), k.id_, v.blkid_.to_string(), v.data_pattern_);

{
std::unique_lock lk(db_mtx_);
inmem_db_.insert_or_assign(k, v);
lsn_index_.emplace(lsn, v);
last_committed_lsn = lsn;
++commit_count_;
}
Expand Down Expand Up @@ -167,6 +172,7 @@ class TestReplicatedDB : public homestore::ReplDevListener {
}

AsyncReplResult<> create_snapshot(shared< snapshot_context > context) override {
std::lock_guard< std::mutex > lock(m_snapshot_lock);
auto s = std::dynamic_pointer_cast< nuraft_snapshot_context >(context)->nuraft_snapshot();
LOGINFOMOD(replication, "[Replica={}] Got snapshot callback term={} idx={}", g_helper->replica_num(),
s->get_last_log_term(), s->get_last_log_idx());
Expand All @@ -176,33 +182,40 @@ class TestReplicatedDB : public homestore::ReplDevListener {

int read_snapshot_data(shared< snapshot_context > context, shared< snapshot_data > snp_data) override {
auto s = std::dynamic_pointer_cast< nuraft_snapshot_context >(context)->nuraft_snapshot();
LOGINFOMOD(replication, "[Replica={}] Read logical snapshot callback obj_id={} term={} idx={}",
g_helper->replica_num(), snp_data->offset, s->get_last_log_term(), s->get_last_log_idx());

if (snp_data->offset == 0) {
snp_data->is_last_obj = false;
snp_data->blob = sisl::io_blob_safe(sizeof(ulong));
LOGINFOMOD(replication, "[Replica={}] Read logical snapshot callback obj_id={} term={} idx={}",
g_helper->replica_num(), snp_data->offset, s->get_last_log_term(), s->get_last_log_idx());
return 0;
}
int64_t follower_last_lsn = snp_data->offset;

int64_t next_lsn = snp_data->offset;
std::vector< KeyValuePair > kv_snapshot_data;
LOGINFOMOD(replication, "[Replica={}] Read logical snapshot callback follower lsn={}", g_helper->replica_num(),
follower_last_lsn);
for (auto& [k, v] : inmem_db_) {
if (v.lsn_ > follower_last_lsn) {
kv_snapshot_data.emplace_back(k, v);
LOGINFOMOD(replication, "[Replica={}] Read logical snapshot callback fetching lsn={} {} {}",
g_helper->replica_num(), v.lsn_, v.data_size_, v.data_pattern_);
}
for (auto iter = lsn_index_.find(next_lsn); iter != lsn_index_.end(); iter++) {
auto& v = iter->second;
kv_snapshot_data.emplace_back(Key{v.id_}, v);
LOGTRACEMOD(replication, "[Replica={}] Read logical snapshot callback fetching lsn={} size={} pattern={}",
g_helper->replica_num(), v.lsn_, v.data_size_, v.data_pattern_);
if (kv_snapshot_data.size() >= 1000) { break; }
}

int64_t kv_snapshot_data_size = sizeof(KeyValuePair) * kv_snapshot_data.size();
LOGINFOMOD(replication, "Snapshot size {}", kv_snapshot_data_size);
if (kv_snapshot_data.size() == 0) {
snp_data->is_last_obj = true;
LOGINFOMOD(replication, "Snapshot is_last_obj is true");
return 0;
}

int64_t kv_snapshot_data_size = sizeof(KeyValuePair) * kv_snapshot_data.size();
sisl::io_blob_safe blob{static_cast< uint32_t >(kv_snapshot_data_size)};
std::memcpy(blob.bytes(), kv_snapshot_data.data(), kv_snapshot_data_size);
snp_data->blob = std::move(blob);
snp_data->is_last_obj = true;
snp_data->is_last_obj = false;
LOGINFOMOD(replication, "[Replica={}] Read logical snapshot callback obj_id={} term={} idx={} num_items={}",
g_helper->replica_num(), snp_data->offset, s->get_last_log_term(), s->get_last_log_idx(),
kv_snapshot_data.size());

return 0;
}

Expand All @@ -218,27 +231,27 @@ class TestReplicatedDB : public homestore::ReplDevListener {

void write_snapshot_data(shared< snapshot_context > context, shared< snapshot_data > snp_data) override {
auto s = std::dynamic_pointer_cast< nuraft_snapshot_context >(context)->nuraft_snapshot();
LOGINFOMOD(replication, "[Replica={}] Save logical snapshot callback obj_id={} term={} idx={} is_last={}",
g_helper->replica_num(), snp_data->offset, s->get_last_log_term(), s->get_last_log_idx(),
snp_data->is_last_obj);

if (snp_data->offset == 0) {
// For obj_id 0 we sent back the last committed lsn.
snp_data->offset = last_committed_lsn;
LOGINFOMOD(replication, "[Replica={}] Save logical snapshot callback return obj_id={}",
g_helper->replica_num(), snp_data->offset);
LOGINFOMOD(replication, "[Replica={}] Save logical snapshot callback obj_id={} term={} idx={} is_last={}",
g_helper->replica_num(), snp_data->offset, s->get_last_log_term(), s->get_last_log_idx(),
snp_data->is_last_obj);
return;
}

std::unique_lock lk(db_mtx_);
size_t kv_snapshot_data_size = snp_data->blob.size();
LOGINFOMOD(replication, "Snapshot size {}", kv_snapshot_data_size);
if (kv_snapshot_data_size == 0) { return; }

size_t num_items = kv_snapshot_data_size / sizeof(KeyValuePair);
std::unique_lock lk(db_mtx_);
auto ptr = r_cast< const KeyValuePair* >(snp_data->blob.bytes());
for (size_t i = 0; i < kv_snapshot_data_size / sizeof(KeyValuePair); i++) {
for (size_t i = 0; i < num_items; i++) {
auto key = ptr->key;
auto value = ptr->value;
LOGINFOMOD(replication, "[Replica={}] Save logical snapshot got lsn={} data_size={} data_pattern={}",
g_helper->replica_num(), value.lsn_, value.data_size_, value.data_pattern_);
LOGTRACEMOD(replication, "[Replica={}] Save logical snapshot got lsn={} data_size={} data_pattern={}",
g_helper->replica_num(), value.lsn_, value.data_size_, value.data_pattern_);

// Write to data service and inmem map.
MultiBlkId out_blkids;
Expand All @@ -251,16 +264,25 @@ class TestReplicatedDB : public homestore::ReplDevListener {
++commit_count_;
ptr++;
}

LOGINFOMOD(replication,
"[Replica={}] Save logical snapshot callback obj_id={} term={} idx={} is_last={} num_items={}",
g_helper->replica_num(), snp_data->offset, s->get_last_log_term(), s->get_last_log_idx(),
snp_data->is_last_obj, num_items);
snp_data->offset = last_committed_lsn + 1;
}

bool apply_snapshot(shared< snapshot_context > context) override {
std::lock_guard< std::mutex > lock(m_snapshot_lock);
auto s = std::dynamic_pointer_cast< nuraft_snapshot_context >(context)->nuraft_snapshot();
LOGINFOMOD(replication, "[Replica={}] Apply snapshot term={} idx={}", g_helper->replica_num(),
s->get_last_log_term(), s->get_last_log_idx());
m_last_snapshot = context;
return true;
}

shared< snapshot_context > last_snapshot() override {
std::lock_guard< std::mutex > lock(m_snapshot_lock);
if (!m_last_snapshot) return nullptr;

auto s = std::dynamic_pointer_cast< nuraft_snapshot_context >(m_last_snapshot)->nuraft_snapshot();
Expand Down Expand Up @@ -367,10 +389,12 @@ class TestReplicatedDB : public homestore::ReplDevListener {

private:
std::map< Key, Value > inmem_db_;
std::map< int64_t, Value > lsn_index_;
uint64_t commit_count_{0};
std::shared_mutex db_mtx_;
uint64_t last_committed_lsn{0};
std::shared_ptr< snapshot_context > m_last_snapshot{nullptr};
std::mutex m_snapshot_lock;
bool zombie_{false};
};

Expand Down Expand Up @@ -546,6 +570,23 @@ class RaftReplDevTest : public testing::Test {
}
}

void shutdown_replica(uint16_t replica) {
if (g_helper->replica_num() == replica) {
LOGINFO("Shutdown homestore: replica_num = {}", replica);
g_helper->shutdown();
} else {
LOGINFO("Wait for replica={} to completely go down and removed from alive raft-groups", replica);
std::this_thread::sleep_for(std::chrono::seconds{5});
}
}

void start_replica(uint16_t replica) {
if (g_helper->replica_num() == replica) {
LOGINFO("Start homestore: replica_num = {}", replica);
g_helper->start();
}
}

void create_snapshot() { dbs_[0]->create_snapshot(); }
void truncate(int num_reserved_entries) { dbs_[0]->truncate(num_reserved_entries); }

Expand Down Expand Up @@ -869,12 +910,12 @@ TEST_F(RaftReplDevTest, GCReplReqs) {

TEST_F(RaftReplDevTest, BaselineTest) {
// Testing the baseline resync where leader creates snapshot and truncate entries.
// To simulate that write 10 entries to leader. Restart follower 1 with sleep 20s.
// Write to leader again to create 10 additional entries which follower 1 doesnt have.
// To simulate that write 50 entries to leader. Shutdown follower 1.
// Write to leader again to create num_io entries which follower 1 doesnt have.
// This is the baseline data. Truncate and snapshot on leader. Wait for commit for leader
// and follower 2. Write to leader again 10 entries after snapshot to create entries
// and follower 2. Write to leader again 50 entries after snapshot to create entries
// for incremental resync. We can create snapshot manually or triggered by raft.
// Verify all nodes got 20 entries.
// Verify all nodes got entries.
LOGINFO("Homestore replica={} setup completed", g_helper->replica_num());
g_helper->sync_for_test_start();

Expand All @@ -886,41 +927,47 @@ TEST_F(RaftReplDevTest, BaselineTest) {
}
#endif

// Write on leader.
uint64_t entries_per_attempt = 10;
// Write some entries on leader.
uint64_t entries_per_attempt = 50;
LOGINFO("Write on leader num_entries={}", entries_per_attempt);
this->write_on_leader(entries_per_attempt, true /* wait_for_commit */);

// Restart follower-1 with delay.
this->restart_replica(1, 20 /* shutdown_delay_sec */);
// Shutdown replica 1.
LOGINFO("Shutdown replica 1");
this->shutdown_replica(1);

// Write lot of entries on leader.
entries_per_attempt = SISL_OPTIONS["num_io"].as< uint64_t >();
LOGINFO("Write on leader num_entries={}", entries_per_attempt);
this->write_on_leader(entries_per_attempt, true /* wait_for_commit */);

if (g_helper->replica_num() == 0 || g_helper->replica_num() == 2) {
// Wait for commmit on leader and follower-2
this->write_on_leader(entries_per_attempt, true /* wait_for_commit */);

// Wait for commmit on leader and follower 2
this->wait_for_all_commits();
LOGINFO("Got all commits for replica 0 and 2");
}

if (g_helper->replica_num() == 0) {
// Leader does manual snapshot and truncate
LOGINFO("Leader create snapshot and truncate");
this->create_snapshot();
this->truncate(0);
if (g_helper->replica_num() == 0) {
// Leader does manual snapshot and truncate
LOGINFO("Leader create snapshot and truncate");
this->create_snapshot();
this->truncate(0);
}
}

// Wait till all writes are down and snapshot is created.
g_helper->sync_for_verify_start();

// Start replica 1 after this.
LOGINFO("Start replica 1");
this->start_replica(1);
g_helper->sync_for_test_start();

// Write on leader to have some entries for increment resync.
entries_per_attempt = 50;
LOGINFO("Write on leader num_entries={}", entries_per_attempt);
this->write_on_leader(entries_per_attempt, true /* wait_for_commit */);
if (g_helper->replica_num() == 0 || g_helper->replica_num() == 2) {
// Wait for commmit on leader and follower-2
this->wait_for_all_commits();
LOGINFO("Got all commits for replica 0 and 2 second time");
}

// Validate all have 30 log entries and corresponding entries.
g_helper->sync_for_verify_start();

LOGINFO("Validate all data written so far by reading them");
this->validate_data();
g_helper->sync_for_cleanup_start();
Expand Down

0 comments on commit 272b1ca

Please sign in to comment.