diff --git a/src/tests/data_service_tests.cpp b/src/tests/data_service_tests.cpp index ba8136e..21f0dd4 100644 --- a/src/tests/data_service_tests.cpp +++ b/src/tests/data_service_tests.cpp @@ -1,6 +1,9 @@ #include "test_fixture.ipp" #include + + + class DataServiceFixture : public MessagingFixtureBase { protected: void SetUp() override { @@ -21,6 +24,75 @@ class DataServiceFixture : public MessagingFixtureBase { std::string REQUEST_DATA{"request_data"}; }; +TEST_F(DataServiceFixture, LongRunning) { + auto sm1 = app_1_->state_mgr_map_[group_id_]; + auto sm2 = app_2_->state_mgr_map_[group_id_]; + auto sm3 = app_3_->state_mgr_map_[group_id_]; + auto repl_ctx = sm1->get_repl_context(); + + EXPECT_TRUE(repl_ctx && repl_ctx->is_raft_leader()); + EXPECT_TRUE(repl_ctx && repl_ctx->raft_leader_id() == to_string(app_1_->id_)); + auto peer_info = repl_ctx->get_raft_status(); + EXPECT_TRUE(peer_info.size() == 3); + for (auto const& peer : peer_info) { + std::cout << "Peer ID: " << peer.id_ << " Last Log Idx: " << peer.last_log_idx_ + << " Last Succ Resp Us: " << peer.last_succ_resp_us_ << std::endl; + EXPECT_TRUE(peer.id_ == to_string(app_1_->id_) || peer.id_ == to_string(app_2_->id_) || + peer.id_ == to_string(app_3_->id_)); + EXPECT_TRUE(peer.last_log_idx_ == 3); + if (peer.id_ == to_string(app_1_->id_)) { + EXPECT_TRUE(peer.last_succ_resp_us_ == 0); + } else { + EXPECT_TRUE(peer.last_succ_resp_us_ > 0); + } + } + + io_blob_list_t big_cli_buf; + test_state_mgr::fill_data_vec_big(big_cli_buf, 1024*1024); + auto num_iters = SISL_OPTIONS["num_iters"].as< uint32_t >(); + + LOGINFO("Starting long running test") + for (uint32_t i=0 ; i < num_iters; i++) { + auto size = 4 * 1024 * (i % 16 + 1); + LOGINFO("Iter {}, size {}", i, size) + auto ptr = big_cli_buf[0].bytes(); + sisl::io_blob iob(ptr, size, true); + io_blob_list_t buf; + buf.emplace_back(iob); + + std::vector< NullAsyncResult > results; + LOGINFO("Iter {}, SEND_DATA, size {}", i, size) + results.push_back(sm1->data_service_request_unidirectional(nuraft_mesg::role_regex::ALL, SEND_DATA, buf) + .deferValue([](auto e) -> NullResult { + EXPECT_TRUE(e.hasValue()); + return folly::Unit(); + })); + + LOGINFO("Iter {}, REQUEST_DATA from app_2, size {}", i, size) + results.push_back(sm2->data_service_request_bidirectional(nuraft_mesg::role_regex::LEADER, REQUEST_DATA, buf) + .deferValue([](auto e) -> NullResult { + EXPECT_TRUE(e.hasValue()); + test_state_mgr::verify_data(e.value().response_blob()); + return folly::Unit(); + })); + + LOGINFO("Iter {}, REQUEST_DATA from app_3, size {}", i, size) + results.push_back(sm3->data_service_request_bidirectional(nuraft_mesg::role_regex::LEADER, REQUEST_DATA, buf) + .deferValue([](auto e) -> NullResult { + EXPECT_TRUE(e.hasValue()); + test_state_mgr::verify_data(e.value().response_blob()); + return folly::Unit(); + })); + folly::collectAll(results).via(folly::getGlobalCPUExecutor()).get(); + } + + for (auto& buf : big_cli_buf) { + buf.buf_free(); + } + LOGINFO("End long running test") + +} + TEST_F(DataServiceFixture, BasicTest1) { get_random_ports(2u); // create new servers diff --git a/src/tests/test_fixture.ipp b/src/tests/test_fixture.ipp index 9e3b8c4..a4d3bd3 100644 --- a/src/tests/test_fixture.ipp +++ b/src/tests/test_fixture.ipp @@ -36,9 +36,12 @@ #include "test_state_manager.h" +SISL_OPTION_GROUP(data_service_test, (num_iters, "", "num_iters", "iterations in long running test", + ::cxxopts::value< uint32_t >()->default_value("100"), "number")); + SISL_LOGGING_INIT(NURAFTMESG_LOG_MODS) -SISL_OPTIONS_ENABLE(logging) +SISL_OPTIONS_ENABLE(logging, data_service_test) constexpr auto rpc_backoff = 50; constexpr auto heartbeat_period = 100; @@ -205,7 +208,7 @@ protected: int main(int argc, char* argv[]) { int parsed_argc = argc; ::testing::InitGoogleTest(&parsed_argc, argv); - SISL_OPTIONS_LOAD(parsed_argc, argv, logging); + SISL_OPTIONS_LOAD(parsed_argc, argv, logging, data_service_test); sisl::logging::SetLogger(std::string(argv[0])); spdlog::set_pattern("[%D %T.%e] [%n] [%^%l%$] [%t] %v"); parsed_argc = 1; diff --git a/src/tests/test_state_manager.cpp b/src/tests/test_state_manager.cpp index d0877d5..9800f2d 100644 --- a/src/tests/test_state_manager.cpp +++ b/src/tests/test_state_manager.cpp @@ -195,6 +195,7 @@ bool test_state_mgr::register_data_service_apis(nuraft_mesg::Manager* messaging) return messaging->bind_data_service_request( SEND_DATA, _group_id, [this](boost::intrusive_ptr< sisl::GenericRpcData >& rpc_data) { + LOGINFO("I am {}, SEND_DATA", _srv_addr); rpc_data->set_comp_cb([this](boost::intrusive_ptr< sisl::GenericRpcData >&) { server_counter++; }); verify_data(rpc_data->request_blob()); m_repl_svc_ctx->send_data_service_response(nuraft_mesg::io_blob_list_t{rpc_data->request_blob()}, @@ -202,6 +203,7 @@ bool test_state_mgr::register_data_service_apis(nuraft_mesg::Manager* messaging) }) && messaging->bind_data_service_request( REQUEST_DATA, _group_id, [this](boost::intrusive_ptr< sisl::GenericRpcData >& rpc_data) { + LOGINFO("I am {}, REQUEST_DATA", _srv_addr); rpc_data->set_comp_cb([this](boost::intrusive_ptr< sisl::GenericRpcData >&) { server_counter++; }); m_repl_svc_ctx->send_data_service_response(nuraft_mesg::io_blob_list_t{rpc_data->request_blob()}, rpc_data);