Skip to content

Commit

Permalink
Merge pull request #78 from hkadayam/use_sisl12
Browse files Browse the repository at this point in the history
Use sisl 12.x and improved logging for repeated events
  • Loading branch information
hkadayam authored Mar 25, 2024
2 parents f32857f + df9a4b2 commit 009f47c
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 20 deletions.
4 changes: 2 additions & 2 deletions conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

class NuRaftMesgConan(ConanFile):
name = "nuraft_mesg"
version = "3.2.1"
version = "3.3.1"

homepage = "https://github.com/eBay/nuraft_mesg"
description = "A gRPC service for NuRAFT"
Expand Down Expand Up @@ -66,7 +66,7 @@ def build_requirements(self):

def requirements(self):
self.requires("boost/1.83.0", transitive_headers=True)
self.requires("sisl/[>=11.1, include_prerelease=True]@oss/master", transitive_headers=True)
self.requires("sisl/[>=12.0, include_prerelease=True]@oss/master", transitive_headers=True)
self.requires("nuraft/2.3.0", transitive_headers=True)

def layout(self):
Expand Down
91 changes: 74 additions & 17 deletions src/proto/proto_mesg_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,59 @@ namespace nuraft_mesg {
std::string group_factory::m_ssl_cert;
using handle_resp = std::function< void(RaftMessage&, ::grpc::Status&) >;

static nuraft::cmd_result_code grpc_status_to_nuraft_code(::grpc::Status const& s) {
if (s.ok()) { return nuraft::cmd_result_code::OK; }
auto const ec = s.error_code();
switch (ec) {
case ::grpc::StatusCode::DEADLINE_EXCEEDED:
return nuraft::cmd_result_code::TIMEOUT;
case ::grpc::StatusCode::UNAVAILABLE:
case ::grpc::StatusCode::NOT_FOUND:
return nuraft::cmd_result_code::SERVER_NOT_FOUND;
case ::grpc::StatusCode::CANCELLED:
case ::grpc::StatusCode::ABORTED:
return nuraft::cmd_result_code::CANCELLED;
case ::grpc::StatusCode::FAILED_PRECONDITION:
return nuraft::cmd_result_code::TERM_MISMATCH;
case ::grpc::StatusCode::ALREADY_EXISTS:
return nuraft::cmd_result_code::SERVER_ALREADY_EXISTS;
case ::grpc::StatusCode::INVALID_ARGUMENT:
case ::grpc::StatusCode::UNIMPLEMENTED:
case ::grpc::StatusCode::UNAUTHENTICATED:
case ::grpc::StatusCode::PERMISSION_DENIED:
case ::grpc::StatusCode::RESOURCE_EXHAUSTED:
case ::grpc::StatusCode::OUT_OF_RANGE:
return nuraft::cmd_result_code::BAD_REQUEST;
case ::grpc::StatusCode::DATA_LOSS:
default:
return nuraft::cmd_result_code::FAILED;
}
}

static constexpr bool is_powerof2(uint64_t v) { return v && ((v & (v - 1)) == 0); }

static void log_every_nth(std::string const& addr, ::grpc::Status const& status, std::string const& msg_type) {
static thread_local std::unordered_map< std::string, std::pair< uint64_t, Clock::time_point > > t_errors;
static constexpr uint64_t every_nth_sec = 60;
std::string msg = addr + "-" + status.error_message();

uint64_t failed_count{1ul};
if (auto const it = t_errors.find(msg); it != t_errors.end()) {
if (get_elapsed_time_sec(it->second.second) > every_nth_sec) {
it->second = std::pair(1ul, Clock::now()); // Reset
} else {
failed_count = ++(it->second.first);
}
} else {
t_errors[msg] = std::pair(1ul, Clock::now());
}

if (is_powerof2(failed_count)) {
LOGE("Failed {} time(s) in the last {} seconds to send {} data_service_request to {}, error: {}", failed_count,
every_nth_sec, msg_type, addr, status.error_message());
}
}

class messaging_client : public grpc_client< Messaging >, public std::enable_shared_from_this< messaging_client > {
public:
messaging_client(std::string const& worker_name, std::string const& addr,
Expand All @@ -56,10 +109,8 @@ class messaging_client : public grpc_client< Messaging >, public std::enable_sha
if (::grpc::INVALID_ARGUMENT == status.error_code()) {
if (auto mc = weak_this.lock(); mc) {
mc->bad_service.fetch_add(1, std::memory_order_relaxed);
LOGE(

"Sent message to wrong service, need to disconnect! Error Message: [{}] Client IP: [{}]",
status.error_message(), mc->_addr);
LOGE("Sent message to wrong service, need to disconnect! Error Message: [{}] Client IP: [{}]",
status.error_message(), mc->_addr);
} else {
LOGE("Sent message to wrong service, need to disconnect! Error Message: [{}]",
status.error_message());
Expand All @@ -77,23 +128,29 @@ class messaging_client : public grpc_client< Messaging >, public std::enable_sha
io_blob_list_t const& cli_buf) {
return _generic_stub
->call_unary(cli_buf, request_name, NURAFT_MESG_CONFIG(mesg_factory_config->data_request_deadline_secs))
.deferValue([](auto&& response) -> NullResult {
if (response.hasError()) {
LOGE("Failed to send data_service_request, error: {}", response.error().error_message());
return folly::makeUnexpected(nuraft::cmd_result_code::CANCELLED);
}
return folly::unit;
});
.deferValue(
[weak_this = std::weak_ptr< messaging_client >(shared_from_this())](auto&& response) -> NullResult {
if (response.hasError()) {
auto mc = weak_this.lock();
std::string addr = mc ? mc->_addr : "unknown";
log_every_nth(addr, response.error(), "unidirectional");
return folly::makeUnexpected(grpc_status_to_nuraft_code(response.error()));
}
return folly::unit;
});
}

AsyncResult< sisl::GenericClientResponse > data_service_request_bidirectional(std::string const& request_name,
io_blob_list_t const& cli_buf) {
return _generic_stub
->call_unary(cli_buf, request_name, NURAFT_MESG_CONFIG(mesg_factory_config->data_request_deadline_secs))
.deferValue([](auto&& response) -> Result< sisl::GenericClientResponse > {
.deferValue([weak_this = std::weak_ptr< messaging_client >(shared_from_this())](
auto&& response) -> Result< sisl::GenericClientResponse > {
if (response.hasError()) {
LOGE("Failed to send data_service_request, error: {}", response.error().error_message());
return folly::makeUnexpected(nuraft::cmd_result_code::CANCELLED);
auto mc = weak_this.lock();
std::string addr = mc ? mc->_addr : "unknown";
log_every_nth(addr, response.error(), "bidirectional");
return folly::makeUnexpected(grpc_status_to_nuraft_code(response.error()));
}
return std::move(response.value());
});
Expand Down Expand Up @@ -194,9 +251,9 @@ NullAsyncResult mesg_factory::data_service_request_unidirectional(std::optional<
g_client->data_service_request_unidirectional(get_generic_method_name(request_name, _group_id), cli_buf));
}
// We ignore the vector of future response from collect all and st the value as folly::unit.
// This is because we do not have a use case to handle the errors that happen during the unidirectional call to all
// the peers.
return folly::collectAll(calls).deferValue([](auto &&) -> NullResult { return folly::unit; });
// This is because we do not have a use case to handle the errors that happen during the unidirectional call to
// all the peers.
return folly::collectAll(calls).deferValue([](auto&&) -> NullResult { return folly::unit; });
}

AsyncResult< sisl::GenericClientResponse >
Expand Down
2 changes: 1 addition & 1 deletion src/tests/data_service_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ TEST_F(DataServiceFixture, NegativeTests) {
sm2->data_service_request_bidirectional(nuraft_mesg::role_regex::LEADER, "invalid_request", cli_buf)
.deferValue([](auto e) -> NullResult {
EXPECT_TRUE(e.hasError());
EXPECT_EQ(nuraft::cmd_result_code::CANCELLED, e.error());
EXPECT_EQ(nuraft::cmd_result_code::BAD_REQUEST, e.error());
return folly::Unit();
}));

Expand Down

0 comments on commit 009f47c

Please sign in to comment.