Skip to content

Commit

Permalink
examples: Add pre-serialized-message example (grpc#10112)
Browse files Browse the repository at this point in the history
This came out of the question grpc#9707, and could be useful to others.
  • Loading branch information
ejona86 authored Aug 15, 2023
1 parent fba7835 commit 849186a
Show file tree
Hide file tree
Showing 5 changed files with 319 additions and 0 deletions.
2 changes: 2 additions & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,8 @@ $ bazel-bin/hello-world-client
- [JWT-based Authentication](example-jwt-auth)
- [Pre-serialized messages](src/main/java/io/grpc/examples/preserialized)
## Unit test examples
Examples for unit testing gRPC clients and servers are located in [examples/src/test](src/test).
Expand Down
2 changes: 2 additions & 0 deletions examples/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ createStartScripts('io.grpc.examples.multiplex.MultiplexingServer')
createStartScripts('io.grpc.examples.multiplex.SharingClient')
createStartScripts('io.grpc.examples.nameresolve.NameResolveClient')
createStartScripts('io.grpc.examples.nameresolve.NameResolveServer')
createStartScripts('io.grpc.examples.preserialized.PreSerializedClient')
createStartScripts('io.grpc.examples.preserialized.PreSerializedServer')
createStartScripts('io.grpc.examples.retrying.RetryingHelloWorldClient')
createStartScripts('io.grpc.examples.retrying.RetryingHelloWorldServer')
createStartScripts('io.grpc.examples.routeguide.RouteGuideClient')
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2023 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.examples.preserialized;

import com.google.common.io.ByteStreams;
import io.grpc.MethodDescriptor;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;

/**
* A marshaller that produces a byte[] instead of decoding into typical POJOs. It can be used for
* any message type.
*/
final class ByteArrayMarshaller implements MethodDescriptor.Marshaller<byte[]> {
@Override
public byte[] parse(InputStream stream) {
try {
return ByteStreams.toByteArray(stream);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}

@Override
public InputStream stream(byte[] b) {
return new ByteArrayInputStream(b);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright 2023 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.examples.preserialized;

import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.Grpc;
import io.grpc.InsecureChannelCredentials;
import io.grpc.ManagedChannel;
import io.grpc.MethodDescriptor;
import io.grpc.StatusRuntimeException;
import io.grpc.examples.helloworld.GreeterGrpc;
import io.grpc.examples.helloworld.HelloReply;
import io.grpc.examples.helloworld.HelloRequest;
import io.grpc.stub.ClientCalls;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* A client that requests a greeting from a hello-world server, but using a pre-serialized request.
* This is a performance optimization that can be useful if you read the request from on-disk or a
* database where it is already serialized, or if you need to send the same complicated message to
* many servers. The same approach can avoid deserializing responses, to be stored in a database.
* This adjustment is client-side only; the server is unable to detect the difference, so this
* client is fully-compatible with the normal {@link HelloWorldServer}.
*/
public class PreSerializedClient {
private static final Logger logger = Logger.getLogger(PreSerializedClient.class.getName());

/**
* Modified sayHello() descriptor with bytes as the request, instead of HelloRequest. By adjusting
* toBuilder() you can choose which of the request and response are bytes.
*/
private static final MethodDescriptor<byte[], HelloReply> SAY_HELLO
= GreeterGrpc.getSayHelloMethod()
.toBuilder(new ByteArrayMarshaller(), GreeterGrpc.getSayHelloMethod().getResponseMarshaller())
.build();

private final Channel channel;

/** Construct client for accessing hello-world server using the existing channel. */
public PreSerializedClient(Channel channel) {
this.channel = channel;
}

/** Say hello to server. */
public void greet(String name) {
logger.info("Will try to greet " + name + " ...");
byte[] request = HelloRequest.newBuilder().setName(name).build().toByteArray();
HelloReply response;
try {
// Stubs use ClientCalls to send RPCs. Since the generated stub won't have byte[] in its
// method signature, this uses ClientCalls directly. It isn't as convenient, but it behaves
// the same as a normal stub.
response = ClientCalls.blockingUnaryCall(channel, SAY_HELLO, CallOptions.DEFAULT, request);
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
return;
}
logger.info("Greeting: " + response.getMessage());
}

/**
* Greet server. If provided, the first element of {@code args} is the name to use in the
* greeting. The second argument is the target server.
*/
public static void main(String[] args) throws Exception {
String user = "world";
String target = "localhost:50051";
if (args.length > 0) {
if ("--help".equals(args[0])) {
System.err.println("Usage: [name [target]]");
System.err.println("");
System.err.println(" name The name you wish to be greeted by. Defaults to " + user);
System.err.println(" target The server to connect to. Defaults to " + target);
System.exit(1);
}
user = args[0];
}
if (args.length > 1) {
target = args[1];
}

ManagedChannel channel = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create())
.build();
try {
PreSerializedClient client = new PreSerializedClient(channel);
client.greet(user);
} finally {
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Copyright 2023 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.examples.preserialized;

import io.grpc.BindableService;
import io.grpc.Grpc;
import io.grpc.InsecureServerCredentials;
import io.grpc.MethodDescriptor;
import io.grpc.Server;
import io.grpc.ServerCallHandler;
import io.grpc.ServerMethodDefinition;
import io.grpc.ServerServiceDefinition;
import io.grpc.ServiceDescriptor;
import io.grpc.examples.helloworld.GreeterGrpc;
import io.grpc.examples.helloworld.HelloReply;
import io.grpc.examples.helloworld.HelloRequest;
import io.grpc.stub.ServerCalls;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

/**
* Server that provides a {@code Greeter} service, but that uses a pre-serialized response. This is
* a performance optimization that can be useful if you read the response from on-disk or a database
* where it is already serialized, or if you need to send the same complicated message to many
* clients. The same approach can avoid deserializing requests, to be stored in a database. This
* adjustment is server-side only; the client is unable to detect the differences, so this server is
* fully-compatible with the normal {@link HelloWorldClient}.
*/
public class PreSerializedServer {
private static final Logger logger = Logger.getLogger(PreSerializedServer.class.getName());

private Server server;

private void start() throws IOException {
int port = 50051;
server = Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create())
.addService(new GreeterImpl())
.build()
.start();
logger.info("Server started, listening on " + port);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
System.err.println("*** shutting down gRPC server since JVM is shutting down");
try {
PreSerializedServer.this.stop();
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
System.err.println("*** server shut down");
}
});
}

private void stop() throws InterruptedException {
if (server != null) {
server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
}
}

/**
* Await termination on the main thread since the grpc library uses daemon threads.
*/
private void blockUntilShutdown() throws InterruptedException {
if (server != null) {
server.awaitTermination();
}
}

/**
* Main launches the server from the command line.
*/
public static void main(String[] args) throws IOException, InterruptedException {
final PreSerializedServer server = new PreSerializedServer();
server.start();
server.blockUntilShutdown();
}

static class GreeterImpl implements GreeterGrpc.AsyncService, BindableService {

public void byteSayHello(HelloRequest req, StreamObserver<byte[]> responseObserver) {
HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName()).build();
responseObserver.onNext(reply.toByteArray());
responseObserver.onCompleted();
}

@Override
public ServerServiceDefinition bindService() {
MethodDescriptor<HelloRequest, HelloReply> sayHello = GreeterGrpc.getSayHelloMethod();
// Modifying the method descriptor to use bytes as the response, instead of HelloReply. By
// adjusting toBuilder() you can choose which of the request and response are bytes.
MethodDescriptor<HelloRequest, byte[]> byteSayHello = sayHello
.toBuilder(sayHello.getRequestMarshaller(), new ByteArrayMarshaller())
.build();
// GreeterGrpc.bindService() will bind every service method, including sayHello(). (Although
// Greeter only has one method, this approach would work for any service.) AsyncService
// provides a default implementation of sayHello() that returns UNIMPLEMENTED, and that
// implementation will be used by bindService(). replaceMethod() will rewrite that method to
// use our byte-based method instead.
//
// The generated bindService() uses ServerCalls to make RPC handlers. Since the generated
// bindService() won't expect byte[] in the AsyncService, this uses ServerCalls directly. It
// isn't as convenient, but it behaves the same as a normal RPC handler.
return replaceMethod(
GreeterGrpc.bindService(this),
byteSayHello,
ServerCalls.asyncUnaryCall(this::byteSayHello));
}

/** Rewrites the ServerServiceDefinition replacing one method's definition. */
private static <ReqT, RespT> ServerServiceDefinition replaceMethod(
ServerServiceDefinition def,
MethodDescriptor<ReqT, RespT> newDesc,
ServerCallHandler<ReqT, RespT> newHandler) {
// There are two data structures involved. The first is the "descriptor" which describes the
// service and methods as a schema. This is the same on client and server. The second is the
// "definition" which includes the handlers to execute methods. This is specific to the server
// and is generated by "bind." This adjusts both the descriptor and definition.

// Descriptor
ServiceDescriptor desc = def.getServiceDescriptor();
ServiceDescriptor.Builder descBuilder = ServiceDescriptor.newBuilder(desc.getName())
.setSchemaDescriptor(desc.getSchemaDescriptor())
.addMethod(newDesc); // Add the modified method
// Copy methods other than the modified one
for (MethodDescriptor<?,?> md : desc.getMethods()) {
if (newDesc.getFullMethodName().equals(md.getFullMethodName())) {
continue;
}
descBuilder.addMethod(md);
}

// Definition
ServerServiceDefinition.Builder defBuilder =
ServerServiceDefinition.builder(descBuilder.build())
.addMethod(newDesc, newHandler); // Add the modified method
// Copy methods other than the modified one
for (ServerMethodDefinition<?,?> smd : def.getMethods()) {
if (newDesc.getFullMethodName().equals(smd.getMethodDescriptor().getFullMethodName())) {
continue;
}
defBuilder.addMethod(smd);
}
return defBuilder.build();
}
}
}

0 comments on commit 849186a

Please sign in to comment.