Skip to content

Commit

Permalink
Use sisl v12 and improved log_every_n for msg send errors
Browse files Browse the repository at this point in the history
  • Loading branch information
hkadayam committed Mar 24, 2024
1 parent 1b68fd3 commit ed28c65
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 19 deletions.
93 changes: 75 additions & 18 deletions src/proto/proto_mesg_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,61 @@ namespace nuraft_mesg {
std::string group_factory::m_ssl_cert;
using handle_resp = std::function< void(RaftMessage&, ::grpc::Status&) >;

static nuraft::cmd_result_code grpc_status_to_nuraft_code(::grpc::Status const& s) {
if (s.ok()) { return nuraft::cmd_result_code::OK; }
auto const ec = s.error_code();
switch (ec) {
case ::grpc::StatusCode::DEADLINE_EXCEEDED:
return nuraft::cmd_result_code::TIMEOUT;
case ::grpc::StatusCode::UNAVAILABLE:
case ::grpc::StatusCode::NOT_FOUND:
return nuraft::cmd_result_code::SERVER_NOT_FOUND;
case ::grpc::StatusCode::CANCELLED:
case ::grpc::StatusCode::ABORTED:
return nuraft::cmd_result_code::CANCELLED;
case ::grpc::StatusCode::FAILED_PRECONDITION:
return nuraft::cmd_result_code::TERM_MISMATCH;
case ::grpc::StatusCode::ALREADY_EXISTS:
return nuraft::cmd_result_code::SERVER_ALREADY_EXISTS;
case ::grpc::StatusCode::INVALID_ARGUMENT:
case ::grpc::StatusCode::UNIMPLEMENTED:
case ::grpc::StatusCode::UNAUTHENTICATED:
case ::grpc::StatusCode::PERMISSION_DENIED:
case ::grpc::StatusCode::RESOURCE_EXHAUSTED:
case ::grpc::StatusCode::OUT_OF_RANGE:
return nuraft::cmd_result_code::BAD_REQUEST;
case ::grpc::StatusCode::DATA_LOSS:
default:
return nuraft::cmd_result_code::FAILED;
}
}

static constexpr bool is_powerof2(uint64_t v) { return v && ((v & (v - 1)) == 0); }

static void log_every_nth(std::string const& addr, ::grpc::Status const& status, bool uni_directional) {
static thread_local std::unordered_map< std::string, std::pair< uint64_t, Clock::time_point > > t_errors;
std::string msg = addr + "-" + status.error_message();

int failed_count = 0;
if (auto const it = t_errors.find(msg); it != t_errors.end()) {
++(it->second.first);
if (get_elapsed_time_sec(it->second.second) > 60) {
it->second = std::pair(1u, Clock::now()); // Reset
failed_count = 1;
} else if (is_powerof2(it->second.first)) {
failed_count = it->second.first;
}
} else {
t_errors[msg] = std::pair(1u, Clock::now());
failed_count = 1;
}

if (failed_count) {
LOGE("Failed {} time(s) in the last minute to send {} data_service_request to {}, error: {}", failed_count,
uni_directional ? "unidirectional" : "bidirectional", addr, status.error_message());
}
}

class messaging_client : public grpc_client< Messaging >, public std::enable_shared_from_this< messaging_client > {
public:
messaging_client(std::string const& worker_name, std::string const& addr,
Expand All @@ -56,10 +111,8 @@ class messaging_client : public grpc_client< Messaging >, public std::enable_sha
if (::grpc::INVALID_ARGUMENT == status.error_code()) {
if (auto mc = weak_this.lock(); mc) {
mc->bad_service.fetch_add(1, std::memory_order_relaxed);
LOGE(

"Sent message to wrong service, need to disconnect! Error Message: [{}] Client IP: [{}]",
status.error_message(), mc->_addr);
LOGE("Sent message to wrong service, need to disconnect! Error Message: [{}] Client IP: [{}]",
status.error_message(), mc->_addr);
} else {
LOGE("Sent message to wrong service, need to disconnect! Error Message: [{}]",
status.error_message());
Expand All @@ -77,25 +130,29 @@ class messaging_client : public grpc_client< Messaging >, public std::enable_sha
io_blob_list_t const& cli_buf) {
return _generic_stub
->call_unary(cli_buf, request_name, NURAFT_MESG_CONFIG(mesg_factory_config->data_request_deadline_secs))
.deferValue([this](auto&& response) -> NullResult {
if (response.hasError()) {
LOGE("Failed to send data_service_request to {}, error: {}", _addr,
response.error().error_message());
return folly::makeUnexpected(nuraft::cmd_result_code::CANCELLED);
}
return folly::unit;
});
.deferValue(
[weak_this = std::weak_ptr< messaging_client >(shared_from_this())](auto&& response) -> NullResult {
if (response.hasError()) {
auto mc = weak_this.lock();
std::string addr = mc ? mc->_addr : "unknown";
log_every_nth(addr, response.error(), true /* unidirectional */);
return folly::makeUnexpected(grpc_status_to_nuraft_code(response.error()));
}
return folly::unit;
});
}

AsyncResult< sisl::GenericClientResponse > data_service_request_bidirectional(std::string const& request_name,
io_blob_list_t const& cli_buf) {
return _generic_stub
->call_unary(cli_buf, request_name, NURAFT_MESG_CONFIG(mesg_factory_config->data_request_deadline_secs))
.deferValue([this](auto&& response) -> Result< sisl::GenericClientResponse > {
.deferValue([weak_this = std::weak_ptr< messaging_client >(shared_from_this())](
auto&& response) -> Result< sisl::GenericClientResponse > {
if (response.hasError()) {
LOGE("Failed to send data_service_request to {}, error: {}", _addr,
response.error().error_message());
return folly::makeUnexpected(nuraft::cmd_result_code::CANCELLED);
auto mc = weak_this.lock();
std::string addr = mc ? mc->_addr : "unknown";
log_every_nth(addr, response.error(), false /* unidirectional */);
return folly::makeUnexpected(grpc_status_to_nuraft_code(response.error()));
}
return std::move(response.value());
});
Expand Down Expand Up @@ -196,8 +253,8 @@ NullAsyncResult mesg_factory::data_service_request_unidirectional(std::optional<
g_client->data_service_request_unidirectional(get_generic_method_name(request_name, _group_id), cli_buf));
}
// We ignore the vector of future response from collect all and st the value as folly::unit.
// This is because we do not have a use case to handle the errors that happen during the unidirectional call to all
// the peers.
// This is because we do not have a use case to handle the errors that happen during the unidirectional call to
// all the peers.
return folly::collectAll(calls).deferValue([](auto&&) -> NullResult { return folly::unit; });
}

Expand Down
2 changes: 1 addition & 1 deletion src/tests/data_service_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ TEST_F(DataServiceFixture, NegativeTests) {
sm2->data_service_request_bidirectional(nuraft_mesg::role_regex::LEADER, "invalid_request", cli_buf)
.deferValue([](auto e) -> NullResult {
EXPECT_TRUE(e.hasError());
EXPECT_EQ(nuraft::cmd_result_code::CANCELLED, e.error());
EXPECT_EQ(nuraft::cmd_result_code::BAD_REQUEST, e.error());
return folly::Unit();
}));

Expand Down

0 comments on commit ed28c65

Please sign in to comment.