Skip to content

Commit

Permalink
issue 259/260: incremental resync
Browse files Browse the repository at this point in the history
  • Loading branch information
yamingk committed Feb 8, 2024
1 parent ccb0bdb commit d26e4d3
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 53 deletions.
8 changes: 5 additions & 3 deletions conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "5.1.3"
version = "5.1.4"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down Expand Up @@ -56,8 +56,10 @@ def build_requirements(self):

def requirements(self):
self.requires("iomgr/[~=11, include_prerelease=True]@oss/master")
self.requires("sisl/[~=11, include_prerelease=True]@oss/master")
self.requires("nuraft_mesg/[~=2, include_prerelease=True]@oss/main")
#self.requires("sisl/[~=11, include_prerelease=True]@oss/master")
self.requires("sisl/11.1.1@arjun/oss")
#self.requires("nuraft_mesg/[~=2, include_prerelease=True]@oss/main")
self.requires("nuraft_mesg/3.0.1@arjun/oss")

self.requires("farmhash/cci.20190513@")
if self.settings.arch in ['x86', 'x86_64']:
Expand Down
88 changes: 65 additions & 23 deletions src/lib/replication/repl_dev/raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include <sisl/fds/buffer.hpp>
#include <sisl/grpc/generic_service.hpp>
#include <sisl/grpc/rpc_client.hpp>
#include <homestore/blkdata_service.hpp>
#include <homestore/logstore_service.hpp>
#include <homestore/superblk_handler.hpp>
Expand Down Expand Up @@ -147,7 +148,7 @@ void RaftReplDev::push_data_to_all_followers(repl_req_ptr_t rreq) {
flatbuffers::FlatBufferToString(builder.GetBufferPointer() + sizeof(flatbuffers::uoffset_t),
PushDataRequestTypeTable()));*/

LOGINFO("Data Channel: Pushing data to all followers: rreq=[{}]", rreq->to_compact_string());
RD_LOG(INFO, "Data Channel: Pushing data to all followers: rreq=[{}]", rreq->to_compact_string());

group_msg_service()
->data_service_request_unidirectional(nuraft_mesg::role_regex::ALL, PUSH_DATA, rreq->pkts)
Expand Down Expand Up @@ -193,6 +194,10 @@ void RaftReplDev::on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_
auto const& originator = req->blkid_originator();
auto const& remote_blkid = req->remote_blkid();

// release this assert if in the future we want to fetch from non-originator;
RD_REL_ASSERT(originator == server_id(),
"Not expect to receive fetch data from remote when I am not the originator of this request");

// fetch data based on the remote_blkid
if (originator == server_id()) {
// We are the originator of the blkid, read data locally;
Expand Down Expand Up @@ -420,10 +425,14 @@ void RaftReplDev::check_and_fetch_remote_data(std::vector< repl_req_ptr_t >* rre
}

void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t >* rreqs) {
if (rreqs->size() == 0) { return; }

std::vector<::flatbuffers::Offset< RequestEntry > > entries;
entries.reserve(rreqs->size());

shared< flatbuffers::FlatBufferBuilder > builder = std::make_shared< flatbuffers::FlatBufferBuilder >();
RD_LOG(INFO, "Data Channel : FetchData from remote: rreq.size={}, my server_id={}", rreqs->size(), server_id());
auto const& originator = rreqs->front()->remote_blkid.server_id;

for (auto const& rreq : *rreqs) {
entries.push_back(CreateRequestEntry(*builder, rreq->get_lsn(), rreq->term(), rreq->dsn(),
Expand All @@ -432,8 +441,14 @@ void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t >* rreqs) {
rreq->remote_blkid.server_id /* blkid_originator */,
builder->CreateVector(rreq->remote_blkid.blkid.serialize().cbytes(),
rreq->remote_blkid.blkid.serialized_size())));
LOGINFO("Fetching data from remote: rreq=[{}], remote_blkid={}", rreq->to_compact_string(),
rreq->remote_blkid.blkid.to_string());
// releax this assert if there is a case in same batch originator can be different (can't think of one now)
// but if there were to be such case, we need to group rreqs by originator and send them in separate
// batches;
RD_DBG_ASSERT(rreq->remote_blkid.server_id == originator, "Unexpected originator for rreq={}",
rreq->to_compact_string());

RD_LOG(TRACE, "Fetching data from originator={}, remote: rreq=[{}], remote_blkid={}, my server_id={}",
originator, rreq->to_compact_string(), rreq->remote_blkid.blkid.to_string(), server_id());
}

builder->FinishSizePrefixed(
Expand All @@ -443,15 +458,34 @@ void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t >* rreqs) {
// blkid;
group_msg_service()
->data_service_request_bidirectional(
nuraft_mesg::role_regex::LEADER, FETCH_DATA,
originator, FETCH_DATA,
sisl::io_blob_list_t{
sisl::io_blob{builder->GetBufferPointer(), builder->GetSize(), false /* is_aligned */}})
.via(&folly::InlineExecutor::instance())
.thenValue([this, builder, rreqs](auto e) {
RD_REL_ASSERT(!!e, "Error in fetching data");
if (!e) {
// if we are here, it means the original who sent the log entries are down.
// we need to handle error and when the other member becomes leader, it will resend the log entries;
RD_LOG(INFO,
"Not able to fetching data from originator={}, error={}, probably originator is down. Will "
"retry when new leader start appending log entries",
rreqs->front()->remote_blkid.server_id, e.error());
for (auto const& rreq : *rreqs) {
handle_error(rreq, RaftReplService::to_repl_error(e.error()));
}
return;
}

auto raw_data = e.value()->response_blob().cbytes();
auto total_size = e.value()->response_blob().size();

RD_DBG_ASSERT(total_size > 0, "Empty response from remote");
RD_DBG_ASSERT(raw_data, "Empty response from remote");

RD_LOG(INFO, "Data Channel: FetchData completed for reques.size()={} ", rreqs->size());

auto raw_data = e.value().cbytes();
auto total_size = e.value().size();
thread_local std::vector< folly::Future< std::error_code > > futs; // static is impplied
futs.clear();

for (auto const& rreq : *rreqs) {
auto const data_size = rreq->remote_blkid.blkid.blk_count() * get_blk_size();
Expand Down Expand Up @@ -507,25 +541,36 @@ void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t >* rreqs) {
}

// Schedule a write and upon completion, mark the data as written.
data_service()
.async_write(r_cast< const char* >(data), data_size, rreq->local_blkid)
.thenValue([this, rreq](auto&& err) {
RD_REL_ASSERT(!err,
"Error in writing data"); // TODO: Find a way to return error to the Listener
rreq->state.fetch_or(uint32_cast(repl_req_state_t::DATA_WRITTEN));
rreq->data_written_promise.setValue();
RD_LOG(INFO, "Data Channel: Data Write completed rreq=[{}]", rreq->to_compact_string());
});
futs.emplace_back(
data_service().async_write(r_cast< const char* >(data), data_size, rreq->local_blkid));

// move the raw_data pointer to next rreq's data;
raw_data += data_size;
total_size -= data_size;

LOGINFO(
"Data Channel: Data fetched from remote: rreq=[{}], data_size: {}, total_size: {}, local_blkid: {}",
rreq->to_compact_string(), data_size, total_size, rreq->local_blkid.to_string());
RD_LOG(INFO,
"Data Channel: Data fetched from remote: rreq=[{}], data_size: {}, total_size: {}, "
"local_blkid: {}",
rreq->to_compact_string(), data_size, total_size, rreq->local_blkid.to_string());
}

folly::collectAllUnsafe(futs).thenValue([this, rreqs, e = std::move(e)](auto&& vf) {
for (auto const& err_c : vf) {
if (sisl_unlikely(err_c.value())) {
auto ec = err_c.value();
RD_LOG(ERROR, "Error in writing data: {}", ec.value());
// TODO: actually will never arrive here as iomgr will assert (should not assert but
// to raise alert and leave the raft group);
}
}

for (auto const& rreq : *rreqs) {
rreq->state.fetch_or(uint32_cast(repl_req_state_t::DATA_WRITTEN));
rreq->data_written_promise.setValue();
RD_LOG(TRACE, "Data Channel: Data Write completed rreq=[{}]", rreq->to_compact_string());
}
});

builder->Release();

RD_DBG_ASSERT_EQ(total_size, 0, "Total size mismatch, some data is not consumed");
Expand Down Expand Up @@ -563,17 +608,14 @@ AsyncNotify RaftReplDev::notify_after_data_written(std::vector< repl_req_ptr_t >
// if in resync mode, fetch data from remote immediately;
check_and_fetch_remote_data(rreqs);
} else {
check_and_fetch_remote_data(rreqs);
// some data are not in completed state, let's schedule a timer to check it again;
// we wait for data channel to fill in the data. Still if its not done we trigger a fetch from remote;
#if 0
m_wait_data_timer_hdl = iomanager.schedule_global_timer( // timer wakes up in current thread;
HS_DYNAMIC_CONFIG(consensus.wait_data_write_timer_sec) * 1000 * 1000 * 1000, false /* recurring */,
nullptr /* cookie */, iomgr::reactor_regex::all_worker, [this, rreqs](auto /*cookie*/) {
LOGINFO("Data Channel: Wait data write timer fired, checking if data is written");
RD_LOG(INFO, "Data Channel: Wait data write timer fired, checking if data is written");
check_and_fetch_remote_data(rreqs);
});
#endif
}

// block waiting here until all the futs are ready (data channel filled in and promises are made);
Expand Down
5 changes: 3 additions & 2 deletions src/lib/replication/repl_dev/raft_repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ class RaftReplDev : public ReplDev,

std::atomic< uint64_t > m_next_dsn{0}; // Data Sequence Number that will keep incrementing for each data entry
//
iomgr::timer_handle_t m_wait_data_timer_hdl{iomgr::null_timer_handle};
iomgr::timer_handle_t m_wait_data_timer_hdl{
iomgr::null_timer_handle}; // non-recurring timer doesn't need to be cancelled on shutdown;
bool m_resync_mode{false};

static std::atomic< uint64_t > s_next_group_ordinal;
Expand All @@ -81,7 +82,7 @@ class RaftReplDev : public ReplDev,
AsyncReplResult<> become_leader() override;
bool is_leader() const override;
const replica_id_t get_leader_id() const override;
std::vector<peer_info> get_replication_status() const override;
std::vector< peer_info > get_replication_status() const override;
group_id_t group_id() const override { return m_group_id; }
std::string group_id_str() const { return boost::uuids::to_string(m_group_id); }
std::string rdev_name() const { return m_rdev_name; }
Expand Down
2 changes: 1 addition & 1 deletion src/tests/test_common/homestore_test_common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ class HSTestHelper {
if (fake_restart) {
shutdown_homestore(false);
// sisl::GrpcAsyncClientWorker::shutdown_all();
std::this_thread::sleep_for(std::chrono::seconds{5});
std::this_thread::sleep_for(std::chrono::seconds{10});
}

std::vector< homestore::dev_info > device_info;
Expand Down
76 changes: 52 additions & 24 deletions src/tests/test_raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
using namespace homestore;

SISL_LOGGING_DEF(test_raft_repl_dev)
SISL_LOGGING_INIT(HOMESTORE_LOG_MODS)
SISL_LOGGING_INIT(HOMESTORE_LOG_MODS, nuraft_mesg)

SISL_OPTION_GROUP(test_raft_repl_dev,
(block_size, "", "block_size", "block size to io",
Expand Down Expand Up @@ -331,45 +331,73 @@ TEST_F(RaftReplDevTest, All_Append_Fetch_Remote_Data) {

g_helper->sync_for_verify_start();

// TODO: seems with filip and fetch remote, the data size is not correct;
LOGINFO("Validate all data written so far by reading them");
this->validate_all_data();

g_helper->sync_for_cleanup_start();
}

TEST_F(RaftReplDevTest, All_ReplService) {
// do some io before restart;
TEST_F(RaftReplDevTest, All_restart_one_follower_inc_resync) {
LOGINFO("Homestore replica={} setup completed", g_helper->replica_num());
g_helper->sync_for_test_start();
auto repl_dev = dynamic_pointer_cast< RaftReplDev >(pick_one_db().repl_dev());
auto group_id = repl_dev->group_id();
auto my_id_str = repl_dev->my_replica_id_str();

auto leader = repl_dev->get_leader_id();
ASSERT_TRUE(leader != replica_id_t())
<< "Error getting leader id for group_id=" << boost::uuids::to_string(group_id).c_str();
auto leader_str = boost::uuids::to_string(leader);
LOGINFO("Got raft leader {} for group {}", leader_str, group_id);

// step-0: do some IO before restart one member;
uint64_t exp_entries = 20;
if (g_helper->replica_num() == 0) {
ASSERT_TRUE(leader_str == my_id_str)
<< "Leader id " << leader_str.c_str() << " should equals to my ID " << my_id_str.c_str();
} else {
ASSERT_TRUE(leader_str != my_id_str) << "I am a follower, Leader id " << leader_str.c_str()
<< " should not equals to my ID " << my_id_str.c_str();
g_helper->runner().set_num_tasks(20);
auto block_size = SISL_OPTIONS["block_size"].as< uint32_t >();
LOGINFO("Run on worker threads to schedule append on repldev for {} Bytes.", block_size);
g_helper->runner().set_task([this, block_size]() {
this->generate_writes(block_size /* data_size */, block_size /* max_size_per_iov */);
});
g_helper->runner().execute().get();
}

auto peers_info = repl_dev->get_replication_status();
LOGINFO("Got peers_info size {} for group {}", peers_info.size(), group_id);
// step-1: wait for all writes to be completed
this->wait_for_all_writes(exp_entries);

// step-2: restart one non-leader replica
if (g_helper->replica_num() == 1) {
LOGINFO("Restart homestore: replica_num = 1");
g_helper->restart();
g_helper->sync_for_test_start();
}

exp_entries += SISL_OPTIONS["num_io"].as< uint64_t >();
// step-3: on leader, wait for a while for replica-1 to finish shutdown so that it can be removed from raft-groups
// and following I/O issued by leader won't be pushed to relica-1;
if (g_helper->replica_num() == 0) {
auto const num_replicas = SISL_OPTIONS["replicas"].as< uint32_t >();
EXPECT_TRUE(peers_info.size() == num_replicas)
<< "Expecting peers_info size " << peers_info.size() << " but got " << peers_info.size();
} else {
EXPECT_TRUE(peers_info.size() == 0) << "Expecting zero length on follower, got " << peers_info.size();
LOGINFO("Wait for grpc connection to replica-1 to expire and removed from raft-groups.");
std::this_thread::sleep_for(std::chrono::seconds{5});

g_helper->runner().set_num_tasks(SISL_OPTIONS["num_io"].as< uint64_t >());

// before replica-1 started, issue I/O so that replica-1 is lagging behind;
auto block_size = SISL_OPTIONS["block_size"].as< uint32_t >();
LOGINFO("Run on worker threads to schedule append on repldev for {} Bytes.", block_size);
g_helper->runner().set_task([this, block_size]() {
this->generate_writes(block_size /* data_size */, block_size /* max_size_per_iov */);
});
g_helper->runner().execute().get();
}

this->wait_for_all_writes(exp_entries);

g_helper->sync_for_verify_start();
LOGINFO("Validate all data written so far by reading them");
this->validate_all_data();
g_helper->sync_for_cleanup_start();
}

// TODO
// double restart:
// 1. restart one follower(F1) while I/O keep running.
// 2. after F1 reboots and leader is resyncing with F1 (after sending the appended entries), this leader also retarts.
// 3. F1 should receive error from grpc saying originator not there.
// 4. F2 should be appending entries to F1 and F1 should be able to catch up with F2 (fetch data from F2).
//

int main(int argc, char* argv[]) {
int parsed_argc{argc};
char** orig_argv = argv;
Expand Down

0 comments on commit d26e4d3

Please sign in to comment.