Skip to content

Commit

Permalink
Merge branch streaming (#552)
Browse files Browse the repository at this point in the history
Merge streaming branch
  • Loading branch information
greensky00 authored Nov 5, 2024
2 parents cda7dff + 2b93337 commit fced08b
Show file tree
Hide file tree
Showing 15 changed files with 1,674 additions and 214 deletions.
1 change: 1 addition & 0 deletions .github/workflows/cmake.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ on:
pull_request:
branches:
- master
- streaming

env:
# Customize the CMake build type here (Release, Debug, RelWithDebInfo, etc.)
Expand Down
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,8 @@ if (CODE_COVERAGE GREATER 0)
stat_mgr_test
logger_test
new_joiner_test
asio_service_stream_test
stream_functional_test
)

# lcov
Expand Down
8 changes: 8 additions & 0 deletions include/libnuraft/asio_service_options.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ struct asio_service_options {
, crc_on_entire_message_(false)
, crc_on_payload_(false)
, corrupted_msg_handler_(nullptr)
, streaming_mode_(false)
{}

/**
Expand Down Expand Up @@ -276,6 +277,13 @@ struct asio_service_options {
*/
std::function< void( std::shared_ptr<buffer>,
std::shared_ptr<buffer> ) > corrupted_msg_handler_;

/**
* If `true`, NuRaft will use streaming mode, which allows it to send
* subsequent requests without waiting for the response to previous requests.
* The order of responses will be identical to the order of requests.
*/
bool streaming_mode_;
};

}
Expand Down
49 changes: 48 additions & 1 deletion include/libnuraft/peer.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ limitations under the License.
#include "srv_config.hxx"

#include <atomic>
#include <cassert>

namespace nuraft {

Expand Down Expand Up @@ -77,6 +78,8 @@ public:
, lost_by_leader_(false)
, rsv_msg_(nullptr)
, rsv_msg_handler_(nullptr)
, last_streamed_log_idx_(0)
, bytes_in_flight_(0)
, l_(logger)
{
reset_ls_timer();
Expand Down Expand Up @@ -221,7 +224,8 @@ public:

void send_req(ptr<peer> myself,
ptr<req_msg>& req,
rpc_handler& handler);
rpc_handler& handler,
bool streaming = false);

void shutdown();

Expand Down Expand Up @@ -307,6 +311,37 @@ public:
ptr<req_msg> get_rsv_msg() const { return rsv_msg_; }
rpc_handler get_rsv_msg_handler() const { return rsv_msg_handler_; }

ulong get_last_streamed_log_idx() {
return last_streamed_log_idx_.load();
}

void set_last_streamed_log_idx(ulong expected, ulong idx) {
last_streamed_log_idx_.compare_exchange_strong(expected, idx);
}

void reset_stream() {
last_streamed_log_idx_.store(0);
}

int64_t get_bytes_in_flight() {
return bytes_in_flight_.load();
}

void bytes_in_flight_add(size_t req_size_bytes) {
bytes_in_flight_.fetch_add(req_size_bytes);
}

void bytes_in_flight_sub(size_t req_size_bytes) {
bytes_in_flight_.fetch_sub(req_size_bytes);
assert(bytes_in_flight_ >= 0);
}

void reset_bytes_in_flight() {
bytes_in_flight_.store(0);
}

void try_set_free(msg_type type, bool streaming);

bool is_lost() const { return lost_by_leader_; }
void set_lost() { lost_by_leader_ = true; }
void set_recovered() { lost_by_leader_ = false; }
Expand All @@ -316,6 +351,8 @@ private:
ptr<rpc_client> my_rpc_client,
ptr<req_msg>& req,
ptr<rpc_result>& pending_result,
bool streaming,
size_t req_size_bytes,
ptr<resp_msg>& resp,
ptr<rpc_exception>& err);

Expand Down Expand Up @@ -523,6 +560,16 @@ private:
*/
rpc_handler rsv_msg_handler_;

/**
* Last log index sent in stream mode.
*/
std::atomic<ulong> last_streamed_log_idx_;

/**
* Current bytes of in-flight append entry requests.
*/
std::atomic<int64_t> bytes_in_flight_;

/**
* Logger instance.
*/
Expand Down
18 changes: 18 additions & 0 deletions include/libnuraft/raft_params.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ struct raft_params {
, use_bg_thread_for_snapshot_io_(false)
, use_full_consensus_among_healthy_members_(false)
, parallel_log_appending_(false)
, max_log_gap_in_stream_(0)
, max_bytes_in_flight_in_stream_(0)
{}

/**
Expand Down Expand Up @@ -619,6 +621,22 @@ public:
* before returning the response.
*/
bool parallel_log_appending_;

/**
* If non-zero, streaming mode is enabled and `append_entries` requests are
* dispatched instantly without awaiting the response from the prior request.
*,
* The count of logs in-flight will be capped by this value, allowing it
* to function as a throttling mechanism, in conjunction with
* `max_bytes_in_flight_in_stream_`.
*/
int32 max_log_gap_in_stream_;

/**
* If non-zero, the volume of data in-flight will be restricted to this
* specified byte limit. This limitation is effective only in streaming mode.
*/
int64_t max_bytes_in_flight_in_stream_;
};

}
Expand Down
3 changes: 2 additions & 1 deletion include/libnuraft/raft_server.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -929,6 +929,7 @@ protected:
void request_vote(bool force_vote);
void request_append_entries();
bool request_append_entries(ptr<peer> p);
bool send_request(ptr<peer>& p, ptr<req_msg>& msg, rpc_handler& m_handler, bool streaming = false);
void handle_peer_resp(ptr<resp_msg>& resp, ptr<rpc_exception>& err);
void handle_append_entries_resp(resp_msg& resp);
void handle_install_snapshot_resp(resp_msg& resp);
Expand All @@ -946,7 +947,7 @@ protected:
void handle_join_leave_rpc_err(msg_type t_msg, ptr<peer> p);
void reset_srv_to_join();
void reset_srv_to_leave();
ptr<req_msg> create_append_entries_req(ptr<peer>& pp);
ptr<req_msg> create_append_entries_req(ptr<peer>& pp, ulong custom_last_log_idx = 0);
ptr<req_msg> create_sync_snapshot_req(ptr<peer>& pp,
ulong last_log_idx,
ulong term,
Expand Down
2 changes: 2 additions & 0 deletions scripts/test/runtests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@ set -e
./tests/new_joiner_test --abort-on-failure
./tests/failure_test --abort-on-failure
./tests/asio_service_test --abort-on-failure
./tests/asio_service_stream_test --abort-on-failure
./tests/stream_functional_test --abort-on-failure
Loading

0 comments on commit fced08b

Please sign in to comment.