diff --git a/src/proto/proto_mesg_factory.cpp b/src/proto/proto_mesg_factory.cpp index 7438280..5354878 100644 --- a/src/proto/proto_mesg_factory.cpp +++ b/src/proto/proto_mesg_factory.cpp @@ -30,6 +30,61 @@ namespace nuraft_mesg { std::string group_factory::m_ssl_cert; using handle_resp = std::function< void(RaftMessage&, ::grpc::Status&) >; +static nuraft::cmd_result_code grpc_status_to_nuraft_code(::grpc::Status const& s) { + if (s.ok()) { return nuraft::cmd_result_code::OK; } + auto const ec = s.error_code(); + switch (ec) { + case ::grpc::StatusCode::DEADLINE_EXCEEDED: + return nuraft::cmd_result_code::TIMEOUT; + case ::grpc::StatusCode::UNAVAILABLE: + case ::grpc::StatusCode::NOT_FOUND: + return nuraft::cmd_result_code::SERVER_NOT_FOUND; + case ::grpc::StatusCode::CANCELLED: + case ::grpc::StatusCode::ABORTED: + return nuraft::cmd_result_code::CANCELLED; + case ::grpc::StatusCode::FAILED_PRECONDITION: + return nuraft::cmd_result_code::TERM_MISMATCH; + case ::grpc::StatusCode::ALREADY_EXISTS: + return nuraft::cmd_result_code::SERVER_ALREADY_EXISTS; + case ::grpc::StatusCode::INVALID_ARGUMENT: + case ::grpc::StatusCode::UNIMPLEMENTED: + case ::grpc::StatusCode::UNAUTHENTICATED: + case ::grpc::StatusCode::PERMISSION_DENIED: + case ::grpc::StatusCode::RESOURCE_EXHAUSTED: + case ::grpc::StatusCode::OUT_OF_RANGE: + return nuraft::cmd_result_code::BAD_REQUEST; + case ::grpc::StatusCode::DATA_LOSS: + default: + return nuraft::cmd_result_code::FAILED; + } +} + +static constexpr bool is_powerof2(uint64_t v) { return v && ((v & (v - 1)) == 0); } + +static void log_every_nth(std::string const& addr, ::grpc::Status const& status, bool uni_directional) { + static thread_local std::unordered_map< std::string, std::pair< uint64_t, Clock::time_point > > t_errors; + std::string msg = addr + "-" + status.error_message(); + + int failed_count = 0; + if (auto const it = t_errors.find(msg); it != t_errors.end()) { + ++(it->second.first); + if (get_elapsed_time_sec(it->second.second) > 60) { + it->second = std::pair(1u, Clock::now()); // Reset + failed_count = 1; + } else if (is_powerof2(it->second.first)) { + failed_count = it->second.first; + } + } else { + t_errors[msg] = std::pair(1u, Clock::now()); + failed_count = 1; + } + + if (failed_count) { + LOGE("Failed {} time(s) in the last minute to send {} data_service_request to {}, error: {}", failed_count, + uni_directional ? "unidirectional" : "bidirectional", addr, status.error_message()); + } +} + class messaging_client : public grpc_client< Messaging >, public std::enable_shared_from_this< messaging_client > { public: messaging_client(std::string const& worker_name, std::string const& addr, @@ -56,10 +111,8 @@ class messaging_client : public grpc_client< Messaging >, public std::enable_sha if (::grpc::INVALID_ARGUMENT == status.error_code()) { if (auto mc = weak_this.lock(); mc) { mc->bad_service.fetch_add(1, std::memory_order_relaxed); - LOGE( - - "Sent message to wrong service, need to disconnect! Error Message: [{}] Client IP: [{}]", - status.error_message(), mc->_addr); + LOGE("Sent message to wrong service, need to disconnect! Error Message: [{}] Client IP: [{}]", + status.error_message(), mc->_addr); } else { LOGE("Sent message to wrong service, need to disconnect! Error Message: [{}]", status.error_message()); @@ -77,25 +130,29 @@ class messaging_client : public grpc_client< Messaging >, public std::enable_sha io_blob_list_t const& cli_buf) { return _generic_stub ->call_unary(cli_buf, request_name, NURAFT_MESG_CONFIG(mesg_factory_config->data_request_deadline_secs)) - .deferValue([this](auto&& response) -> NullResult { - if (response.hasError()) { - LOGE("Failed to send data_service_request to {}, error: {}", _addr, - response.error().error_message()); - return folly::makeUnexpected(nuraft::cmd_result_code::CANCELLED); - } - return folly::unit; - }); + .deferValue( + [weak_this = std::weak_ptr< messaging_client >(shared_from_this())](auto&& response) -> NullResult { + if (response.hasError()) { + auto mc = weak_this.lock(); + std::string addr = mc ? mc->_addr : "unknown"; + log_every_nth(addr, response.error(), true /* unidirectional */); + return folly::makeUnexpected(grpc_status_to_nuraft_code(response.error())); + } + return folly::unit; + }); } AsyncResult< sisl::GenericClientResponse > data_service_request_bidirectional(std::string const& request_name, io_blob_list_t const& cli_buf) { return _generic_stub ->call_unary(cli_buf, request_name, NURAFT_MESG_CONFIG(mesg_factory_config->data_request_deadline_secs)) - .deferValue([this](auto&& response) -> Result< sisl::GenericClientResponse > { + .deferValue([weak_this = std::weak_ptr< messaging_client >(shared_from_this())]( + auto&& response) -> Result< sisl::GenericClientResponse > { if (response.hasError()) { - LOGE("Failed to send data_service_request to {}, error: {}", _addr, - response.error().error_message()); - return folly::makeUnexpected(nuraft::cmd_result_code::CANCELLED); + auto mc = weak_this.lock(); + std::string addr = mc ? mc->_addr : "unknown"; + log_every_nth(addr, response.error(), false /* unidirectional */); + return folly::makeUnexpected(grpc_status_to_nuraft_code(response.error())); } return std::move(response.value()); }); @@ -196,8 +253,8 @@ NullAsyncResult mesg_factory::data_service_request_unidirectional(std::optional< g_client->data_service_request_unidirectional(get_generic_method_name(request_name, _group_id), cli_buf)); } // We ignore the vector of future response from collect all and st the value as folly::unit. - // This is because we do not have a use case to handle the errors that happen during the unidirectional call to all - // the peers. + // This is because we do not have a use case to handle the errors that happen during the unidirectional call to + // all the peers. return folly::collectAll(calls).deferValue([](auto&&) -> NullResult { return folly::unit; }); } diff --git a/src/tests/data_service_tests.cpp b/src/tests/data_service_tests.cpp index d04da5c..44b0c3b 100644 --- a/src/tests/data_service_tests.cpp +++ b/src/tests/data_service_tests.cpp @@ -191,7 +191,7 @@ TEST_F(DataServiceFixture, NegativeTests) { sm2->data_service_request_bidirectional(nuraft_mesg::role_regex::LEADER, "invalid_request", cli_buf) .deferValue([](auto e) -> NullResult { EXPECT_TRUE(e.hasError()); - EXPECT_EQ(nuraft::cmd_result_code::CANCELLED, e.error()); + EXPECT_EQ(nuraft::cmd_result_code::BAD_REQUEST, e.error()); return folly::Unit(); }));