Skip to content

Commit

Permalink
fix raft repl dev ut
Browse files Browse the repository at this point in the history
  • Loading branch information
JacksonYao287 committed Sep 10, 2024
1 parent 7100c80 commit c9ada64
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 15 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.4.56"
version = "6.4.57"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
38 changes: 24 additions & 14 deletions src/tests/test_raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ class TestReplicatedDB : public homestore::ReplDevListener {
std::unique_lock lk(db_mtx_);
inmem_db_.insert_or_assign(k, v);
lsn_index_.emplace(lsn, v);
last_data_committed_lsn = lsn;
++commit_count_;
}

Expand Down Expand Up @@ -203,20 +204,20 @@ class TestReplicatedDB : public homestore::ReplDevListener {
if (kv_snapshot_data.size() >= 1000) { break; }
}

if (kv_snapshot_data.size() == 0) {
snp_data->is_last_obj = true;
LOGINFOMOD(replication, "Snapshot is_last_obj is true");
return 0;
if (kv_snapshot_data.size()) {
int64_t kv_snapshot_data_size = sizeof(KeyValuePair) * kv_snapshot_data.size();
sisl::io_blob_safe blob{static_cast< uint32_t >(kv_snapshot_data_size)};
std::memcpy(blob.bytes(), kv_snapshot_data.data(), kv_snapshot_data_size);
snp_data->blob = std::move(blob);
LOGINFOMOD(replication, "[Replica={}] Read logical snapshot callback obj_id={} term={} idx={} num_items={}",
g_helper->replica_num(), snp_data->offset, s->get_last_log_term(), s->get_last_log_idx(),
kv_snapshot_data.size());
} else {
LOGINFOMOD(replication, "no data need to be sent for snapshot");
}

int64_t kv_snapshot_data_size = sizeof(KeyValuePair) * kv_snapshot_data.size();
sisl::io_blob_safe blob{static_cast< uint32_t >(kv_snapshot_data_size)};
std::memcpy(blob.bytes(), kv_snapshot_data.data(), kv_snapshot_data_size);
snp_data->blob = std::move(blob);
// we always read all the data in one shot, so we need to set is_last_obj to true
snp_data->is_last_obj = true;
LOGINFOMOD(replication, "[Replica={}] Read logical snapshot callback obj_id={} term={} idx={} num_items={}",
g_helper->replica_num(), snp_data->offset, s->get_last_log_term(), s->get_last_log_idx(),
kv_snapshot_data.size());

return 0;
}
Expand All @@ -233,9 +234,9 @@ class TestReplicatedDB : public homestore::ReplDevListener {

void write_snapshot_data(shared< snapshot_context > context, shared< snapshot_data > snp_data) override {
auto s = std::dynamic_pointer_cast< nuraft_snapshot_context >(context)->nuraft_snapshot();
auto last_committed_idx =
std::dynamic_pointer_cast< RaftReplDev >(repl_dev())->raft_server()->get_committed_log_idx();
if (snp_data->offset == 0) {
auto last_committed_idx =
std::dynamic_pointer_cast< RaftReplDev >(repl_dev())->raft_server()->get_committed_log_idx();
snp_data->offset = last_committed_idx + 1;
LOGINFOMOD(replication, "[Replica={}] Save logical snapshot callback return obj_id={}",
g_helper->replica_num(), snp_data->offset);
Expand All @@ -260,6 +261,7 @@ class TestReplicatedDB : public homestore::ReplDevListener {
snapshot_data_write(value.data_size_, value.data_pattern_, out_blkids);
value.blkid_ = out_blkids;
}
last_data_committed_lsn = value.lsn_;
inmem_db_.insert_or_assign(key, value);
++commit_count_;
ptr++;
Expand All @@ -269,7 +271,12 @@ class TestReplicatedDB : public homestore::ReplDevListener {
"[Replica={}] Save logical snapshot callback obj_id={} term={} idx={} is_last={} num_items={}",
g_helper->replica_num(), snp_data->offset, s->get_last_log_term(), s->get_last_log_idx(),
snp_data->is_last_obj, num_items);
snp_data->offset = last_committed_idx + 1;

// before we finish install snapshot, raft_server()->get_committed_log_idx() will always be the same. so we need
// last_data_committed_lsn to notify leader to transfer new data to follower.
// actually, leader will transfer all the data in one shot, so this is will not trigger any error, but we should
// make the logic correct.
snp_data->offset = last_data_committed_lsn + 1;
}

bool apply_snapshot(shared< snapshot_context > context) override {
Expand Down Expand Up @@ -391,6 +398,9 @@ class TestReplicatedDB : public homestore::ReplDevListener {
std::map< Key, Value > inmem_db_;
std::map< int64_t, Value > lsn_index_;
uint64_t commit_count_{0};
// this is the last lsn for data, might not be the same with the real last committed lsn
// which should be get by raft_server()->get_committed_log_idx()
uint64_t last_data_committed_lsn{0};
std::shared_mutex db_mtx_;
std::shared_ptr< snapshot_context > m_last_snapshot{nullptr};
std::mutex m_snapshot_lock;
Expand Down

0 comments on commit c9ada64

Please sign in to comment.