Skip to content

Commit

Permalink
Fix round robin load balancing with service discovery (#882)
Browse files Browse the repository at this point in the history
GrpcNameResolverProvider maps individual ServiceInstances provided by
the discovery client 1-to-1 to instances of EquivalentAddressGroup
instead of creating a single EquivalentAddressGroup instance with a
list of addresses, as the GRPC load balancing algorithms are designed to
balance across these group instances.

A test is added to verify that the load balancing works correctly.

This resolves #818.
  • Loading branch information
jeremyg484 authored Apr 17, 2024
1 parent f42121f commit 6c93290
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,6 @@
*/
package io.micronaut.grpc.discovery;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;
import java.util.Collections;
import java.util.List;

import io.grpc.Attributes;
import io.grpc.EquivalentAddressGroup;
import io.grpc.ManagedChannelBuilder;
Expand All @@ -44,13 +38,17 @@
import io.micronaut.discovery.ServiceInstanceList;
import io.micronaut.discovery.exceptions.NoAvailableServiceException;
import io.micronaut.grpc.channels.GrpcDefaultManagedChannelConfiguration;

import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Singleton;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;

import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Collections;
import java.util.List;

import static io.micronaut.grpc.discovery.GrpcNameResolverProvider.ENABLED;


Expand Down Expand Up @@ -184,10 +182,10 @@ public void start(Listener listener) {
}

private List<EquivalentAddressGroup> toAddresses(List<ServiceInstance> instances) {
final List<SocketAddress> socketAddresses = instances.stream().map(serviceInstance ->
new InetSocketAddress(serviceInstance.getHost(), serviceInstance.getPort())
).map(SocketAddress.class::cast).toList();
return Collections.singletonList(new EquivalentAddressGroup(socketAddresses));
return instances.stream()
.map(serviceInstance -> new InetSocketAddress(serviceInstance.getHost(), serviceInstance.getPort()))
.map(EquivalentAddressGroup::new)
.toList();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package io.micronaut.grpc.discovery

import io.grpc.ManagedChannel
import io.grpc.examples.helloworld.HelloReply
import io.grpc.examples.helloworld.HelloRequest
import io.grpc.examples.helloworld.MultiNodeGreeterGrpc
import io.grpc.stub.StreamObserver
import io.micronaut.context.ApplicationContext
import io.micronaut.context.annotation.Factory
import io.micronaut.context.annotation.Requires
import io.micronaut.context.annotation.Value
import io.micronaut.core.io.socket.SocketUtils
import io.micronaut.grpc.annotation.GrpcChannel
import io.micronaut.runtime.server.EmbeddedServer
import jakarta.inject.Singleton
import spock.lang.Specification

class GrpcLoadBalancedServiceSpec extends Specification {

void "test GRPC named service discovery with round robin load balancing"() {

given: "A service is run on multiple servers"
def port1 = SocketUtils.findAvailableTcpPort()
def port2 = SocketUtils.findAvailableTcpPort()
def port3 = SocketUtils.findAvailableTcpPort()

EmbeddedServer server1 = ApplicationContext.run(EmbeddedServer, [
'spec.name': 'GrpcLoadBalancedServiceSpec-Server',
'micronaut.application.name': 'greet',
'grpc.server.port' : port1
])

EmbeddedServer server2 = ApplicationContext.run(EmbeddedServer, [
'spec.name': 'GrpcLoadBalancedServiceSpec-Server',
'micronaut.application.name': 'greet',
'grpc.server.port' : port2
])

EmbeddedServer server3 = ApplicationContext.run(EmbeddedServer, [
'spec.name': 'GrpcLoadBalancedServiceSpec-Server',
'micronaut.application.name': 'greet',
'grpc.server.port' : port3
])

and: 'then a client is run that declares the service'
ApplicationContext client = ApplicationContext.run([
'spec.name': 'GrpcLoadBalancedServiceSpec-Client',
(GrpcNameResolverProvider.ENABLED) : true,
'grpc.channels.greet.plaintext' : true,
'grpc.channels.greet.default-load-balancing-policy' : 'round_robin',
'micronaut.http.services.greet.urls[0]': server1.URL.toString(),
'micronaut.http.services.greet.urls[1]': server2.URL.toString(),
'micronaut.http.services.greet.urls[2]': server3.URL.toString()
])

when: 'the service is called many times'
MultiNodeGreeterGrpc.MultiNodeGreeterFutureStub stub = client.getBean(MultiNodeGreeterGrpc.MultiNodeGreeterFutureStub)
Set<String> results = new HashSet<>()
for (int i=0; i<12; i++) {
results.add(stub.sayHello(HelloRequest.newBuilder().setName("test").build()).get().message)
}

then: 'the calls are load balanced across the 3 different servers'
results.size() == 3

cleanup:
client.stop()
server1.stop()
server2.stop()
server3.stop()
}

@Requires(property = "spec.name", value = "GrpcLoadBalancedServiceSpec-Client")
@Factory
static class Clients {
@Singleton
MultiNodeGreeterGrpc.MultiNodeGreeterFutureStub futureStub(@GrpcChannel("greet") ManagedChannel channel) {
MultiNodeGreeterGrpc.newFutureStub(
channel
)
}
}

@Singleton
@Requires(property = "spec.name", value = "GrpcLoadBalancedServiceSpec-Server")
static class MultiNodeGreeterImpl extends MultiNodeGreeterGrpc.MultiNodeGreeterImplBase {

@Value('${grpc.server.port}')
Integer port

@Override
void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + request.getName() + " from " + port).build()
responseObserver.onNext(reply)
responseObserver.onCompleted()
}
}
}
7 changes: 6 additions & 1 deletion grpc-client-runtime/src/test/proto/helloworld.proto
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ service Greeter2 {
rpc SayHello (HelloRequest) returns (HelloReply) {}
}

// Multi-node greeting service definition
service MultiNodeGreeter {
rpc SayHello (HelloRequest) returns (HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
string name = 1;
Expand All @@ -40,4 +45,4 @@ message HelloRequest {
// The response message containing the greetings
message HelloReply {
string message = 1;
}
}

0 comments on commit 6c93290

Please sign in to comment.