diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 7035333..0bfa2f0 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -15,7 +15,7 @@ target_sources(${PROJECT_NAME} PRIVATE lib/mesg_client.cpp lib/service.cpp lib/data_service_grpc.cpp - lib/messaging.cpp + lib/manager_impl.cpp $ ) settings_gen_cpp($ ${CMAKE_CURRENT_BINARY_DIR}/generated/ ${PROJECT_NAME} lib/nuraft_mesg_config.fbs) diff --git a/src/lib/messaging.cpp b/src/lib/manager_impl.cpp similarity index 88% rename from src/lib/messaging.cpp rename to src/lib/manager_impl.cpp index b54e836..3110b2e 100644 --- a/src/lib/messaging.cpp +++ b/src/lib/manager_impl.cpp @@ -1,6 +1,6 @@ /// Copyright 2018 (c) eBay Corporation // -#include "messaging.hpp" +#include "manager_impl.hpp" #include @@ -83,14 +83,15 @@ class engine_factory : public group_factory { } }; -service::~service() { +ManagerImpl::~ManagerImpl() { if (_mesg_service) { _grpc_server->shutdown(); _mesg_service->shutdown(); } } -service::service(Manager::Params const& start_params, std::weak_ptr< MessagingApplication > app, bool and_data_svc) : +ManagerImpl::ManagerImpl(Manager::Params const& start_params, std::weak_ptr< MessagingApplication > app, + bool and_data_svc) : start_params_(start_params), _srv_id(to_server_id(start_params_.server_uuid_)), application_(app) { _g_factory = std::make_shared< engine_factory >(grpc_client_threads, start_params_, app); auto logger_name = fmt::format("nuraft_{}", start_params_.server_uuid_); @@ -124,7 +125,7 @@ service::service(Manager::Params const& start_params, std::weak_ptr< MessagingAp restart_server(); } -void service::restart_server() { +void ManagerImpl::restart_server() { auto listen_address = fmt::format(FMT_STRING("0.0.0.0:{}"), start_params_.mesg_port_); LOGINFO("Starting Messaging Service on http://{}", listen_address); @@ -139,7 +140,7 @@ void service::restart_server() { _mesg_service->bind(_grpc_server.get()); } -void service::register_mgr_type(std::string const& group_type, group_params const& params) { +void ManagerImpl::register_mgr_type(std::string const& group_type, group_params const& params) { std::lock_guard< std::mutex > lg(_manager_lock); auto [it, happened] = _state_mgr_types.emplace(std::make_pair(group_type, params)); DEBUG_ASSERT(_state_mgr_types.end() != it, "Out of memory?"); @@ -147,8 +148,8 @@ void service::register_mgr_type(std::string const& group_type, group_params cons if (_state_mgr_types.end() == it) { LOGERROR("Could not register group type: {}", group_type); } } -nuraft::cb_func::ReturnCode service::callback_handler(std::string const& group_id, nuraft::cb_func::Type type, - nuraft::cb_func::Param* param) { +nuraft::cb_func::ReturnCode ManagerImpl::callback_handler(std::string const& group_id, nuraft::cb_func::Type type, + nuraft::cb_func::Param* param) { switch (type) { case nuraft::cb_func::RemovedFromCluster: { LOGINFO("Removed from cluster {}", group_id); @@ -188,7 +189,7 @@ nuraft::cb_func::ReturnCode service::callback_handler(std::string const& group_i return nuraft::cb_func::Ok; } -void service::exit_group(std::string const& group_id) { +void ManagerImpl::exit_group(std::string const& group_id) { std::shared_ptr< mesg_state_mgr > mgr; { std::lock_guard< std::mutex > lg(_manager_lock); @@ -197,9 +198,9 @@ void service::exit_group(std::string const& group_id) { if (mgr) mgr->leave(); } -std::error_condition service::group_init(int32_t const srv_id, std::string const& group_id, - std::string const& group_type, nuraft::context*& ctx, - std::shared_ptr< nuraft_mesg::group_metrics > metrics) { +std::error_condition ManagerImpl::group_init(int32_t const srv_id, std::string const& group_id, + std::string const& group_type, nuraft::context*& ctx, + std::shared_ptr< nuraft_mesg::group_metrics > metrics) { LOGDEBUGMOD(nuraft_mesg, "Creating context for Group: {} as Member: {}", group_id, srv_id); // State manager (RAFT log store, config) @@ -245,7 +246,7 @@ std::error_condition service::group_init(int32_t const srv_id, std::string const return std::error_condition(); } -NullAsyncResult service::add_member(std::string const& group_id, std::string const& new_id) { +NullAsyncResult ManagerImpl::add_member(std::string const& group_id, std::string const& new_id) { return _mesg_service->add_srv(group_id, nuraft::srv_config(to_server_id(new_id), new_id)) .deferValue([this, g_id = group_id, n_id = new_id](auto cmd_result) mutable -> NullResult { auto result = cmd_result.value(); @@ -267,7 +268,7 @@ NullAsyncResult service::add_member(std::string const& group_id, std::string con }); } -NullAsyncResult service::rem_member(std::string const& group_id, std::string const& old_id) { +NullAsyncResult ManagerImpl::rem_member(std::string const& group_id, std::string const& old_id) { return _mesg_service->rm_srv(group_id, to_server_id(old_id)) .deferValue([this, group_id](auto cmd_result) mutable -> NullResult { auto result = cmd_result.value(); @@ -282,13 +283,13 @@ NullAsyncResult service::rem_member(std::string const& group_id, std::string con }); } -std::shared_ptr< mesg_state_mgr > service::lookup_state_manager(std::string const& group_id) const { +std::shared_ptr< mesg_state_mgr > ManagerImpl::lookup_state_manager(std::string const& group_id) const { std::lock_guard< std::mutex > lg(_manager_lock); if (auto it = _state_managers.find(group_id); _state_managers.end() != it) return it->second; return nullptr; } -NullAsyncResult service::create_group(std::string const& group_id, std::string const& group_type_name) { +NullAsyncResult ManagerImpl::create_group(std::string const& group_id, std::string const& group_type_name) { { std::lock_guard< std::mutex > lg(_manager_lock); _is_leader.insert(std::make_pair(group_id, false)); @@ -309,8 +310,8 @@ NullAsyncResult service::create_group(std::string const& group_id, std::string c }); } -NullResult service::join_group(std::string const& group_id, std::string const& group_type, - std::shared_ptr< mesg_state_mgr > smgr) { +NullResult ManagerImpl::join_group(std::string const& group_id, std::string const& group_type, + std::shared_ptr< mesg_state_mgr > smgr) { { std::lock_guard< std::mutex > lg(_manager_lock); auto [it, happened] = _state_managers.emplace(group_id, smgr); @@ -326,7 +327,7 @@ NullResult service::join_group(std::string const& group_id, std::string const& g return folly::Unit(); } -void service::append_peers(std::string const& group_id, std::list< std::string >& servers) const { +void ManagerImpl::append_peers(std::string const& group_id, std::list< std::string >& servers) const { std::lock_guard< std::mutex > lg(_manager_lock); if (auto it = _state_managers.find(group_id); _state_managers.end() != it) { if (auto config = it->second->load_config(); config) { @@ -337,7 +338,7 @@ void service::append_peers(std::string const& group_id, std::list< std::string > } } -NullAsyncResult service::become_leader(std::string const& group_id) { +NullAsyncResult ManagerImpl::become_leader(std::string const& group_id) { { auto lk = std::unique_lock< std::mutex >(_manager_lock); if (_is_leader[group_id]) { return folly::Unit(); } @@ -363,7 +364,7 @@ NullAsyncResult service::become_leader(std::string const& group_id) { }); } -void service::leave_group(std::string const& group_id) { +void ManagerImpl::leave_group(std::string const& group_id) { LOGINFO("Leaving group [vol={}]", group_id); { std::lock_guard< std::mutex > lg(_manager_lock); @@ -385,26 +386,26 @@ void service::leave_group(std::string const& group_id) { LOGINFO("Finished leaving: [vol={}]", group_id); } -NullAsyncResult service::client_request(std::string const& group_id, std::shared_ptr< nuraft::buffer >& buf) { +NullAsyncResult ManagerImpl::client_request(std::string const& group_id, std::shared_ptr< nuraft::buffer >& buf) { return _mesg_service->append_entries(group_id, {buf}).deferValue([](auto cmd_result) -> NullResult { auto result = cmd_result.value(); if (nuraft::OK != result) return folly::makeUnexpected(convertToError(result)); return folly::Unit(); }); } -uint32_t service::logstore_id(std::string const& group_id) const { +uint32_t ManagerImpl::logstore_id(std::string const& group_id) const { std::lock_guard< std::mutex > lg(_manager_lock); if (auto it = _state_managers.find(group_id); _state_managers.end() != it) { return it->second->get_logstore_id(); } return UINT32_MAX; } -void service::get_srv_config_all(std::string const& group_name, - std::vector< std::shared_ptr< nuraft::srv_config > >& configs_out) { +void ManagerImpl::get_srv_config_all(std::string const& group_name, + std::vector< std::shared_ptr< nuraft::srv_config > >& configs_out) { _mesg_service->get_srv_config_all(group_name, configs_out); } -bool service::bind_data_service_request(std::string const& request_name, std::string const& group_id, - data_service_request_handler_t const& request_handler) { +bool ManagerImpl::bind_data_service_request(std::string const& request_name, std::string const& group_id, + data_service_request_handler_t const& request_handler) { return _mesg_service->bind_data_service_request(request_name, group_id, request_handler); } @@ -435,7 +436,7 @@ void mesg_state_mgr::make_repl_ctx(grpc_server* server, std::shared_ptr< mesg_fa std::shared_ptr< Manager > init_messaging(Manager::Params const& p, std::weak_ptr< MessagingApplication > w, bool with_data_svc) { RELEASE_ASSERT(w.lock(), "Could not acquire application!"); - return std::make_shared< service >(p, w, with_data_svc); + return std::make_shared< ManagerImpl >(p, w, with_data_svc); } } // namespace nuraft_mesg diff --git a/src/lib/messaging.hpp b/src/lib/manager_impl.hpp similarity index 96% rename from src/lib/messaging.hpp rename to src/lib/manager_impl.hpp index 8637ab7..9272996 100644 --- a/src/lib/messaging.hpp +++ b/src/lib/manager_impl.hpp @@ -35,7 +35,7 @@ class group_factory; class msg_service; class group_metrics; -class service : public Manager { +class ManagerImpl : public Manager { Manager::Params start_params_; int32_t _srv_id; @@ -62,8 +62,8 @@ class service : public Manager { void exit_group(std::string const& group_id); public: - service(Manager::Params const&, std::weak_ptr< MessagingApplication >, bool and_data_svc = false); - ~service() override; + ManagerImpl(Manager::Params const&, std::weak_ptr< MessagingApplication >, bool and_data_svc = false); + ~ManagerImpl() override; int32_t server_id() const override { return _srv_id; } diff --git a/src/lib/service.hpp b/src/lib/service.hpp index 3e6d3cd..5df730b 100644 --- a/src/lib/service.hpp +++ b/src/lib/service.hpp @@ -12,7 +12,7 @@ #include #include "proto/messaging_service.grpc.pb.h" -#include "messaging.hpp" +#include "manager_impl.hpp" #include "data_service_grpc.hpp" namespace nuraft_mesg { diff --git a/src/tests/MessagingTest.cpp b/src/tests/MessagingTest.cpp index ec74dc5..7ca3092 100644 --- a/src/tests/MessagingTest.cpp +++ b/src/tests/MessagingTest.cpp @@ -270,103 +270,103 @@ TEST_F(MessagingFixture, SyncAddMember) { EXPECT_EQ(srv_list.size(), 4u); } -// class DataServiceFixture : public MessagingFixtureBase { -// protected: -// void SetUp() override { -// MessagingFixtureBase::SetUp(); -// start(true); -// } -// }; -// -// TEST_F(DataServiceFixture, DataServiceBasic) { -// get_random_ports(2u); -// // create new servers -// auto app_4 = std::make_shared< TestApplication >("sm4", ports[3]); -// lookup_map.emplace(app_4->id_, fmt::format("127.0.0.1:{}", ports[3])); -// app_1_->map_peers(lookup_map); -// app_2_->map_peers(lookup_map); -// app_3_->map_peers(lookup_map); -// app_4->map_peers(lookup_map); -// app_4->start(true); -// auto add4 = app_1_->instance_->add_member("test_group", app_4->id_); -// std::this_thread::sleep_for(std::chrono::seconds(1)); -// EXPECT_TRUE(std::move(add4).get()); -// -// auto app_5 = std::make_shared< TestApplication >("sm5", ports[4]); -// lookup_map.emplace(app_5->id_, fmt::format("127.0.0.1:{}", ports[4])); -// app_1_->map_peers(lookup_map); -// app_2_->map_peers(lookup_map); -// app_3_->map_peers(lookup_map); -// app_4->map_peers(lookup_map); -// app_5->map_peers(lookup_map); -// app_5->start(true); -// auto add5 = app_1_->instance_->add_member("test_group", app_5->id_); -// std::this_thread::sleep_for(std::chrono::seconds(1)); -// EXPECT_TRUE(std::move(add5).get()); -// -// // create new group -// app_4->instance_->create_group("data_service_test_group", "test_type"); -// std::this_thread::sleep_for(std::chrono::seconds(1)); -// auto add1 = app_4->instance_->add_member("data_service_test_group", app_1_->id_); -// std::this_thread::sleep_for(std::chrono::seconds(1)); -// EXPECT_TRUE(std::move(add1).get()); -// auto add2 = app_4->instance_->add_member("data_service_test_group", app_2_->id_); -// std::this_thread::sleep_for(std::chrono::seconds(1)); -// EXPECT_TRUE(std::move(add2).get()); -// add5 = app_4->instance_->add_member("data_service_test_group", app_5->id_); -// std::this_thread::sleep_for(std::chrono::seconds(1)); -// EXPECT_TRUE(std::move(add5).get()); -// -// for (auto& [key, smgr] : state_mgr_map) { -// smgr.first->register_data_service_apis(smgr.second); -// } -// -// io_blob_list_t cli_buf; -// test_state_mgr::fill_data_vec(cli_buf); -// -// auto sm1 = state_mgr_map["test_group_sm1"].first; -// auto sm4 = state_mgr_map["data_service_test_group_sm4"].first; -// -// std::string const SEND_DATA{"send_data"}; -// std::string const REQUEST_DATA{"request_data"}; -// -// std::vector< NullAsyncResult > results; -// results.push_back(sm1->data_service_request(SEND_DATA, cli_buf).deferValue([](auto e) -> NullResult { -// test_state_mgr::verify_data(e.value()); -// return folly::Unit(); -// })); -// results.push_back(sm4->data_service_request(SEND_DATA, cli_buf).deferValue([](auto e) -> NullResult { -// test_state_mgr::verify_data(e.value()); -// return folly::Unit(); -// })); -// -// results.push_back(sm1->data_service_request(REQUEST_DATA, cli_buf).deferValue([](auto e) -> NullResult { -// test_state_mgr::verify_data(e.value()); -// return folly::Unit(); -// })); -// folly::collectAll(results).via(folly::getGlobalCPUExecutor()).get(); -// -// // add a new member to data_service_test_group and check if repl_ctx4 sends data to newly added member -// auto add_3 = app_4->instance_->add_member("data_service_test_group", app_3_->id_); -// std::this_thread::sleep_for(std::chrono::seconds(1)); -// EXPECT_TRUE(std::move(add_3).get()); -// auto sm3 = state_mgr_map["data_service_test_group_sm3"].first; -// sm3->register_data_service_apis(app_3_->instance_.get()); -// sm4->data_service_request(SEND_DATA, cli_buf) -// .deferValue([](auto e) -> folly::Unit { -// test_state_mgr::verify_data(e.value()); -// return folly::Unit(); -// }) -// .get(); -// -// // the count is 4 (2 methods from group test_group) + 7 (from data_service_test_group) -// EXPECT_EQ(test_state_mgr::get_server_counter(), 11); -// -// // free client buf -// for (auto& buf : cli_buf) { -// buf.buf_free(); -// } -// } +class DataServiceFixture : public MessagingFixtureBase { +protected: + void SetUp() override { + MessagingFixtureBase::SetUp(); + start(true); + } +}; + +TEST_F(DataServiceFixture, DataServiceBasic) { + get_random_ports(2u); + // create new servers + auto app_4 = std::make_shared< TestApplication >("sm4", ports[3]); + lookup_map.emplace(app_4->id_, fmt::format("127.0.0.1:{}", ports[3])); + app_1_->map_peers(lookup_map); + app_2_->map_peers(lookup_map); + app_3_->map_peers(lookup_map); + app_4->map_peers(lookup_map); + app_4->start(true); + auto add4 = app_1_->instance_->add_member("test_group", app_4->id_); + std::this_thread::sleep_for(std::chrono::seconds(1)); + EXPECT_TRUE(std::move(add4).get()); + + auto app_5 = std::make_shared< TestApplication >("sm5", ports[4]); + lookup_map.emplace(app_5->id_, fmt::format("127.0.0.1:{}", ports[4])); + app_1_->map_peers(lookup_map); + app_2_->map_peers(lookup_map); + app_3_->map_peers(lookup_map); + app_4->map_peers(lookup_map); + app_5->map_peers(lookup_map); + app_5->start(true); + auto add5 = app_1_->instance_->add_member("test_group", app_5->id_); + std::this_thread::sleep_for(std::chrono::seconds(1)); + EXPECT_TRUE(std::move(add5).get()); + + // create new group + app_4->instance_->create_group("data_service_test_group", "test_type"); + std::this_thread::sleep_for(std::chrono::seconds(1)); + auto add1 = app_4->instance_->add_member("data_service_test_group", app_1_->id_); + std::this_thread::sleep_for(std::chrono::seconds(1)); + EXPECT_TRUE(std::move(add1).get()); + auto add2 = app_4->instance_->add_member("data_service_test_group", app_2_->id_); + std::this_thread::sleep_for(std::chrono::seconds(1)); + EXPECT_TRUE(std::move(add2).get()); + add5 = app_4->instance_->add_member("data_service_test_group", app_5->id_); + std::this_thread::sleep_for(std::chrono::seconds(1)); + EXPECT_TRUE(std::move(add5).get()); + + for (auto& [key, smgr] : state_mgr_map) { + smgr.first->register_data_service_apis(smgr.second); + } + + io_blob_list_t cli_buf; + test_state_mgr::fill_data_vec(cli_buf); + + auto sm1 = state_mgr_map["test_group_sm1"].first; + auto sm4 = state_mgr_map["data_service_test_group_sm4"].first; + + std::string const SEND_DATA{"send_data"}; + std::string const REQUEST_DATA{"request_data"}; + + std::vector< NullAsyncResult > results; + results.push_back(sm1->data_service_request(SEND_DATA, cli_buf).deferValue([](auto e) -> NullResult { + test_state_mgr::verify_data(e.value()); + return folly::Unit(); + })); + results.push_back(sm4->data_service_request(SEND_DATA, cli_buf).deferValue([](auto e) -> NullResult { + test_state_mgr::verify_data(e.value()); + return folly::Unit(); + })); + + results.push_back(sm1->data_service_request(REQUEST_DATA, cli_buf).deferValue([](auto e) -> NullResult { + test_state_mgr::verify_data(e.value()); + return folly::Unit(); + })); + folly::collectAll(results).via(folly::getGlobalCPUExecutor()).get(); + + // add a new member to data_service_test_group and check if repl_ctx4 sends data to newly added member + auto add_3 = app_4->instance_->add_member("data_service_test_group", app_3_->id_); + std::this_thread::sleep_for(std::chrono::seconds(1)); + EXPECT_TRUE(std::move(add_3).get()); + auto sm3 = state_mgr_map["data_service_test_group_sm3"].first; + sm3->register_data_service_apis(app_3_->instance_.get()); + sm4->data_service_request(SEND_DATA, cli_buf) + .deferValue([](auto e) -> folly::Unit { + test_state_mgr::verify_data(e.value()); + return folly::Unit(); + }) + .get(); + + // the count is 4 (2 methods from group test_group) + 7 (from data_service_test_group) + EXPECT_EQ(test_state_mgr::get_server_counter(), 11); + + // free client buf + for (auto& buf : cli_buf) { + buf.buf_free(); + } +} int main(int argc, char* argv[]) { ::testing::InitGoogleTest(&argc, argv);