diff --git a/reactor-netty-core/src/main/java/reactor/netty/Metrics.java b/reactor-netty-core/src/main/java/reactor/netty/Metrics.java
index 1b13934c65..1b35994db1 100644
--- a/reactor-netty-core/src/main/java/reactor/netty/Metrics.java
+++ b/reactor-netty-core/src/main/java/reactor/netty/Metrics.java
@@ -204,6 +204,10 @@ public class Metrics {
*/
public static final String PENDING_STREAMS = ".pending.streams";
+ /**
+ * The number of HTTP/2 stream acquisitions steal count.
+ */
+ public static final String STEAL_STREAMS = ".steal.streams";
// ByteBufAllocator Metrics
/**
diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2AllocationStrategy.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2AllocationStrategy.java
index c6d3ba29d7..434ab2e9fd 100644
--- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2AllocationStrategy.java
+++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2AllocationStrategy.java
@@ -66,6 +66,19 @@ public interface Builder {
* @return {@code this}
*/
Builder minConnections(int minConnections);
+
+ /**
+ * Enables or disables work stealing mode for managing HTTP2 Connection Pools.
+ *
+ * By default, a single Connection Pool is used by multiple Netty event loop threads.
+ * When work stealing is enabled, each Netty event loop will maintain its own
+ * HTTP2 Connection Pool, and HTTP2 streams allocation will be distributed over all available
+ * pools using a work stealing strategy. This approach maximizes throughput and
+ * resource utilization in a multithreaded environment.
+ *
+ * @return {@code this}
+ */
+ Builder enableWorkStealing();
}
/**
@@ -77,6 +90,18 @@ public static Http2AllocationStrategy.Builder builder() {
return new Http2AllocationStrategy.Build();
}
+ /**
+ * Creates a builder for {@link Http2AllocationStrategy} and initialize it
+ * with an existing strategy. This method can be used to create a mutated version
+ * of an existing strategy.
+ *
+ * @return a new {@link Http2AllocationStrategy.Builder} initialized with an existing http2
+ * allocation strategy.
+ */
+ public static Http2AllocationStrategy.Builder builder(Http2AllocationStrategy existing) {
+ return new Http2AllocationStrategy.Build(existing);
+ }
+
@Override
public Http2AllocationStrategy copy() {
return new Http2AllocationStrategy(this);
@@ -141,9 +166,14 @@ public void returnPermits(int returned) {
}
}
+ public boolean enableWorkStealing() {
+ return enableWorkStealing;
+ }
+
final long maxConcurrentStreams;
final int maxConnections;
final int minConnections;
+ final boolean enableWorkStealing;
volatile int permits;
static final AtomicIntegerFieldUpdater PERMITS = AtomicIntegerFieldUpdater.newUpdater(Http2AllocationStrategy.class, "permits");
@@ -152,6 +182,7 @@ public void returnPermits(int returned) {
this.maxConcurrentStreams = build.maxConcurrentStreams;
this.maxConnections = build.maxConnections;
this.minConnections = build.minConnections;
+ this.enableWorkStealing = build.enableWorkStealing;
PERMITS.lazySet(this, this.maxConnections);
}
@@ -159,6 +190,7 @@ public void returnPermits(int returned) {
this.maxConcurrentStreams = copy.maxConcurrentStreams;
this.maxConnections = copy.maxConnections;
this.minConnections = copy.minConnections;
+ this.enableWorkStealing = copy.enableWorkStealing;
PERMITS.lazySet(this, this.maxConnections);
}
@@ -170,6 +202,17 @@ static final class Build implements Builder {
long maxConcurrentStreams = DEFAULT_MAX_CONCURRENT_STREAMS;
int maxConnections = DEFAULT_MAX_CONNECTIONS;
int minConnections = DEFAULT_MIN_CONNECTIONS;
+ boolean enableWorkStealing = Boolean.getBoolean("reactor.netty.pool.h2.enableworkstealing");
+
+ Build() {
+ }
+
+ Build(Http2AllocationStrategy existing) {
+ this.maxConcurrentStreams = existing.maxConcurrentStreams;
+ this.minConnections = existing.minConnections;
+ this.maxConnections = existing.maxConnections;
+ this.enableWorkStealing = existing.enableWorkStealing;
+ }
@Override
public Http2AllocationStrategy build() {
@@ -206,5 +249,11 @@ public Builder minConnections(int minConnections) {
this.minConnections = minConnections;
return this;
}
+
+ @Override
+ public Builder enableWorkStealing() {
+ this.enableWorkStealing = true;
+ return this;
+ }
}
}
diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java
index d3b12578de..17a0760fc6 100644
--- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java
+++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java
@@ -39,6 +39,7 @@
import reactor.netty.ConnectionObserver;
import reactor.netty.channel.ChannelMetricsRecorder;
import reactor.netty.channel.ChannelOperations;
+import reactor.netty.internal.shaded.reactor.pool.decorators.InstrumentedPoolDecorators;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.resources.PooledConnectionProvider;
import reactor.netty.transport.TransportConfig;
@@ -50,13 +51,19 @@
import reactor.util.annotation.Nullable;
import reactor.util.concurrent.Queues;
import reactor.util.context.Context;
+import reactor.util.function.Tuples;
import java.io.IOException;
import java.net.SocketAddress;
import java.time.Duration;
+import java.util.Iterator;
+import java.util.List;
import java.util.Queue;
+import java.util.concurrent.Executor;
import java.util.function.BiPredicate;
import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
import static reactor.netty.ReactorNetty.format;
import static reactor.netty.ReactorNetty.getChannelContext;
@@ -536,12 +543,46 @@ static final class PooledConnectionAllocator {
this.config = (HttpClientConfig) config;
this.remoteAddress = remoteAddress;
this.resolver = resolver;
- this.pool = id == null ?
- poolFactory.newPool(connectChannel(), null, DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE,
- poolConfig -> new Http2Pool(poolConfig, poolFactory.allocationStrategy())) :
- poolFactory.newPool(connectChannel(), DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE,
- new MicrometerPoolMetricsRecorder(id, name, remoteAddress),
- poolConfig -> new Http2Pool(poolConfig, poolFactory.allocationStrategy()));
+
+ Http2AllocationStrategy http2Strategy = poolFactory.allocationStrategy() instanceof Http2AllocationStrategy ?
+ (Http2AllocationStrategy) poolFactory.allocationStrategy() : null;
+
+ if (http2Strategy == null || !http2Strategy.enableWorkStealing) {
+ this.pool = id == null ?
+ poolFactory.newPool(connectChannel(), null, DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE,
+ poolConfig -> new Http2Pool(poolConfig, poolFactory.allocationStrategy())) :
+ poolFactory.newPool(connectChannel(), DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE,
+ new MicrometerPoolMetricsRecorder(id, name, remoteAddress),
+ poolConfig -> new Http2Pool(poolConfig, poolFactory.allocationStrategy()));
+ }
+ else {
+ // Create one connection allocator (it will be shared by all Http2Pool instances)
+ Publisher connPublisher = connectChannel();
+
+ List execs = StreamSupport.stream(config.loopResources().onClient(true).spliterator(), false)
+ .limit(http2Strategy.maxConnections)
+ .collect(Collectors.toList());
+ Iterator execsIter = execs.iterator();
+
+ MicrometerPoolMetricsRecorder micrometerRecorder = id == null ? null : new MicrometerPoolMetricsRecorder(id, name, remoteAddress);
+ this.pool = InstrumentedPoolDecorators.concurrentPools(execs.size(),
+ http2Strategy.minConnections, http2Strategy.maxConnections, (minConn, maxConn) -> {
+ Http2AllocationStrategy adaptedH2Strategy = Http2AllocationStrategy.builder(http2Strategy)
+ .minConnections(minConn)
+ .maxConnections(maxConn)
+ .build();
+
+ InstrumentedPool pool =
+ id == null ?
+ poolFactory.newPool(connPublisher, null, DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE,
+ poolConfig -> new Http2Pool(poolConfig, adaptedH2Strategy)) :
+ poolFactory.newPool(connPublisher, DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE,
+ micrometerRecorder,
+ poolConfig -> new Http2Pool(poolConfig, adaptedH2Strategy));
+
+ return Tuples.of(pool, execsIter.next());
+ });
+ }
}
Publisher connectChannel() {
diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProviderMeters.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProviderMeters.java
index 72e3bd986c..67a991346b 100644
--- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProviderMeters.java
+++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProviderMeters.java
@@ -67,6 +67,26 @@ public Meter.Type getType() {
}
},
+ /**
+ * The number of HTTP/2 stream acquisition steal count.
+ */
+ STEAL_STREAMS {
+ @Override
+ public String getName() {
+ return "reactor.netty.connection.provider.steal.streams";
+ }
+
+ @Override
+ public KeyName[] getKeyNames() {
+ return Http2ConnectionProviderMetersTags.values();
+ }
+
+ @Override
+ public Meter.Type getType() {
+ return Meter.Type.COUNTER;
+ }
+ },
+
/**
* The number of the idle connections in the connection pool.
*/
diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java
index 0e30f2dc31..937c08132c 100644
--- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java
+++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java
@@ -25,6 +25,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;
@@ -44,6 +45,7 @@
import reactor.netty.Connection;
import reactor.netty.FutureMono;
import reactor.netty.NettyPipeline;
+import reactor.netty.internal.shaded.reactor.pool.AllocationStrategy;
import reactor.netty.internal.shaded.reactor.pool.InstrumentedPool;
import reactor.netty.internal.shaded.reactor.pool.PoolAcquirePendingLimitException;
import reactor.netty.internal.shaded.reactor.pool.PoolAcquireTimeoutException;
@@ -156,6 +158,8 @@ final class Http2Pool implements InstrumentedPool, InstrumentedPool.
final Long maxConcurrentStreams;
final int minConnections;
final PoolConfig poolConfig;
+ final AllocationStrategy allocationStrategy;
+ final boolean workStealingEnabled;
long lastInteractionTimestamp;
@@ -165,11 +169,17 @@ final class Http2Pool implements InstrumentedPool, InstrumentedPool.
this.clock = poolConfig.clock();
this.connections = new ConcurrentLinkedQueue<>();
this.lastInteractionTimestamp = clock.millis();
- this.maxConcurrentStreams = allocationStrategy instanceof Http2AllocationStrategy ?
+ Http2AllocationStrategy http2Strategy = allocationStrategy instanceof Http2AllocationStrategy ? (Http2AllocationStrategy) allocationStrategy : null;
+
+ this.maxConcurrentStreams = http2Strategy != null ?
((Http2AllocationStrategy) allocationStrategy).maxConcurrentStreams() : -1;
this.minConnections = allocationStrategy == null ? 0 : allocationStrategy.permitMinimum();
this.pending = new ConcurrentLinkedDeque<>();
this.poolConfig = poolConfig;
+ this.workStealingEnabled = http2Strategy != null && http2Strategy.enableWorkStealing;
+
+ // Perform allocations using either the allocationStrategy parameter, or default to the poolConfig allocation strategy.
+ this.allocationStrategy = allocationStrategy == null ? poolConfig.allocationStrategy() : new DelegatingAllocationStrategy(allocationStrategy);
recordInteractionTimestamp();
scheduleEviction();
@@ -185,6 +195,24 @@ public Mono> acquire(Duration timeout) {
return new BorrowerMono(this, timeout);
}
+ @Override
+ public boolean steal(InstrumentedPool pool) {
+ Http2Pool other = (Http2Pool) pool;
+
+ if (!other.isDisposed()) {
+ ConcurrentLinkedDeque q = other.pending;
+ Borrower b = other.pollPending(q, false);
+ if (b != null && !b.get()) {
+ // TODO check race conditions when timer expires or subscription is cancelled concurrently
+ b.setPool(this);
+ doAcquire(b);
+ return true;
+ }
+ }
+
+ return false;
+ }
+
@Override
public int acquiredSize() {
return allocatedSize() - idleSize();
@@ -192,7 +220,7 @@ public int acquiredSize() {
@Override
public int allocatedSize() {
- return poolConfig.allocationStrategy().permitGranted();
+ return allocationStrategy.permitGranted();
}
@Override
@@ -366,7 +394,7 @@ void drainLoop() {
// find a connection that can be used for opening a new stream
// when cached connections are below minimum connections, then allocate a new connection
boolean belowMinConnections = minConnections > 0 &&
- poolConfig.allocationStrategy().permitGranted() < minConnections;
+ allocationStrategy.permitGranted() < minConnections;
Slot slot = belowMinConnections ? null : findConnection(resources);
if (slot != null) {
Borrower borrower = pollPending(borrowers, true);
@@ -383,10 +411,19 @@ void drainLoop() {
log.debug(format(slot.connection.channel(), "Channel activated"));
}
ACQUIRED.incrementAndGet(this);
- slot.connection.channel().eventLoop().execute(() -> {
+ if (!workStealingEnabled) {
+ slot.connection.channel().eventLoop().execute(() -> {
+ borrower.deliver(new Http2PooledRef(slot)); // will insert the connection slot into CONNECTIONS
+ drain();
+ });
+ }
+ else {
+ // WHen using the reactor work-stealing pool, we are already executing from one of the pools' executor,
+ // so, we can safely deliver the borrower concurrently, all the borrowers are distributed across
+ // all sub pools, so we won't be in a situation where the current thread will run the drainloop
+ // for ever under heavy requests load, so no need to reschedule.
borrower.deliver(new Http2PooledRef(slot));
- drain();
- });
+ }
}
else {
int resourcesCount = idleSize;
@@ -396,7 +433,7 @@ void drainLoop() {
// connections allocations were triggered
}
else {
- int permits = poolConfig.allocationStrategy().getPermits(1);
+ int permits = allocationStrategy.getPermits(1);
if (permits <= 0) {
if (maxPending >= 0) {
borrowersCount = pendingSize;
@@ -412,7 +449,7 @@ void drainLoop() {
else {
if (permits > 1) {
// warmup is not supported
- poolConfig.allocationStrategy().returnPermits(permits - 1);
+ allocationStrategy.returnPermits(permits - 1);
}
Borrower borrower = pollPending(borrowers, true);
if (borrower == null) {
@@ -439,7 +476,7 @@ void drainLoop() {
else if (sig.isOnError()) {
Throwable error = sig.getThrowable();
assert error != null;
- poolConfig.allocationStrategy().returnPermits(1);
+ allocationStrategy.returnPermits(1);
borrower.fail(error);
}
})
@@ -633,7 +670,7 @@ void pendingOffer(Borrower borrower) {
if (WIP.getAndIncrement(this) == 0) {
ConcurrentLinkedQueue ir = connections;
- if (maxPending >= 0 && postOffer > maxPending && ir.isEmpty() && poolConfig.allocationStrategy().estimatePermitCount() == 0) {
+ if (maxPending >= 0 && postOffer > maxPending && ir.isEmpty() && allocationStrategy.estimatePermitCount() == 0) {
Borrower toCull = pollPending(pendingQueue, false);
if (toCull != null) {
pendingAcquireLimitReached(toCull, maxPending);
@@ -729,7 +766,7 @@ static final class Borrower extends AtomicBoolean implements Scannable, Subscrip
final Duration acquireTimeout;
final CoreSubscriber super Http2PooledRef> actual;
- final Http2Pool pool;
+ final AtomicReference pool;
long pendingAcquireStart;
@@ -738,7 +775,7 @@ static final class Borrower extends AtomicBoolean implements Scannable, Subscrip
Borrower(CoreSubscriber super Http2PooledRef> actual, Http2Pool pool, Duration acquireTimeout) {
this.acquireTimeout = acquireTimeout;
this.actual = actual;
- this.pool = pool;
+ this.pool = new AtomicReference<>(pool);
this.timeoutTask = TIMEOUT_DISPOSED;
}
@@ -746,7 +783,7 @@ static final class Borrower extends AtomicBoolean implements Scannable, Subscrip
public void cancel() {
stopPendingCountdown(true); // this is not failure, the subscription was canceled
if (compareAndSet(false, true)) {
- pool.cancelAcquire(this);
+ pool().cancelAcquire(this);
}
}
@@ -757,8 +794,9 @@ Context currentContext() {
@Override
public void request(long n) {
if (Operators.validate(n)) {
+ Http2Pool pool = pool();
long estimateStreamsCount = pool.totalMaxConcurrentStreams - pool.acquired;
- int permits = pool.poolConfig.allocationStrategy().estimatePermitCount();
+ int permits = pool.allocationStrategy.estimatePermitCount();
int pending = pool.pendingSize;
if (!acquireTimeout.isZero() && permits + estimateStreamsCount <= pending) {
pendingAcquireStart = pool.clock.millis();
@@ -773,7 +811,7 @@ public void run() {
if (compareAndSet(false, true)) {
// this is failure, a timeout was observed
stopPendingCountdown(false);
- pool.cancelAcquire(Http2Pool.Borrower.this);
+ pool().cancelAcquire(Http2Pool.Borrower.this);
actual.onError(new PoolAcquireTimeoutException(acquireTimeout));
}
}
@@ -801,7 +839,10 @@ public String toString() {
}
void deliver(Http2PooledRef poolSlot) {
- assert poolSlot.slot.connection.channel().eventLoop().inEventLoop();
+ if (!pool().workStealingEnabled) {
+ // TODO can we do this check even when workstealing is enabled ?
+ assert poolSlot.slot.connection.channel().eventLoop().inEventLoop();
+ }
poolSlot.slot.incrementConcurrencyAndGet();
poolSlot.slot.deactivate();
if (get()) {
@@ -823,6 +864,7 @@ void fail(Throwable error) {
void stopPendingCountdown(boolean success) {
if (!timeoutTask.isDisposed()) {
+ Http2Pool pool = pool();
if (success) {
pool.poolConfig.metricsRecorder().recordPendingSuccessAndLatency(pool.clock.millis() - pendingAcquireStart);
}
@@ -832,6 +874,14 @@ void stopPendingCountdown(boolean success) {
}
timeoutTask.dispose();
}
+
+ Http2Pool pool() {
+ return pool.get();
+ }
+
+ public void setPool(Http2Pool replace) {
+ pool.set(replace);
+ }
}
static final class BorrowerMono extends Mono> {
@@ -1043,7 +1093,7 @@ void invalidate() {
if (log.isDebugEnabled()) {
log.debug(format(connection.channel(), "Channel removed from pool"));
}
- pool.poolConfig.allocationStrategy().returnPermits(1);
+ pool.allocationStrategy.returnPermits(1);
TOTAL_MAX_CONCURRENT_STREAMS.addAndGet(this.pool, -maxConcurrentStreams);
}
}
@@ -1078,4 +1128,43 @@ public long allocationTimestamp() {
}
}
+
+ static final class DelegatingAllocationStrategy implements AllocationStrategy {
+
+ final ConnectionProvider.AllocationStrategy> delegate;
+
+ DelegatingAllocationStrategy(ConnectionProvider.AllocationStrategy> delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public int estimatePermitCount() {
+ return delegate.estimatePermitCount();
+ }
+
+ @Override
+ public int getPermits(int desired) {
+ return delegate.getPermits(desired);
+ }
+
+ @Override
+ public int permitGranted() {
+ return delegate.permitGranted();
+ }
+
+ @Override
+ public int permitMinimum() {
+ return delegate.permitMinimum();
+ }
+
+ @Override
+ public int permitMaximum() {
+ return delegate.permitMaximum();
+ }
+
+ @Override
+ public void returnPermits(int returned) {
+ delegate.returnPermits(returned);
+ }
+ }
}
diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/MicrometerHttp2ConnectionProviderMeterRegistrar.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/MicrometerHttp2ConnectionProviderMeterRegistrar.java
index 75bf6cd939..3b7fd4528c 100644
--- a/reactor-netty-http/src/main/java/reactor/netty/http/client/MicrometerHttp2ConnectionProviderMeterRegistrar.java
+++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/MicrometerHttp2ConnectionProviderMeterRegistrar.java
@@ -19,8 +19,10 @@
import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.Tags;
import reactor.netty.internal.shaded.reactor.pool.InstrumentedPool;
+import reactor.netty.internal.shaded.reactor.pool.decorators.WorkStealingPool;
import java.net.SocketAddress;
+import java.util.List;
import static reactor.netty.Metrics.REGISTRY;
import static reactor.netty.http.client.Http2ConnectionProviderMeters.ACTIVE_CONNECTIONS;
@@ -31,6 +33,7 @@
import static reactor.netty.http.client.Http2ConnectionProviderMeters.IDLE_CONNECTIONS;
import static reactor.netty.http.client.Http2ConnectionProviderMeters.PENDING_STREAMS;
import static reactor.netty.Metrics.formatSocketAddress;
+import static reactor.netty.http.client.Http2ConnectionProviderMeters.STEAL_STREAMS;
final class MicrometerHttp2ConnectionProviderMeterRegistrar {
@@ -48,9 +51,20 @@ void registerMetrics(String poolName, String id, SocketAddress remoteAddress, In
.tags(tags)
.register(REGISTRY);
- Gauge.builder(ACTIVE_STREAMS.getName(), metrics, poolMetrics -> ((Http2Pool) poolMetrics).activeStreams())
- .tags(tags)
- .register(REGISTRY);
+ if (metrics instanceof Http2Pool) {
+ Gauge.builder(ACTIVE_STREAMS.getName(), metrics, poolMetrics -> ((Http2Pool) poolMetrics).activeStreams())
+ .tags(tags)
+ .register(REGISTRY);
+ }
+ else if (metrics instanceof WorkStealingPool>) {
+ Gauge.builder(ACTIVE_STREAMS.getName(), metrics, poolMetrics -> getActiveStreams(((WorkStealingPool>) metrics).getPools()))
+ .tags(tags)
+ .register(REGISTRY);
+
+ Gauge.builder(STEAL_STREAMS.getName(), metrics, InstrumentedPool.PoolMetrics::stealCount)
+ .tags(tags)
+ .register(REGISTRY);
+ }
Gauge.builder(IDLE_CONNECTIONS.getName(), metrics, InstrumentedPool.PoolMetrics::idleSize)
.tags(tags)
@@ -70,4 +84,10 @@ void deRegisterMetrics(String poolName, String id, SocketAddress remoteAddress)
REGISTRY.remove(new Meter.Id(IDLE_CONNECTIONS.getName(), tags, null, null, Meter.Type.GAUGE));
REGISTRY.remove(new Meter.Id(PENDING_STREAMS.getName(), tags, null, null, Meter.Type.GAUGE));
}
-}
\ No newline at end of file
+
+ int getActiveStreams(List extends InstrumentedPool>> pools) {
+ return pools.stream()
+ .mapToInt(pool -> ((Http2Pool) pool).activeStreams())
+ .sum();
+ }
+}
diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java
index ac6a946cdf..eb5bed5cb9 100644
--- a/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java
+++ b/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java
@@ -3333,4 +3333,43 @@ private void doTestIssue1943(HttpProtocol protocol) {
.block(Duration.ofSeconds(5));
}
}
+
+ @Test
+ void testHttp2ClientWithWorkStealing() {
+ disposableServer =
+ HttpServer.create()
+ .protocol(HttpProtocol.H2C)
+ .port(0)
+ .handle((req, res) ->
+ res.sendString(Mono.just("Welcome")))
+ .bindNow();
+
+ ConnectionProvider provider = ConnectionProvider
+ .builder("http")
+ .allocationStrategy(Http2AllocationStrategy.builder()
+ .minConnections(1)
+ .maxConnections(10)
+ .enableWorkStealing()
+ .build())
+ .build();
+
+ try {
+ HttpClient client = HttpClient.create(provider)
+ .protocol(HttpProtocol.H2C)
+ .port(disposableServer.port())
+ .wiretap(true);
+
+ StepVerifier.create(client
+ .headers(hdr -> hdr.set("Content-Type", "text/plain"))
+ .get()
+ .uri("/payload-size")
+ .response((r, buf) -> buf.aggregate().asString().zipWith(Mono.just(r))))
+ .expectNextMatches(tuple -> "Welcome".equals(tuple.getT1()) && tuple.getT2().status().equals(HttpResponseStatus.OK))
+ .expectComplete()
+ .verify(Duration.ofSeconds(30));
+ }
+ finally {
+ provider.disposeLater().block();
+ }
+ }
}