From 5d37d867b51581b7b3b58351a03fd71e57d7f87f Mon Sep 17 00:00:00 2001 From: Jonas Fredin Date: Mon, 14 Jan 2019 16:31:22 +0100 Subject: [PATCH] moving the onNext into the synchronized block to avoid onComplete to be called before onNext --- .../pool/PooledConnectionProviderImpl.java | 2 +- .../client/PoolingWithRealChannelTest.java | 85 +++++++++++++++++++ .../protocol/tcp/client/TcpClientRule.java | 46 ++++++++++ 3 files changed, 132 insertions(+), 1 deletion(-) diff --git a/rxnetty-common/src/main/java/io/reactivex/netty/client/pool/PooledConnectionProviderImpl.java b/rxnetty-common/src/main/java/io/reactivex/netty/client/pool/PooledConnectionProviderImpl.java index fcbb9508..60937e1f 100644 --- a/rxnetty-common/src/main/java/io/reactivex/netty/client/pool/PooledConnectionProviderImpl.java +++ b/rxnetty-common/src/main/java/io/reactivex/netty/client/pool/PooledConnectionProviderImpl.java @@ -421,9 +421,9 @@ public void onNext(PooledConnection conn) { onNextArrived = true; _terminated = terminated; _error = error; + delegate.onNext(conn); } - delegate.onNext(conn); if (_terminated) { if (null != error) { diff --git a/rxnetty-tcp/src/test/java/io/reactivex/netty/protocol/tcp/client/PoolingWithRealChannelTest.java b/rxnetty-tcp/src/test/java/io/reactivex/netty/protocol/tcp/client/PoolingWithRealChannelTest.java index ad10f846..51c81dbf 100644 --- a/rxnetty-tcp/src/test/java/io/reactivex/netty/protocol/tcp/client/PoolingWithRealChannelTest.java +++ b/rxnetty-tcp/src/test/java/io/reactivex/netty/protocol/tcp/client/PoolingWithRealChannelTest.java @@ -19,10 +19,22 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.embedded.EmbeddedChannel; import io.reactivex.netty.client.pool.PooledConnection; +import org.junit.Assert; import org.junit.Rule; +import org.junit.Test; +import rx.Observable; +import rx.functions.Action1; +import rx.functions.Func0; +import rx.functions.Func1; +import rx.observers.AssertableSubscriber; + +import java.util.ArrayList; +import java.util.List; import static org.hamcrest.MatcherAssert.*; import static org.hamcrest.Matchers.*; +import static rx.Observable.fromCallable; +import static rx.Observable.just; /** * This tests the code paths which are not invoked for {@link EmbeddedChannel} as it does not schedule any task @@ -49,4 +61,77 @@ public void testReuse() throws Exception { assertThat("Connection is not reused.", connection2, is(connection)); } + + @Test + /** + * + * Load test to prove concurrency issues mainly seen on heavy load. + * + */ + public void testLoad() { + clientRule.startServer(1000); + + MockTcpClientEventListener listener = new MockTcpClientEventListener(); + clientRule.getClient().subscribe(listener); + + + int number_of_iterations = 300; + int numberOfRequests = 10; + + for(int j = 0; j < number_of_iterations; j++) { + + List> results = new ArrayList<>(); + + //Just giving the client some time to recover + try { + Thread.sleep(100); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + for (int i = 0; i < numberOfRequests; i++) { + results.add( + fromCallable(new Func0>() { + @Override + public PooledConnection call() { + return clientRule.connectWithCheck(); + } + }) + .flatMap(new Func1, Observable>() { + @Override + public Observable call(PooledConnection connection) { + return connection.writeStringAndFlushOnEach(just("Hello")) + .toCompletable() + .toObservable() + .concatWith(connection.getInput()) + .take(1) + .single() + .map(new Func1() { + @Override + public String call(ByteBuf byteBuf) { + try { + + byte[] bytes = new byte[byteBuf.readableBytes()]; + byteBuf.readBytes(bytes); + String result = new String(bytes); + return result; + } finally { + byteBuf.release(); + } + } + }).doOnError(new Action1() { + @Override + public void call(Throwable throwable) { + Assert.fail("Did not expect exception: " + throwable.getMessage()); + throwable.printStackTrace(); + } + }); + } + })); + } + AssertableSubscriber test = Observable.merge(results).test(); + test.awaitTerminalEvent(); + test.assertNoErrors(); + } + } } diff --git a/rxnetty-tcp/src/test/java/io/reactivex/netty/protocol/tcp/client/TcpClientRule.java b/rxnetty-tcp/src/test/java/io/reactivex/netty/protocol/tcp/client/TcpClientRule.java index 411c9b20..4f2a6ce7 100644 --- a/rxnetty-tcp/src/test/java/io/reactivex/netty/protocol/tcp/client/TcpClientRule.java +++ b/rxnetty-tcp/src/test/java/io/reactivex/netty/protocol/tcp/client/TcpClientRule.java @@ -27,13 +27,17 @@ import io.reactivex.netty.client.pool.SingleHostPoolingProviderFactory; import io.reactivex.netty.protocol.tcp.server.ConnectionHandler; import io.reactivex.netty.protocol.tcp.server.TcpServer; +import org.junit.Assert; import org.junit.rules.ExternalResource; import org.junit.runner.Description; import org.junit.runners.model.Statement; import rx.Observable; +import rx.Observer; +import rx.functions.Func0; import rx.observers.TestSubscriber; import java.net.InetSocketAddress; +import java.util.concurrent.atomic.AtomicBoolean; import static org.hamcrest.MatcherAssert.*; import static org.hamcrest.Matchers.*; @@ -83,6 +87,48 @@ public PooledConnection connect() { return (PooledConnection) cSub.getOnNextEvents().get(0); } + public PooledConnection connectWithCheck() { + + final AtomicBoolean gotOnNext = new AtomicBoolean(false); + + Observable> got_no_connection = client.createConnectionRequest() + .doOnEach(new Observer>() { + @Override + public void onCompleted() { + if(!gotOnNext.get()) { + //A PooledConnection could sometimes send onCompleted before the onNext event occurred. + Assert.fail("Should not get onCompletedBefore onNext"); + } + } + + @Override + public void onError(Throwable e) { + } + + @Override + public void onNext(Connection byteBufByteBufConnection) { + gotOnNext.set(true); + } + }) + .switchIfEmpty(Observable.defer(new Func0>>() { + @Override + public Observable> call() { + return Observable.empty(); + } + })); + + TestSubscriber> cSub = new TestSubscriber<>(); + got_no_connection.subscribe(cSub); + + cSub.awaitTerminalEvent(); + + cSub.assertNoErrors(); + + assertThat("No connection received.", cSub.getOnNextEvents(), hasSize(1)); + + return (PooledConnection) cSub.getOnNextEvents().get(0); + } + private void createClient(final int maxConnections) { InetSocketAddress serverAddr = new InetSocketAddress("127.0.0.1", server.getServerPort()); client = TcpClient.newClient(SingleHostPoolingProviderFactory.createBounded(maxConnections),