diff --git a/conanfile.py b/conanfile.py index 64d35df54..d8f6720b1 100644 --- a/conanfile.py +++ b/conanfile.py @@ -9,7 +9,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "6.4.56" + version = "6.4.57" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" diff --git a/src/tests/test_raft_repl_dev.cpp b/src/tests/test_raft_repl_dev.cpp index f2919c760..720324090 100644 --- a/src/tests/test_raft_repl_dev.cpp +++ b/src/tests/test_raft_repl_dev.cpp @@ -141,6 +141,7 @@ class TestReplicatedDB : public homestore::ReplDevListener { std::unique_lock lk(db_mtx_); inmem_db_.insert_or_assign(k, v); lsn_index_.emplace(lsn, v); + last_data_committed_lsn = lsn; ++commit_count_; } @@ -203,20 +204,20 @@ class TestReplicatedDB : public homestore::ReplDevListener { if (kv_snapshot_data.size() >= 1000) { break; } } - if (kv_snapshot_data.size() == 0) { - snp_data->is_last_obj = true; - LOGINFOMOD(replication, "Snapshot is_last_obj is true"); - return 0; + if (kv_snapshot_data.size()) { + int64_t kv_snapshot_data_size = sizeof(KeyValuePair) * kv_snapshot_data.size(); + sisl::io_blob_safe blob{static_cast< uint32_t >(kv_snapshot_data_size)}; + std::memcpy(blob.bytes(), kv_snapshot_data.data(), kv_snapshot_data_size); + snp_data->blob = std::move(blob); + LOGINFOMOD(replication, "[Replica={}] Read logical snapshot callback obj_id={} term={} idx={} num_items={}", + g_helper->replica_num(), snp_data->offset, s->get_last_log_term(), s->get_last_log_idx(), + kv_snapshot_data.size()); + } else { + LOGINFOMOD(replication, "no data need to be sent for snapshot"); } - int64_t kv_snapshot_data_size = sizeof(KeyValuePair) * kv_snapshot_data.size(); - sisl::io_blob_safe blob{static_cast< uint32_t >(kv_snapshot_data_size)}; - std::memcpy(blob.bytes(), kv_snapshot_data.data(), kv_snapshot_data_size); - snp_data->blob = std::move(blob); + // we always read all the data in one shot, so we need to set is_last_obj to true snp_data->is_last_obj = true; - LOGINFOMOD(replication, "[Replica={}] Read logical snapshot callback obj_id={} term={} idx={} num_items={}", - g_helper->replica_num(), snp_data->offset, s->get_last_log_term(), s->get_last_log_idx(), - kv_snapshot_data.size()); return 0; } @@ -233,9 +234,9 @@ class TestReplicatedDB : public homestore::ReplDevListener { void write_snapshot_data(shared< snapshot_context > context, shared< snapshot_data > snp_data) override { auto s = std::dynamic_pointer_cast< nuraft_snapshot_context >(context)->nuraft_snapshot(); - auto last_committed_idx = - std::dynamic_pointer_cast< RaftReplDev >(repl_dev())->raft_server()->get_committed_log_idx(); if (snp_data->offset == 0) { + auto last_committed_idx = + std::dynamic_pointer_cast< RaftReplDev >(repl_dev())->raft_server()->get_committed_log_idx(); snp_data->offset = last_committed_idx + 1; LOGINFOMOD(replication, "[Replica={}] Save logical snapshot callback return obj_id={}", g_helper->replica_num(), snp_data->offset); @@ -260,6 +261,7 @@ class TestReplicatedDB : public homestore::ReplDevListener { snapshot_data_write(value.data_size_, value.data_pattern_, out_blkids); value.blkid_ = out_blkids; } + last_data_committed_lsn = value.lsn_; inmem_db_.insert_or_assign(key, value); ++commit_count_; ptr++; @@ -269,7 +271,12 @@ class TestReplicatedDB : public homestore::ReplDevListener { "[Replica={}] Save logical snapshot callback obj_id={} term={} idx={} is_last={} num_items={}", g_helper->replica_num(), snp_data->offset, s->get_last_log_term(), s->get_last_log_idx(), snp_data->is_last_obj, num_items); - snp_data->offset = last_committed_idx + 1; + + // before we finish install snapshot, raft_server()->get_committed_log_idx() will always be the same. so we need + // last_data_committed_lsn to notify leader to transfer new data to follower. + // actually, leader will transfer all the data in one shot, so this is will not trigger any error, but we should + // make the logic correct. + snp_data->offset = last_data_committed_lsn + 1; } bool apply_snapshot(shared< snapshot_context > context) override { @@ -391,6 +398,9 @@ class TestReplicatedDB : public homestore::ReplDevListener { std::map< Key, Value > inmem_db_; std::map< int64_t, Value > lsn_index_; uint64_t commit_count_{0}; + // this is the last lsn for data, might not be the same with the real last committed lsn + // which should be get by raft_server()->get_committed_log_idx() + uint64_t last_data_committed_lsn{0}; std::shared_mutex db_mtx_; std::shared_ptr< snapshot_context > m_last_snapshot{nullptr}; std::mutex m_snapshot_lock;