Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

issue 259/260: incremental resync #311

Merged
merged 10 commits into from
Feb 21, 2024
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "5.1.5"
version = "5.1.6"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
3 changes: 3 additions & 0 deletions src/lib/common/homestore_config.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,9 @@ table Consensus {

// Leadership expiry 120 seconds
leadership_expiry_ms: uint32 = 120000;

// data fetch max size limit in MB
data_fetch_max_size_mb: uint32 = 2;
}

table HomeStoreSettings {
Expand Down
2 changes: 2 additions & 0 deletions src/lib/device/physical_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,8 @@ void PhysicalDev::load_chunks(std::function< bool(cshared< Chunk >&) >&& chunk_f
cinfo->checksum = info_crc;

auto chunk = std::make_shared< Chunk >(this, *cinfo, cslot);
m_chunk_data_area.insert(
ChunkInterval::right_open(cinfo->chunk_start_offset, cinfo->chunk_start_offset + cinfo->chunk_size));
if (chunk_found_cb(chunk)) { get_stream(chunk).m_chunks_map.insert(std::pair{cinfo->chunk_id, chunk}); }
}
hs_utils::iobuf_free(buf, sisl::buftag::superblk);
Expand Down
132 changes: 95 additions & 37 deletions src/lib/replication/repl_dev/raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ void RaftReplDev::push_data_to_all_followers(repl_req_ptr_t rreq) {
flatbuffers::FlatBufferToString(builder.GetBufferPointer() + sizeof(flatbuffers::uoffset_t),
PushDataRequestTypeTable()));*/

LOGINFO("Data Channel: Pushing data to all followers: rreq=[{}]", rreq->to_compact_string());
RD_LOG(DEBUG, "Data Channel: Pushing data to all followers: rreq=[{}]", rreq->to_compact_string());

group_msg_service()
->data_service_request_unidirectional(nuraft_mesg::role_regex::ALL, PUSH_DATA, rreq->pkts)
Expand Down Expand Up @@ -184,16 +184,16 @@ void RaftReplDev::on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_
ctx->outstanding_read_cnt = fetch_req->request()->entries()->size();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to do Context and read count etc, which is reimplementing future/promise right? Suggest to create promise and move promise to callback and do setValue there. We don't need mutex, cv and atomic count explicltly.

Copy link
Contributor Author

@yamingk yamingk Feb 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We discussed this, somehow I was hitting hang while using the future/promise by collectAll* (tried a few variations) here (the receiving side doing async_write is already calling collectAllUnsafe). I think it is because here it is the server thread and collectAllUnsafe() somehow will block which will cause the hang? We can increase the server thread number to see if it can be resolved.

src/lib/manager_impl.cpp
27 constexpr auto grpc_server_threads = 1u;

But on the receving side doing async_write, it is the client thread and the batch append anyway blocks there if the data written is not completed yet.


for (auto const& req : *(fetch_req->request()->entries())) {
RD_LOG(INFO, "Data Channel: FetchData received: lsn={}", req->lsn());

auto const& lsn = req->lsn();
auto const& term = req->raft_term();
auto const& dsn = req->dsn();
auto const& header = req->user_header();
auto const& key = req->user_key();
auto const& originator = req->blkid_originator();
auto const& remote_blkid = req->remote_blkid();

RD_LOG(DEBUG, "Data Channel: FetchData received: lsn={}", lsn);

// release this assert if in the future we want to fetch from non-originator;
RD_REL_ASSERT(originator == server_id(),
"Not expect to receive fetch data from remote when I am not the originator of this request");

// fetch data based on the remote_blkid
if (originator == server_id()) {
// We are the originator of the blkid, read data locally;
Expand All @@ -220,11 +220,6 @@ void RaftReplDev::on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_
}
ctx->cv.notify_one();
});
} else {
// TODO: if we are not the originator, we need to fetch based on lsn;
// To be implemented;
RD_LOG(INFO, "I am not the originaltor for the requested blks, originaltor: {}, server_id: {}.", originator,
server_id());
}
}

Expand Down Expand Up @@ -395,6 +390,16 @@ repl_req_ptr_t RaftReplDev::follower_create_req(repl_key const& rkey, sisl::blob
return rreq;
}

auto RaftReplDev::get_max_data_fetch_size() const {
#ifdef _PRERELEASE
if (iomgr_flip::instance()->test_flip("simulate_staging_fetch_data")) {
LOGINFO("Flip simulate_staging_fetch_data is enabled, return max_data_fetch_size: 16K");
return 4 * 4096ull;
}
#endif
return HS_DYNAMIC_CONFIG(consensus.data_fetch_max_size_mb) * 1024 * 1024ull;
}
yamingk marked this conversation as resolved.
Show resolved Hide resolved

void RaftReplDev::check_and_fetch_remote_data(std::vector< repl_req_ptr_t >* rreqs) {
// Pop any entries that are already completed - from the entries list as well as from map
rreqs->erase(std::remove_if(
Expand All @@ -416,25 +421,51 @@ void RaftReplDev::check_and_fetch_remote_data(std::vector< repl_req_ptr_t >* rre

if (rreqs->size()) {
// Some data not completed yet, let's fetch from remote;
fetch_data_from_remote(rreqs);
auto total_size_to_fetch = 0ul;
std::vector< repl_req_ptr_t > next_batch_rreqs;
const auto max_batch_size = get_max_data_fetch_size();
for (auto const& rreq : *rreqs) {
auto const& size = rreq->remote_blkid.blkid.blk_count() * get_blk_size();
if ((total_size_to_fetch + size) >= max_batch_size) {
fetch_data_from_remote(std::move(next_batch_rreqs));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: if the first rreq is already larger than max_batch_size we have a no-op fetch_data_from_remote though there seems a special handling in fetch_data_from_remote

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe

  for (auto const& rreq : *rreqs) {
            auto const& size = rreq->remote_blkid.blkid.blk_count() * get_blk_size();
            total_size_to_fetch += size;
            next_batch_rreqs.emplace_back(rreq);
            if ((total_size_to_fetch ) >= max_batch_size) {
                fetch_data_from_remote(std::move(next_batch_rreqs));
                ...
             }
   }

Copy link
Contributor Author

@yamingk yamingk Feb 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normally I tend to update size after certain operation has been completed, for the ease of error handling, etc.

But you have a good point here, if a single rreq (not necessarily the 1st one)'s remote_blkid addresses as large as its maximum blk count, which is 65535, it could be as large as 64MB in nuboject case (1K blk size) and could be 256MB for a 4KB blk size, and we have to handle this case. This will go away if we use GRPC streaming API which we probably also need for baseline resync. Will mark as TODO for now as this could become a moon point if we switch to streaming API.

Will come out a new test case (in next PR) with one rreq's remote_blkid addressing 65535 blks, which can be replication's limit test.

next_batch_rreqs.clear();
total_size_to_fetch = 0;
}

total_size_to_fetch += size;
next_batch_rreqs.emplace_back(rreq);
}

// check if there is any left over not processed;
if (next_batch_rreqs.size()) { fetch_data_from_remote(std::move(next_batch_rreqs)); }
}
}

void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t >* rreqs) {
void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: We were passing earlier as a vector pointer instead of vector by value is because, down the line we capture this vector in collectAllUnSafe().thenValue() and we do expensive multiple allocation. You might argue that this function is not in critical path and don't mind this additional capture. My only thinking is for any reason we use this function in future, may be it might be helpful, but its an nit optional comment.

Copy link
Contributor Author

@yamingk yamingk Feb 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can still do std::move on a vector to avoid allocation (line:440 also does it), right?

It is a vector now because of the staging handling, and we can't not do start_indx, end_index with a pointer to vector as it won't work for std::move for the whole vector.

if (rreqs.size() == 0) { return; }
yamingk marked this conversation as resolved.
Show resolved Hide resolved

std::vector<::flatbuffers::Offset< RequestEntry > > entries;
entries.reserve(rreqs->size());
entries.reserve(rreqs.size());

shared< flatbuffers::FlatBufferBuilder > builder = std::make_shared< flatbuffers::FlatBufferBuilder >();
RD_LOG(DEBUG, "Data Channel : FetchData from remote: rreq.size={}, my server_id={}", rreqs.size(), server_id());
auto const& originator = rreqs.front()->remote_blkid.server_id;

yamingk marked this conversation as resolved.
Show resolved Hide resolved
for (auto const& rreq : *rreqs) {
for (auto const& rreq : rreqs) {
entries.push_back(CreateRequestEntry(*builder, rreq->get_lsn(), rreq->term(), rreq->dsn(),
builder->CreateVector(rreq->header.cbytes(), rreq->header.size()),
builder->CreateVector(rreq->key.cbytes(), rreq->key.size()),
rreq->remote_blkid.server_id /* blkid_originator */,
builder->CreateVector(rreq->remote_blkid.blkid.serialize().cbytes(),
rreq->remote_blkid.blkid.serialized_size())));
LOGINFO("Fetching data from remote: rreq=[{}], remote_blkid={}", rreq->to_compact_string(),
rreq->remote_blkid.blkid.to_string());
// releax this assert if there is a case in same batch originator can be different (can't think of one now)
yamingk marked this conversation as resolved.
Show resolved Hide resolved
// but if there were to be such case, we need to group rreqs by originator and send them in separate
// batches;
RD_DBG_ASSERT(rreq->remote_blkid.server_id == originator, "Unexpected originator for rreq={}",
rreq->to_compact_string());

RD_LOG(TRACE, "Fetching data from originator={}, remote: rreq=[{}], remote_blkid={}, my server_id={}",
originator, rreq->to_compact_string(), rreq->remote_blkid.blkid.to_string(), server_id());
}

builder->FinishSizePrefixed(
Expand All @@ -444,17 +475,36 @@ void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t >* rreqs) {
// blkid;
group_msg_service()
->data_service_request_bidirectional(
nuraft_mesg::role_regex::LEADER, FETCH_DATA,
originator, FETCH_DATA,
sisl::io_blob_list_t{
sisl::io_blob{builder->GetBufferPointer(), builder->GetSize(), false /* is_aligned */}})
.via(&folly::InlineExecutor::instance())
.thenValue([this, builder, rreqs](auto e) {
RD_REL_ASSERT(!!e, "Error in fetching data");
if (!e) {
// if we are here, it means the original who sent the log entries are down.
// we need to handle error and when the other member becomes leader, it will resend the log entries;
RD_LOG(INFO,
"Not able to fetching data from originator={}, error={}, probably originator is down. Will "
"retry when new leader start appending log entries",
rreqs.front()->remote_blkid.server_id, e.error());
for (auto const& rreq : rreqs) {
handle_error(rreq, RaftReplService::to_repl_error(e.error()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if originator down, how we switch to current leader.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sanebay If originator is down, those requests are timed out. But later when a new leader is elected, that leader will send the new data and will create a new rreq and fetch data and commit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the originator is removed and new leader is elected and its sending resync, wont we keep timing out for each entry because we dont who the leader is. We wont be able to move forward. One option is to first for check for raft who is the leader and if not found, ask all replica's who the leader is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is explained at line:485. The new leader will re-send the append entries with originator set to this new leader. We just need to handle failure here if the originator goes down.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any case that we can error out here but not getting new append entries? e.g temporary network flips? or even a restart before raft timing out (so that leader doesnt change)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

responded in other place for this comment (looks like a duplicated one?)

}
return;
}

auto raw_data = e.value().response_blob().cbytes();
auto total_size = e.value().response_blob().size();

for (auto const& rreq : *rreqs) {
RD_DBG_ASSERT_GT(total_size, 0, "Empty response from remote");
RD_DBG_ASSERT(raw_data, "Empty response from remote");
yamingk marked this conversation as resolved.
Show resolved Hide resolved

RD_LOG(INFO, "Data Channel: FetchData completed for reques.size()={} ", rreqs.size());

thread_local std::vector< folly::Future< std::error_code > > futs; // static is impplied
futs.clear();

for (auto const& rreq : rreqs) {
auto const data_size = rreq->remote_blkid.blkid.blk_count() * get_blk_size();
// if data is already received, skip it because someone is already doing the write;
if (rreq->state.load() & uint32_cast(repl_req_state_t::DATA_RECEIVED)) {
Expand Down Expand Up @@ -508,25 +558,36 @@ void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t >* rreqs) {
}

// Schedule a write and upon completion, mark the data as written.
data_service()
.async_write(r_cast< const char* >(data), data_size, rreq->local_blkid)
.thenValue([this, rreq](auto&& err) {
RD_REL_ASSERT(!err,
"Error in writing data"); // TODO: Find a way to return error to the Listener
rreq->state.fetch_or(uint32_cast(repl_req_state_t::DATA_WRITTEN));
rreq->data_written_promise.setValue();
RD_LOG(INFO, "Data Channel: Data Write completed rreq=[{}]", rreq->to_compact_string());
});
futs.emplace_back(
data_service().async_write(r_cast< const char* >(data), data_size, rreq->local_blkid));

// move the raw_data pointer to next rreq's data;
raw_data += data_size;
total_size -= data_size;

LOGINFO(
"Data Channel: Data fetched from remote: rreq=[{}], data_size: {}, total_size: {}, local_blkid: {}",
rreq->to_compact_string(), data_size, total_size, rreq->local_blkid.to_string());
RD_LOG(INFO,
"Data Channel: Data fetched from remote: rreq=[{}], data_size: {}, total_size: {}, "
"local_blkid: {}",
rreq->to_compact_string(), data_size, total_size, rreq->local_blkid.to_string());
}

folly::collectAllUnsafe(futs).thenValue([this, rreqs, e = std::move(e)](auto&& vf) {
for (auto const& err_c : vf) {
if (sisl_unlikely(err_c.value())) {
auto ec = err_c.value();
RD_LOG(ERROR, "Error in writing data: {}", ec.value());
// TODO: actually will never arrive here as iomgr will assert (should not assert but
// to raise alert and leave the raft group);
}
}

for (auto const& rreq : rreqs) {
rreq->state.fetch_or(uint32_cast(repl_req_state_t::DATA_WRITTEN));
rreq->data_written_promise.setValue();
RD_LOG(TRACE, "Data Channel: Data Write completed rreq=[{}]", rreq->to_compact_string());
}
});

builder->Release();

RD_DBG_ASSERT_EQ(total_size, 0, "Total size mismatch, some data is not consumed");
Expand Down Expand Up @@ -564,17 +625,14 @@ AsyncNotify RaftReplDev::notify_after_data_written(std::vector< repl_req_ptr_t >
// if in resync mode, fetch data from remote immediately;
check_and_fetch_remote_data(rreqs);
} else {
check_and_fetch_remote_data(rreqs);
// some data are not in completed state, let's schedule a timer to check it again;
// we wait for data channel to fill in the data. Still if its not done we trigger a fetch from remote;
#if 0
m_wait_data_timer_hdl = iomanager.schedule_global_timer( // timer wakes up in current thread;
HS_DYNAMIC_CONFIG(consensus.wait_data_write_timer_sec) * 1000 * 1000 * 1000, false /* recurring */,
nullptr /* cookie */, iomgr::reactor_regex::all_worker, [this, rreqs](auto /*cookie*/) {
LOGINFO("Data Channel: Wait data write timer fired, checking if data is written");
RD_LOG(INFO, "Data Channel: Wait data write timer fired, checking if data is written");
yamingk marked this conversation as resolved.
Show resolved Hide resolved
check_and_fetch_remote_data(rreqs);
});
#endif
}

// block waiting here until all the futs are ready (data channel filled in and promises are made);
Expand Down
9 changes: 5 additions & 4 deletions src/lib/replication/repl_dev/raft_repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ class RaftReplDev : public ReplDev,

std::atomic< uint64_t > m_next_dsn{0}; // Data Sequence Number that will keep incrementing for each data entry
//
iomgr::timer_handle_t m_wait_data_timer_hdl{iomgr::null_timer_handle};
iomgr::timer_handle_t m_wait_data_timer_hdl{
iomgr::null_timer_handle}; // non-recurring timer doesn't need to be cancelled on shutdown;
bool m_resync_mode{false};

static std::atomic< uint64_t > s_next_group_ordinal;
Expand All @@ -81,7 +82,7 @@ class RaftReplDev : public ReplDev,
AsyncReplResult<> become_leader() override;
bool is_leader() const override;
const replica_id_t get_leader_id() const override;
std::vector<peer_info> get_replication_status() const override;
std::vector< peer_info > get_replication_status() const override;
group_id_t group_id() const override { return m_group_id; }
std::string group_id_str() const { return boost::uuids::to_string(m_group_id); }
std::string rdev_name() const { return m_rdev_name; }
Expand Down Expand Up @@ -124,8 +125,8 @@ class RaftReplDev : public ReplDev,
void on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_data);
void on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_data);
void check_and_fetch_remote_data(std::vector< repl_req_ptr_t >* rreqs);
void fetch_data_from_remote(std::vector< repl_req_ptr_t >* rreqs);

void fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs);
auto get_max_data_fetch_size() const;
bool is_resync_mode() { return m_resync_mode; }
void handle_error(repl_req_ptr_t const& rreq, ReplServiceError err);
};
Expand Down
4 changes: 2 additions & 2 deletions src/tests/test_common/homestore_test_common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ class HSTestHelper {

static void start_homestore(const std::string& test_name, std::map< uint32_t, test_params >&& svc_params,
hs_before_services_starting_cb_t cb = nullptr, bool fake_restart = false,
bool init_device = true) {
bool init_device = true, uint32_t shutdown_delay_sec = 5) {
auto const ndevices = SISL_OPTIONS["num_devs"].as< uint32_t >();
auto const dev_size = SISL_OPTIONS["dev_size_mb"].as< uint64_t >() * 1024 * 1024;
auto num_threads = SISL_OPTIONS["num_threads"].as< uint32_t >();
Expand All @@ -185,7 +185,7 @@ class HSTestHelper {
if (fake_restart) {
shutdown_homestore(false);
// sisl::GrpcAsyncClientWorker::shutdown_all();
std::this_thread::sleep_for(std::chrono::seconds{5});
std::this_thread::sleep_for(std::chrono::seconds{shutdown_delay_sec});
}

std::vector< homestore::dev_info > device_info;
Expand Down
6 changes: 3 additions & 3 deletions src/tests/test_common/hs_repl_test_common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,12 +186,12 @@ class HSReplTestHelper {
setup();
}

void restart() {
void restart(uint32_t shutdown_delay_secs = 5) {
test_common::HSTestHelper::start_homestore(
name_ + std::to_string(replica_num_),
{{HS_SERVICE::REPLICATION, {.repl_app = std::make_unique< TestReplApplication >(*this)}},
{HS_SERVICE::LOG, {}}},
nullptr, true /* restart */);
nullptr, true /* restart */, true /* init_device */, shutdown_delay_secs);
}

void restart_one_by_one() {
Expand Down Expand Up @@ -305,4 +305,4 @@ class HSReplTestHelper {

Runner io_runner_;
};
} // namespace test_common
} // namespace test_common
Loading
Loading