diff --git a/conanfile.py b/conanfile.py index 6d45ef6..306d688 100644 --- a/conanfile.py +++ b/conanfile.py @@ -10,7 +10,7 @@ class NuRaftMesgConan(ConanFile): name = "nuraft_mesg" - version = "3.3.1" + version = "3.3.2" homepage = "https://github.com/eBay/nuraft_mesg" description = "A gRPC service for NuRAFT" diff --git a/include/nuraft_mesg/nuraft_mesg.hpp b/include/nuraft_mesg/nuraft_mesg.hpp index ad942dd..89c3a28 100644 --- a/include/nuraft_mesg/nuraft_mesg.hpp +++ b/include/nuraft_mesg/nuraft_mesg.hpp @@ -64,6 +64,7 @@ class Manager { std::string ssl_cert_; std::shared_ptr< sisl::GrpcTokenVerifier > token_verifier_{nullptr}; std::shared_ptr< sisl::GrpcTokenClient > token_client_{nullptr}; + uint32_t max_message_size_{0}; }; using group_params = nuraft::raft_params; virtual ~Manager() = default; diff --git a/src/lib/manager_impl.cpp b/src/lib/manager_impl.cpp index 8d5da07..d87843a 100644 --- a/src/lib/manager_impl.cpp +++ b/src/lib/manager_impl.cpp @@ -96,7 +96,7 @@ void ManagerImpl::restart_server() { _grpc_server.reset(); _grpc_server = std::unique_ptr< sisl::GrpcServer >(sisl::GrpcServer::make( listen_address, start_params_.token_verifier_, NURAFT_MESG_CONFIG(grpc_server_thread_cnt), - start_params_.ssl_key_, start_params_.ssl_cert_)); + start_params_.ssl_key_, start_params_.ssl_cert_, start_params_.max_message_size_)); _mesg_service->associate(_grpc_server.get()); _grpc_server->run(); diff --git a/src/tests/data_service_tests.cpp b/src/tests/data_service_tests.cpp index 44b0c3b..ff002ad 100644 --- a/src/tests/data_service_tests.cpp +++ b/src/tests/data_service_tests.cpp @@ -6,7 +6,7 @@ class DataServiceFixture : public MessagingFixtureBase { void SetUp() override { MessagingFixtureBase::SetUp(); start(true); - test_state_mgr::fill_data_vec(cli_buf); + test_state_mgr::fill_data_vec(cli_buf, 8); } void TearDown() override { @@ -94,6 +94,7 @@ TEST_F(DataServiceFixture, BasicTest1) { return folly::Unit(); })); + auto repl_ctx1 = sm1->get_repl_context(); for (auto svr : repl_ctx1->_server->get_config()->get_servers()) { if (svr->get_endpoint() == to_string(app_1_->id_)) continue; @@ -107,6 +108,27 @@ TEST_F(DataServiceFixture, BasicTest1) { folly::collectAll(results).via(folly::getGlobalCPUExecutor()).get(); + // test big message + LOGINFO("Starting large object write test") + io_blob_list_t big_cli_buf; + test_state_mgr::fill_data_vec_big(big_cli_buf, 32 * 1024 * 1024); + sm1->data_service_request_unidirectional(nuraft_mesg::role_regex::ALL, SEND_DATA, big_cli_buf) + .deferValue([](auto e) -> NullResult { + EXPECT_TRUE(e.hasValue()); + return folly::Unit(); + }).get(); + LOGINFO("End large object write test") + LOGINFO("Starting large object read test") + + sm4_1->data_service_request_bidirectional(nuraft_mesg::role_regex::LEADER, REQUEST_DATA, big_cli_buf) + .deferValue([](auto e) -> NullResult { + EXPECT_TRUE(e.hasValue()); + test_state_mgr::verify_data(e.value().response_blob()); + return folly::Unit(); + }).get(); + LOGINFO("End large object read test") + + // add a new member to data_service_test_group and check if repl_ctx4 sends data to newly added member auto add_3 = app_4->instance_->add_member(data_group, app_3_->id_); std::this_thread::sleep_for(std::chrono::seconds(1)); @@ -119,9 +141,9 @@ TEST_F(DataServiceFixture, BasicTest1) { .get(); // TODO REVIEW THIS - // test_group: 4 (1 SEND_DATA) + 5 (1 REQUEST_DATA) + 1 (SEND_DATA to a peer) = 10 + // test_group: 4 (2 * 1 SEND_DATA) + 6 (1 REQUEST_DATA) + 1 (SEND_DATA to a peer) = 15 // data_service_test_group: 1 (1 REQUEST_DATA) + 4 (1 SEND_DATA) = 5 - EXPECT_EQ(test_state_mgr::get_server_counter(), 15); + EXPECT_EQ(test_state_mgr::get_server_counter(), 20); app_5->instance_->leave_group(data_group); app_5->instance_->leave_group(group_id_); app_4->instance_->leave_group(data_group); diff --git a/src/tests/test_fixture.ipp b/src/tests/test_fixture.ipp index dc96654..9e3b8c4 100644 --- a/src/tests/test_fixture.ipp +++ b/src/tests/test_fixture.ipp @@ -88,6 +88,7 @@ public: params.server_uuid_ = id_; params.mesg_port_ = port_; params.default_group_type_ = "test_type"; + params.max_message_size_ = 65 * 1024 * 1024; instance_ = init_messaging(params, weak_from_this(), data_svc_enabled); auto r_params = nuraft::raft_params() .with_election_timeout_lower(elect_to_low) diff --git a/src/tests/test_state_manager.cpp b/src/tests/test_state_manager.cpp index fa44755..d0877d5 100644 --- a/src/tests/test_state_manager.cpp +++ b/src/tests/test_state_manager.cpp @@ -215,12 +215,25 @@ void test_state_mgr::verify_data(sisl::io_blob const& buf) { } } -void test_state_mgr::fill_data_vec(nuraft_mesg::io_blob_list_t& cli_buf) { - static int const data_size{8}; - for (int i = 0; i < data_size; i++) { +void test_state_mgr::fill_data_vec_big(nuraft_mesg::io_blob_list_t& cli_buf, uint32_t size_bytes) { + auto cnt = size_bytes / sizeof(uint32_t); + sisl::io_blob data(size_bytes); + data_vec.clear(); + uint32_t* const write_buf{reinterpret_cast< uint32_t* >(data.bytes())}; + for (uint32_t i = 0; i < cnt; i++) { + data_vec.emplace_back(i); + write_buf[i] = data_vec.back(); + } + cli_buf.emplace_back(data); +} + +void test_state_mgr::fill_data_vec(nuraft_mesg::io_blob_list_t& cli_buf, uint32_t size_bytes) { + auto cnt = size_bytes / sizeof(uint32_t); + data_vec.clear(); + for (uint32_t i = 0; i < cnt; i++) { cli_buf.emplace_back(sizeof(uint32_t)); uint32_t* const write_buf{reinterpret_cast< uint32_t* >(cli_buf[i].bytes())}; - data_vec.emplace_back(get_random_num()); + data_vec.emplace_back(i); *write_buf = data_vec.back(); } } diff --git a/src/tests/test_state_manager.h b/src/tests/test_state_manager.h index f967d78..48d142a 100644 --- a/src/tests/test_state_manager.h +++ b/src/tests/test_state_manager.h @@ -54,7 +54,8 @@ class test_state_mgr : public nuraft_mesg::mesg_state_mgr { nuraft_mesg::io_blob_list_t const& cli_buf); bool register_data_service_apis(nuraft_mesg::Manager* messaging); - static void fill_data_vec(nuraft_mesg::io_blob_list_t& cli_buf); + static void fill_data_vec(nuraft_mesg::io_blob_list_t& cli_buf, uint32_t size_bytes); + static void fill_data_vec_big(nuraft_mesg::io_blob_list_t& cli_buf, uint32_t size_bytes); static uint16_t get_random_num(); static uint32_t get_server_counter(); static void verify_data(sisl::io_blob const& buf);