Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

issue 259/260: incremental resync #311

Merged
merged 10 commits into from
Feb 21, 2024
8 changes: 5 additions & 3 deletions conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "5.1.3"
version = "5.1.4"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down Expand Up @@ -56,8 +56,10 @@ def build_requirements(self):

def requirements(self):
self.requires("iomgr/[~=11, include_prerelease=True]@oss/master")
self.requires("sisl/[~=11, include_prerelease=True]@oss/master")
self.requires("nuraft_mesg/[~=2, include_prerelease=True]@oss/main")
#self.requires("sisl/[~=11, include_prerelease=True]@oss/master")
yamingk marked this conversation as resolved.
Show resolved Hide resolved
self.requires("sisl/11.1.1@arjun/oss")
#self.requires("nuraft_mesg/[~=2, include_prerelease=True]@oss/main")
self.requires("nuraft_mesg/3.0.1@arjun/oss")

self.requires("farmhash/cci.20190513@")
if self.settings.arch in ['x86', 'x86_64']:
Expand Down
88 changes: 65 additions & 23 deletions src/lib/replication/repl_dev/raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include <sisl/fds/buffer.hpp>
#include <sisl/grpc/generic_service.hpp>
#include <sisl/grpc/rpc_client.hpp>
#include <homestore/blkdata_service.hpp>
#include <homestore/logstore_service.hpp>
#include <homestore/superblk_handler.hpp>
Expand Down Expand Up @@ -147,7 +148,7 @@ void RaftReplDev::push_data_to_all_followers(repl_req_ptr_t rreq) {
flatbuffers::FlatBufferToString(builder.GetBufferPointer() + sizeof(flatbuffers::uoffset_t),
PushDataRequestTypeTable()));*/

LOGINFO("Data Channel: Pushing data to all followers: rreq=[{}]", rreq->to_compact_string());
RD_LOG(INFO, "Data Channel: Pushing data to all followers: rreq=[{}]", rreq->to_compact_string());
yamingk marked this conversation as resolved.
Show resolved Hide resolved

group_msg_service()
->data_service_request_unidirectional(nuraft_mesg::role_regex::ALL, PUSH_DATA, rreq->pkts)
Expand Down Expand Up @@ -193,6 +194,10 @@ void RaftReplDev::on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_
auto const& originator = req->blkid_originator();
auto const& remote_blkid = req->remote_blkid();

// release this assert if in the future we want to fetch from non-originator;
RD_REL_ASSERT(originator == server_id(),
"Not expect to receive fetch data from remote when I am not the originator of this request");

// fetch data based on the remote_blkid
if (originator == server_id()) {
// We are the originator of the blkid, read data locally;
Expand Down Expand Up @@ -420,10 +425,14 @@ void RaftReplDev::check_and_fetch_remote_data(std::vector< repl_req_ptr_t >* rre
}

void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t >* rreqs) {
if (rreqs->size() == 0) { return; }

std::vector<::flatbuffers::Offset< RequestEntry > > entries;
entries.reserve(rreqs->size());

shared< flatbuffers::FlatBufferBuilder > builder = std::make_shared< flatbuffers::FlatBufferBuilder >();
RD_LOG(INFO, "Data Channel : FetchData from remote: rreq.size={}, my server_id={}", rreqs->size(), server_id());
auto const& originator = rreqs->front()->remote_blkid.server_id;

yamingk marked this conversation as resolved.
Show resolved Hide resolved
for (auto const& rreq : *rreqs) {
entries.push_back(CreateRequestEntry(*builder, rreq->get_lsn(), rreq->term(), rreq->dsn(),
Expand All @@ -432,8 +441,14 @@ void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t >* rreqs) {
rreq->remote_blkid.server_id /* blkid_originator */,
builder->CreateVector(rreq->remote_blkid.blkid.serialize().cbytes(),
rreq->remote_blkid.blkid.serialized_size())));
LOGINFO("Fetching data from remote: rreq=[{}], remote_blkid={}", rreq->to_compact_string(),
rreq->remote_blkid.blkid.to_string());
// releax this assert if there is a case in same batch originator can be different (can't think of one now)
yamingk marked this conversation as resolved.
Show resolved Hide resolved
// but if there were to be such case, we need to group rreqs by originator and send them in separate
// batches;
RD_DBG_ASSERT(rreq->remote_blkid.server_id == originator, "Unexpected originator for rreq={}",
rreq->to_compact_string());

RD_LOG(TRACE, "Fetching data from originator={}, remote: rreq=[{}], remote_blkid={}, my server_id={}",
originator, rreq->to_compact_string(), rreq->remote_blkid.blkid.to_string(), server_id());
}

builder->FinishSizePrefixed(
Expand All @@ -443,15 +458,34 @@ void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t >* rreqs) {
// blkid;
group_msg_service()
->data_service_request_bidirectional(
nuraft_mesg::role_regex::LEADER, FETCH_DATA,
originator, FETCH_DATA,
sisl::io_blob_list_t{
sisl::io_blob{builder->GetBufferPointer(), builder->GetSize(), false /* is_aligned */}})
.via(&folly::InlineExecutor::instance())
.thenValue([this, builder, rreqs](auto e) {
RD_REL_ASSERT(!!e, "Error in fetching data");
if (!e) {
// if we are here, it means the original who sent the log entries are down.
// we need to handle error and when the other member becomes leader, it will resend the log entries;
RD_LOG(INFO,
"Not able to fetching data from originator={}, error={}, probably originator is down. Will "
"retry when new leader start appending log entries",
rreqs->front()->remote_blkid.server_id, e.error());
for (auto const& rreq : *rreqs) {
handle_error(rreq, RaftReplService::to_repl_error(e.error()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if originator down, how we switch to current leader.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sanebay If originator is down, those requests are timed out. But later when a new leader is elected, that leader will send the new data and will create a new rreq and fetch data and commit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the originator is removed and new leader is elected and its sending resync, wont we keep timing out for each entry because we dont who the leader is. We wont be able to move forward. One option is to first for check for raft who is the leader and if not found, ask all replica's who the leader is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is explained at line:485. The new leader will re-send the append entries with originator set to this new leader. We just need to handle failure here if the originator goes down.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any case that we can error out here but not getting new append entries? e.g temporary network flips? or even a restart before raft timing out (so that leader doesnt change)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

responded in other place for this comment (looks like a duplicated one?)

}
return;
}

auto raw_data = e.value()->response_blob().cbytes();
auto total_size = e.value()->response_blob().size();

RD_DBG_ASSERT(total_size > 0, "Empty response from remote");
RD_DBG_ASSERT(raw_data, "Empty response from remote");
yamingk marked this conversation as resolved.
Show resolved Hide resolved

RD_LOG(INFO, "Data Channel: FetchData completed for reques.size()={} ", rreqs->size());

auto raw_data = e.value().cbytes();
auto total_size = e.value().size();
thread_local std::vector< folly::Future< std::error_code > > futs; // static is impplied
futs.clear();

for (auto const& rreq : *rreqs) {
auto const data_size = rreq->remote_blkid.blkid.blk_count() * get_blk_size();
Expand Down Expand Up @@ -507,25 +541,36 @@ void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t >* rreqs) {
}

// Schedule a write and upon completion, mark the data as written.
data_service()
.async_write(r_cast< const char* >(data), data_size, rreq->local_blkid)
.thenValue([this, rreq](auto&& err) {
RD_REL_ASSERT(!err,
"Error in writing data"); // TODO: Find a way to return error to the Listener
rreq->state.fetch_or(uint32_cast(repl_req_state_t::DATA_WRITTEN));
rreq->data_written_promise.setValue();
RD_LOG(INFO, "Data Channel: Data Write completed rreq=[{}]", rreq->to_compact_string());
});
futs.emplace_back(
data_service().async_write(r_cast< const char* >(data), data_size, rreq->local_blkid));

// move the raw_data pointer to next rreq's data;
raw_data += data_size;
total_size -= data_size;

LOGINFO(
"Data Channel: Data fetched from remote: rreq=[{}], data_size: {}, total_size: {}, local_blkid: {}",
rreq->to_compact_string(), data_size, total_size, rreq->local_blkid.to_string());
RD_LOG(INFO,
"Data Channel: Data fetched from remote: rreq=[{}], data_size: {}, total_size: {}, "
"local_blkid: {}",
rreq->to_compact_string(), data_size, total_size, rreq->local_blkid.to_string());
}

folly::collectAllUnsafe(futs).thenValue([this, rreqs, e = std::move(e)](auto&& vf) {
for (auto const& err_c : vf) {
if (sisl_unlikely(err_c.value())) {
auto ec = err_c.value();
RD_LOG(ERROR, "Error in writing data: {}", ec.value());
// TODO: actually will never arrive here as iomgr will assert (should not assert but
// to raise alert and leave the raft group);
}
}

for (auto const& rreq : *rreqs) {
rreq->state.fetch_or(uint32_cast(repl_req_state_t::DATA_WRITTEN));
rreq->data_written_promise.setValue();
RD_LOG(TRACE, "Data Channel: Data Write completed rreq=[{}]", rreq->to_compact_string());
}
});

builder->Release();

RD_DBG_ASSERT_EQ(total_size, 0, "Total size mismatch, some data is not consumed");
Expand Down Expand Up @@ -563,17 +608,14 @@ AsyncNotify RaftReplDev::notify_after_data_written(std::vector< repl_req_ptr_t >
// if in resync mode, fetch data from remote immediately;
check_and_fetch_remote_data(rreqs);
} else {
check_and_fetch_remote_data(rreqs);
// some data are not in completed state, let's schedule a timer to check it again;
// we wait for data channel to fill in the data. Still if its not done we trigger a fetch from remote;
#if 0
m_wait_data_timer_hdl = iomanager.schedule_global_timer( // timer wakes up in current thread;
HS_DYNAMIC_CONFIG(consensus.wait_data_write_timer_sec) * 1000 * 1000 * 1000, false /* recurring */,
nullptr /* cookie */, iomgr::reactor_regex::all_worker, [this, rreqs](auto /*cookie*/) {
LOGINFO("Data Channel: Wait data write timer fired, checking if data is written");
RD_LOG(INFO, "Data Channel: Wait data write timer fired, checking if data is written");
yamingk marked this conversation as resolved.
Show resolved Hide resolved
check_and_fetch_remote_data(rreqs);
});
#endif
}

// block waiting here until all the futs are ready (data channel filled in and promises are made);
Expand Down
5 changes: 3 additions & 2 deletions src/lib/replication/repl_dev/raft_repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ class RaftReplDev : public ReplDev,

std::atomic< uint64_t > m_next_dsn{0}; // Data Sequence Number that will keep incrementing for each data entry
//
iomgr::timer_handle_t m_wait_data_timer_hdl{iomgr::null_timer_handle};
iomgr::timer_handle_t m_wait_data_timer_hdl{
iomgr::null_timer_handle}; // non-recurring timer doesn't need to be cancelled on shutdown;
bool m_resync_mode{false};

static std::atomic< uint64_t > s_next_group_ordinal;
Expand All @@ -81,7 +82,7 @@ class RaftReplDev : public ReplDev,
AsyncReplResult<> become_leader() override;
bool is_leader() const override;
const replica_id_t get_leader_id() const override;
std::vector<peer_info> get_replication_status() const override;
std::vector< peer_info > get_replication_status() const override;
group_id_t group_id() const override { return m_group_id; }
std::string group_id_str() const { return boost::uuids::to_string(m_group_id); }
std::string rdev_name() const { return m_rdev_name; }
Expand Down
2 changes: 1 addition & 1 deletion src/tests/test_common/homestore_test_common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ class HSTestHelper {
if (fake_restart) {
shutdown_homestore(false);
// sisl::GrpcAsyncClientWorker::shutdown_all();
std::this_thread::sleep_for(std::chrono::seconds{5});
std::this_thread::sleep_for(std::chrono::seconds{10});
}

std::vector< homestore::dev_info > device_info;
Expand Down
76 changes: 52 additions & 24 deletions src/tests/test_raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
using namespace homestore;

SISL_LOGGING_DEF(test_raft_repl_dev)
SISL_LOGGING_INIT(HOMESTORE_LOG_MODS)
SISL_LOGGING_INIT(HOMESTORE_LOG_MODS, nuraft_mesg)

SISL_OPTION_GROUP(test_raft_repl_dev,
(block_size, "", "block_size", "block size to io",
Expand Down Expand Up @@ -331,45 +331,73 @@ TEST_F(RaftReplDevTest, All_Append_Fetch_Remote_Data) {

g_helper->sync_for_verify_start();

// TODO: seems with filip and fetch remote, the data size is not correct;
LOGINFO("Validate all data written so far by reading them");
this->validate_all_data();

g_helper->sync_for_cleanup_start();
}

TEST_F(RaftReplDevTest, All_ReplService) {
// do some io before restart;
TEST_F(RaftReplDevTest, All_restart_one_follower_inc_resync) {
LOGINFO("Homestore replica={} setup completed", g_helper->replica_num());
g_helper->sync_for_test_start();
auto repl_dev = dynamic_pointer_cast< RaftReplDev >(pick_one_db().repl_dev());
auto group_id = repl_dev->group_id();
auto my_id_str = repl_dev->my_replica_id_str();

auto leader = repl_dev->get_leader_id();
ASSERT_TRUE(leader != replica_id_t())
<< "Error getting leader id for group_id=" << boost::uuids::to_string(group_id).c_str();
auto leader_str = boost::uuids::to_string(leader);
LOGINFO("Got raft leader {} for group {}", leader_str, group_id);

// step-0: do some IO before restart one member;
uint64_t exp_entries = 20;
if (g_helper->replica_num() == 0) {
ASSERT_TRUE(leader_str == my_id_str)
<< "Leader id " << leader_str.c_str() << " should equals to my ID " << my_id_str.c_str();
} else {
ASSERT_TRUE(leader_str != my_id_str) << "I am a follower, Leader id " << leader_str.c_str()
<< " should not equals to my ID " << my_id_str.c_str();
g_helper->runner().set_num_tasks(20);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit : use exp_entries

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The data channel and log channel use the same grpc connection actually and if there is any network issue, a new leader should be elected. For the network flip, I also discussed this with Sanel the other day, that we probably should do retry, this will make the logic complicated but if this network flip (network failure, but still leader doesn't change) is a real thing, we don't have choice but to handle it. I will mark it as a TODO also (to avoid comlicating things before we are 100% sure needed).

auto block_size = SISL_OPTIONS["block_size"].as< uint32_t >();
LOGINFO("Run on worker threads to schedule append on repldev for {} Bytes.", block_size);
g_helper->runner().set_task([this, block_size]() {
this->generate_writes(block_size /* data_size */, block_size /* max_size_per_iov */);
});
g_helper->runner().execute().get();
}

auto peers_info = repl_dev->get_replication_status();
LOGINFO("Got peers_info size {} for group {}", peers_info.size(), group_id);
// step-1: wait for all writes to be completed
this->wait_for_all_writes(exp_entries);

// step-2: restart one non-leader replica
if (g_helper->replica_num() == 1) {
LOGINFO("Restart homestore: replica_num = 1");
g_helper->restart();
g_helper->sync_for_test_start();
}

exp_entries += SISL_OPTIONS["num_io"].as< uint64_t >();
// step-3: on leader, wait for a while for replica-1 to finish shutdown so that it can be removed from raft-groups
// and following I/O issued by leader won't be pushed to relica-1;
if (g_helper->replica_num() == 0) {
auto const num_replicas = SISL_OPTIONS["replicas"].as< uint32_t >();
EXPECT_TRUE(peers_info.size() == num_replicas)
<< "Expecting peers_info size " << peers_info.size() << " but got " << peers_info.size();
} else {
EXPECT_TRUE(peers_info.size() == 0) << "Expecting zero length on follower, got " << peers_info.size();
LOGINFO("Wait for grpc connection to replica-1 to expire and removed from raft-groups.");
std::this_thread::sleep_for(std::chrono::seconds{5});

g_helper->runner().set_num_tasks(SISL_OPTIONS["num_io"].as< uint64_t >());

// before replica-1 started, issue I/O so that replica-1 is lagging behind;
auto block_size = SISL_OPTIONS["block_size"].as< uint32_t >();
LOGINFO("Run on worker threads to schedule append on repldev for {} Bytes.", block_size);
g_helper->runner().set_task([this, block_size]() {
this->generate_writes(block_size /* data_size */, block_size /* max_size_per_iov */);
});
g_helper->runner().execute().get();
}

this->wait_for_all_writes(exp_entries);

g_helper->sync_for_verify_start();
LOGINFO("Validate all data written so far by reading them");
this->validate_all_data();
g_helper->sync_for_cleanup_start();
}

// TODO
// double restart:
// 1. restart one follower(F1) while I/O keep running.
// 2. after F1 reboots and leader is resyncing with F1 (after sending the appended entries), this leader also retarts.
// 3. F1 should receive error from grpc saying originator not there.
// 4. F2 should be appending entries to F1 and F1 should be able to catch up with F2 (fetch data from F2).
//

int main(int argc, char* argv[]) {
int parsed_argc{argc};
char** orig_argv = argv;
Expand Down
Loading