Skip to content

Commit

Permalink
Configurable max_msg_size
Browse files Browse the repository at this point in the history
Upper layer can pass in
    params.max_message_size_
to specified the max message size the GRPC server will accept.

UT with params.max_message_size_ set to 65MB and SEND_DATA up to 32MB.

SEND_DATA with 64MB is not stable as raft hb timeout can be easily
triggered.

Signed-off-by: Xiaoxi Chen <[email protected]>
  • Loading branch information
xiaoxichen committed Mar 28, 2024
1 parent 009f47c commit 7d2a646
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 10 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

class NuRaftMesgConan(ConanFile):
name = "nuraft_mesg"
version = "3.3.1"
version = "3.3.2"

homepage = "https://github.com/eBay/nuraft_mesg"
description = "A gRPC service for NuRAFT"
Expand Down
1 change: 1 addition & 0 deletions include/nuraft_mesg/nuraft_mesg.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class Manager {
std::string ssl_cert_;
std::shared_ptr< sisl::GrpcTokenVerifier > token_verifier_{nullptr};
std::shared_ptr< sisl::GrpcTokenClient > token_client_{nullptr};
uint32_t max_message_size_{0};
};
using group_params = nuraft::raft_params;
virtual ~Manager() = default;
Expand Down
2 changes: 1 addition & 1 deletion src/lib/manager_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ void ManagerImpl::restart_server() {
_grpc_server.reset();
_grpc_server = std::unique_ptr< sisl::GrpcServer >(sisl::GrpcServer::make(
listen_address, start_params_.token_verifier_, NURAFT_MESG_CONFIG(grpc_server_thread_cnt),
start_params_.ssl_key_, start_params_.ssl_cert_));
start_params_.ssl_key_, start_params_.ssl_cert_, start_params_.max_message_size_));
_mesg_service->associate(_grpc_server.get());

_grpc_server->run();
Expand Down
28 changes: 25 additions & 3 deletions src/tests/data_service_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ class DataServiceFixture : public MessagingFixtureBase {
void SetUp() override {
MessagingFixtureBase::SetUp();
start(true);
test_state_mgr::fill_data_vec(cli_buf);
test_state_mgr::fill_data_vec(cli_buf, 8);
}

void TearDown() override {
Expand Down Expand Up @@ -94,6 +94,7 @@ TEST_F(DataServiceFixture, BasicTest1) {
return folly::Unit();
}));


auto repl_ctx1 = sm1->get_repl_context();
for (auto svr : repl_ctx1->_server->get_config()->get_servers()) {
if (svr->get_endpoint() == to_string(app_1_->id_)) continue;
Expand All @@ -107,6 +108,27 @@ TEST_F(DataServiceFixture, BasicTest1) {

folly::collectAll(results).via(folly::getGlobalCPUExecutor()).get();

// test big message
LOGINFO("Starting large object write test")
io_blob_list_t big_cli_buf;
test_state_mgr::fill_data_vec_big(big_cli_buf, 2 * 1024 * 1024);
sm1->data_service_request_unidirectional(nuraft_mesg::role_regex::ALL, SEND_DATA, big_cli_buf)
.deferValue([](auto e) -> NullResult {
EXPECT_TRUE(e.hasValue());
return folly::Unit();
}).get();
LOGINFO("End large object write test")
LOGINFO("Starting large object read test")

sm4_1->data_service_request_bidirectional(nuraft_mesg::role_regex::LEADER, REQUEST_DATA, big_cli_buf)
.deferValue([](auto e) -> NullResult {
EXPECT_TRUE(e.hasValue());
test_state_mgr::verify_data(e.value().response_blob());
return folly::Unit();
}).get();
LOGINFO("End large object read test")


// add a new member to data_service_test_group and check if repl_ctx4 sends data to newly added member
auto add_3 = app_4->instance_->add_member(data_group, app_3_->id_);
std::this_thread::sleep_for(std::chrono::seconds(1));
Expand All @@ -119,9 +141,9 @@ TEST_F(DataServiceFixture, BasicTest1) {
.get();

// TODO REVIEW THIS
// test_group: 4 (1 SEND_DATA) + 5 (1 REQUEST_DATA) + 1 (SEND_DATA to a peer) = 10
// test_group: 4 (2 * 1 SEND_DATA) + 6 (1 REQUEST_DATA) + 1 (SEND_DATA to a peer) = 15
// data_service_test_group: 1 (1 REQUEST_DATA) + 4 (1 SEND_DATA) = 5
EXPECT_EQ(test_state_mgr::get_server_counter(), 15);
EXPECT_EQ(test_state_mgr::get_server_counter(), 20);
app_5->instance_->leave_group(data_group);
app_5->instance_->leave_group(group_id_);
app_4->instance_->leave_group(data_group);
Expand Down
1 change: 1 addition & 0 deletions src/tests/test_fixture.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public:
params.server_uuid_ = id_;
params.mesg_port_ = port_;
params.default_group_type_ = "test_type";
params.max_message_size_ = 65 * 1024 * 1024;
instance_ = init_messaging(params, weak_from_this(), data_svc_enabled);
auto r_params = nuraft::raft_params()
.with_election_timeout_lower(elect_to_low)
Expand Down
21 changes: 17 additions & 4 deletions src/tests/test_state_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,12 +215,25 @@ void test_state_mgr::verify_data(sisl::io_blob const& buf) {
}
}

void test_state_mgr::fill_data_vec(nuraft_mesg::io_blob_list_t& cli_buf) {
static int const data_size{8};
for (int i = 0; i < data_size; i++) {
void test_state_mgr::fill_data_vec_big(nuraft_mesg::io_blob_list_t& cli_buf, uint32_t size_bytes) {
auto cnt = size_bytes / sizeof(uint32_t);
sisl::io_blob data(size_bytes);
data_vec.clear();
uint32_t* const write_buf{reinterpret_cast< uint32_t* >(data.bytes())};
for (uint32_t i = 0; i < cnt; i++) {
data_vec.emplace_back(i);
write_buf[i] = data_vec.back();
}
cli_buf.emplace_back(data);
}

void test_state_mgr::fill_data_vec(nuraft_mesg::io_blob_list_t& cli_buf, uint32_t size_bytes) {
auto cnt = size_bytes / sizeof(uint32_t);
data_vec.clear();
for (uint32_t i = 0; i < cnt; i++) {
cli_buf.emplace_back(sizeof(uint32_t));
uint32_t* const write_buf{reinterpret_cast< uint32_t* >(cli_buf[i].bytes())};
data_vec.emplace_back(get_random_num());
data_vec.emplace_back(i);
*write_buf = data_vec.back();
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/tests/test_state_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ class test_state_mgr : public nuraft_mesg::mesg_state_mgr {
nuraft_mesg::io_blob_list_t const& cli_buf);

bool register_data_service_apis(nuraft_mesg::Manager* messaging);
static void fill_data_vec(nuraft_mesg::io_blob_list_t& cli_buf);
static void fill_data_vec(nuraft_mesg::io_blob_list_t& cli_buf, uint32_t size_bytes);
static void fill_data_vec_big(nuraft_mesg::io_blob_list_t& cli_buf, uint32_t size_bytes);
static uint16_t get_random_num();
static uint32_t get_server_counter();
static void verify_data(sisl::io_blob const& buf);
Expand Down

0 comments on commit 7d2a646

Please sign in to comment.