diff --git a/src/include/nuraft_mesg/grpc_factory.hpp b/src/include/nuraft_mesg/grpc_factory.hpp index 6643cf8..5e1f837 100644 --- a/src/include/nuraft_mesg/grpc_factory.hpp +++ b/src/include/nuraft_mesg/grpc_factory.hpp @@ -18,6 +18,7 @@ // inherited rpc_client instances sharing a common worker pool. #pragma once +#include #include #include @@ -45,9 +46,9 @@ class grpc_factory : public nuraft::rpc_client_factory, public std::enable_share nuraft::ptr< nuraft::rpc_client > create_client(const std::string& client) override; nuraft::ptr< nuraft::rpc_client > create_client(peer_id_t const& client); - virtual std::error_condition create_client(peer_id_t const& client, nuraft::ptr< nuraft::rpc_client >&) = 0; + virtual nuraft::cmd_result_code create_client(peer_id_t const& client, nuraft::ptr< nuraft::rpc_client >&) = 0; - virtual std::error_condition reinit_client(peer_id_t const& client, nuraft::ptr< nuraft::rpc_client >&) = 0; + virtual nuraft::cmd_result_code reinit_client(peer_id_t const& client, nuraft::ptr< nuraft::rpc_client >&) = 0; // Construct and send an AddServer message to the cluster NullAsyncResult add_server(uint32_t const srv_id, peer_id_t const& srv_addr, nuraft::srv_config const& dest_cfg); diff --git a/src/include/nuraft_mesg/mesg_factory.hpp b/src/include/nuraft_mesg/mesg_factory.hpp index c5a6f90..7bb3839 100644 --- a/src/include/nuraft_mesg/mesg_factory.hpp +++ b/src/include/nuraft_mesg/mesg_factory.hpp @@ -14,7 +14,7 @@ *********************************************************************************/ #pragma once -#include +#include #include #include #include @@ -41,9 +41,9 @@ class group_factory : public grpc_factory { std::shared_ptr< sisl::GrpcTokenClient > const token_client, std::string const& ssl_cert = ""); using grpc_factory::create_client; - std::error_condition create_client(peer_id_t const& client, nuraft::ptr< nuraft::rpc_client >&) override; - std::error_condition reinit_client(peer_id_t const& client, - std::shared_ptr< nuraft::rpc_client >& raft_client) override; + nuraft::cmd_result_code create_client(peer_id_t const& client, nuraft::ptr< nuraft::rpc_client >&) override; + nuraft::cmd_result_code reinit_client(peer_id_t const& client, + std::shared_ptr< nuraft::rpc_client >& raft_client) override; virtual std::string lookupEndpoint(peer_id_t const& client) = 0; }; @@ -65,10 +65,10 @@ class mesg_factory final : public grpc_factory { group_id_t group_name() const { return _group_name; } - std::error_condition create_client(peer_id_t const& client, nuraft::ptr< nuraft::rpc_client >& rpc_ptr) override; + nuraft::cmd_result_code create_client(peer_id_t const& client, nuraft::ptr< nuraft::rpc_client >& rpc_ptr) override; - std::error_condition reinit_client(peer_id_t const& client, - std::shared_ptr< nuraft::rpc_client >& raft_client) override; + nuraft::cmd_result_code reinit_client(peer_id_t const& client, + std::shared_ptr< nuraft::rpc_client >& raft_client) override; AsyncResult< sisl::io_blob > data_service_request(std::string const& request_name, io_blob_list_t const& cli_buf); }; diff --git a/src/include/nuraft_mesg/nuraft_mesg.hpp b/src/include/nuraft_mesg/nuraft_mesg.hpp index 9d44e97..19becab 100644 --- a/src/include/nuraft_mesg/nuraft_mesg.hpp +++ b/src/include/nuraft_mesg/nuraft_mesg.hpp @@ -17,7 +17,6 @@ #include #include #include -#include #include #include diff --git a/src/lib/grpc_factory.cpp b/src/lib/grpc_factory.cpp index fc3e126..fd0eb69 100644 --- a/src/lib/grpc_factory.cpp +++ b/src/lib/grpc_factory.cpp @@ -156,16 +156,16 @@ nuraft::ptr< nuraft::rpc_client > grpc_factory::create_client(peer_id_t const& c if (_clients.end() != it) { if (!happened) { LOGDEBUGMOD(nuraft_mesg, "Re-creating client for {}", client); - if (auto err = reinit_client(client, it->second); err) { - LOGERROR("Failed to re-initialize client {}: {}", client, err.message()); + if (auto err = reinit_client(client, it->second); nuraft::OK != err) { + LOGERROR("Failed to re-initialize client {}: {}", client, err); new_client = std::make_shared< grpc_error_client >(); } else { new_client = it->second; } } else { LOGDEBUGMOD(nuraft_mesg, "Creating client for {}", client); - if (auto err = create_client(client, it->second); err) { - LOGERROR("Failed to create client for {}: {}", client, err.message()); + if (auto err = create_client(client, it->second); nuraft::OK != err) { + LOGERROR("Failed to create client for {}: {}", client, err); new_client = std::make_shared< grpc_error_client >(); } else { new_client = it->second; diff --git a/src/lib/manager_impl.cpp b/src/lib/manager_impl.cpp index 14909f1..9ea5736 100644 --- a/src/lib/manager_impl.cpp +++ b/src/lib/manager_impl.cpp @@ -13,7 +13,6 @@ #include #include #include -#include #include "service.hpp" #include "nuraft_mesg/mesg_factory.hpp" @@ -28,37 +27,6 @@ constexpr auto grpc_server_threads = 1u; namespace nuraft_mesg { -static std::error_condition convertToError(nuraft::cmd_result_code rc) { - switch (rc) { - case nuraft::OK: - return std::error_condition(); - case nuraft::CANCELLED: - return std::make_error_condition(std::errc::operation_canceled); - case nuraft::TIMEOUT: - return std::make_error_condition(std::errc::timed_out); - case nuraft::NOT_LEADER: - return std::make_error_condition(std::errc::permission_denied); - case nuraft::BAD_REQUEST: - return std::make_error_condition(std::errc::invalid_argument); - case nuraft::SERVER_ALREADY_EXISTS: - return std::make_error_condition(std::errc::file_exists); - case nuraft::CONFIG_CHANGING: - return std::make_error_condition(std::errc::interrupted); - case nuraft::SERVER_IS_JOINING: - return std::make_error_condition(std::errc::device_or_resource_busy); - case nuraft::SERVER_NOT_FOUND: - return std::make_error_condition(std::errc::no_such_device); - case nuraft::CANNOT_REMOVE_LEADER: - return std::make_error_condition(std::errc::not_supported); - case nuraft::SERVER_IS_LEAVING: - return std::make_error_condition(std::errc::owner_dead); - case nuraft::FAILED: - [[fallthrough]]; - default: - return std::make_error_condition(std::errc::io_error); - } -} - int32_t to_server_id(peer_id_t const& server_addr) { boost::hash< boost::uuids::uuid > uuid_hasher; return uuid_hasher(server_addr) >> 33; diff --git a/src/lib/manager_impl.hpp b/src/lib/manager_impl.hpp index 8ad917a..4c721de 100644 --- a/src/lib/manager_impl.hpp +++ b/src/lib/manager_impl.hpp @@ -15,13 +15,11 @@ #pragma once #include -#include #include #include #include #include #include -#include #include diff --git a/src/lib/mesg_client.cpp b/src/lib/mesg_client.cpp index bc7e833..e14487a 100644 --- a/src/lib/mesg_client.cpp +++ b/src/lib/mesg_client.cpp @@ -134,24 +134,24 @@ class group_client : public grpc_base_client { } }; -std::error_condition mesg_factory::create_client(peer_id_t const& client, - nuraft::ptr< nuraft::rpc_client >& raft_client) { +nuraft::cmd_result_code mesg_factory::create_client(peer_id_t const& client, + nuraft::ptr< nuraft::rpc_client >& raft_client) { // Re-direct this call to a global factory so we can re-use clients to the same endpoints LOGDEBUGMOD(nuraft_mesg, "Creating client to {}", client); auto m_client = std::dynamic_pointer_cast< messaging_client >(_group_factory->create_client(to_string(client))); - if (!m_client) { return std::make_error_condition(std::errc::connection_aborted); } + if (!m_client) return nuraft::CANCELLED; raft_client = std::make_shared< group_client >(m_client, client, _group_name, _group_type, _metrics); - return (!raft_client) ? std::make_error_condition(std::errc::invalid_argument) : std::error_condition(); + return (!raft_client) ? nuraft::BAD_REQUEST : nuraft::OK; } -std::error_condition mesg_factory::reinit_client(peer_id_t const& client, - std::shared_ptr< nuraft::rpc_client >& raft_client) { +nuraft::cmd_result_code mesg_factory::reinit_client(peer_id_t const& client, + std::shared_ptr< nuraft::rpc_client >& raft_client) { LOGDEBUGMOD(nuraft_mesg, "Re-init client to {}", client); auto g_client = std::dynamic_pointer_cast< group_client >(raft_client); auto new_raft_client = std::static_pointer_cast< nuraft::rpc_client >(g_client->realClient()); if (auto err = _group_factory->reinit_client(client, new_raft_client); err) { return err; } g_client->setClient(std::dynamic_pointer_cast< messaging_client >(new_raft_client)); - return std::error_condition(); + return nuraft::OK; } AsyncResult< sisl::io_blob > mesg_factory::data_service_request(std::string const& request_name, @@ -171,27 +171,27 @@ group_factory::group_factory(int const cli_thread_count, group_id_t const& name, m_ssl_cert = ssl_cert; } -std::error_condition group_factory::create_client(peer_id_t const& client, - nuraft::ptr< nuraft::rpc_client >& raft_client) { +nuraft::cmd_result_code group_factory::create_client(peer_id_t const& client, + nuraft::ptr< nuraft::rpc_client >& raft_client) { LOGDEBUGMOD(nuraft_mesg, "Creating client to {}", client); auto endpoint = lookupEndpoint(client); - if (endpoint.empty()) { return std::make_error_condition(std::errc::invalid_argument); } + if (endpoint.empty()) nuraft::BAD_REQUEST; LOGDEBUGMOD(nuraft_mesg, "Creating client for [{}] @ [{}]", client, endpoint); raft_client = sisl::GrpcAsyncClient::make< messaging_client >(workerName(), endpoint, m_token_client, "", m_ssl_cert); - return (!raft_client) ? std::make_error_condition(std::errc::connection_aborted) : std::error_condition(); + return (!raft_client) ? nuraft::CANCELLED : nuraft::OK; } -std::error_condition group_factory::reinit_client(peer_id_t const& client, - std::shared_ptr< nuraft::rpc_client >& raft_client) { +nuraft::cmd_result_code group_factory::reinit_client(peer_id_t const& client, + std::shared_ptr< nuraft::rpc_client >& raft_client) { LOGDEBUGMOD(nuraft_mesg, "Re-init client to {}", client); assert(raft_client); auto mesg_client = std::dynamic_pointer_cast< messaging_client >(raft_client); if (!mesg_client->is_connection_ready() || 0 < mesg_client->bad_service.load(std::memory_order_relaxed)) { return create_client(client, raft_client); } - return std::error_condition(); + return nuraft::OK; } } // namespace nuraft_mesg diff --git a/src/tests/test_state_manager.cpp b/src/tests/test_state_manager.cpp index 1244e46..60ce95c 100644 --- a/src/tests/test_state_manager.cpp +++ b/src/tests/test_state_manager.cpp @@ -15,6 +15,7 @@ #include "test_state_manager.h" #include +#include #include "jungle_logstore/jungle_log_store.h" #include diff --git a/test_package/example_client.cpp b/test_package/example_client.cpp index ad0f397..a12ef3e 100644 --- a/test_package/example_client.cpp +++ b/test_package/example_client.cpp @@ -1,7 +1,9 @@ +#include #include #include #include +#include #include #include #include @@ -49,14 +51,13 @@ int send_message(uint32_t leader_id, nuraft_mesg::group_id_t const& group_id, st buf->put(message.c_str()); buf->pos(0); - nuraft::cmd_result_code rc = nuraft::SERVER_IS_JOINING; - while (nuraft::SERVER_IS_JOINING == rc || nuraft::CONFIG_CHANGING == rc) { - rc = factory->client_request(buf, dest_cfg).get(); - if (nuraft::SERVER_IS_JOINING == rc || nuraft::CONFIG_CHANGING == rc) { - std::this_thread::sleep_for(std::chrono::milliseconds(200)); - } + auto result = nuraft_mesg::NullResult(folly::makeUnexpected(nuraft::SERVER_IS_JOINING)); + while (!result && (nuraft::SERVER_IS_JOINING == result.error() || nuraft::CONFIG_CHANGING == result.error())) { + auto sf = factory->client_request(buf, dest_cfg); + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + result = std::move(sf).get(); } - int ret = nuraft::OK == rc ? 0 : -1; + int ret = (!!result) ? 0 : -1; sisl::GrpcAsyncClientWorker::shutdown_all(); return ret; } @@ -66,14 +67,14 @@ int add_new_server(uint32_t leader_id, uint32_t srv_id, nuraft_mesg::group_id_t auto factory = std::make_shared< mesg_factory >(g_factory, group_id, "test_package"); auto const dest_cfg = srv_config(leader_id, uuids[leader_id]); - nuraft::cmd_result_code rc = nuraft::SERVER_IS_JOINING; - while (nuraft::SERVER_IS_JOINING == rc || nuraft::CONFIG_CHANGING == rc) { - rc = factory->add_server(srv_id, uuids[srv_id], dest_cfg).get(); - if (nuraft::SERVER_IS_JOINING == rc || nuraft::CONFIG_CHANGING == rc) { - std::this_thread::sleep_for(std::chrono::milliseconds(200)); - } + auto result = nuraft_mesg::NullResult(folly::makeUnexpected(nuraft::SERVER_IS_JOINING)); + while (!result && (nuraft::SERVER_IS_JOINING == result.error() || nuraft::CONFIG_CHANGING == result.error())) { + auto srv_addr = boost::uuids::string_generator()(uuids[srv_id]); + auto sf = factory->add_server(srv_id, srv_addr, dest_cfg); + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + result = std::move(sf).get(); } - int ret = nuraft::OK == rc ? 0 : -1; + int ret = (!!result) ? 0 : -1; sisl::GrpcAsyncClientWorker::shutdown_all(); return ret; } @@ -83,14 +84,13 @@ int remove_server(uint32_t leader_id, nuraft_mesg::group_id_t const& group_id, u auto factory = std::make_shared< mesg_factory >(g_factory, group_id, "test_package"); auto const dest_cfg = srv_config(leader_id, uuids[leader_id]); - nuraft::cmd_result_code rc = nuraft::SERVER_IS_JOINING; - while (nuraft::SERVER_IS_JOINING == rc || nuraft::CONFIG_CHANGING == rc) { - rc = factory->rem_server(srv_id, dest_cfg).get(); - if (nuraft::SERVER_IS_JOINING == rc || nuraft::CONFIG_CHANGING == rc) { - std::this_thread::sleep_for(std::chrono::milliseconds(200)); - } + auto result = nuraft_mesg::NullResult(folly::makeUnexpected(nuraft::SERVER_IS_JOINING)); + while (!result && (nuraft::SERVER_IS_JOINING == result.error() || nuraft::CONFIG_CHANGING == result.error())) { + auto sf = factory->rem_server(srv_id, dest_cfg); + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + result = std::move(sf).get(); } - int ret = nuraft::OK == rc ? 0 : -1; + int ret = (!!result) ? 0 : -1; sisl::GrpcAsyncClientWorker::shutdown_all(); return ret; } diff --git a/test_package/example_state_manager.cpp b/test_package/example_state_manager.cpp index 942874f..0c0f5c5 100644 --- a/test_package/example_state_manager.cpp +++ b/test_package/example_state_manager.cpp @@ -2,6 +2,7 @@ #include "example_state_machine.h" #include +#include #include