diff --git a/rxnetty/src/main/java/io/reactivex/netty/client/ConnectionPoolImpl.java b/rxnetty/src/main/java/io/reactivex/netty/client/ConnectionPoolImpl.java index f8b26030..3e4be0fb 100644 --- a/rxnetty/src/main/java/io/reactivex/netty/client/ConnectionPoolImpl.java +++ b/rxnetty/src/main/java/io/reactivex/netty/client/ConnectionPoolImpl.java @@ -20,12 +20,6 @@ import io.reactivex.netty.metrics.Clock; import io.reactivex.netty.metrics.MetricEventsListener; import io.reactivex.netty.metrics.MetricEventsSubject; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import rx.Observable; @@ -40,6 +34,10 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; /** * @author Nitesh Kant @@ -249,13 +247,6 @@ public void shutdown() { Observable.just(1L).subscribe(createShutdownAction()); } - private void performShutdownIfRequested() { - - if (isShutdownRequested.get()) { - performShutdownIfPossible(); - } - } - private Action1 createShutdownAction() { return new Action1() { @Override @@ -361,15 +352,6 @@ public void call(Throwable throwable) { ); } - private void poolAlreadyClosed(PooledConnection connection, long startTime, Subscriber> subscriber) { - - connection.closeUnderlyingChannel(); - IllegalStateException exception = new IllegalStateException("Pool already shut down"); - metricEventsSubject.onEvent(ClientMetricsEvent.POOL_ACQUIRE_FAILED, - Clock.onEndMillis(startTime), exception); - subscriber.onError(exception); - } - @Override public Subscription subscribe(MetricEventsListener> listener) { return metricEventsSubject.subscribe(listener); @@ -402,25 +384,4 @@ public void run() { } } } - - private class ShutdownTask implements Runnable { - - private final ScheduledExecutorService executorService; - - public ShutdownTask(ScheduledExecutorService executorService) { - - this.executorService = executorService; - } - - @Override - public void run() { - if (isShutdownPerformed.get()) { - executorService.shutdown(); - } - boolean shutdownIsDone = performShutdownIfPossible(); - if (shutdownIsDone) { - executorService.shutdown(); - } - } - } } diff --git a/rxnetty/src/test/java/io/reactivex/netty/client/ConnectionPoolTest.java b/rxnetty/src/test/java/io/reactivex/netty/client/ConnectionPoolTest.java index f8eeaf9a..957d695d 100644 --- a/rxnetty/src/test/java/io/reactivex/netty/client/ConnectionPoolTest.java +++ b/rxnetty/src/test/java/io/reactivex/netty/client/ConnectionPoolTest.java @@ -251,7 +251,7 @@ public void testShutdown() throws Exception { assertAllConnectionsReturned(); } - @Test(expected = IllegalStateException.class) + @Test(expected = IllegalStateException.class, timeout = 20000) public void shouldThrowExceptionIfAcquireIsSubscribedToAfterShutdownOfPool() { serverConnHandler.closeNewConnectionsOnReceive(false); strategy.incrementMaxConnections(2); @@ -262,7 +262,7 @@ public void shouldThrowExceptionIfAcquireIsSubscribedToAfterShutdownOfPool() { } - @Test + @Test(timeout = 20000) public void testShutdownWithNonReturnedConnections() throws Exception{ serverConnHandler.closeNewConnectionsOnReceive(false); strategy.incrementMaxConnections(2);