From 272b1caafe23ddbc4c6872a9c3dd28a8c604e039 Mon Sep 17 00:00:00 2001 From: Sanal P Date: Fri, 16 Aug 2024 13:44:17 -0700 Subject: [PATCH] Add batching in UT to support million ios for baseline test. --- conanfile.py | 2 +- .../log_store/home_raft_log_store.cpp | 3 +- .../test_common/homestore_test_common.hpp | 12 +- src/tests/test_common/hs_repl_test_common.hpp | 7 + src/tests/test_raft_repl_dev.cpp | 147 ++++++++++++------ 5 files changed, 118 insertions(+), 53 deletions(-) diff --git a/conanfile.py b/conanfile.py index c31e269b7..30d53f709 100644 --- a/conanfile.py +++ b/conanfile.py @@ -9,7 +9,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "6.4.50" + version = "6.4.51" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" diff --git a/src/lib/replication/log_store/home_raft_log_store.cpp b/src/lib/replication/log_store/home_raft_log_store.cpp index 6a589dc84..c57267f91 100644 --- a/src/lib/replication/log_store/home_raft_log_store.cpp +++ b/src/lib/replication/log_store/home_raft_log_store.cpp @@ -315,7 +315,8 @@ bool HomeRaftLogStore::compact(ulong compact_lsn) { // release this assert if for some use case, we should tolorant this case; // for now, don't expect this case to happen. // RELEASE_ASSERT(false, "compact_lsn={} is beyond the current max_lsn={}", compact_lsn, cur_max_lsn); - + REPL_STORE_LOG(DEBUG, "Adding dummy entries during compact from={} upto={}", cur_max_lsn + 1, + to_store_lsn(compact_lsn)); // We need to fill the remaining entries with dummy data. for (auto lsn{cur_max_lsn + 1}; lsn <= to_store_lsn(compact_lsn); ++lsn) { append(m_dummy_log_entry); diff --git a/src/tests/test_common/homestore_test_common.hpp b/src/tests/test_common/homestore_test_common.hpp index 3b5484966..9010874db 100644 --- a/src/tests/test_common/homestore_test_common.hpp +++ b/src/tests/test_common/homestore_test_common.hpp @@ -185,7 +185,16 @@ class HSTestHelper { do_start_homestore(true /* fake_restart*/, false /* init_device */, shutdown_delay_sec); } + virtual void start_homestore() { + do_start_homestore(true /* fake_restart*/, false /* init_device */, 1 /* shutdown_delay_sec */); + } + virtual void shutdown_homestore(bool cleanup = true) { + if (homestore::HomeStore::safe_instance() == nullptr) { + /* Already shutdown */ + return; + } + homestore::HomeStore::instance()->shutdown(); homestore::HomeStore::reset_instance(); iomanager.stop(); @@ -370,7 +379,8 @@ class HSTestHelper { if (init_device) { init_raw_devices(m_token.devs_); } } else { for (uint32_t i{0}; i < ndevices; ++i) { - m_generated_devs.emplace_back(std::string{"/tmp/" + m_token.name_ + "_" + std::to_string(i + 1)}); + m_generated_devs.emplace_back( + std::string{"/tmp/source/tests/" + m_token.name_ + "_" + std::to_string(i + 1)}); } if (init_device) { LOGINFO("creating {} device files with each of size {} ", ndevices, homestore::in_bytes(dev_size)); diff --git a/src/tests/test_common/hs_repl_test_common.hpp b/src/tests/test_common/hs_repl_test_common.hpp index 356b58a6b..67abe2f8e 100644 --- a/src/tests/test_common/hs_repl_test_common.hpp +++ b/src/tests/test_common/hs_repl_test_common.hpp @@ -242,6 +242,13 @@ class HSReplTestHelper : public HSTestHelper { }); } + void shutdown() { shutdown_homestore(false /* cleanup */); } + + void start() { + m_token.params(HS_SERVICE::REPLICATION).repl_app = std::make_unique< TestReplApplication >(*this); + start_homestore(); + } + uint16_t replica_num() const { return replica_num_; } homestore::replica_id_t my_replica_id() const { return my_replica_id_; } homestore::replica_id_t replica_id(uint16_t member_id) const { diff --git a/src/tests/test_raft_repl_dev.cpp b/src/tests/test_raft_repl_dev.cpp index a741a5b6b..675cb517c 100644 --- a/src/tests/test_raft_repl_dev.cpp +++ b/src/tests/test_raft_repl_dev.cpp @@ -80,6 +80,7 @@ class TestReplicatedDB : public homestore::ReplDevListener { uint64_t data_size_; uint64_t data_pattern_; MultiBlkId blkid_; + uint64_t id_; }; struct KeyValuePair { @@ -127,8 +128,11 @@ class TestReplicatedDB : public homestore::ReplDevListener { auto jheader = r_cast< test_req::journal_header const* >(header.cbytes()); Key k{.id_ = *(r_cast< uint64_t const* >(key.cbytes()))}; - Value v{ - .lsn_ = lsn, .data_size_ = jheader->data_size, .data_pattern_ = jheader->data_pattern, .blkid_ = blkids}; + Value v{.lsn_ = lsn, + .data_size_ = jheader->data_size, + .data_pattern_ = jheader->data_pattern, + .blkid_ = blkids, + .id_ = k.id_}; LOGINFOMOD(replication, "[Replica={}] Received commit on lsn={} dsn={} key={} value[blkid={} pattern={}]", g_helper->replica_num(), lsn, ctx->dsn(), k.id_, v.blkid_.to_string(), v.data_pattern_); @@ -136,6 +140,7 @@ class TestReplicatedDB : public homestore::ReplDevListener { { std::unique_lock lk(db_mtx_); inmem_db_.insert_or_assign(k, v); + lsn_index_.emplace(lsn, v); last_committed_lsn = lsn; ++commit_count_; } @@ -167,6 +172,7 @@ class TestReplicatedDB : public homestore::ReplDevListener { } AsyncReplResult<> create_snapshot(shared< snapshot_context > context) override { + std::lock_guard< std::mutex > lock(m_snapshot_lock); auto s = std::dynamic_pointer_cast< nuraft_snapshot_context >(context)->nuraft_snapshot(); LOGINFOMOD(replication, "[Replica={}] Got snapshot callback term={} idx={}", g_helper->replica_num(), s->get_last_log_term(), s->get_last_log_idx()); @@ -176,33 +182,40 @@ class TestReplicatedDB : public homestore::ReplDevListener { int read_snapshot_data(shared< snapshot_context > context, shared< snapshot_data > snp_data) override { auto s = std::dynamic_pointer_cast< nuraft_snapshot_context >(context)->nuraft_snapshot(); - LOGINFOMOD(replication, "[Replica={}] Read logical snapshot callback obj_id={} term={} idx={}", - g_helper->replica_num(), snp_data->offset, s->get_last_log_term(), s->get_last_log_idx()); if (snp_data->offset == 0) { snp_data->is_last_obj = false; snp_data->blob = sisl::io_blob_safe(sizeof(ulong)); + LOGINFOMOD(replication, "[Replica={}] Read logical snapshot callback obj_id={} term={} idx={}", + g_helper->replica_num(), snp_data->offset, s->get_last_log_term(), s->get_last_log_idx()); return 0; } - int64_t follower_last_lsn = snp_data->offset; + + int64_t next_lsn = snp_data->offset; std::vector< KeyValuePair > kv_snapshot_data; - LOGINFOMOD(replication, "[Replica={}] Read logical snapshot callback follower lsn={}", g_helper->replica_num(), - follower_last_lsn); - for (auto& [k, v] : inmem_db_) { - if (v.lsn_ > follower_last_lsn) { - kv_snapshot_data.emplace_back(k, v); - LOGINFOMOD(replication, "[Replica={}] Read logical snapshot callback fetching lsn={} {} {}", - g_helper->replica_num(), v.lsn_, v.data_size_, v.data_pattern_); - } + for (auto iter = lsn_index_.find(next_lsn); iter != lsn_index_.end(); iter++) { + auto& v = iter->second; + kv_snapshot_data.emplace_back(Key{v.id_}, v); + LOGTRACEMOD(replication, "[Replica={}] Read logical snapshot callback fetching lsn={} size={} pattern={}", + g_helper->replica_num(), v.lsn_, v.data_size_, v.data_pattern_); + if (kv_snapshot_data.size() >= 1000) { break; } } - int64_t kv_snapshot_data_size = sizeof(KeyValuePair) * kv_snapshot_data.size(); - LOGINFOMOD(replication, "Snapshot size {}", kv_snapshot_data_size); + if (kv_snapshot_data.size() == 0) { + snp_data->is_last_obj = true; + LOGINFOMOD(replication, "Snapshot is_last_obj is true"); + return 0; + } + int64_t kv_snapshot_data_size = sizeof(KeyValuePair) * kv_snapshot_data.size(); sisl::io_blob_safe blob{static_cast< uint32_t >(kv_snapshot_data_size)}; std::memcpy(blob.bytes(), kv_snapshot_data.data(), kv_snapshot_data_size); snp_data->blob = std::move(blob); - snp_data->is_last_obj = true; + snp_data->is_last_obj = false; + LOGINFOMOD(replication, "[Replica={}] Read logical snapshot callback obj_id={} term={} idx={} num_items={}", + g_helper->replica_num(), snp_data->offset, s->get_last_log_term(), s->get_last_log_idx(), + kv_snapshot_data.size()); + return 0; } @@ -218,27 +231,27 @@ class TestReplicatedDB : public homestore::ReplDevListener { void write_snapshot_data(shared< snapshot_context > context, shared< snapshot_data > snp_data) override { auto s = std::dynamic_pointer_cast< nuraft_snapshot_context >(context)->nuraft_snapshot(); - LOGINFOMOD(replication, "[Replica={}] Save logical snapshot callback obj_id={} term={} idx={} is_last={}", - g_helper->replica_num(), snp_data->offset, s->get_last_log_term(), s->get_last_log_idx(), - snp_data->is_last_obj); if (snp_data->offset == 0) { // For obj_id 0 we sent back the last committed lsn. snp_data->offset = last_committed_lsn; - LOGINFOMOD(replication, "[Replica={}] Save logical snapshot callback return obj_id={}", - g_helper->replica_num(), snp_data->offset); + LOGINFOMOD(replication, "[Replica={}] Save logical snapshot callback obj_id={} term={} idx={} is_last={}", + g_helper->replica_num(), snp_data->offset, s->get_last_log_term(), s->get_last_log_idx(), + snp_data->is_last_obj); return; } - std::unique_lock lk(db_mtx_); size_t kv_snapshot_data_size = snp_data->blob.size(); - LOGINFOMOD(replication, "Snapshot size {}", kv_snapshot_data_size); + if (kv_snapshot_data_size == 0) { return; } + + size_t num_items = kv_snapshot_data_size / sizeof(KeyValuePair); + std::unique_lock lk(db_mtx_); auto ptr = r_cast< const KeyValuePair* >(snp_data->blob.bytes()); - for (size_t i = 0; i < kv_snapshot_data_size / sizeof(KeyValuePair); i++) { + for (size_t i = 0; i < num_items; i++) { auto key = ptr->key; auto value = ptr->value; - LOGINFOMOD(replication, "[Replica={}] Save logical snapshot got lsn={} data_size={} data_pattern={}", - g_helper->replica_num(), value.lsn_, value.data_size_, value.data_pattern_); + LOGTRACEMOD(replication, "[Replica={}] Save logical snapshot got lsn={} data_size={} data_pattern={}", + g_helper->replica_num(), value.lsn_, value.data_size_, value.data_pattern_); // Write to data service and inmem map. MultiBlkId out_blkids; @@ -251,16 +264,25 @@ class TestReplicatedDB : public homestore::ReplDevListener { ++commit_count_; ptr++; } + + LOGINFOMOD(replication, + "[Replica={}] Save logical snapshot callback obj_id={} term={} idx={} is_last={} num_items={}", + g_helper->replica_num(), snp_data->offset, s->get_last_log_term(), s->get_last_log_idx(), + snp_data->is_last_obj, num_items); + snp_data->offset = last_committed_lsn + 1; } bool apply_snapshot(shared< snapshot_context > context) override { + std::lock_guard< std::mutex > lock(m_snapshot_lock); auto s = std::dynamic_pointer_cast< nuraft_snapshot_context >(context)->nuraft_snapshot(); LOGINFOMOD(replication, "[Replica={}] Apply snapshot term={} idx={}", g_helper->replica_num(), s->get_last_log_term(), s->get_last_log_idx()); + m_last_snapshot = context; return true; } shared< snapshot_context > last_snapshot() override { + std::lock_guard< std::mutex > lock(m_snapshot_lock); if (!m_last_snapshot) return nullptr; auto s = std::dynamic_pointer_cast< nuraft_snapshot_context >(m_last_snapshot)->nuraft_snapshot(); @@ -367,10 +389,12 @@ class TestReplicatedDB : public homestore::ReplDevListener { private: std::map< Key, Value > inmem_db_; + std::map< int64_t, Value > lsn_index_; uint64_t commit_count_{0}; std::shared_mutex db_mtx_; uint64_t last_committed_lsn{0}; std::shared_ptr< snapshot_context > m_last_snapshot{nullptr}; + std::mutex m_snapshot_lock; bool zombie_{false}; }; @@ -546,6 +570,23 @@ class RaftReplDevTest : public testing::Test { } } + void shutdown_replica(uint16_t replica) { + if (g_helper->replica_num() == replica) { + LOGINFO("Shutdown homestore: replica_num = {}", replica); + g_helper->shutdown(); + } else { + LOGINFO("Wait for replica={} to completely go down and removed from alive raft-groups", replica); + std::this_thread::sleep_for(std::chrono::seconds{5}); + } + } + + void start_replica(uint16_t replica) { + if (g_helper->replica_num() == replica) { + LOGINFO("Start homestore: replica_num = {}", replica); + g_helper->start(); + } + } + void create_snapshot() { dbs_[0]->create_snapshot(); } void truncate(int num_reserved_entries) { dbs_[0]->truncate(num_reserved_entries); } @@ -869,12 +910,12 @@ TEST_F(RaftReplDevTest, GCReplReqs) { TEST_F(RaftReplDevTest, BaselineTest) { // Testing the baseline resync where leader creates snapshot and truncate entries. - // To simulate that write 10 entries to leader. Restart follower 1 with sleep 20s. - // Write to leader again to create 10 additional entries which follower 1 doesnt have. + // To simulate that write 50 entries to leader. Shutdown follower 1. + // Write to leader again to create num_io entries which follower 1 doesnt have. // This is the baseline data. Truncate and snapshot on leader. Wait for commit for leader - // and follower 2. Write to leader again 10 entries after snapshot to create entries + // and follower 2. Write to leader again 50 entries after snapshot to create entries // for incremental resync. We can create snapshot manually or triggered by raft. - // Verify all nodes got 20 entries. + // Verify all nodes got entries. LOGINFO("Homestore replica={} setup completed", g_helper->replica_num()); g_helper->sync_for_test_start(); @@ -886,41 +927,47 @@ TEST_F(RaftReplDevTest, BaselineTest) { } #endif - // Write on leader. - uint64_t entries_per_attempt = 10; + // Write some entries on leader. + uint64_t entries_per_attempt = 50; LOGINFO("Write on leader num_entries={}", entries_per_attempt); this->write_on_leader(entries_per_attempt, true /* wait_for_commit */); - // Restart follower-1 with delay. - this->restart_replica(1, 20 /* shutdown_delay_sec */); + // Shutdown replica 1. + LOGINFO("Shutdown replica 1"); + this->shutdown_replica(1); + // Write lot of entries on leader. + entries_per_attempt = SISL_OPTIONS["num_io"].as< uint64_t >(); LOGINFO("Write on leader num_entries={}", entries_per_attempt); - this->write_on_leader(entries_per_attempt, true /* wait_for_commit */); - if (g_helper->replica_num() == 0 || g_helper->replica_num() == 2) { - // Wait for commmit on leader and follower-2 + this->write_on_leader(entries_per_attempt, true /* wait_for_commit */); + + // Wait for commmit on leader and follower 2 this->wait_for_all_commits(); LOGINFO("Got all commits for replica 0 and 2"); - } - if (g_helper->replica_num() == 0) { - // Leader does manual snapshot and truncate - LOGINFO("Leader create snapshot and truncate"); - this->create_snapshot(); - this->truncate(0); + if (g_helper->replica_num() == 0) { + // Leader does manual snapshot and truncate + LOGINFO("Leader create snapshot and truncate"); + this->create_snapshot(); + this->truncate(0); + } } + // Wait till all writes are down and snapshot is created. + g_helper->sync_for_verify_start(); + + // Start replica 1 after this. + LOGINFO("Start replica 1"); + this->start_replica(1); + g_helper->sync_for_test_start(); + // Write on leader to have some entries for increment resync. + entries_per_attempt = 50; LOGINFO("Write on leader num_entries={}", entries_per_attempt); this->write_on_leader(entries_per_attempt, true /* wait_for_commit */); - if (g_helper->replica_num() == 0 || g_helper->replica_num() == 2) { - // Wait for commmit on leader and follower-2 - this->wait_for_all_commits(); - LOGINFO("Got all commits for replica 0 and 2 second time"); - } - - // Validate all have 30 log entries and corresponding entries. g_helper->sync_for_verify_start(); + LOGINFO("Validate all data written so far by reading them"); this->validate_data(); g_helper->sync_for_cleanup_start();