Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client never gets notified when server goes away #9

Open
apbthere opened this issue Oct 18, 2018 · 2 comments
Open

client never gets notified when server goes away #9

apbthere opened this issue Oct 18, 2018 · 2 comments
Assignees

Comments

@apbthere
Copy link

apbthere commented Oct 18, 2018

The client never gets notified when server goes away. It just waits indefinitely.
The following unit test creates a simple server that streams 200 messages to the client. Client will call Object.wait() on server socket effectively hanging the server after receiving 2 messages and then wait for 20 more messages or it will time out in 10 seconds. The client doesn't throw any exceptions nor signals onError.

package com.example.demo;

import java.util.Date;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.junit.Test;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import com.poc.protobuf.UnitRequest;
import com.poc.protobuf.UnitResponse;
import com.poc.protobuf.UnitService;
import com.poc.protobuf.UnitServiceClient;
import com.poc.protobuf.UnitServiceServer;

import io.netty.buffer.ByteBuf;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.rpc.rsocket.RequestHandlingRSocket;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.server.NettyContextCloseable;
import io.rsocket.transport.netty.server.TcpServerTransport;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class DefaultSimpleServiceTests {

class DefaultSimpleService implements UnitService {

@Override
public Flux<UnitResponse> requestStream(UnitRequest unitRequest, ByteBuf metadata) {
	String command = unitRequest.getRequestCommandMessage();
	
	return Flux.range(1, 200)
			.map(i -> UnitResponse.newBuilder()
					.setMessageNumber(i)
					.setResponseMessage(i + " Srever is processsing " + command + " command")
					.build());
}
}

  @Test
  public void test1() throws Exception {
	  UnitServiceServer serviceServer = new UnitServiceServer(new DefaultSimpleService(), Optional.empty(), Optional.empty());
	  
	    NettyContextCloseable serverSocket = RSocketFactory.receive()
	        .acceptor(
	            (setup, sendingSocket) ->
	                Mono.just(new RequestHandlingRSocket(serviceServer)))
	        .transport(TcpServerTransport.create(8801))
	        .start()
	        .block();
	    
	    RSocket rSocket = RSocketFactory.connect().transport(TcpClientTransport.create(8801)).start().block();

	    UnitServiceClient client = new UnitServiceClient(rSocket);
	    
	    	CountDownLatch latch = new CountDownLatch(22);
	    	
	    	client.requestStream(UnitRequest.newBuilder().setRequestCommandMessage("Give me some data!").build())
	    			.subscribe(new Subscriber<UnitResponse>() {

				private Subscription subscription;

				@Override
				public void onSubscribe(Subscription s) {
					this.subscription = s;
					s.request(1);
				}

				@Override
				public void onNext(UnitResponse t) {
					System.out.println("Received message " + t.getResponseMessage());
					latch.countDown();
					
					if (latch.getCount() < 20) {
						System.out.println("Killing server now...");
						try {
							// this will halt the thread causing server to disappear 
							serverSocket.wait();
						} catch (InterruptedException e) {
							e.printStackTrace();
						}
					}
					
					subscription.request(1);
				}

				@Override
				public void onError(Throwable t) {
					System.out.println("Error detected! " + t);
				}

				@Override
				public void onComplete() {
					System.out.println("Stream completed!");
				}
			} );
	    	
	    	latch.await(10, TimeUnit.SECONDS);
	    	System.out.println("Finished at " + new Date().toString());
  }
}

proto file

syntax = "proto3";

package com.harris.atom.poc;

import "google/protobuf/empty.proto";

option java_package = "com.poc.protobuf";
option java_outer_classname = "UnitServiceProto";
option java_multiple_files = true;

service UnitService {

// Single Request / Streaming Response
rpc RequestStream (UnitRequest) returns (stream UnitResponse) {}
}

message UnitRequest {
string requestCommandMessage = 1;
}

message UnitResponse {
string responseMessage = 1;
int32 messageNumber = 2;
}
@OlegDokuka OlegDokuka self-assigned this Oct 23, 2018
@robertroeser
Copy link
Member

@mostroverkhov does your fix to rsocket help with this?

@mostroverkhov
Copy link
Member

@robertroeser It helps, but reactor/reactor-netty#495 has to be resolved also

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants