Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add max_receive_msg_size parameters. #230

Merged
merged 2 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class SISLConan(ConanFile):
name = "sisl"
version = "12.0.1"
version = "12.1.1"

homepage = "https://github.com/eBay/sisl"
description = "Library for fast data structures, utilities"
Expand Down
6 changes: 4 additions & 2 deletions include/sisl/grpc/rpc_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,17 @@ class GrpcServer : private boost::noncopyable {
const std::string& ssl_cert);
GrpcServer(const std::string& listen_addr, uint32_t threads, const std::string& ssl_key,
const std::string& ssl_cert, const std::shared_ptr< sisl::GrpcTokenVerifier >& auth_mgr);
GrpcServer(const std::string& listen_addr, uint32_t threads, int max_receive_msg_size, const std::string& ssl_key,
const std::string& ssl_cert, const std::shared_ptr< sisl::GrpcTokenVerifier >& auth_mgr);
virtual ~GrpcServer();

/**
* Create a new GrpcServer instance and initialize it.
*/
static GrpcServer* make(const std::string& listen_addr, uint32_t threads = 1, const std::string& ssl_key = "",
const std::string& ssl_cert = "");
const std::string& ssl_cert = "", int max_receive_msg_size = 0);
hkadayam marked this conversation as resolved.
Show resolved Hide resolved
static GrpcServer* make(const std::string& listen_addr, const std::shared_ptr< sisl::GrpcTokenVerifier >& auth_mgr,
uint32_t threads = 1, const std::string& ssl_key = "", const std::string& ssl_cert = "");
uint32_t threads = 1, const std::string& ssl_key = "", const std::string& ssl_cert = "", int max_receive_msg_size = 0);

void run(const rpc_thread_start_cb_t& thread_start_cb = nullptr);
void shutdown();
Expand Down
14 changes: 5 additions & 9 deletions src/grpc/rpc_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,17 @@ GrpcBaseClient::GrpcBaseClient(const std::string& server_addr,

void GrpcBaseClient::init() {
::grpc::SslCredentialsOptions ssl_opts;
::grpc::ChannelArguments channel_args;
channel_args.SetMaxReceiveMessageSize(-1);
hkadayam marked this conversation as resolved.
Show resolved Hide resolved
if (!m_ssl_cert.empty()) {
if (load_ssl_cert(m_ssl_cert, ssl_opts.pem_root_certs)) {
if (!m_target_domain.empty()) {
::grpc::ChannelArguments channel_args;
channel_args.SetSslTargetNameOverride(m_target_domain);
m_channel = ::grpc::CreateCustomChannel(m_server_addr, ::grpc::SslCredentials(ssl_opts), channel_args);
} else {
m_channel = ::grpc::CreateChannel(m_server_addr, ::grpc::SslCredentials(ssl_opts));
}

if (!m_target_domain.empty()) { channel_args.SetSslTargetNameOverride(m_target_domain); }
m_channel = ::grpc::CreateCustomChannel(m_server_addr, ::grpc::SslCredentials(ssl_opts), channel_args);
} else {
throw std::runtime_error("Unable to load ssl certification for grpc client");
}
} else {
m_channel = ::grpc::CreateChannel(m_server_addr, ::grpc::InsecureChannelCredentials());
m_channel = ::grpc::CreateCustomChannel(m_server_addr, ::grpc::InsecureChannelCredentials(), channel_args);
}
}

Expand Down
18 changes: 12 additions & 6 deletions src/grpc/rpc_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,18 @@ SISL_LOGGING_DEF(grpc_server)
namespace sisl {
GrpcServer::GrpcServer(const std::string& listen_addr, uint32_t threads, const std::string& ssl_key,
const std::string& ssl_cert) :
GrpcServer::GrpcServer(listen_addr, threads, ssl_key, ssl_cert, nullptr) {}

GrpcServer::GrpcServer(listen_addr, threads, 0, ssl_key, ssl_cert, nullptr) {}
GrpcServer::GrpcServer(const std::string& listen_addr, uint32_t threads, const std::string& ssl_key,
const std::string& ssl_cert, const std::shared_ptr< sisl::GrpcTokenVerifier >& auth_mgr) :
GrpcServer::GrpcServer(listen_addr, threads, 0, ssl_key, ssl_cert, auth_mgr) {}
GrpcServer::GrpcServer(const std::string& listen_addr, uint32_t threads, int max_receive_msg_size,
const std::string& ssl_key, const std::string& ssl_cert,
const std::shared_ptr< sisl::GrpcTokenVerifier >& auth_mgr) :
m_num_threads{threads}, m_auth_mgr{auth_mgr} {
if (listen_addr.empty() || threads == 0) { throw std::invalid_argument("Invalid parameter to start grpc server"); }

if (max_receive_msg_size != 0) { m_builder.SetMaxReceiveMessageSize(max_receive_msg_size); }

hkadayam marked this conversation as resolved.
Show resolved Hide resolved
if (!ssl_cert.empty() && !ssl_key.empty()) {
std::string key_contents;
std::string cert_contents;
Expand Down Expand Up @@ -76,13 +81,14 @@ GrpcServer::~GrpcServer() {
}

GrpcServer* GrpcServer::make(const std::string& listen_addr, uint32_t threads, const std::string& ssl_key,
const std::string& ssl_cert) {
return GrpcServer::make(listen_addr, nullptr, threads, ssl_key, ssl_cert);
const std::string& ssl_cert, int max_receive_msg_size) {
return GrpcServer::make(listen_addr, nullptr, threads, ssl_key, ssl_cert, max_receive_msg_size);
}

GrpcServer* GrpcServer::make(const std::string& listen_addr, const std::shared_ptr< sisl::GrpcTokenVerifier >& auth_mgr,
uint32_t threads, const std::string& ssl_key, const std::string& ssl_cert) {
return new GrpcServer(listen_addr, threads, ssl_key, ssl_cert, auth_mgr);
uint32_t threads, const std::string& ssl_key, const std::string& ssl_cert,
int max_receive_msg_size) {
return new GrpcServer(listen_addr, threads, max_receive_msg_size, ssl_key, ssl_cert, auth_mgr);
}

void GrpcServer::run(const rpc_thread_start_cb_t& thread_start_cb) {
Expand Down
46 changes: 39 additions & 7 deletions src/grpc/tests/function/echo_async_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <chrono>
#include <thread>
#include <mutex>
#include <random>

#include <sisl/logging/logging.h>
#include <sisl/options/options.h>
Expand All @@ -32,6 +33,25 @@ using namespace sisl;
using namespace ::grpc_helper_test;
using namespace std::placeholders;

#define MAX_GRPC_RECV_SIZE 64 * 1024 * 1024

static constexpr std::array< const char, 62 > alphanum{
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K',
'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f',
'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z'};

static std::string gen_random_string(size_t len) {
std::string str;
static thread_local std::random_device rd{};
static thread_local std::default_random_engine re{rd()};
std::uniform_int_distribution< size_t > rand_char{0, alphanum.size() - 1};
for (size_t i{0}; i < len; ++i) {
str += alphanum[rand_char(re)];
}
str += '\0';
return str;
}

struct DataMessage {
int m_seqno;
std::string m_buf;
Expand Down Expand Up @@ -97,7 +117,7 @@ static void SerializeToBlob(sisl::io_blob_list_t& buffer, const DataMessage& msg
buffer.emplace_back(buf);
}

static const std::string GENERIC_CLIENT_MESSAGE{"I am a super client!"};
static const std::string GENERIC_CLIENT_MESSAGE{gen_random_string(MAX_GRPC_RECV_SIZE)};
static const std::string GENERIC_METHOD{"SendData"};

class TestClient {
Expand Down Expand Up @@ -236,8 +256,14 @@ class TestClient {
} else {
// divide all numbers not divisible by 2 and 3 into three equal buckets
static uint32_t j = 0u;
static int mess_size[] = {16, 64, 64 * 1024, 16 * 1024, 16 * 1024 * 1024, 64 * 1024 * 1024 - 1024};
static std::random_device rd;
static std::mt19937 gen(rd());
static std::uniform_int_distribution< int > distrib(0, sizeof(mess_size)/sizeof(mess_size[0]) -1);
if ((j++ % 4) == 0) {
DataMessage req(i, GENERIC_CLIENT_MESSAGE);
int size = mess_size[distrib(gen)];
LOGDEBUGMOD(grpc_server, "Testing call_unary with size {}", size);
DataMessage req(i, GENERIC_CLIENT_MESSAGE.substr(0, size));
grpc::ByteBuffer cli_buf;
SerializeToByteBuffer(cli_buf, req);
generic_stub->call_unary(
Expand All @@ -247,15 +273,19 @@ class TestClient {
},
1);
} else if (((j++ % 4) == 1)) {
DataMessage data_msg(i, GENERIC_CLIENT_MESSAGE);
int size = mess_size[distrib(gen)];
LOGDEBUGMOD(grpc_server, "Testing call_rpc with size {}", size);
DataMessage data_msg(i, GENERIC_CLIENT_MESSAGE.substr(0, size));
generic_stub->call_rpc([data_msg](grpc::ByteBuffer& req) { SerializeToByteBuffer(req, data_msg); },
GENERIC_METHOD,
[data_msg, this](GenericClientRpcData& cd) {
validate_generic_reply(data_msg, cd.reply(), cd.status());
},
1);
} else if (((j++ % 4) == 2)) {
DataMessage req(i, GENERIC_CLIENT_MESSAGE);
int size = mess_size[distrib(gen)];
LOGDEBUGMOD(grpc_server, "Testing call_unary with size {}", size);
DataMessage req(i, GENERIC_CLIENT_MESSAGE.substr(0, size));
grpc::ByteBuffer cli_buf;
SerializeToByteBuffer(cli_buf, req);
generic_stub->call_unary(cli_buf, GENERIC_METHOD, 1)
Expand All @@ -266,8 +296,11 @@ class TestClient {
return folly::Unit();
})
.get();

} else {
DataMessage req(i, GENERIC_CLIENT_MESSAGE);
int size = mess_size[distrib(gen)];
LOGDEBUGMOD(grpc_server, "Testing call_unary with size {}", size);
DataMessage req(i, GENERIC_CLIENT_MESSAGE.substr(0, size));
sisl::io_blob_list_t cli_buf;
SerializeToBlob(cli_buf, req);
generic_stub->call_unary(cli_buf, GENERIC_METHOD, 1)
Expand Down Expand Up @@ -372,7 +405,6 @@ class TestServer {
static void set_response(BufT const& req, grpc::ByteBuffer& resp, bool set_buf) {
DataMessage cli_request;
DeserializeFromBuffer(req, cli_request);
RELEASE_ASSERT((cli_request.m_buf == GENERIC_CLIENT_MESSAGE), "Could not parse response buffer");
hkadayam marked this conversation as resolved.
Show resolved Hide resolved
if (set_buf) { SerializeToByteBuffer(resp, cli_request); }
}

Expand Down Expand Up @@ -417,7 +449,7 @@ class TestServer {

void start(const std::string& server_address) {
LOGINFO("Start echo and ping server on {}...", server_address);
m_grpc_server = GrpcServer::make(server_address, 4, "", "");
m_grpc_server = GrpcServer::make(server_address, 4, "", "", MAX_GRPC_RECV_SIZE);
m_echo_impl = new EchoServiceImpl();
m_echo_impl->register_service(m_grpc_server);

Expand Down
Loading