Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the option to get the request as sisl::io_blob from the generic service #194

Merged
merged 1 commit into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

class SISLConan(ConanFile):
name = "sisl"
version = "10.2.5"
version = "10.3.1"

homepage = "https://github.com/eBay/sisl"
description = "Library for fast data structures, utilities"
Expand Down
32 changes: 29 additions & 3 deletions include/sisl/grpc/generic_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#pragma once

#include <grpcpp/generic/async_generic_service.h>
#include "sisl/fds/buffer.hpp"
#include "rpc_call.hpp"

namespace sisl {
Expand Down Expand Up @@ -49,12 +50,35 @@ class GenericRpcData : public RpcDataAbstract, sisl::ObjLifeCounter< GenericRpcD
RpcDataAbstract* create_new() override { return new GenericRpcData(m_rpc_info, m_queue_idx); }
void set_status(grpc::Status& status) { m_retstatus = status; }

~GenericRpcData() override = default;
~GenericRpcData() override {
if (m_request_blob_allocated) { m_request_blob.buf_free(); }
}

// There is only one generic static rpc data for all rpcs.
size_t get_rpc_idx() const override { return 0; }

const grpc::ByteBuffer& request() const { return m_request; }
sisl::io_blob& request_blob() {
if (!m_request_blob.bytes) {
grpc::Slice slice;
auto status = m_request.TrySingleSlice(&slice);
if (status.ok()) {
m_request_blob.bytes = const_cast< uint8_t* >(slice.begin());
m_request_blob.size = slice.size();
} else if (status.error_code() == grpc::StatusCode::FAILED_PRECONDITION) {
// If the ByteBuffer is not made up of single slice, TrySingleSlice() will fail.
// DumpSingleSlice() should work in those cases but will incur a copy.
if (status = m_request.DumpToSingleSlice(&slice); status.ok()) {
m_request_blob.buf_alloc(slice.size());
m_request_blob_allocated = true;
std::memcpy(static_cast< void* >(m_request_blob.bytes), static_cast< const void* >(slice.begin()),
slice.size());
}
}
}
return m_request_blob;
}

grpc::ByteBuffer& response() { return m_response; }

void enqueue_call_request(::grpc::ServerCompletionQueue& cq) override {
Expand All @@ -78,6 +102,8 @@ class GenericRpcData : public RpcDataAbstract, sisl::ObjLifeCounter< GenericRpcD
grpc::GenericServerContext m_ctx;
grpc::ByteBuffer m_request;
grpc::ByteBuffer m_response;
sisl::io_blob m_request_blob;
bool m_request_blob_allocated{false};
grpc::Status m_retstatus{grpc::Status::OK};
// user can set and retrieve the context. Its life cycle is tied to that of rpc data.
generic_rpc_ctx_ptr m_rpc_context;
Expand All @@ -104,7 +130,7 @@ class GenericRpcData : public RpcDataAbstract, sisl::ObjLifeCounter< GenericRpcD
return in_shutdown ? nullptr : create_new();
}

RpcDataAbstract* on_buf_read(bool ) {
RpcDataAbstract* on_buf_read(bool) {
auto this_rpc_data = boost::intrusive_ptr< GenericRpcData >{this};
// take a ref before the handler cb is called.
// unref is called in send_response which is handled by us (in case of sync calls)
Expand All @@ -114,7 +140,7 @@ class GenericRpcData : public RpcDataAbstract, sisl::ObjLifeCounter< GenericRpcD
return nullptr;
}

RpcDataAbstract* on_buf_write(bool ) {
RpcDataAbstract* on_buf_write(bool) {
m_stream.Finish(m_retstatus, static_cast< void* >(m_request_completed_tag.ref()));
unref();
return nullptr;
Expand Down
19 changes: 14 additions & 5 deletions src/grpc/tests/function/echo_async_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ struct DataMessage {
}
};

static void DeserializeFromByteBuffer(const grpc::ByteBuffer& buffer, DataMessage& msg) {
static void DeserializeFromBuffer(const grpc::ByteBuffer& buffer, DataMessage& msg) {
std::vector< grpc::Slice > slices;
(void)buffer.Dump(&slices);
std::string buf;
Expand All @@ -72,6 +72,14 @@ static void DeserializeFromByteBuffer(const grpc::ByteBuffer& buffer, DataMessag
}
msg.DeserializeFromString(buf);
}

static void DeserializeFromBuffer(sisl::io_blob const& buffer, DataMessage& msg) {
std::string buf;
buf.reserve(buffer.size);
buf.append(reinterpret_cast< const char* >(buffer.bytes), buffer.size);
msg.DeserializeFromString(buf);
}

static void SerializeToByteBuffer(grpc::ByteBuffer& buffer, const DataMessage& msg) {
std::string buf;
msg.SerializeToString(buf);
Expand Down Expand Up @@ -115,7 +123,7 @@ class TestClient {
RELEASE_ASSERT_EQ(status.ok(), true, "generic request {} failed, status {}: {}", req.m_seqno,
status.error_code(), status.error_message());
DataMessage svr_msg;
DeserializeFromByteBuffer(reply, svr_msg);
DeserializeFromBuffer(reply, svr_msg);
RELEASE_ASSERT_EQ(req.m_seqno, svr_msg.m_seqno);
RELEASE_ASSERT_EQ(req.m_buf, svr_msg.m_buf);
{
Expand Down Expand Up @@ -323,9 +331,10 @@ class TestServer {
std::atomic< uint32_t > num_calls = 0ul;
std::atomic< uint32_t > num_completions = 0ul;

static void set_response(const grpc::ByteBuffer& req, grpc::ByteBuffer& resp) {
template < typename BufT >
static void set_response(BufT const& req, grpc::ByteBuffer& resp) {
DataMessage cli_request;
DeserializeFromByteBuffer(req, cli_request);
DeserializeFromBuffer(req, cli_request);
RELEASE_ASSERT((cli_request.m_buf == GENERIC_CLIENT_MESSAGE), "Could not parse response buffer");
SerializeToByteBuffer(resp, cli_request);
}
Expand All @@ -344,7 +353,7 @@ class TestServer {
if ((++num_calls % 2) == 0) {
LOGDEBUGMOD(grpc_server, "respond async generic request, call_num {}", num_calls.load());
std::thread([this, rpc = rpc_data] {
set_response(rpc->request(), rpc->response());
set_response(rpc->request_blob(), rpc->response());
rpc->send_response();
}).detach();
return false;
Expand Down