Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement futures based grpc client API #185

Merged
merged 2 commits into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

class SISLConan(ConanFile):
name = "sisl"
version = "10.1.4"
version = "10.2.1"

homepage = "https://github.com/eBay/sisl"
description = "Library for fast data structures, utilities"
Expand Down
111 changes: 98 additions & 13 deletions include/sisl/grpc/rpc_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <grpcpp/impl/codegen/async_unary_call.h>
#include <grpcpp/generic/generic_stub.h>
#include <grpc/support/log.h>
#include <folly/futures/Future.h>

#include <sisl/logging/logging.h>
#include <sisl/utility/obj_life_counter.hpp>
Expand Down Expand Up @@ -63,18 +64,30 @@ template < typename RespT >
using unary_callback_t = std::function< void(RespT&, ::grpc::Status& status) >;

template < typename ReqT, typename RespT >
class ClientRpcDataInternal;
class ClientRpcDataCallback;

template < typename ReqT, typename RespT >
class ClientRpcDataFuture;

template < typename T >
using Result = folly::Expected< T, ::grpc::Status >;

template < typename T >
using AsyncResult = folly::SemiFuture< Result< T > >;

using GenericClientRpcData = ClientRpcData< grpc::ByteBuffer, grpc::ByteBuffer >;
using generic_rpc_comp_cb_t = rpc_comp_cb_t< grpc::ByteBuffer, grpc::ByteBuffer >;
using generic_req_builder_cb_t = req_builder_cb_t< grpc::ByteBuffer >;
using generic_unary_callback_t = unary_callback_t< grpc::ByteBuffer >;
using GenericClientRpcDataInternal = ClientRpcDataInternal< grpc::ByteBuffer, grpc::ByteBuffer >;
using GenericClientRpcDataCallback = ClientRpcDataCallback< grpc::ByteBuffer, grpc::ByteBuffer >;
using GenericClientRpcDataFuture = ClientRpcDataFuture< grpc::ByteBuffer, grpc::ByteBuffer >;
using generic_result_t = Result< grpc::ByteBuffer >;
using generic_async_result_t = AsyncResult< grpc::ByteBuffer >;

/**
* The specialized 'ClientRpcDataInternal' per gRPC call, it stores
* the response handler function
*
* The specialized 'ClientRpcDataInternal' per gRPC call,
* Derive from this class to create Rpc Data that can hold
* the response handler function or a promise
*/
template < typename ReqT, typename RespT >
class ClientRpcDataInternal : public ClientRpcDataAbstract {
Expand All @@ -88,7 +101,6 @@ class ClientRpcDataInternal : public ClientRpcDataAbstract {
friend class GrpcAsyncClient;

ClientRpcDataInternal() = default;
ClientRpcDataInternal(const unary_callback_t< RespT >& cb) : m_cb{cb} {}
virtual ~ClientRpcDataInternal() = default;

// TODO: support time in any time unit -- lhuang8
Expand All @@ -103,23 +115,55 @@ class ClientRpcDataInternal : public ClientRpcDataAbstract {
RespT& reply() { return m_reply; }
::grpc::ClientContext& context() { return m_context; }

virtual void handle_response([[maybe_unused]] bool ok = true) override {
// For unary call, ok is always true, `status_` will indicate error if there are any.
m_cb(m_reply, m_status);
}
virtual void handle_response(bool ok = true) = 0;

void add_metadata(const std::string& meta_key, const std::string& meta_value) {
m_context.AddMetadata(meta_key, meta_value);
}

unary_callback_t< RespT > m_cb;
RespT m_reply;
::grpc::ClientContext m_context;
::grpc::Status m_status;
ResponseReaderPtr m_resp_reader_ptr;
GenericResponseReaderPtr m_generic_resp_reader_ptr;
};

/**
* callback version of ClientRpcDataInternal
*/
template < typename ReqT, typename RespT >
class ClientRpcDataCallback : public ClientRpcDataInternal< ReqT, RespT > {
public:
ClientRpcDataCallback(const unary_callback_t< RespT >& cb) : m_cb{cb} {}

virtual void handle_response([[maybe_unused]] bool ok = true) override {
// For unary call, ok is always true, `status_` will indicate error if there are any.
if (m_cb) { m_cb(this->m_reply, this->m_status); }
}

unary_callback_t< RespT > m_cb;
};

/**
* futures version of ClientRpcDataInternal
*/
template < typename ReqT, typename RespT >
class ClientRpcDataFuture : public ClientRpcDataInternal< ReqT, RespT > {
public:
ClientRpcDataFuture(folly::Promise< Result< RespT > >&& promise) : m_promise{std::move(promise)} {}

virtual void handle_response([[maybe_unused]] bool ok = true) override {
// For unary call, ok is always true, `status_` will indicate error if there are any.
if (this->m_status.ok()) {
m_promise.setValue(this->m_reply);
} else {
m_promise.setValue(folly::makeUnexpected(this->m_status));
}
}

folly::Promise< Result< RespT > > m_promise;
};

template < typename ReqT, typename RespT >
class ClientRpcData : public ClientRpcDataInternal< ReqT, RespT > {
public:
Expand Down Expand Up @@ -297,7 +341,7 @@ class GrpcAsyncClient : public GrpcBaseClient {
template < typename ReqT, typename RespT >
void call_unary(const ReqT& request, const unary_call_t< ReqT, RespT >& method,
const unary_callback_t< RespT >& callback, uint32_t deadline) {
auto data = new ClientRpcDataInternal< ReqT, RespT >(callback);
auto data = new ClientRpcDataCallback< ReqT, RespT >(callback);
data->set_deadline(deadline);
if (m_token_client) {
data->add_metadata(m_token_client->get_auth_header_key(), m_token_client->get_token());
Expand Down Expand Up @@ -326,7 +370,43 @@ class GrpcAsyncClient : public GrpcBaseClient {
void call_unary(const ReqT& request, const unary_call_t< ReqT, RespT >& method,
const unary_callback_t< RespT >& callback, uint32_t deadline,
const std::vector< std::pair< std::string, std::string > >& metadata) {
auto data = new ClientRpcDataInternal< ReqT, RespT >(callback);
auto data = new ClientRpcDataCallback< ReqT, RespT >(callback);
data->set_deadline(deadline);
for (auto const& [key, value] : metadata) {
data->add_metadata(key, value);
}
if (m_token_client) {
data->add_metadata(m_token_client->get_auth_header_key(), m_token_client->get_token());
}
// Note that async unary RPCs don't post a CQ tag in call
data->m_resp_reader_ptr = (m_stub.get()->*method)(&data->context(), request, cq());
// CQ tag posted here
data->m_resp_reader_ptr->Finish(&data->reply(), &data->status(), (void*)data);
}

// Futures version of call_unary
template < typename ReqT, typename RespT >
AsyncResult< RespT > call_unary(const ReqT& request, const unary_call_t< ReqT, RespT >& method,
uint32_t deadline) {
auto [p, sf] = folly::makePromiseContract< Result< RespT > >();
auto data = new ClientRpcDataFuture< ReqT, RespT >(std::move(p));
data->set_deadline(deadline);
if (m_token_client) {
data->add_metadata(m_token_client->get_auth_header_key(), m_token_client->get_token());
}
// Note that async unary RPCs don't post a CQ tag in call
data->m_resp_reader_ptr = (m_stub.get()->*method)(&data->context(), request, cq());
// CQ tag posted here
data->m_resp_reader_ptr->Finish(&data->reply(), &data->status(), (void*)data);
return std::move(sf);
}

template < typename ReqT, typename RespT >
AsyncResult< RespT > call_unary(const ReqT& request, const unary_call_t< ReqT, RespT >& method,
uint32_t deadline,
const std::vector< std::pair< std::string, std::string > >& metadata) {
auto [p, sf] = folly::makePromiseContract< Result< RespT > >();
auto data = new ClientRpcDataFuture< ReqT, RespT >(std::move(p));
data->set_deadline(deadline);
for (auto const& [key, value] : metadata) {
data->add_metadata(key, value);
Expand All @@ -338,6 +418,7 @@ class GrpcAsyncClient : public GrpcBaseClient {
data->m_resp_reader_ptr = (m_stub.get()->*method)(&data->context(), request, cq());
// CQ tag posted here
data->m_resp_reader_ptr->Finish(&data->reply(), &data->status(), (void*)data);
raakella1 marked this conversation as resolved.
Show resolved Hide resolved
return std::move(sf);
}

StubPtr< ServiceT > m_stub;
Expand Down Expand Up @@ -368,6 +449,10 @@ class GrpcAsyncClient : public GrpcBaseClient {
void call_rpc(const generic_req_builder_cb_t& builder_cb, const std::string& method,
const generic_rpc_comp_cb_t& done_cb, uint32_t deadline);

// futures version of call_unary
generic_async_result_t call_unary(const grpc::ByteBuffer& request, const std::string& method,
uint32_t deadline);

std::unique_ptr< grpc::GenericStub > m_generic_stub;
GrpcAsyncClientWorker* m_worker;
std::shared_ptr< sisl::GrpcTokenClient > m_token_client;
Expand Down
13 changes: 7 additions & 6 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ add_subdirectory (sobject)
# on Folly and pistache. It is unknown if Windows is supported...
list(APPEND POSIX_LIBRARIES )
list(APPEND SISL_DEPS )
if(${userspace-rcu_FOUND})
add_subdirectory (grpc)
list(APPEND POSIX_LIBRARIES
$<TARGET_OBJECTS:sisl_grpc>
)
endif()

if(${folly_FOUND})
if(${userspace-rcu_FOUND})
add_subdirectory (grpc)
list(APPEND POSIX_LIBRARIES
$<TARGET_OBJECTS:sisl_grpc>
)
endif()
add_subdirectory (cache)
add_subdirectory (fds)
add_subdirectory (file_watcher)
Expand Down
1 change: 1 addition & 0 deletions src/grpc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ target_sources(sisl_grpc PRIVATE
target_link_libraries(sisl_grpc
gRPC::grpc++
flatbuffers::flatbuffers
Folly::Folly
${COMMON_DEPS}
)

Expand Down
18 changes: 16 additions & 2 deletions src/grpc/rpc_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ void GrpcAsyncClientWorker::shutdown_all() {

void GrpcAsyncClient::GenericAsyncStub::call_unary(const grpc::ByteBuffer& request, const std::string& method,
const generic_unary_callback_t& callback, uint32_t deadline) {
auto data = new GenericClientRpcDataInternal(callback);
auto data = new GenericClientRpcDataCallback(callback);
data->set_deadline(deadline);
if (m_token_client) { data->add_metadata(m_token_client->get_auth_header_key(), m_token_client->get_token()); }
// Note that async unary RPCs don't post a CQ tag in call
Expand All @@ -156,11 +156,25 @@ void GrpcAsyncClient::GenericAsyncStub::call_rpc(const generic_req_builder_cb_t&
cd->m_generic_resp_reader_ptr->Finish(&cd->reply(), &cd->status(), (void*)cd);
}

generic_async_result_t GrpcAsyncClient::GenericAsyncStub::call_unary(const grpc::ByteBuffer& request,
const std::string& method, uint32_t deadline) {
auto [p, sf] = folly::makePromiseContract< generic_result_t >();
auto data = new GenericClientRpcDataFuture(std::move(p));
data->set_deadline(deadline);
if (m_token_client) { data->add_metadata(m_token_client->get_auth_header_key(), m_token_client->get_token()); }
// Note that async unary RPCs don't post a CQ tag in call
data->m_generic_resp_reader_ptr = m_generic_stub->PrepareUnaryCall(&data->context(), method, request, cq());
data->m_generic_resp_reader_ptr->StartCall();
// CQ tag posted here
data->m_generic_resp_reader_ptr->Finish(&data->reply(), &data->status(), (void*)data);
return std::move(sf);
}

std::unique_ptr< GrpcAsyncClient::GenericAsyncStub > GrpcAsyncClient::make_generic_stub(const std::string& worker) {
auto w = GrpcAsyncClientWorker::get_worker(worker);
if (w == nullptr) { throw std::runtime_error("worker thread not available"); }

return std::make_unique< GrpcAsyncClient::GenericAsyncStub >(std::make_unique< grpc::GenericStub >(m_channel), w,
m_token_client);
}
} // namespace sisl::grpc
} // namespace sisl
58 changes: 47 additions & 11 deletions src/grpc/tests/function/echo_async_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class TestClient {
static constexpr int GRPC_CALL_COUNT = 400;
const std::string WORKER_NAME{"Worker-1"};

void validate_echo_reply(const EchoRequest& req, EchoReply& reply, ::grpc::Status& status) {
void validate_echo_reply(const EchoRequest& req, EchoReply& reply, ::grpc::Status const& status) {
RELEASE_ASSERT_EQ(status.ok(), true, "echo request {} failed, status {}: {}", req.message(),
status.error_code(), status.error_message());
LOGDEBUGMOD(grpc_server, "echo request {} reply {}", req.message(), reply.message());
Expand All @@ -100,7 +100,7 @@ class TestClient {
}
}

void validate_ping_reply(const PingRequest& req, PingReply& reply, ::grpc::Status& status) {
void validate_ping_reply(const PingRequest& req, PingReply& reply, ::grpc::Status const& status) {
RELEASE_ASSERT_EQ(status.ok(), true, "ping request {} failed, status {}: {}", req.seqno(), status.error_code(),
status.error_message());
LOGDEBUGMOD(grpc_server, "ping request {} reply {}", req.seqno(), reply.seqno());
Expand All @@ -111,7 +111,7 @@ class TestClient {
}
}

void validate_generic_reply(const DataMessage& req, grpc::ByteBuffer& reply, ::grpc::Status& status) {
void validate_generic_reply(const DataMessage& req, grpc::ByteBuffer& reply, ::grpc::Status const& status) {
RELEASE_ASSERT_EQ(status.ok(), true, "generic request {} failed, status {}: {}", req.m_seqno,
status.error_code(), status.error_message());
DataMessage svr_msg;
Expand Down Expand Up @@ -140,7 +140,7 @@ class TestClient {

for (int i = 1; i <= GRPC_CALL_COUNT; ++i) {
if ((i % 2) == 0) {
if ((i % 4) == 0) {
if ((i % 3) == 0) {
EchoRequest req;
req.set_message(std::to_string(i));
echo_stub->call_unary< EchoRequest, EchoReply >(
Expand All @@ -149,18 +149,30 @@ class TestClient {
validate_echo_reply(req, reply, status);
},
1);
} else {
} else if (i % 3 == 1) {
echo_stub->call_rpc< EchoRequest, EchoReply >(
[i](EchoRequest& req) { req.set_message(std::to_string(i)); },
&EchoService::StubInterface::AsyncEcho,
[this](ClientRpcData< EchoRequest, EchoReply >& cd) {
validate_echo_reply(cd.req(), cd.reply(), cd.status());
},
1);
} else {
EchoRequest req;
req.set_message(std::to_string(i));
echo_stub->call_unary< EchoRequest, EchoReply >(req, &EchoService::StubInterface::AsyncEcho, 1)
.deferValue([req, this](auto e) {
RELEASE_ASSERT(e.hasValue(), "echo request {} failed, status {}: {}", req.message(),
e.error().error_code(), e.error().error_message());
validate_echo_reply(req, e.value(), grpc::Status::OK);
return folly::Unit();
})
.get();
}
} else if ((i % 3) == 0) {
// divide all numbers divisible by 3 and not by 2 into two equal buckets
if ((((i + 3) / 6) % 2) == 0) {
// divide all numbers divisible by 3 and not by 2 into three equal buckets
auto const j = (i + 3) / 6;
if (j % 3 == 0) {
PingRequest req;
req.set_seqno(i);
ping_stub->call_unary< PingRequest, PingReply >(
Expand All @@ -169,17 +181,29 @@ class TestClient {
validate_ping_reply(req, reply, status);
},
1);
} else {
} else if (j % 3 == 1) {
ping_stub->call_rpc< PingRequest, PingReply >(
[i](PingRequest& req) { req.set_seqno(i); }, &PingService::StubInterface::AsyncPing,
[this](ClientRpcData< PingRequest, PingReply >& cd) {
validate_ping_reply(cd.req(), cd.reply(), cd.status());
},
1);
} else {
PingRequest req;
req.set_seqno(i);
ping_stub->call_unary< PingRequest, PingReply >(req, &PingService::StubInterface::AsyncPing, 1)
.deferValue([req, this](auto e) {
RELEASE_ASSERT(e.hasValue(), "ping request {} failed, status {}: {}", req.seqno(),
e.error().error_code(), e.error().error_message());
validate_ping_reply(req, e.value(), grpc::Status::OK);
return folly::Unit();
})
.get();
}
} else {
// divide all numbers not divisible by 2 and 3 into two equal buckets
if (((i + 1) % 6) == 0) {
// divide all numbers not divisible by 2 and 3 into three equal buckets
static uint32_t j = 0u;
if ((j++ % 3) == 0) {
DataMessage req(i, GENERIC_CLIENT_MESSAGE);
grpc::ByteBuffer cli_buf;
SerializeToByteBuffer(cli_buf, req);
Expand All @@ -189,14 +213,26 @@ class TestClient {
validate_generic_reply(req, reply, status);
},
1);
} else {
} else if (((j++ % 3) == 1)) {
DataMessage data_msg(i, GENERIC_CLIENT_MESSAGE);
generic_stub->call_rpc([data_msg](grpc::ByteBuffer& req) { SerializeToByteBuffer(req, data_msg); },
GENERIC_METHOD,
[data_msg, this](GenericClientRpcData& cd) {
validate_generic_reply(data_msg, cd.reply(), cd.status());
},
1);
} else {
DataMessage req(i, GENERIC_CLIENT_MESSAGE);
grpc::ByteBuffer cli_buf;
SerializeToByteBuffer(cli_buf, req);
generic_stub->call_unary(cli_buf, GENERIC_METHOD, 1)
.deferValue([req, this](auto e) {
RELEASE_ASSERT(e.hasValue(), "generic request {} failed, status {}: {}", req.m_seqno,
e.error().error_code(), e.error().error_message());
validate_generic_reply(req, e.value(), grpc::Status::OK);
return folly::Unit();
})
.get();
}
}
}
Expand Down