diff --git a/conanfile.py b/conanfile.py index c569af07..321ea4cb 100644 --- a/conanfile.py +++ b/conanfile.py @@ -8,7 +8,7 @@ class SISLConan(ConanFile): name = "sisl" - version = "10.1.4" + version = "10.2.1" homepage = "https://github.com/eBay/sisl" description = "Library for fast data structures, utilities" diff --git a/include/sisl/grpc/rpc_client.hpp b/include/sisl/grpc/rpc_client.hpp index b06c1470..5b54190d 100644 --- a/include/sisl/grpc/rpc_client.hpp +++ b/include/sisl/grpc/rpc_client.hpp @@ -27,6 +27,7 @@ #include #include #include +#include #include #include @@ -63,23 +64,35 @@ template < typename RespT > using unary_callback_t = std::function< void(RespT&, ::grpc::Status& status) >; template < typename ReqT, typename RespT > -class ClientRpcDataInternal; +class ClientRpcDataCallback; + +template < typename ReqT, typename RespT > +class ClientRpcDataFuture; + +template < typename T > +using Result = folly::Expected< T, ::grpc::Status >; + +template < typename T > +using AsyncResult = folly::SemiFuture< Result< T > >; using GenericClientRpcData = ClientRpcData< grpc::ByteBuffer, grpc::ByteBuffer >; using generic_rpc_comp_cb_t = rpc_comp_cb_t< grpc::ByteBuffer, grpc::ByteBuffer >; using generic_req_builder_cb_t = req_builder_cb_t< grpc::ByteBuffer >; using generic_unary_callback_t = unary_callback_t< grpc::ByteBuffer >; -using GenericClientRpcDataInternal = ClientRpcDataInternal< grpc::ByteBuffer, grpc::ByteBuffer >; +using GenericClientRpcDataCallback = ClientRpcDataCallback< grpc::ByteBuffer, grpc::ByteBuffer >; +using GenericClientRpcDataFuture = ClientRpcDataFuture< grpc::ByteBuffer, grpc::ByteBuffer >; +using generic_result_t = Result< grpc::ByteBuffer >; +using generic_async_result_t = AsyncResult< grpc::ByteBuffer >; /** - * The specialized 'ClientRpcDataInternal' per gRPC call, it stores - * the response handler function - * + * The specialized 'ClientRpcDataInternal' per gRPC call, + * Derive from this class to create Rpc Data that can hold + * the response handler function or a promise */ template < typename ReqT, typename RespT > class ClientRpcDataInternal : public ClientRpcDataAbstract { public: - using ResponseReaderPtr = std::unique_ptr< ::grpc::ClientAsyncResponseReaderInterface< RespT > >; + using ResponseReaderPtr = std::unique_ptr<::grpc::ClientAsyncResponseReaderInterface< RespT > >; using GenericResponseReaderPtr = std::unique_ptr< grpc::GenericClientAsyncResponseReader >; /* Allow GrpcAsyncClient and its inner classes to use @@ -88,7 +101,6 @@ class ClientRpcDataInternal : public ClientRpcDataAbstract { friend class GrpcAsyncClient; ClientRpcDataInternal() = default; - ClientRpcDataInternal(const unary_callback_t< RespT >& cb) : m_cb{cb} {} virtual ~ClientRpcDataInternal() = default; // TODO: support time in any time unit -- lhuang8 @@ -103,16 +115,12 @@ class ClientRpcDataInternal : public ClientRpcDataAbstract { RespT& reply() { return m_reply; } ::grpc::ClientContext& context() { return m_context; } - virtual void handle_response([[maybe_unused]] bool ok = true) override { - // For unary call, ok is always true, `status_` will indicate error if there are any. - m_cb(m_reply, m_status); - } + virtual void handle_response(bool ok = true) = 0; void add_metadata(const std::string& meta_key, const std::string& meta_value) { m_context.AddMetadata(meta_key, meta_value); } - unary_callback_t< RespT > m_cb; RespT m_reply; ::grpc::ClientContext m_context; ::grpc::Status m_status; @@ -120,6 +128,42 @@ class ClientRpcDataInternal : public ClientRpcDataAbstract { GenericResponseReaderPtr m_generic_resp_reader_ptr; }; +/** + * callback version of ClientRpcDataInternal + */ +template < typename ReqT, typename RespT > +class ClientRpcDataCallback : public ClientRpcDataInternal< ReqT, RespT > { +public: + ClientRpcDataCallback(const unary_callback_t< RespT >& cb) : m_cb{cb} {} + + virtual void handle_response([[maybe_unused]] bool ok = true) override { + // For unary call, ok is always true, `status_` will indicate error if there are any. + if (m_cb) { m_cb(this->m_reply, this->m_status); } + } + + unary_callback_t< RespT > m_cb; +}; + +/** + * futures version of ClientRpcDataInternal + */ +template < typename ReqT, typename RespT > +class ClientRpcDataFuture : public ClientRpcDataInternal< ReqT, RespT > { +public: + ClientRpcDataFuture(folly::Promise< Result< RespT > >&& promise) : m_promise{std::move(promise)} {} + + virtual void handle_response([[maybe_unused]] bool ok = true) override { + // For unary call, ok is always true, `status_` will indicate error if there are any. + if (this->m_status.ok()) { + m_promise.setValue(this->m_reply); + } else { + m_promise.setValue(folly::makeUnexpected(this->m_status)); + } + } + + folly::Promise< Result< RespT > > m_promise; +}; + template < typename ReqT, typename RespT > class ClientRpcData : public ClientRpcDataInternal< ReqT, RespT > { public: @@ -150,7 +194,7 @@ class GrpcBaseClient { const std::string m_target_domain; const std::string m_ssl_cert; - std::shared_ptr< ::grpc::ChannelInterface > m_channel; + std::shared_ptr<::grpc::ChannelInterface > m_channel; std::shared_ptr< sisl::GrpcTokenClient > m_token_client; public: @@ -266,12 +310,29 @@ class GrpcAsyncClient : public GrpcBaseClient { /* unary call helper */ template < typename RespT > - using unary_call_return_t = std::unique_ptr< ::grpc::ClientAsyncResponseReaderInterface< RespT > >; + using unary_call_return_t = std::unique_ptr<::grpc::ClientAsyncResponseReaderInterface< RespT > >; template < typename ReqT, typename RespT > using unary_call_t = unary_call_return_t< RespT > (stub_t::*)(::grpc::ClientContext*, const ReqT&, ::grpc::CompletionQueue*); + template < typename ReqT, typename RespT > + void prepare_and_send_unary(ClientRpcDataInternal< ReqT, RespT >* data, const ReqT& request, + const unary_call_t< ReqT, RespT >& method, uint32_t deadline, + const std::vector< std::pair< std::string, std::string > >& metadata) { + data->set_deadline(deadline); + for (auto const& [key, value] : metadata) { + data->add_metadata(key, value); + } + if (m_token_client) { + data->add_metadata(m_token_client->get_auth_header_key(), m_token_client->get_token()); + } + // Note that async unary RPCs don't post a CQ tag in call + data->m_resp_reader_ptr = (m_stub.get()->*method)(&data->m_context, request, cq()); + // CQ tag posted here + data->m_resp_reader_ptr->Finish(&data->reply(), &data->status(), (void*)data); + } + // using unary_callback_t = std::function< void(RespT&, ::grpc::Status& status) >; /** @@ -292,21 +353,21 @@ class GrpcAsyncClient : public GrpcBaseClient { * OK before handling the response. If call failed, `::grpc::Status` * indicates the error code and error message. * @param deadline - deadline in seconds + * @param metadata - key value pair of the metadata to be sent with the request * */ + template < typename ReqT, typename RespT > + void call_unary(const ReqT& request, const unary_call_t< ReqT, RespT >& method, + const unary_callback_t< RespT >& callback, uint32_t deadline, + const std::vector< std::pair< std::string, std::string > >& metadata) { + auto data = new ClientRpcDataCallback< ReqT, RespT >(callback); + prepare_and_send_unary(data, request, method, deadline, metadata); + } + template < typename ReqT, typename RespT > void call_unary(const ReqT& request, const unary_call_t< ReqT, RespT >& method, const unary_callback_t< RespT >& callback, uint32_t deadline) { - auto data = new ClientRpcDataInternal< ReqT, RespT >(callback); - data->set_deadline(deadline); - if (m_token_client) { - data->add_metadata(m_token_client->get_auth_header_key(), m_token_client->get_token()); - } - // Note that async unary RPCs don't post a CQ tag in call - data->m_resp_reader_ptr = (m_stub.get()->*method)(&data->context(), request, cq()); - // CQ tag posted here - data->m_resp_reader_ptr->Finish(&data->reply(), &data->status(), (void*)data); - return; + call_unary(request, method, callback, deadline, {}); } template < typename ReqT, typename RespT > @@ -314,30 +375,24 @@ class GrpcAsyncClient : public GrpcBaseClient { const rpc_comp_cb_t< ReqT, RespT >& done_cb, uint32_t deadline) { auto cd = new ClientRpcData< ReqT, RespT >(done_cb); builder_cb(cd->m_req); - cd->set_deadline(deadline); - if (m_token_client) { - cd->add_metadata(m_token_client->get_auth_header_key(), m_token_client->get_token()); - } - cd->m_resp_reader_ptr = (m_stub.get()->*method)(&cd->context(), cd->m_req, cq()); - cd->m_resp_reader_ptr->Finish(&cd->reply(), &cd->status(), (void*)cd); + prepare_and_send_unary(cd, cd->m_req, method, deadline, {}); } + // Futures version of call_unary template < typename ReqT, typename RespT > - void call_unary(const ReqT& request, const unary_call_t< ReqT, RespT >& method, - const unary_callback_t< RespT >& callback, uint32_t deadline, - const std::vector< std::pair< std::string, std::string > >& metadata) { - auto data = new ClientRpcDataInternal< ReqT, RespT >(callback); - data->set_deadline(deadline); - for (auto const& [key, value] : metadata) { - data->add_metadata(key, value); - } - if (m_token_client) { - data->add_metadata(m_token_client->get_auth_header_key(), m_token_client->get_token()); - } - // Note that async unary RPCs don't post a CQ tag in call - data->m_resp_reader_ptr = (m_stub.get()->*method)(&data->context(), request, cq()); - // CQ tag posted here - data->m_resp_reader_ptr->Finish(&data->reply(), &data->status(), (void*)data); + AsyncResult< RespT > call_unary(const ReqT& request, const unary_call_t< ReqT, RespT >& method, + uint32_t deadline, + const std::vector< std::pair< std::string, std::string > >& metadata) { + auto [p, sf] = folly::makePromiseContract< Result< RespT > >(); + auto data = new ClientRpcDataFuture< ReqT, RespT >(std::move(p)); + prepare_and_send_unary(data, request, method, deadline, metadata); + return std::move(sf); + } + + template < typename ReqT, typename RespT > + AsyncResult< RespT > call_unary(const ReqT& request, const unary_call_t< ReqT, RespT >& method, + uint32_t deadline) { + return call_unary(request, method, deadline, {}); } StubPtr< ServiceT > m_stub; @@ -362,12 +417,19 @@ class GrpcAsyncClient : public GrpcBaseClient { std::shared_ptr< sisl::GrpcTokenClient > token_client) : m_generic_stub(std::move(stub)), m_worker(worker), m_token_client(token_client) {} + void prepare_and_send_unary_generic(ClientRpcDataInternal< grpc::ByteBuffer, grpc::ByteBuffer >* data, + const grpc::ByteBuffer& request, const std::string& method, uint32_t deadline); + void call_unary(const grpc::ByteBuffer& request, const std::string& method, const generic_unary_callback_t& callback, uint32_t deadline); void call_rpc(const generic_req_builder_cb_t& builder_cb, const std::string& method, const generic_rpc_comp_cb_t& done_cb, uint32_t deadline); + // futures version of call_unary + generic_async_result_t call_unary(const grpc::ByteBuffer& request, const std::string& method, + uint32_t deadline); + std::unique_ptr< grpc::GenericStub > m_generic_stub; GrpcAsyncClientWorker* m_worker; std::shared_ptr< sisl::GrpcTokenClient > m_token_client; diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 768116bf..ec2bd709 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -9,13 +9,14 @@ add_subdirectory (sobject) # on Folly and pistache. It is unknown if Windows is supported... list(APPEND POSIX_LIBRARIES ) list(APPEND SISL_DEPS ) -if(${userspace-rcu_FOUND}) - add_subdirectory (grpc) - list(APPEND POSIX_LIBRARIES - $ - ) -endif() + if(${folly_FOUND}) + if(${userspace-rcu_FOUND}) + add_subdirectory (grpc) + list(APPEND POSIX_LIBRARIES + $ + ) + endif() add_subdirectory (cache) add_subdirectory (fds) add_subdirectory (file_watcher) diff --git a/src/grpc/CMakeLists.txt b/src/grpc/CMakeLists.txt index c5e31fac..7ca0105b 100644 --- a/src/grpc/CMakeLists.txt +++ b/src/grpc/CMakeLists.txt @@ -13,6 +13,7 @@ target_sources(sisl_grpc PRIVATE target_link_libraries(sisl_grpc gRPC::grpc++ flatbuffers::flatbuffers + Folly::Folly ${COMMON_DEPS} ) diff --git a/src/grpc/rpc_client.cpp b/src/grpc/rpc_client.cpp index 6f36c3eb..c6f85439 100644 --- a/src/grpc/rpc_client.cpp +++ b/src/grpc/rpc_client.cpp @@ -132,28 +132,37 @@ void GrpcAsyncClientWorker::shutdown_all() { s_workers.clear(); } -void GrpcAsyncClient::GenericAsyncStub::call_unary(const grpc::ByteBuffer& request, const std::string& method, - const generic_unary_callback_t& callback, uint32_t deadline) { - auto data = new GenericClientRpcDataInternal(callback); +void GrpcAsyncClient::GenericAsyncStub::prepare_and_send_unary_generic( + ClientRpcDataInternal< grpc::ByteBuffer, grpc::ByteBuffer >* data, const grpc::ByteBuffer& request, + const std::string& method, uint32_t deadline) { data->set_deadline(deadline); if (m_token_client) { data->add_metadata(m_token_client->get_auth_header_key(), m_token_client->get_token()); } // Note that async unary RPCs don't post a CQ tag in call - data->m_generic_resp_reader_ptr = m_generic_stub->PrepareUnaryCall(&data->context(), method, request, cq()); + data->m_generic_resp_reader_ptr = m_generic_stub->PrepareUnaryCall(&data->m_context, method, request, cq()); data->m_generic_resp_reader_ptr->StartCall(); // CQ tag posted here data->m_generic_resp_reader_ptr->Finish(&data->reply(), &data->status(), (void*)data); - return; +} + +void GrpcAsyncClient::GenericAsyncStub::call_unary(const grpc::ByteBuffer& request, const std::string& method, + const generic_unary_callback_t& callback, uint32_t deadline) { + auto data = new GenericClientRpcDataCallback(callback); + prepare_and_send_unary_generic(data, request, method, deadline); } void GrpcAsyncClient::GenericAsyncStub::call_rpc(const generic_req_builder_cb_t& builder_cb, const std::string& method, const generic_rpc_comp_cb_t& done_cb, uint32_t deadline) { auto cd = new GenericClientRpcData(done_cb); builder_cb(cd->m_req); - cd->set_deadline(deadline); - if (m_token_client) { cd->add_metadata(m_token_client->get_auth_header_key(), m_token_client->get_token()); } - cd->m_generic_resp_reader_ptr = m_generic_stub->PrepareUnaryCall(&cd->context(), method, cd->m_req, cq()); - cd->m_generic_resp_reader_ptr->StartCall(); - cd->m_generic_resp_reader_ptr->Finish(&cd->reply(), &cd->status(), (void*)cd); + prepare_and_send_unary_generic(cd, cd->m_req, method, deadline); +} + +generic_async_result_t GrpcAsyncClient::GenericAsyncStub::call_unary(const grpc::ByteBuffer& request, + const std::string& method, uint32_t deadline) { + auto [p, sf] = folly::makePromiseContract< generic_result_t >(); + auto data = new GenericClientRpcDataFuture(std::move(p)); + prepare_and_send_unary_generic(data, request, method, deadline); + return std::move(sf); } std::unique_ptr< GrpcAsyncClient::GenericAsyncStub > GrpcAsyncClient::make_generic_stub(const std::string& worker) { @@ -163,4 +172,4 @@ std::unique_ptr< GrpcAsyncClient::GenericAsyncStub > GrpcAsyncClient::make_gener return std::make_unique< GrpcAsyncClient::GenericAsyncStub >(std::make_unique< grpc::GenericStub >(m_channel), w, m_token_client); } -} // namespace sisl::grpc +} // namespace sisl diff --git a/src/grpc/tests/function/echo_async_client.cpp b/src/grpc/tests/function/echo_async_client.cpp index 064b0042..1a92b9a2 100644 --- a/src/grpc/tests/function/echo_async_client.cpp +++ b/src/grpc/tests/function/echo_async_client.cpp @@ -89,7 +89,7 @@ class TestClient { static constexpr int GRPC_CALL_COUNT = 400; const std::string WORKER_NAME{"Worker-1"}; - void validate_echo_reply(const EchoRequest& req, EchoReply& reply, ::grpc::Status& status) { + void validate_echo_reply(const EchoRequest& req, EchoReply& reply, ::grpc::Status const& status) { RELEASE_ASSERT_EQ(status.ok(), true, "echo request {} failed, status {}: {}", req.message(), status.error_code(), status.error_message()); LOGDEBUGMOD(grpc_server, "echo request {} reply {}", req.message(), reply.message()); @@ -100,7 +100,7 @@ class TestClient { } } - void validate_ping_reply(const PingRequest& req, PingReply& reply, ::grpc::Status& status) { + void validate_ping_reply(const PingRequest& req, PingReply& reply, ::grpc::Status const& status) { RELEASE_ASSERT_EQ(status.ok(), true, "ping request {} failed, status {}: {}", req.seqno(), status.error_code(), status.error_message()); LOGDEBUGMOD(grpc_server, "ping request {} reply {}", req.seqno(), reply.seqno()); @@ -111,7 +111,7 @@ class TestClient { } } - void validate_generic_reply(const DataMessage& req, grpc::ByteBuffer& reply, ::grpc::Status& status) { + void validate_generic_reply(const DataMessage& req, grpc::ByteBuffer& reply, ::grpc::Status const& status) { RELEASE_ASSERT_EQ(status.ok(), true, "generic request {} failed, status {}: {}", req.m_seqno, status.error_code(), status.error_message()); DataMessage svr_msg; @@ -140,7 +140,7 @@ class TestClient { for (int i = 1; i <= GRPC_CALL_COUNT; ++i) { if ((i % 2) == 0) { - if ((i % 4) == 0) { + if ((i % 3) == 0) { EchoRequest req; req.set_message(std::to_string(i)); echo_stub->call_unary< EchoRequest, EchoReply >( @@ -149,7 +149,7 @@ class TestClient { validate_echo_reply(req, reply, status); }, 1); - } else { + } else if (i % 3 == 1) { echo_stub->call_rpc< EchoRequest, EchoReply >( [i](EchoRequest& req) { req.set_message(std::to_string(i)); }, &EchoService::StubInterface::AsyncEcho, @@ -157,10 +157,22 @@ class TestClient { validate_echo_reply(cd.req(), cd.reply(), cd.status()); }, 1); + } else { + EchoRequest req; + req.set_message(std::to_string(i)); + echo_stub->call_unary< EchoRequest, EchoReply >(req, &EchoService::StubInterface::AsyncEcho, 1) + .deferValue([req, this](auto e) { + RELEASE_ASSERT(e.hasValue(), "echo request {} failed, status {}: {}", req.message(), + e.error().error_code(), e.error().error_message()); + validate_echo_reply(req, e.value(), grpc::Status::OK); + return folly::Unit(); + }) + .get(); } } else if ((i % 3) == 0) { - // divide all numbers divisible by 3 and not by 2 into two equal buckets - if ((((i + 3) / 6) % 2) == 0) { + // divide all numbers divisible by 3 and not by 2 into three equal buckets + auto const j = (i + 3) / 6; + if (j % 3 == 0) { PingRequest req; req.set_seqno(i); ping_stub->call_unary< PingRequest, PingReply >( @@ -169,17 +181,29 @@ class TestClient { validate_ping_reply(req, reply, status); }, 1); - } else { + } else if (j % 3 == 1) { ping_stub->call_rpc< PingRequest, PingReply >( [i](PingRequest& req) { req.set_seqno(i); }, &PingService::StubInterface::AsyncPing, [this](ClientRpcData< PingRequest, PingReply >& cd) { validate_ping_reply(cd.req(), cd.reply(), cd.status()); }, 1); + } else { + PingRequest req; + req.set_seqno(i); + ping_stub->call_unary< PingRequest, PingReply >(req, &PingService::StubInterface::AsyncPing, 1) + .deferValue([req, this](auto e) { + RELEASE_ASSERT(e.hasValue(), "ping request {} failed, status {}: {}", req.seqno(), + e.error().error_code(), e.error().error_message()); + validate_ping_reply(req, e.value(), grpc::Status::OK); + return folly::Unit(); + }) + .get(); } } else { - // divide all numbers not divisible by 2 and 3 into two equal buckets - if (((i + 1) % 6) == 0) { + // divide all numbers not divisible by 2 and 3 into three equal buckets + static uint32_t j = 0u; + if ((j++ % 3) == 0) { DataMessage req(i, GENERIC_CLIENT_MESSAGE); grpc::ByteBuffer cli_buf; SerializeToByteBuffer(cli_buf, req); @@ -189,7 +213,7 @@ class TestClient { validate_generic_reply(req, reply, status); }, 1); - } else { + } else if (((j++ % 3) == 1)) { DataMessage data_msg(i, GENERIC_CLIENT_MESSAGE); generic_stub->call_rpc([data_msg](grpc::ByteBuffer& req) { SerializeToByteBuffer(req, data_msg); }, GENERIC_METHOD, @@ -197,6 +221,18 @@ class TestClient { validate_generic_reply(data_msg, cd.reply(), cd.status()); }, 1); + } else { + DataMessage req(i, GENERIC_CLIENT_MESSAGE); + grpc::ByteBuffer cli_buf; + SerializeToByteBuffer(cli_buf, req); + generic_stub->call_unary(cli_buf, GENERIC_METHOD, 1) + .deferValue([req, this](auto e) { + RELEASE_ASSERT(e.hasValue(), "generic request {} failed, status {}: {}", req.m_seqno, + e.error().error_code(), e.error().error_message()); + validate_generic_reply(req, e.value(), grpc::Status::OK); + return folly::Unit(); + }) + .get(); } } }