Skip to content

Commit

Permalink
implement futures based grpc client API (#185)
Browse files Browse the repository at this point in the history
* implement futures based grpc client API

* review_comments: reuse common logic in the call_unary method

---------

Co-authored-by: Ravi Akella email = [email protected] <raakella@sdsbuild07>
  • Loading branch information
raakella1 and Ravi Akella email = [email protected] authored Oct 17, 2023
1 parent bd9b008 commit 4ce39a9
Show file tree
Hide file tree
Showing 6 changed files with 183 additions and 74 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

class SISLConan(ConanFile):
name = "sisl"
version = "10.1.4"
version = "10.2.1"

homepage = "https://github.com/eBay/sisl"
description = "Library for fast data structures, utilities"
Expand Down
152 changes: 107 additions & 45 deletions include/sisl/grpc/rpc_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <grpcpp/impl/codegen/async_unary_call.h>
#include <grpcpp/generic/generic_stub.h>
#include <grpc/support/log.h>
#include <folly/futures/Future.h>

#include <sisl/logging/logging.h>
#include <sisl/utility/obj_life_counter.hpp>
Expand Down Expand Up @@ -63,23 +64,35 @@ template < typename RespT >
using unary_callback_t = std::function< void(RespT&, ::grpc::Status& status) >;

template < typename ReqT, typename RespT >
class ClientRpcDataInternal;
class ClientRpcDataCallback;

template < typename ReqT, typename RespT >
class ClientRpcDataFuture;

template < typename T >
using Result = folly::Expected< T, ::grpc::Status >;

template < typename T >
using AsyncResult = folly::SemiFuture< Result< T > >;

using GenericClientRpcData = ClientRpcData< grpc::ByteBuffer, grpc::ByteBuffer >;
using generic_rpc_comp_cb_t = rpc_comp_cb_t< grpc::ByteBuffer, grpc::ByteBuffer >;
using generic_req_builder_cb_t = req_builder_cb_t< grpc::ByteBuffer >;
using generic_unary_callback_t = unary_callback_t< grpc::ByteBuffer >;
using GenericClientRpcDataInternal = ClientRpcDataInternal< grpc::ByteBuffer, grpc::ByteBuffer >;
using GenericClientRpcDataCallback = ClientRpcDataCallback< grpc::ByteBuffer, grpc::ByteBuffer >;
using GenericClientRpcDataFuture = ClientRpcDataFuture< grpc::ByteBuffer, grpc::ByteBuffer >;
using generic_result_t = Result< grpc::ByteBuffer >;
using generic_async_result_t = AsyncResult< grpc::ByteBuffer >;

/**
* The specialized 'ClientRpcDataInternal' per gRPC call, it stores
* the response handler function
*
* The specialized 'ClientRpcDataInternal' per gRPC call,
* Derive from this class to create Rpc Data that can hold
* the response handler function or a promise
*/
template < typename ReqT, typename RespT >
class ClientRpcDataInternal : public ClientRpcDataAbstract {
public:
using ResponseReaderPtr = std::unique_ptr< ::grpc::ClientAsyncResponseReaderInterface< RespT > >;
using ResponseReaderPtr = std::unique_ptr<::grpc::ClientAsyncResponseReaderInterface< RespT > >;
using GenericResponseReaderPtr = std::unique_ptr< grpc::GenericClientAsyncResponseReader >;

/* Allow GrpcAsyncClient and its inner classes to use
Expand All @@ -88,7 +101,6 @@ class ClientRpcDataInternal : public ClientRpcDataAbstract {
friend class GrpcAsyncClient;

ClientRpcDataInternal() = default;
ClientRpcDataInternal(const unary_callback_t< RespT >& cb) : m_cb{cb} {}
virtual ~ClientRpcDataInternal() = default;

// TODO: support time in any time unit -- lhuang8
Expand All @@ -103,23 +115,55 @@ class ClientRpcDataInternal : public ClientRpcDataAbstract {
RespT& reply() { return m_reply; }
::grpc::ClientContext& context() { return m_context; }

virtual void handle_response([[maybe_unused]] bool ok = true) override {
// For unary call, ok is always true, `status_` will indicate error if there are any.
m_cb(m_reply, m_status);
}
virtual void handle_response(bool ok = true) = 0;

void add_metadata(const std::string& meta_key, const std::string& meta_value) {
m_context.AddMetadata(meta_key, meta_value);
}

unary_callback_t< RespT > m_cb;
RespT m_reply;
::grpc::ClientContext m_context;
::grpc::Status m_status;
ResponseReaderPtr m_resp_reader_ptr;
GenericResponseReaderPtr m_generic_resp_reader_ptr;
};

/**
* callback version of ClientRpcDataInternal
*/
template < typename ReqT, typename RespT >
class ClientRpcDataCallback : public ClientRpcDataInternal< ReqT, RespT > {
public:
ClientRpcDataCallback(const unary_callback_t< RespT >& cb) : m_cb{cb} {}

virtual void handle_response([[maybe_unused]] bool ok = true) override {
// For unary call, ok is always true, `status_` will indicate error if there are any.
if (m_cb) { m_cb(this->m_reply, this->m_status); }
}

unary_callback_t< RespT > m_cb;
};

/**
* futures version of ClientRpcDataInternal
*/
template < typename ReqT, typename RespT >
class ClientRpcDataFuture : public ClientRpcDataInternal< ReqT, RespT > {
public:
ClientRpcDataFuture(folly::Promise< Result< RespT > >&& promise) : m_promise{std::move(promise)} {}

virtual void handle_response([[maybe_unused]] bool ok = true) override {
// For unary call, ok is always true, `status_` will indicate error if there are any.
if (this->m_status.ok()) {
m_promise.setValue(this->m_reply);
} else {
m_promise.setValue(folly::makeUnexpected(this->m_status));
}
}

folly::Promise< Result< RespT > > m_promise;
};

template < typename ReqT, typename RespT >
class ClientRpcData : public ClientRpcDataInternal< ReqT, RespT > {
public:
Expand Down Expand Up @@ -150,7 +194,7 @@ class GrpcBaseClient {
const std::string m_target_domain;
const std::string m_ssl_cert;

std::shared_ptr< ::grpc::ChannelInterface > m_channel;
std::shared_ptr<::grpc::ChannelInterface > m_channel;
std::shared_ptr< sisl::GrpcTokenClient > m_token_client;

public:
Expand Down Expand Up @@ -266,12 +310,29 @@ class GrpcAsyncClient : public GrpcBaseClient {

/* unary call helper */
template < typename RespT >
using unary_call_return_t = std::unique_ptr< ::grpc::ClientAsyncResponseReaderInterface< RespT > >;
using unary_call_return_t = std::unique_ptr<::grpc::ClientAsyncResponseReaderInterface< RespT > >;

template < typename ReqT, typename RespT >
using unary_call_t = unary_call_return_t< RespT > (stub_t::*)(::grpc::ClientContext*, const ReqT&,
::grpc::CompletionQueue*);

template < typename ReqT, typename RespT >
void prepare_and_send_unary(ClientRpcDataInternal< ReqT, RespT >* data, const ReqT& request,
const unary_call_t< ReqT, RespT >& method, uint32_t deadline,
const std::vector< std::pair< std::string, std::string > >& metadata) {
data->set_deadline(deadline);
for (auto const& [key, value] : metadata) {
data->add_metadata(key, value);
}
if (m_token_client) {
data->add_metadata(m_token_client->get_auth_header_key(), m_token_client->get_token());
}
// Note that async unary RPCs don't post a CQ tag in call
data->m_resp_reader_ptr = (m_stub.get()->*method)(&data->m_context, request, cq());
// CQ tag posted here
data->m_resp_reader_ptr->Finish(&data->reply(), &data->status(), (void*)data);
}

// using unary_callback_t = std::function< void(RespT&, ::grpc::Status& status) >;

/**
Expand All @@ -292,52 +353,46 @@ class GrpcAsyncClient : public GrpcBaseClient {
* OK before handling the response. If call failed, `::grpc::Status`
* indicates the error code and error message.
* @param deadline - deadline in seconds
* @param metadata - key value pair of the metadata to be sent with the request
*
*/
template < typename ReqT, typename RespT >
void call_unary(const ReqT& request, const unary_call_t< ReqT, RespT >& method,
const unary_callback_t< RespT >& callback, uint32_t deadline,
const std::vector< std::pair< std::string, std::string > >& metadata) {
auto data = new ClientRpcDataCallback< ReqT, RespT >(callback);
prepare_and_send_unary(data, request, method, deadline, metadata);
}

template < typename ReqT, typename RespT >
void call_unary(const ReqT& request, const unary_call_t< ReqT, RespT >& method,
const unary_callback_t< RespT >& callback, uint32_t deadline) {
auto data = new ClientRpcDataInternal< ReqT, RespT >(callback);
data->set_deadline(deadline);
if (m_token_client) {
data->add_metadata(m_token_client->get_auth_header_key(), m_token_client->get_token());
}
// Note that async unary RPCs don't post a CQ tag in call
data->m_resp_reader_ptr = (m_stub.get()->*method)(&data->context(), request, cq());
// CQ tag posted here
data->m_resp_reader_ptr->Finish(&data->reply(), &data->status(), (void*)data);
return;
call_unary(request, method, callback, deadline, {});
}

template < typename ReqT, typename RespT >
void call_rpc(const req_builder_cb_t< ReqT >& builder_cb, const unary_call_t< ReqT, RespT >& method,
const rpc_comp_cb_t< ReqT, RespT >& done_cb, uint32_t deadline) {
auto cd = new ClientRpcData< ReqT, RespT >(done_cb);
builder_cb(cd->m_req);
cd->set_deadline(deadline);
if (m_token_client) {
cd->add_metadata(m_token_client->get_auth_header_key(), m_token_client->get_token());
}
cd->m_resp_reader_ptr = (m_stub.get()->*method)(&cd->context(), cd->m_req, cq());
cd->m_resp_reader_ptr->Finish(&cd->reply(), &cd->status(), (void*)cd);
prepare_and_send_unary(cd, cd->m_req, method, deadline, {});
}

// Futures version of call_unary
template < typename ReqT, typename RespT >
void call_unary(const ReqT& request, const unary_call_t< ReqT, RespT >& method,
const unary_callback_t< RespT >& callback, uint32_t deadline,
const std::vector< std::pair< std::string, std::string > >& metadata) {
auto data = new ClientRpcDataInternal< ReqT, RespT >(callback);
data->set_deadline(deadline);
for (auto const& [key, value] : metadata) {
data->add_metadata(key, value);
}
if (m_token_client) {
data->add_metadata(m_token_client->get_auth_header_key(), m_token_client->get_token());
}
// Note that async unary RPCs don't post a CQ tag in call
data->m_resp_reader_ptr = (m_stub.get()->*method)(&data->context(), request, cq());
// CQ tag posted here
data->m_resp_reader_ptr->Finish(&data->reply(), &data->status(), (void*)data);
AsyncResult< RespT > call_unary(const ReqT& request, const unary_call_t< ReqT, RespT >& method,
uint32_t deadline,
const std::vector< std::pair< std::string, std::string > >& metadata) {
auto [p, sf] = folly::makePromiseContract< Result< RespT > >();
auto data = new ClientRpcDataFuture< ReqT, RespT >(std::move(p));
prepare_and_send_unary(data, request, method, deadline, metadata);
return std::move(sf);
}

template < typename ReqT, typename RespT >
AsyncResult< RespT > call_unary(const ReqT& request, const unary_call_t< ReqT, RespT >& method,
uint32_t deadline) {
return call_unary(request, method, deadline, {});
}

StubPtr< ServiceT > m_stub;
Expand All @@ -362,12 +417,19 @@ class GrpcAsyncClient : public GrpcBaseClient {
std::shared_ptr< sisl::GrpcTokenClient > token_client) :
m_generic_stub(std::move(stub)), m_worker(worker), m_token_client(token_client) {}

void prepare_and_send_unary_generic(ClientRpcDataInternal< grpc::ByteBuffer, grpc::ByteBuffer >* data,
const grpc::ByteBuffer& request, const std::string& method, uint32_t deadline);

void call_unary(const grpc::ByteBuffer& request, const std::string& method,
const generic_unary_callback_t& callback, uint32_t deadline);

void call_rpc(const generic_req_builder_cb_t& builder_cb, const std::string& method,
const generic_rpc_comp_cb_t& done_cb, uint32_t deadline);

// futures version of call_unary
generic_async_result_t call_unary(const grpc::ByteBuffer& request, const std::string& method,
uint32_t deadline);

std::unique_ptr< grpc::GenericStub > m_generic_stub;
GrpcAsyncClientWorker* m_worker;
std::shared_ptr< sisl::GrpcTokenClient > m_token_client;
Expand Down
13 changes: 7 additions & 6 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ add_subdirectory (sobject)
# on Folly and pistache. It is unknown if Windows is supported...
list(APPEND POSIX_LIBRARIES )
list(APPEND SISL_DEPS )
if(${userspace-rcu_FOUND})
add_subdirectory (grpc)
list(APPEND POSIX_LIBRARIES
$<TARGET_OBJECTS:sisl_grpc>
)
endif()

if(${folly_FOUND})
if(${userspace-rcu_FOUND})
add_subdirectory (grpc)
list(APPEND POSIX_LIBRARIES
$<TARGET_OBJECTS:sisl_grpc>
)
endif()
add_subdirectory (cache)
add_subdirectory (fds)
add_subdirectory (file_watcher)
Expand Down
1 change: 1 addition & 0 deletions src/grpc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ target_sources(sisl_grpc PRIVATE
target_link_libraries(sisl_grpc
gRPC::grpc++
flatbuffers::flatbuffers
Folly::Folly
${COMMON_DEPS}
)

Expand Down
31 changes: 20 additions & 11 deletions src/grpc/rpc_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,28 +132,37 @@ void GrpcAsyncClientWorker::shutdown_all() {
s_workers.clear();
}

void GrpcAsyncClient::GenericAsyncStub::call_unary(const grpc::ByteBuffer& request, const std::string& method,
const generic_unary_callback_t& callback, uint32_t deadline) {
auto data = new GenericClientRpcDataInternal(callback);
void GrpcAsyncClient::GenericAsyncStub::prepare_and_send_unary_generic(
ClientRpcDataInternal< grpc::ByteBuffer, grpc::ByteBuffer >* data, const grpc::ByteBuffer& request,
const std::string& method, uint32_t deadline) {
data->set_deadline(deadline);
if (m_token_client) { data->add_metadata(m_token_client->get_auth_header_key(), m_token_client->get_token()); }
// Note that async unary RPCs don't post a CQ tag in call
data->m_generic_resp_reader_ptr = m_generic_stub->PrepareUnaryCall(&data->context(), method, request, cq());
data->m_generic_resp_reader_ptr = m_generic_stub->PrepareUnaryCall(&data->m_context, method, request, cq());
data->m_generic_resp_reader_ptr->StartCall();
// CQ tag posted here
data->m_generic_resp_reader_ptr->Finish(&data->reply(), &data->status(), (void*)data);
return;
}

void GrpcAsyncClient::GenericAsyncStub::call_unary(const grpc::ByteBuffer& request, const std::string& method,
const generic_unary_callback_t& callback, uint32_t deadline) {
auto data = new GenericClientRpcDataCallback(callback);
prepare_and_send_unary_generic(data, request, method, deadline);
}

void GrpcAsyncClient::GenericAsyncStub::call_rpc(const generic_req_builder_cb_t& builder_cb, const std::string& method,
const generic_rpc_comp_cb_t& done_cb, uint32_t deadline) {
auto cd = new GenericClientRpcData(done_cb);
builder_cb(cd->m_req);
cd->set_deadline(deadline);
if (m_token_client) { cd->add_metadata(m_token_client->get_auth_header_key(), m_token_client->get_token()); }
cd->m_generic_resp_reader_ptr = m_generic_stub->PrepareUnaryCall(&cd->context(), method, cd->m_req, cq());
cd->m_generic_resp_reader_ptr->StartCall();
cd->m_generic_resp_reader_ptr->Finish(&cd->reply(), &cd->status(), (void*)cd);
prepare_and_send_unary_generic(cd, cd->m_req, method, deadline);
}

generic_async_result_t GrpcAsyncClient::GenericAsyncStub::call_unary(const grpc::ByteBuffer& request,
const std::string& method, uint32_t deadline) {
auto [p, sf] = folly::makePromiseContract< generic_result_t >();
auto data = new GenericClientRpcDataFuture(std::move(p));
prepare_and_send_unary_generic(data, request, method, deadline);
return std::move(sf);
}

std::unique_ptr< GrpcAsyncClient::GenericAsyncStub > GrpcAsyncClient::make_generic_stub(const std::string& worker) {
Expand All @@ -163,4 +172,4 @@ std::unique_ptr< GrpcAsyncClient::GenericAsyncStub > GrpcAsyncClient::make_gener
return std::make_unique< GrpcAsyncClient::GenericAsyncStub >(std::make_unique< grpc::GenericStub >(m_channel), w,
m_token_client);
}
} // namespace sisl::grpc
} // namespace sisl
Loading

0 comments on commit 4ce39a9

Please sign in to comment.