From 6c932903057578b92bb22563203f1ada6d65b5ba Mon Sep 17 00:00:00 2001 From: Jeremy Grelle Date: Wed, 17 Apr 2024 13:51:51 -0400 Subject: [PATCH] Fix round robin load balancing with service discovery (#882) GrpcNameResolverProvider maps individual ServiceInstances provided by the discovery client 1-to-1 to instances of EquivalentAddressGroup instead of creating a single EquivalentAddressGroup instance with a list of addresses, as the GRPC load balancing algorithms are designed to balance across these group instances. A test is added to verify that the load balancing works correctly. This resolves #818. --- .../discovery/GrpcNameResolverProvider.java | 20 ++-- .../GrpcLoadBalancedServiceSpec.groovy | 98 +++++++++++++++++++ .../src/test/proto/helloworld.proto | 7 +- 3 files changed, 113 insertions(+), 12 deletions(-) create mode 100644 grpc-client-runtime/src/test/groovy/io/micronaut/grpc/discovery/GrpcLoadBalancedServiceSpec.groovy diff --git a/grpc-client-runtime/src/main/java/io/micronaut/grpc/discovery/GrpcNameResolverProvider.java b/grpc-client-runtime/src/main/java/io/micronaut/grpc/discovery/GrpcNameResolverProvider.java index d5f001b9f..7aef25c0f 100644 --- a/grpc-client-runtime/src/main/java/io/micronaut/grpc/discovery/GrpcNameResolverProvider.java +++ b/grpc-client-runtime/src/main/java/io/micronaut/grpc/discovery/GrpcNameResolverProvider.java @@ -15,12 +15,6 @@ */ package io.micronaut.grpc.discovery; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.net.URI; -import java.util.Collections; -import java.util.List; - import io.grpc.Attributes; import io.grpc.EquivalentAddressGroup; import io.grpc.ManagedChannelBuilder; @@ -44,13 +38,17 @@ import io.micronaut.discovery.ServiceInstanceList; import io.micronaut.discovery.exceptions.NoAvailableServiceException; import io.micronaut.grpc.channels.GrpcDefaultManagedChannelConfiguration; - import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import jakarta.inject.Singleton; import reactor.core.Disposable; import reactor.core.publisher.Flux; +import java.net.InetSocketAddress; +import java.net.URI; +import java.util.Collections; +import java.util.List; + import static io.micronaut.grpc.discovery.GrpcNameResolverProvider.ENABLED; @@ -184,10 +182,10 @@ public void start(Listener listener) { } private List toAddresses(List instances) { - final List socketAddresses = instances.stream().map(serviceInstance -> - new InetSocketAddress(serviceInstance.getHost(), serviceInstance.getPort()) - ).map(SocketAddress.class::cast).toList(); - return Collections.singletonList(new EquivalentAddressGroup(socketAddresses)); + return instances.stream() + .map(serviceInstance -> new InetSocketAddress(serviceInstance.getHost(), serviceInstance.getPort())) + .map(EquivalentAddressGroup::new) + .toList(); } @Override diff --git a/grpc-client-runtime/src/test/groovy/io/micronaut/grpc/discovery/GrpcLoadBalancedServiceSpec.groovy b/grpc-client-runtime/src/test/groovy/io/micronaut/grpc/discovery/GrpcLoadBalancedServiceSpec.groovy new file mode 100644 index 000000000..45e2a6aee --- /dev/null +++ b/grpc-client-runtime/src/test/groovy/io/micronaut/grpc/discovery/GrpcLoadBalancedServiceSpec.groovy @@ -0,0 +1,98 @@ +package io.micronaut.grpc.discovery + +import io.grpc.ManagedChannel +import io.grpc.examples.helloworld.HelloReply +import io.grpc.examples.helloworld.HelloRequest +import io.grpc.examples.helloworld.MultiNodeGreeterGrpc +import io.grpc.stub.StreamObserver +import io.micronaut.context.ApplicationContext +import io.micronaut.context.annotation.Factory +import io.micronaut.context.annotation.Requires +import io.micronaut.context.annotation.Value +import io.micronaut.core.io.socket.SocketUtils +import io.micronaut.grpc.annotation.GrpcChannel +import io.micronaut.runtime.server.EmbeddedServer +import jakarta.inject.Singleton +import spock.lang.Specification + +class GrpcLoadBalancedServiceSpec extends Specification { + + void "test GRPC named service discovery with round robin load balancing"() { + + given: "A service is run on multiple servers" + def port1 = SocketUtils.findAvailableTcpPort() + def port2 = SocketUtils.findAvailableTcpPort() + def port3 = SocketUtils.findAvailableTcpPort() + + EmbeddedServer server1 = ApplicationContext.run(EmbeddedServer, [ + 'spec.name': 'GrpcLoadBalancedServiceSpec-Server', + 'micronaut.application.name': 'greet', + 'grpc.server.port' : port1 + ]) + + EmbeddedServer server2 = ApplicationContext.run(EmbeddedServer, [ + 'spec.name': 'GrpcLoadBalancedServiceSpec-Server', + 'micronaut.application.name': 'greet', + 'grpc.server.port' : port2 + ]) + + EmbeddedServer server3 = ApplicationContext.run(EmbeddedServer, [ + 'spec.name': 'GrpcLoadBalancedServiceSpec-Server', + 'micronaut.application.name': 'greet', + 'grpc.server.port' : port3 + ]) + + and: 'then a client is run that declares the service' + ApplicationContext client = ApplicationContext.run([ + 'spec.name': 'GrpcLoadBalancedServiceSpec-Client', + (GrpcNameResolverProvider.ENABLED) : true, + 'grpc.channels.greet.plaintext' : true, + 'grpc.channels.greet.default-load-balancing-policy' : 'round_robin', + 'micronaut.http.services.greet.urls[0]': server1.URL.toString(), + 'micronaut.http.services.greet.urls[1]': server2.URL.toString(), + 'micronaut.http.services.greet.urls[2]': server3.URL.toString() + ]) + + when: 'the service is called many times' + MultiNodeGreeterGrpc.MultiNodeGreeterFutureStub stub = client.getBean(MultiNodeGreeterGrpc.MultiNodeGreeterFutureStub) + Set results = new HashSet<>() + for (int i=0; i<12; i++) { + results.add(stub.sayHello(HelloRequest.newBuilder().setName("test").build()).get().message) + } + + then: 'the calls are load balanced across the 3 different servers' + results.size() == 3 + + cleanup: + client.stop() + server1.stop() + server2.stop() + server3.stop() + } + + @Requires(property = "spec.name", value = "GrpcLoadBalancedServiceSpec-Client") + @Factory + static class Clients { + @Singleton + MultiNodeGreeterGrpc.MultiNodeGreeterFutureStub futureStub(@GrpcChannel("greet") ManagedChannel channel) { + MultiNodeGreeterGrpc.newFutureStub( + channel + ) + } + } + + @Singleton + @Requires(property = "spec.name", value = "GrpcLoadBalancedServiceSpec-Server") + static class MultiNodeGreeterImpl extends MultiNodeGreeterGrpc.MultiNodeGreeterImplBase { + + @Value('${grpc.server.port}') + Integer port + + @Override + void sayHello(HelloRequest request, StreamObserver responseObserver) { + HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + request.getName() + " from " + port).build() + responseObserver.onNext(reply) + responseObserver.onCompleted() + } + } +} diff --git a/grpc-client-runtime/src/test/proto/helloworld.proto b/grpc-client-runtime/src/test/proto/helloworld.proto index 38a3d17fe..fac0aee21 100644 --- a/grpc-client-runtime/src/test/proto/helloworld.proto +++ b/grpc-client-runtime/src/test/proto/helloworld.proto @@ -32,6 +32,11 @@ service Greeter2 { rpc SayHello (HelloRequest) returns (HelloReply) {} } +// Multi-node greeting service definition +service MultiNodeGreeter { + rpc SayHello (HelloRequest) returns (HelloReply) {} +} + // The request message containing the user's name. message HelloRequest { string name = 1; @@ -40,4 +45,4 @@ message HelloRequest { // The response message containing the greetings message HelloReply { string message = 1; -} \ No newline at end of file +}