Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add batching in UT to support million ios for baseline test. #510

Merged
merged 1 commit into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.4.51"
version = "6.4.52"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
3 changes: 2 additions & 1 deletion src/lib/replication/log_store/home_raft_log_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,8 @@ bool HomeRaftLogStore::compact(ulong compact_lsn) {
// release this assert if for some use case, we should tolorant this case;
// for now, don't expect this case to happen.
// RELEASE_ASSERT(false, "compact_lsn={} is beyond the current max_lsn={}", compact_lsn, cur_max_lsn);

REPL_STORE_LOG(DEBUG, "Adding dummy entries during compact from={} upto={}", cur_max_lsn + 1,
to_store_lsn(compact_lsn));
// We need to fill the remaining entries with dummy data.
for (auto lsn{cur_max_lsn + 1}; lsn <= to_store_lsn(compact_lsn); ++lsn) {
append(m_dummy_log_entry);
Expand Down
9 changes: 9 additions & 0 deletions src/tests/test_common/homestore_test_common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,16 @@ class HSTestHelper {
do_start_homestore(true /* fake_restart*/, false /* init_device */, shutdown_delay_sec);
}

virtual void start_homestore() {
do_start_homestore(true /* fake_restart*/, false /* init_device */, 1 /* shutdown_delay_sec */);
}

virtual void shutdown_homestore(bool cleanup = true) {
if (homestore::HomeStore::safe_instance() == nullptr) {
/* Already shutdown */
return;
}

homestore::HomeStore::instance()->shutdown();
homestore::HomeStore::reset_instance();
iomanager.stop();
Expand Down
7 changes: 7 additions & 0 deletions src/tests/test_common/hs_repl_test_common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,13 @@ class HSReplTestHelper : public HSTestHelper {
});
}

void shutdown() { shutdown_homestore(false /* cleanup */); }

void start() {
m_token.params(HS_SERVICE::REPLICATION).repl_app = std::make_unique< TestReplApplication >(*this);
start_homestore();
}

uint16_t replica_num() const { return replica_num_; }
homestore::replica_id_t my_replica_id() const { return my_replica_id_; }
homestore::replica_id_t replica_id(uint16_t member_id) const {
Expand Down
147 changes: 97 additions & 50 deletions src/tests/test_raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class TestReplicatedDB : public homestore::ReplDevListener {
uint64_t data_size_;
uint64_t data_pattern_;
MultiBlkId blkid_;
uint64_t id_;
};

struct KeyValuePair {
Expand Down Expand Up @@ -127,15 +128,19 @@ class TestReplicatedDB : public homestore::ReplDevListener {

auto jheader = r_cast< test_req::journal_header const* >(header.cbytes());
Key k{.id_ = *(r_cast< uint64_t const* >(key.cbytes()))};
Value v{
.lsn_ = lsn, .data_size_ = jheader->data_size, .data_pattern_ = jheader->data_pattern, .blkid_ = blkids};
Value v{.lsn_ = lsn,
.data_size_ = jheader->data_size,
.data_pattern_ = jheader->data_pattern,
.blkid_ = blkids,
.id_ = k.id_};

LOGINFOMOD(replication, "[Replica={}] Received commit on lsn={} dsn={} key={} value[blkid={} pattern={}]",
g_helper->replica_num(), lsn, ctx->dsn(), k.id_, v.blkid_.to_string(), v.data_pattern_);

{
std::unique_lock lk(db_mtx_);
inmem_db_.insert_or_assign(k, v);
lsn_index_.emplace(lsn, v);
last_committed_lsn = lsn;
++commit_count_;
}
Expand Down Expand Up @@ -167,6 +172,7 @@ class TestReplicatedDB : public homestore::ReplDevListener {
}

AsyncReplResult<> create_snapshot(shared< snapshot_context > context) override {
std::lock_guard< std::mutex > lock(m_snapshot_lock);
auto s = std::dynamic_pointer_cast< nuraft_snapshot_context >(context)->nuraft_snapshot();
LOGINFOMOD(replication, "[Replica={}] Got snapshot callback term={} idx={}", g_helper->replica_num(),
s->get_last_log_term(), s->get_last_log_idx());
Expand All @@ -176,33 +182,40 @@ class TestReplicatedDB : public homestore::ReplDevListener {

int read_snapshot_data(shared< snapshot_context > context, shared< snapshot_data > snp_data) override {
auto s = std::dynamic_pointer_cast< nuraft_snapshot_context >(context)->nuraft_snapshot();
LOGINFOMOD(replication, "[Replica={}] Read logical snapshot callback obj_id={} term={} idx={}",
g_helper->replica_num(), snp_data->offset, s->get_last_log_term(), s->get_last_log_idx());

if (snp_data->offset == 0) {
snp_data->is_last_obj = false;
snp_data->blob = sisl::io_blob_safe(sizeof(ulong));
LOGINFOMOD(replication, "[Replica={}] Read logical snapshot callback obj_id={} term={} idx={}",
g_helper->replica_num(), snp_data->offset, s->get_last_log_term(), s->get_last_log_idx());
return 0;
}
int64_t follower_last_lsn = snp_data->offset;

int64_t next_lsn = snp_data->offset;
std::vector< KeyValuePair > kv_snapshot_data;
LOGINFOMOD(replication, "[Replica={}] Read logical snapshot callback follower lsn={}", g_helper->replica_num(),
follower_last_lsn);
for (auto& [k, v] : inmem_db_) {
if (v.lsn_ > follower_last_lsn) {
kv_snapshot_data.emplace_back(k, v);
LOGINFOMOD(replication, "[Replica={}] Read logical snapshot callback fetching lsn={} {} {}",
g_helper->replica_num(), v.lsn_, v.data_size_, v.data_pattern_);
}
for (auto iter = lsn_index_.find(next_lsn); iter != lsn_index_.end(); iter++) {
auto& v = iter->second;
kv_snapshot_data.emplace_back(Key{v.id_}, v);
LOGTRACEMOD(replication, "[Replica={}] Read logical snapshot callback fetching lsn={} size={} pattern={}",
g_helper->replica_num(), v.lsn_, v.data_size_, v.data_pattern_);
if (kv_snapshot_data.size() >= 1000) { break; }
}

int64_t kv_snapshot_data_size = sizeof(KeyValuePair) * kv_snapshot_data.size();
LOGINFOMOD(replication, "Snapshot size {}", kv_snapshot_data_size);
if (kv_snapshot_data.size() == 0) {
snp_data->is_last_obj = true;
LOGINFOMOD(replication, "Snapshot is_last_obj is true");
return 0;
}

int64_t kv_snapshot_data_size = sizeof(KeyValuePair) * kv_snapshot_data.size();
sisl::io_blob_safe blob{static_cast< uint32_t >(kv_snapshot_data_size)};
std::memcpy(blob.bytes(), kv_snapshot_data.data(), kv_snapshot_data_size);
snp_data->blob = std::move(blob);
snp_data->is_last_obj = true;
snp_data->is_last_obj = false;
LOGINFOMOD(replication, "[Replica={}] Read logical snapshot callback obj_id={} term={} idx={} num_items={}",
g_helper->replica_num(), snp_data->offset, s->get_last_log_term(), s->get_last_log_idx(),
kv_snapshot_data.size());

return 0;
}

Expand All @@ -218,27 +231,27 @@ class TestReplicatedDB : public homestore::ReplDevListener {

void write_snapshot_data(shared< snapshot_context > context, shared< snapshot_data > snp_data) override {
auto s = std::dynamic_pointer_cast< nuraft_snapshot_context >(context)->nuraft_snapshot();
LOGINFOMOD(replication, "[Replica={}] Save logical snapshot callback obj_id={} term={} idx={} is_last={}",
g_helper->replica_num(), snp_data->offset, s->get_last_log_term(), s->get_last_log_idx(),
snp_data->is_last_obj);

if (snp_data->offset == 0) {
// For obj_id 0 we sent back the last committed lsn.
snp_data->offset = last_committed_lsn;
LOGINFOMOD(replication, "[Replica={}] Save logical snapshot callback return obj_id={}",
g_helper->replica_num(), snp_data->offset);
LOGINFOMOD(replication, "[Replica={}] Save logical snapshot callback obj_id={} term={} idx={} is_last={}",
g_helper->replica_num(), snp_data->offset, s->get_last_log_term(), s->get_last_log_idx(),
snp_data->is_last_obj);
return;
}

std::unique_lock lk(db_mtx_);
size_t kv_snapshot_data_size = snp_data->blob.size();
LOGINFOMOD(replication, "Snapshot size {}", kv_snapshot_data_size);
if (kv_snapshot_data_size == 0) { return; }

size_t num_items = kv_snapshot_data_size / sizeof(KeyValuePair);
std::unique_lock lk(db_mtx_);
auto ptr = r_cast< const KeyValuePair* >(snp_data->blob.bytes());
for (size_t i = 0; i < kv_snapshot_data_size / sizeof(KeyValuePair); i++) {
for (size_t i = 0; i < num_items; i++) {
auto key = ptr->key;
auto value = ptr->value;
LOGINFOMOD(replication, "[Replica={}] Save logical snapshot got lsn={} data_size={} data_pattern={}",
g_helper->replica_num(), value.lsn_, value.data_size_, value.data_pattern_);
LOGTRACEMOD(replication, "[Replica={}] Save logical snapshot got lsn={} data_size={} data_pattern={}",
g_helper->replica_num(), value.lsn_, value.data_size_, value.data_pattern_);

// Write to data service and inmem map.
MultiBlkId out_blkids;
Expand All @@ -251,16 +264,25 @@ class TestReplicatedDB : public homestore::ReplDevListener {
++commit_count_;
ptr++;
}

LOGINFOMOD(replication,
"[Replica={}] Save logical snapshot callback obj_id={} term={} idx={} is_last={} num_items={}",
g_helper->replica_num(), snp_data->offset, s->get_last_log_term(), s->get_last_log_idx(),
snp_data->is_last_obj, num_items);
snp_data->offset = last_committed_lsn + 1;
}

bool apply_snapshot(shared< snapshot_context > context) override {
std::lock_guard< std::mutex > lock(m_snapshot_lock);
auto s = std::dynamic_pointer_cast< nuraft_snapshot_context >(context)->nuraft_snapshot();
LOGINFOMOD(replication, "[Replica={}] Apply snapshot term={} idx={}", g_helper->replica_num(),
s->get_last_log_term(), s->get_last_log_idx());
m_last_snapshot = context;
return true;
}

shared< snapshot_context > last_snapshot() override {
std::lock_guard< std::mutex > lock(m_snapshot_lock);
if (!m_last_snapshot) return nullptr;

auto s = std::dynamic_pointer_cast< nuraft_snapshot_context >(m_last_snapshot)->nuraft_snapshot();
Expand Down Expand Up @@ -367,10 +389,12 @@ class TestReplicatedDB : public homestore::ReplDevListener {

private:
std::map< Key, Value > inmem_db_;
std::map< int64_t, Value > lsn_index_;
uint64_t commit_count_{0};
std::shared_mutex db_mtx_;
uint64_t last_committed_lsn{0};
std::shared_ptr< snapshot_context > m_last_snapshot{nullptr};
std::mutex m_snapshot_lock;
bool zombie_{false};
};

Expand Down Expand Up @@ -546,6 +570,23 @@ class RaftReplDevTest : public testing::Test {
}
}

void shutdown_replica(uint16_t replica) {
if (g_helper->replica_num() == replica) {
LOGINFO("Shutdown homestore: replica_num = {}", replica);
g_helper->shutdown();
} else {
LOGINFO("Wait for replica={} to completely go down and removed from alive raft-groups", replica);
std::this_thread::sleep_for(std::chrono::seconds{5});
}
}

void start_replica(uint16_t replica) {
if (g_helper->replica_num() == replica) {
LOGINFO("Start homestore: replica_num = {}", replica);
g_helper->start();
}
}

void create_snapshot() { dbs_[0]->create_snapshot(); }
void truncate(int num_reserved_entries) { dbs_[0]->truncate(num_reserved_entries); }

Expand Down Expand Up @@ -869,12 +910,12 @@ TEST_F(RaftReplDevTest, GCReplReqs) {

TEST_F(RaftReplDevTest, BaselineTest) {
// Testing the baseline resync where leader creates snapshot and truncate entries.
// To simulate that write 10 entries to leader. Restart follower 1 with sleep 20s.
// Write to leader again to create 10 additional entries which follower 1 doesnt have.
// To simulate that write 50 entries to leader. Shutdown follower 1.
// Write to leader again to create num_io entries which follower 1 doesnt have.
// This is the baseline data. Truncate and snapshot on leader. Wait for commit for leader
// and follower 2. Write to leader again 10 entries after snapshot to create entries
// and follower 2. Write to leader again 50 entries after snapshot to create entries
// for incremental resync. We can create snapshot manually or triggered by raft.
// Verify all nodes got 20 entries.
// Verify all nodes got entries.
LOGINFO("Homestore replica={} setup completed", g_helper->replica_num());
g_helper->sync_for_test_start();

Expand All @@ -886,41 +927,47 @@ TEST_F(RaftReplDevTest, BaselineTest) {
}
#endif

// Write on leader.
uint64_t entries_per_attempt = 10;
// Write some entries on leader.
uint64_t entries_per_attempt = 50;
LOGINFO("Write on leader num_entries={}", entries_per_attempt);
this->write_on_leader(entries_per_attempt, true /* wait_for_commit */);

// Restart follower-1 with delay.
this->restart_replica(1, 20 /* shutdown_delay_sec */);
// Shutdown replica 1.
LOGINFO("Shutdown replica 1");
this->shutdown_replica(1);

// Write lot of entries on leader.
entries_per_attempt = SISL_OPTIONS["num_io"].as< uint64_t >();
LOGINFO("Write on leader num_entries={}", entries_per_attempt);
this->write_on_leader(entries_per_attempt, true /* wait_for_commit */);

if (g_helper->replica_num() == 0 || g_helper->replica_num() == 2) {
// Wait for commmit on leader and follower-2
this->write_on_leader(entries_per_attempt, true /* wait_for_commit */);

// Wait for commmit on leader and follower 2
this->wait_for_all_commits();
LOGINFO("Got all commits for replica 0 and 2");
}

if (g_helper->replica_num() == 0) {
// Leader does manual snapshot and truncate
LOGINFO("Leader create snapshot and truncate");
this->create_snapshot();
this->truncate(0);
if (g_helper->replica_num() == 0) {
// Leader does manual snapshot and truncate
LOGINFO("Leader create snapshot and truncate");
this->create_snapshot();
this->truncate(0);
}
}

// Wait till all writes are down and snapshot is created.
g_helper->sync_for_verify_start();

// Start replica 1 after this.
LOGINFO("Start replica 1");
this->start_replica(1);
g_helper->sync_for_test_start();

// Write on leader to have some entries for increment resync.
entries_per_attempt = 50;
LOGINFO("Write on leader num_entries={}", entries_per_attempt);
this->write_on_leader(entries_per_attempt, true /* wait_for_commit */);
if (g_helper->replica_num() == 0 || g_helper->replica_num() == 2) {
// Wait for commmit on leader and follower-2
this->wait_for_all_commits();
LOGINFO("Got all commits for replica 0 and 2 second time");
}

// Validate all have 30 log entries and corresponding entries.
g_helper->sync_for_verify_start();

LOGINFO("Validate all data written so far by reading them");
this->validate_data();
g_helper->sync_for_cleanup_start();
Expand Down
Loading