diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java
index 5261ba7226..ccf34fb809 100644
--- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java
+++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java
@@ -17,6 +17,7 @@
import java.time.Clock;
import java.time.Duration;
+import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -88,9 +89,6 @@
* Configurations that are not applicable
*
* - {@link PoolConfig#destroyHandler()} - the destroy handler cannot be used as the destruction is more complex.
- * - {@link PoolConfig#evictInBackgroundInterval()} and {@link PoolConfig#evictInBackgroundScheduler()} -
- * there are no idle resources in the pool. Once the connection does not have active streams, it
- * is returned to the parent pool.
* - {@link PoolConfig#evictionPredicate()} - the eviction predicate cannot be used as more complex
* checks have to be done. Also the pool uses filtering for the connections (a connection might not be able
* to be used but is required to stay in the pool).
@@ -155,6 +153,8 @@ final class Http2Pool implements InstrumentedPool, InstrumentedPool.
long lastInteractionTimestamp;
+ Disposable evictionTask;
+
Http2Pool(PoolConfig poolConfig, long maxIdleTime, long maxLifeTime) {
if (poolConfig.allocationStrategy().getPermits(0) != 0) {
throw new IllegalArgumentException("No support for configuring minimum number of connections");
@@ -168,6 +168,8 @@ final class Http2Pool implements InstrumentedPool, InstrumentedPool.
this.poolConfig = poolConfig;
recordInteractionTimestamp();
+
+ scheduleEviction();
}
@Override
@@ -203,6 +205,8 @@ public Mono disposeLater() {
@SuppressWarnings("unchecked")
ConcurrentLinkedDeque q = PENDING.getAndSet(this, TERMINATED);
if (q != TERMINATED) {
+ evictionTask.dispose();
+
Borrower p;
while ((p = pollPending(q, true)) != null) {
p.fail(new PoolShutdownException());
@@ -431,6 +435,67 @@ else if (sig.isOnError()) {
}
}
+ @SuppressWarnings("FutureReturnValueIgnored")
+ void evictInBackground() {
+ @SuppressWarnings("unchecked")
+ ConcurrentLinkedQueue resources = CONNECTIONS.get(this);
+ if (resources == null) {
+ //no need to schedule the task again, pool has been disposed
+ return;
+ }
+
+ if (WIP.getAndIncrement(this) == 0) {
+ if (pendingSize == 0) {
+ Iterator slots = resources.iterator();
+ while (slots.hasNext()) {
+ Slot slot = slots.next();
+ if (slot.concurrency() == 0) {
+ if (!slot.connection.channel().isActive()) {
+ if (log.isDebugEnabled()) {
+ log.debug(format(slot.connection.channel(), "Channel is closed, remove from pool"));
+ }
+ recordInteractionTimestamp();
+ slots.remove();
+ IDLE_SIZE.decrementAndGet(this);
+ slot.invalidate();
+ continue;
+ }
+
+ if (maxLifeReached(slot)) {
+ if (log.isDebugEnabled()) {
+ log.debug(format(slot.connection.channel(), "Max life time is reached, remove from pool"));
+ }
+ //"FutureReturnValueIgnored" this is deliberate
+ slot.connection.channel().close();
+ recordInteractionTimestamp();
+ slots.remove();
+ IDLE_SIZE.decrementAndGet(this);
+ slot.invalidate();
+ continue;
+ }
+ }
+ if (maxIdleReached(slot)) {
+ if (log.isDebugEnabled()) {
+ log.debug(format(slot.connection.channel(), "Idle time is reached, remove from pool"));
+ }
+ //"FutureReturnValueIgnored" this is deliberate
+ slot.connection.channel().close();
+ recordInteractionTimestamp();
+ slots.remove();
+ IDLE_SIZE.decrementAndGet(this);
+ slot.invalidate();
+ }
+ }
+ }
+ //at the end if there are racing drain calls, go into the drainLoop
+ if (WIP.decrementAndGet(this) > 0) {
+ drainLoop();
+ }
+ }
+ //schedule the next iteration
+ scheduleEviction();
+ }
+
@Nullable
@SuppressWarnings("FutureReturnValueIgnored")
Slot findConnection(ConcurrentLinkedQueue resources) {
@@ -465,7 +530,7 @@ Slot findConnection(ConcurrentLinkedQueue resources) {
}
// check whether the connection's idle time has been reached
- if (maxIdleTime != -1 && slot.idleTime() >= maxIdleTime) {
+ if (maxIdleReached(slot)) {
if (log.isDebugEnabled()) {
log.debug(format(slot.connection.channel(), "Idle time is reached, remove from pool"));
}
@@ -510,6 +575,10 @@ Slot findConnection(ConcurrentLinkedQueue resources) {
return null;
}
+ boolean maxIdleReached(Slot slot) {
+ return maxIdleTime != -1 && slot.idleTime() >= maxIdleTime;
+ }
+
boolean maxLifeReached(Slot slot) {
return maxLifeTime != -1 && slot.lifeTime() >= maxLifeTime;
}
@@ -608,6 +677,17 @@ void removeSlot(Slot slot) {
}
}
+ void scheduleEviction() {
+ if (!poolConfig.evictInBackgroundInterval().isZero()) {
+ long nanosEvictionInterval = poolConfig.evictInBackgroundInterval().toNanos();
+ this.evictionTask = poolConfig.evictInBackgroundScheduler()
+ .schedule(this::evictInBackground, nanosEvictionInterval, TimeUnit.NANOSECONDS);
+ }
+ else {
+ this.evictionTask = Disposables.disposed();
+ }
+ }
+
static final Function> DEFAULT_DESTROY_HANDLER =
connection -> {
if (!connection.channel().isActive()) {
diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java
index bc1ec1a561..ec317c8828 100644
--- a/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java
+++ b/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java
@@ -335,6 +335,213 @@ void evictClosedConnectionMaxConnectionsReached() throws Exception {
}
}
+ @Test
+ void evictInBackgroundClosedConnection() throws Exception {
+ PoolBuilder> poolBuilder =
+ PoolBuilder.from(Mono.fromSupplier(() -> {
+ Channel channel = new EmbeddedChannel(
+ new TestChannelId(),
+ Http2FrameCodecBuilder.forClient().build());
+ return Connection.from(channel);
+ }))
+ .idleResourceReuseLruOrder()
+ .maxPendingAcquireUnbounded()
+ .sizeBetween(0, 1)
+ .evictInBackground(Duration.ofSeconds(5));
+ Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, -1));
+
+ Connection connection = null;
+ try {
+ PooledRef acquired1 = http2Pool.acquire().block();
+
+ assertThat(acquired1).isNotNull();
+ assertThat(http2Pool.activeStreams()).isEqualTo(1);
+ assertThat(http2Pool.connections.size()).isEqualTo(1);
+
+ connection = acquired1.poolable();
+ ChannelId id1 = connection.channel().id();
+ CountDownLatch latch = new CountDownLatch(1);
+ ((EmbeddedChannel) connection.channel()).finishAndReleaseAll();
+ connection.onDispose(latch::countDown);
+ connection.dispose();
+
+ assertThat(latch.await(1, TimeUnit.SECONDS)).as("latch await").isTrue();
+
+ assertThat(http2Pool.activeStreams()).isEqualTo(1);
+ assertThat(http2Pool.connections.size()).isEqualTo(1);
+
+ acquired1.invalidate().block();
+
+ http2Pool.evictInBackground();
+
+ assertThat(http2Pool.activeStreams()).isEqualTo(0);
+ assertThat(http2Pool.connections.size()).isEqualTo(0);
+
+ PooledRef acquired2 = http2Pool.acquire().block();
+
+ assertThat(acquired2).isNotNull();
+ assertThat(http2Pool.activeStreams()).isEqualTo(1);
+ assertThat(http2Pool.connections.size()).isEqualTo(1);
+
+ connection = acquired2.poolable();
+ ChannelId id2 = connection.channel().id();
+
+ assertThat(id1).isNotEqualTo(id2);
+
+ acquired2.invalidate().block();
+
+ http2Pool.evictInBackground();
+
+ assertThat(http2Pool.activeStreams()).isEqualTo(0);
+ assertThat(http2Pool.connections.size()).isEqualTo(1);
+ }
+ finally {
+ if (connection != null) {
+ ((EmbeddedChannel) connection.channel()).finishAndReleaseAll();
+ connection.dispose();
+ }
+ }
+ }
+
+ @Test
+ void evictInBackgroundMaxIdleTime() throws Exception {
+ PoolBuilder> poolBuilder =
+ PoolBuilder.from(Mono.fromSupplier(() -> {
+ Channel channel = new EmbeddedChannel(
+ new TestChannelId(),
+ Http2FrameCodecBuilder.forClient().build());
+ return Connection.from(channel);
+ }))
+ .idleResourceReuseLruOrder()
+ .maxPendingAcquireUnbounded()
+ .sizeBetween(0, 1)
+ .evictInBackground(Duration.ofSeconds(5));
+ Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, 10, -1));
+
+ Connection connection1 = null;
+ Connection connection2 = null;
+ try {
+ PooledRef acquired1 = http2Pool.acquire().block();
+
+ assertThat(acquired1).isNotNull();
+ assertThat(http2Pool.activeStreams()).isEqualTo(1);
+ assertThat(http2Pool.connections.size()).isEqualTo(1);
+
+ connection1 = acquired1.poolable();
+ ChannelId id1 = connection1.channel().id();
+
+ acquired1.invalidate().block();
+
+ Thread.sleep(15);
+
+ http2Pool.evictInBackground();
+
+ assertThat(http2Pool.activeStreams()).isEqualTo(0);
+ assertThat(http2Pool.connections.size()).isEqualTo(0);
+
+ PooledRef acquired2 = http2Pool.acquire().block();
+
+ assertThat(acquired2).isNotNull();
+ assertThat(http2Pool.activeStreams()).isEqualTo(1);
+ assertThat(http2Pool.connections.size()).isEqualTo(1);
+
+ connection2 = acquired2.poolable();
+ ChannelId id2 = connection2.channel().id();
+
+ assertThat(id1).isNotEqualTo(id2);
+
+ acquired2.invalidate().block();
+
+ Thread.sleep(15);
+
+ http2Pool.evictInBackground();
+
+ assertThat(http2Pool.activeStreams()).isEqualTo(0);
+ assertThat(http2Pool.connections.size()).isEqualTo(0);
+ }
+ finally {
+ if (connection1 != null) {
+ ((EmbeddedChannel) connection1.channel()).finishAndReleaseAll();
+ connection1.dispose();
+ }
+ if (connection2 != null) {
+ ((EmbeddedChannel) connection2.channel()).finishAndReleaseAll();
+ connection2.dispose();
+ }
+ }
+ }
+
+ @Test
+ void evictInBackgroundMaxLifeTime() throws Exception {
+ PoolBuilder> poolBuilder =
+ PoolBuilder.from(Mono.fromSupplier(() -> {
+ Channel channel = new EmbeddedChannel(
+ new TestChannelId(),
+ Http2FrameCodecBuilder.forClient().build());
+ return Connection.from(channel);
+ }))
+ .idleResourceReuseLruOrder()
+ .maxPendingAcquireUnbounded()
+ .sizeBetween(0, 1)
+ .evictInBackground(Duration.ofSeconds(5));
+ Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, 10));
+
+ Connection connection1 = null;
+ Connection connection2 = null;
+ try {
+ PooledRef acquired1 = http2Pool.acquire().block();
+
+ assertThat(acquired1).isNotNull();
+ assertThat(http2Pool.activeStreams()).isEqualTo(1);
+ assertThat(http2Pool.connections.size()).isEqualTo(1);
+
+ connection1 = acquired1.poolable();
+ ChannelId id1 = connection1.channel().id();
+
+ Thread.sleep(10);
+
+ assertThat(http2Pool.activeStreams()).isEqualTo(1);
+ assertThat(http2Pool.connections.size()).isEqualTo(1);
+
+ acquired1.invalidate().block();
+
+ http2Pool.evictInBackground();
+
+ assertThat(http2Pool.activeStreams()).isEqualTo(0);
+ assertThat(http2Pool.connections.size()).isEqualTo(0);
+
+ PooledRef acquired2 = http2Pool.acquire().block();
+
+ assertThat(acquired2).isNotNull();
+ assertThat(http2Pool.activeStreams()).isEqualTo(1);
+ assertThat(http2Pool.connections.size()).isEqualTo(1);
+
+ connection2 = acquired2.poolable();
+ ChannelId id2 = connection2.channel().id();
+
+ assertThat(id1).isNotEqualTo(id2);
+
+ acquired2.invalidate().block();
+
+ Thread.sleep(10);
+
+ http2Pool.evictInBackground();
+
+ assertThat(http2Pool.activeStreams()).isEqualTo(0);
+ assertThat(http2Pool.connections.size()).isEqualTo(0);
+ }
+ finally {
+ if (connection1 != null) {
+ ((EmbeddedChannel) connection1.channel()).finishAndReleaseAll();
+ connection1.dispose();
+ }
+ if (connection2 != null) {
+ ((EmbeddedChannel) connection2.channel()).finishAndReleaseAll();
+ connection2.dispose();
+ }
+ }
+ }
+
@Test
void maxIdleTime() throws Exception {
PoolBuilder> poolBuilder =