From a048f9909d0b7bbdacffd1555bf04adc3df742e9 Mon Sep 17 00:00:00 2001 From: Pierre De Rop Date: Sat, 25 Nov 2023 01:07:44 +0100 Subject: [PATCH] Experiment work stealing pools --- .../src/main/java/reactor/netty/Metrics.java | 4 + .../resources/PooledConnectionProvider.java | 50 +++++++++- .../http/client/Http2AllocationStrategy.java | 55 ++++++++++- .../http/client/Http2ConnectionProvider.java | 66 +++++++++++-- .../client/Http2ConnectionProviderMeters.java | 22 ++++- .../reactor/netty/http/client/Http2Pool.java | 99 ++++++++++++++++--- ...Http2ConnectionProviderMeterRegistrar.java | 30 +++++- .../netty/http/client/HttpClientTest.java | 55 +++++++++-- 8 files changed, 343 insertions(+), 38 deletions(-) diff --git a/reactor-netty-core/src/main/java/reactor/netty/Metrics.java b/reactor-netty-core/src/main/java/reactor/netty/Metrics.java index 5cf3345bf8..d33970df1d 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/Metrics.java +++ b/reactor-netty-core/src/main/java/reactor/netty/Metrics.java @@ -204,6 +204,10 @@ public class Metrics { */ public static final String PENDING_STREAMS = ".pending.streams"; + /** + * The number of HTTP/2 stream acquisitions steal count. + */ + public static final String STEAL_STREAMS = ".steal.streams"; // ByteBufAllocator Metrics /** diff --git a/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java b/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java index b97af6b532..44aa18d979 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java +++ b/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java @@ -527,6 +527,20 @@ public InstrumentedPool newPool( return newPoolInternal(allocator, destroyHandler, defaultEvictionPredicate).build(poolFactory); } + public InstrumentedPool newPool( + PoolBuilder> poolBuilder, + int maxConnections, + @Nullable AllocationStrategy allocationStrategy, + Function> destroyHandler, + BiPredicate defaultEvictionPredicate, + Function, InstrumentedPool> poolFactory) { + if (disposeTimeout != null) { + return newPoolInternal(poolBuilder, maxConnections, allocationStrategy, destroyHandler, defaultEvictionPredicate, null) + .build(poolFactory.andThen(InstrumentedPoolDecorators::gracefulShutdown)); + } + return newPoolInternal(poolBuilder, maxConnections, allocationStrategy, destroyHandler, defaultEvictionPredicate, null).build(poolFactory); + } + public InstrumentedPool newPool( Publisher allocator, Function> destroyHandler, @@ -540,6 +554,21 @@ public InstrumentedPool newPool( return newPoolInternal(allocator, destroyHandler, defaultEvictionPredicate, poolMetricsRecorder).build(poolFactory); } + public InstrumentedPool newPool( + PoolBuilder> poolBuilder, + int maxConnections, + @Nullable AllocationStrategy allocationStrategy, + Function> destroyHandler, + BiPredicate defaultEvictionPredicate, + PoolMetricsRecorder poolMetricsRecorder, + Function, InstrumentedPool> poolFactory) { + if (disposeTimeout != null) { + return newPoolInternal(poolBuilder, maxConnections, allocationStrategy, destroyHandler, defaultEvictionPredicate, poolMetricsRecorder) + .build(poolFactory.andThen(InstrumentedPoolDecorators::gracefulShutdown)); + } + return newPoolInternal(poolBuilder, maxConnections, allocationStrategy, destroyHandler, defaultEvictionPredicate, poolMetricsRecorder).build(poolFactory); + } + PoolBuilder> newPoolInternal( Publisher allocator, Function> destroyHandler, @@ -552,11 +581,22 @@ PoolBuilder> newPoolInternal( Function> destroyHandler, BiPredicate defaultEvictionPredicate, @Nullable PoolMetricsRecorder poolMetricsRecorder) { - PoolBuilder> poolBuilder = - PoolBuilder.from(allocator) - .destroyHandler(destroyHandler) - .maxPendingAcquire(pendingAcquireMaxCount) - .evictInBackground(evictionInterval); + return newPoolInternal(PoolBuilder.from(allocator), -1, null, destroyHandler, defaultEvictionPredicate, poolMetricsRecorder); + } + + PoolBuilder> newPoolInternal( + PoolBuilder> poolBuilder, + int maxConnections, + @Nullable AllocationStrategy allocationStrategy, + Function> destroyHandler, + BiPredicate defaultEvictionPredicate, + @Nullable PoolMetricsRecorder poolMetricsRecorder) { + maxConnections = (maxConnections == -1) ? this.maxConnections : maxConnections; + allocationStrategy = (allocationStrategy == null) ? this.allocationStrategy : allocationStrategy; + poolBuilder = poolBuilder + .destroyHandler(destroyHandler) + .maxPendingAcquire(pendingAcquireMaxCount) + .evictInBackground(evictionInterval); if (this.evictionPredicate != null) { poolBuilder = poolBuilder.evictionPredicate( diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2AllocationStrategy.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2AllocationStrategy.java index c6d3ba29d7..790913109a 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2AllocationStrategy.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2AllocationStrategy.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2023 VMware, Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2022-2024 VMware, Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -66,6 +66,23 @@ public interface Builder { * @return {@code this} */ Builder minConnections(int minConnections); + + /** + * Enables or disables work stealing mode for managing HTTP2 Connection Pools. + *

+ * By default, a single Connection Pool is used by multiple Netty event loop threads. + * When work stealing is enabled, each Netty event loop will maintain its own + * HTTP2 Connection Pool, and HTTP2 streams allocation will be distributed over all available + * pools using a work stealing strategy. This approach maximizes throughput and + * resource utilization in a multithreaded environment. + * + * @param progressive true if the HTTP2 Connection pools should be enabled gradually (when the nth pool becomes + * is starting to get some pendingg acquisitions request, then enable one more + * pool until all available pools are enabled). + * + * @return {@code this} + */ + Builder enableWorkStealing(boolean progressive); } /** @@ -77,6 +94,18 @@ public static Http2AllocationStrategy.Builder builder() { return new Http2AllocationStrategy.Build(); } + /** + * Creates a builder for {@link Http2AllocationStrategy} and initialize it + * with an existing strategy. This method can be used to create a mutated version + * of an existing strategy. + * + * @return a new {@link Http2AllocationStrategy.Builder} initialized with an existing http2 + * allocation strategy. + */ + public static Http2AllocationStrategy.Builder builder(Http2AllocationStrategy existing) { + return new Http2AllocationStrategy.Build(existing); + } + @Override public Http2AllocationStrategy copy() { return new Http2AllocationStrategy(this); @@ -141,9 +170,14 @@ public void returnPermits(int returned) { } } + public boolean enableWorkStealing() { + return enableWorkStealing; + } + final long maxConcurrentStreams; final int maxConnections; final int minConnections; + final boolean enableWorkStealing; volatile int permits; static final AtomicIntegerFieldUpdater PERMITS = AtomicIntegerFieldUpdater.newUpdater(Http2AllocationStrategy.class, "permits"); @@ -152,6 +186,7 @@ public void returnPermits(int returned) { this.maxConcurrentStreams = build.maxConcurrentStreams; this.maxConnections = build.maxConnections; this.minConnections = build.minConnections; + this.enableWorkStealing = build.enableWorkStealing; PERMITS.lazySet(this, this.maxConnections); } @@ -159,6 +194,7 @@ public void returnPermits(int returned) { this.maxConcurrentStreams = copy.maxConcurrentStreams; this.maxConnections = copy.maxConnections; this.minConnections = copy.minConnections; + this.enableWorkStealing = copy.enableWorkStealing; PERMITS.lazySet(this, this.maxConnections); } @@ -170,6 +206,17 @@ static final class Build implements Builder { long maxConcurrentStreams = DEFAULT_MAX_CONCURRENT_STREAMS; int maxConnections = DEFAULT_MAX_CONNECTIONS; int minConnections = DEFAULT_MIN_CONNECTIONS; + boolean enableWorkStealing = Boolean.getBoolean("reactor.netty.pool.h2.enableworkstealing"); + + Build() { + } + + Build(Http2AllocationStrategy existing) { + this.maxConcurrentStreams = existing.maxConcurrentStreams; + this.minConnections = existing.minConnections; + this.maxConnections = existing.maxConnections; + this.enableWorkStealing = existing.enableWorkStealing; + } @Override public Http2AllocationStrategy build() { @@ -206,5 +253,11 @@ public Builder minConnections(int minConnections) { this.minConnections = minConnections; return this; } + + @Override + public Builder enableWorkStealing(boolean progressive) { + this.enableWorkStealing = true; + return this; + } } } diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java index 071a37d0b7..057562b843 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java @@ -39,6 +39,9 @@ import reactor.netty.ConnectionObserver; import reactor.netty.channel.ChannelMetricsRecorder; import reactor.netty.channel.ChannelOperations; +import reactor.netty.internal.shaded.reactor.pool.PoolBuilder; +import reactor.netty.internal.shaded.reactor.pool.PoolConfig; +import reactor.netty.internal.shaded.reactor.pool.decorators.InstrumentedPoolDecorators; import reactor.netty.resources.ConnectionProvider; import reactor.netty.resources.PooledConnectionProvider; import reactor.netty.transport.ClientTransportConfig; @@ -51,13 +54,20 @@ import reactor.util.annotation.Nullable; import reactor.util.concurrent.Queues; import reactor.util.context.Context; +import reactor.util.function.Tuples; import java.io.IOException; import java.net.SocketAddress; import java.time.Duration; +import java.util.Iterator; +import java.util.List; import java.util.Queue; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiPredicate; import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import static reactor.netty.ReactorNetty.format; import static reactor.netty.ReactorNetty.getChannelContext; @@ -565,12 +575,56 @@ static final class PooledConnectionAllocator { this.config = (HttpClientConfig) config; this.remoteAddress = remoteAddress; this.resolver = resolver; - this.pool = id == null ? - poolFactory.newPool(connectChannel(), null, DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE, - poolConfig -> new Http2Pool(poolConfig, poolFactory.allocationStrategy())) : - poolFactory.newPool(connectChannel(), DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE, - new MicrometerPoolMetricsRecorder(id, name, remoteAddress), - poolConfig -> new Http2Pool(poolConfig, poolFactory.allocationStrategy())); + + Http2AllocationStrategy http2Strategy = poolFactory.allocationStrategy() instanceof Http2AllocationStrategy ? + (Http2AllocationStrategy) poolFactory.allocationStrategy() : null; + + if (http2Strategy == null || !http2Strategy.enableWorkStealing) { + this.pool = id == null ? + poolFactory.newPool(connectChannel(), null, DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE, + poolConfig -> new Http2Pool(poolConfig, poolFactory.allocationStrategy())) : + poolFactory.newPool(connectChannel(), DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE, + new MicrometerPoolMetricsRecorder(id, name, remoteAddress), + poolConfig -> new Http2Pool(poolConfig, poolFactory.allocationStrategy())); + } + else { + // Create one connection allocator (it will be shared by all Http2Pool instances) + Publisher allocator = connectChannel(); + + List execs = StreamSupport.stream(config.loopResources().onClient(true).spliterator(), false) + .limit(http2Strategy.maxConnections) + .collect(Collectors.toList()); + Iterator execsIter = execs.iterator(); + + MicrometerPoolMetricsRecorder micrometerRecorder = id == null ? null : new MicrometerPoolMetricsRecorder(id, name, remoteAddress); + AtomicInteger subPoolIndex = new AtomicInteger(); + + this.pool = InstrumentedPoolDecorators.concurrentPools(execs.size(), allocator, + (PoolBuilder> poolBuilder) -> { + int index = subPoolIndex.getAndIncrement(); + int minDiv = http2Strategy.minConnections / execs.size(); + int minMod = http2Strategy.minConnections % execs.size(); + int maxDiv = http2Strategy.maxConnections / execs.size(); + int maxMod = http2Strategy.maxConnections % execs.size(); + + int minConn = index < minMod ? minDiv + 1 : minDiv; + int maxConn = index < maxMod ? maxDiv + 1 : maxDiv; + + Http2AllocationStrategy adaptedH2Strategy = Http2AllocationStrategy.builder(http2Strategy) + .minConnections(minConn) + .maxConnections(maxConn) + .build(); + + InstrumentedPool pool = + id == null ? + poolFactory.newPool(poolBuilder, maxConn, adaptedH2Strategy, DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE, + poolConfig -> new Http2Pool(poolConfig, adaptedH2Strategy)) : + poolFactory.newPool(poolBuilder, maxConn, adaptedH2Strategy, DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE, + micrometerRecorder, + poolConfig -> new Http2Pool(poolConfig, adaptedH2Strategy)); + return Tuples.of(pool, execsIter.next()); + }); + } } Publisher connectChannel() { diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProviderMeters.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProviderMeters.java index 72e3bd986c..970b5f6a77 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProviderMeters.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProviderMeters.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2023 VMware, Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2022-2024 VMware, Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -67,6 +67,26 @@ public Meter.Type getType() { } }, + /** + * The number of HTTP/2 stream acquisition steal count. + */ + STEAL_STREAMS { + @Override + public String getName() { + return "reactor.netty.connection.provider.steal.streams"; + } + + @Override + public KeyName[] getKeyNames() { + return Http2ConnectionProviderMetersTags.values(); + } + + @Override + public Meter.Type getType() { + return Meter.Type.COUNTER; + } + }, + /** * The number of the idle connections in the connection pool. */ diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java index 7a149a9468..dc30127a1c 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java @@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.Function; @@ -120,6 +121,10 @@ class Http2Pool implements InstrumentedPool, InstrumentedPool.PoolMe static final AtomicReferenceFieldUpdater CONNECTIONS = AtomicReferenceFieldUpdater.newUpdater(Http2Pool.class, ConcurrentLinkedQueue.class, "connections"); + volatile int connectionsCount; + static final AtomicIntegerFieldUpdater CONNECTIONS_COUNT = + AtomicIntegerFieldUpdater.newUpdater(Http2Pool.class, "connectionsCount"); + volatile int idleSize; private static final AtomicIntegerFieldUpdater IDLE_SIZE = AtomicIntegerFieldUpdater.newUpdater(Http2Pool.class, "idleSize"); @@ -157,6 +162,7 @@ class Http2Pool implements InstrumentedPool, InstrumentedPool.PoolMe final Long maxConcurrentStreams; final int minConnections; final PoolConfig poolConfig; + final boolean workStealingEnabled; long lastInteractionTimestamp; @@ -166,12 +172,14 @@ class Http2Pool implements InstrumentedPool, InstrumentedPool.PoolMe this.clock = poolConfig.clock(); this.connections = new ConcurrentLinkedQueue<>(); this.lastInteractionTimestamp = clock.millis(); - this.maxConcurrentStreams = allocationStrategy instanceof Http2AllocationStrategy ? + Http2AllocationStrategy http2Strategy = allocationStrategy instanceof Http2AllocationStrategy ? (Http2AllocationStrategy) allocationStrategy : null; + + this.maxConcurrentStreams = http2Strategy != null ? ((Http2AllocationStrategy) allocationStrategy).maxConcurrentStreams() : -1; this.minConnections = allocationStrategy == null ? 0 : allocationStrategy.permitMinimum(); this.pending = new ConcurrentLinkedDeque<>(); this.poolConfig = poolConfig; - + this.workStealingEnabled = http2Strategy != null && http2Strategy.enableWorkStealing; recordInteractionTimestamp(); scheduleEviction(); } @@ -186,6 +194,41 @@ public Mono> acquire(Duration timeout) { return new BorrowerMono(this, timeout); } + @Override + public boolean hasAvailableResources() { + long totalMaxConcurrentStreams = this.totalMaxConcurrentStreams; + long estimateStreamsCount = totalMaxConcurrentStreams - acquired; + int permits = poolConfig.allocationStrategy().estimatePermitCount(); + if ((estimateStreamsCount + permits) - pendingSize <= 0) { + // no more idle streams + if (connectionsCount < poolConfig.allocationStrategy().permitMaximum()) { + // but we know we can allocate more connections, which will most likely be able to allocate many streams + return true; + } + // we can't acquire streams anymore (all streams are used and all connections are established) + return false; + + } + return true; + } + + @Override + public boolean transferBorrowersFrom(InstrumentedPool pool) { + Http2Pool other = (Http2Pool) pool; + + if (!other.isDisposed()) { + ConcurrentLinkedDeque q = other.pending; + Borrower b = other.pollPending(q, false); + if (b != null && !b.get()) { + b.setPool(this); + doAcquire(b); + return true; + } + } + + return false; + } + @Override public int acquiredSize() { return allocatedSize() - idleSize(); @@ -348,10 +391,12 @@ void doAcquire(Borrower borrower) { drain(); } - void drain() { + boolean drain() { if (WIP.getAndIncrement(this) == 0) { drainLoop(); + return true; } + return false; } void drainLoop() { @@ -390,10 +435,19 @@ void drainLoop() { log.debug(format(slot.connection.channel(), "Channel activated")); } ACQUIRED.incrementAndGet(this); - slot.connection.channel().eventLoop().execute(() -> { + if (!workStealingEnabled) { + slot.connection.channel().eventLoop().execute(() -> { + borrower.deliver(new Http2PooledRef(slot)); // will insert the connection slot into CONNECTIONS + drain(); + }); + } + else { + // WHen using the reactor work-stealing pool, we are already executing from one of the pools' executor, + // so, we can safely deliver the borrower concurrently, all the borrowers are distributed across + // all sub pools, so we won't be in a situation where the current thread will run the drainloop + // for ever under heavy requests load, so no need to reschedule. borrower.deliver(new Http2PooledRef(slot)); - drain(); - }); + } } else { int resourcesCount = idleSize; @@ -748,7 +802,7 @@ static final class Borrower extends AtomicBoolean implements Scannable, Subscrip final Duration acquireTimeout; final CoreSubscriber actual; - final Http2Pool pool; + final AtomicReference pool; long pendingAcquireStart; @@ -757,7 +811,7 @@ static final class Borrower extends AtomicBoolean implements Scannable, Subscrip Borrower(CoreSubscriber actual, Http2Pool pool, Duration acquireTimeout) { this.acquireTimeout = acquireTimeout; this.actual = actual; - this.pool = pool; + this.pool = new AtomicReference<>(pool); this.timeoutTask = TIMEOUT_DISPOSED; } @@ -765,7 +819,7 @@ static final class Borrower extends AtomicBoolean implements Scannable, Subscrip public void cancel() { stopPendingCountdown(true); // this is not failure, the subscription was canceled if (compareAndSet(false, true)) { - pool.cancelAcquire(this); + pool().cancelAcquire(this); } } @@ -776,7 +830,7 @@ Context currentContext() { @Override public void request(long n) { if (Operators.validate(n)) { - pool.doAcquire(this); + pool().doAcquire(this); } } @@ -785,7 +839,7 @@ public void run() { if (compareAndSet(false, true)) { // this is failure, a timeout was observed stopPendingCountdown(false); - pool.cancelAcquire(Http2Pool.Borrower.this); + pool().cancelAcquire(Http2Pool.Borrower.this); actual.onError(new PoolAcquireTimeoutException(acquireTimeout)); } } @@ -813,7 +867,10 @@ public String toString() { } void deliver(Http2PooledRef poolSlot) { - assert poolSlot.slot.connection.channel().eventLoop().inEventLoop(); + if (!pool().workStealingEnabled) { + // TODO can we do this check even when workstealing is enabled ? + assert poolSlot.slot.connection.channel().eventLoop().inEventLoop(); + } poolSlot.slot.incrementConcurrencyAndGet(); poolSlot.slot.deactivate(); if (get()) { @@ -835,6 +892,7 @@ void fail(Throwable error) { void stopPendingCountdown(boolean success) { if (pendingAcquireStart > 0) { + Http2Pool pool = pool(); if (success) { pool.poolConfig.metricsRecorder().recordPendingSuccessAndLatency(pool.clock.millis() - pendingAcquireStart); } @@ -846,6 +904,14 @@ void stopPendingCountdown(boolean success) { } timeoutTask.dispose(); } + + Http2Pool pool() { + return pool.get(); + } + + public void setPool(Http2Pool replace) { + pool.set(replace); + } } static final class BorrowerMono extends Mono> { @@ -896,7 +962,11 @@ public Mono invalidate() { return Mono.defer(() -> { if (compareAndSet(false, true)) { ACQUIRED.decrementAndGet(slot.pool); - return slot.pool.destroyPoolable(this).doFinally(st -> slot.pool.drain()); + return slot.pool.destroyPoolable(this).doFinally(st -> { + if (slot.pool.drain() && slot.pool.hasAvailableResources()) { + slot.pool.config().resourceManager().resourceAvailable(); + } + }); } else { return Mono.empty(); @@ -967,6 +1037,8 @@ static class Slot extends AtomicBoolean implements PooledRefMetadata { } initMaxConcurrentStreams(); TOTAL_MAX_CONCURRENT_STREAMS.addAndGet(this.pool, this.maxConcurrentStreams); + CONNECTIONS_COUNT.incrementAndGet(this.pool); + pool.config().resourceManager().resourceAvailable(); } void initMaxConcurrentStreams() { @@ -1066,6 +1138,7 @@ void invalidate() { } pool.poolConfig.allocationStrategy().returnPermits(1); TOTAL_MAX_CONCURRENT_STREAMS.addAndGet(this.pool, -maxConcurrentStreams); + CONNECTIONS_COUNT.decrementAndGet(this.pool); } } diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/MicrometerHttp2ConnectionProviderMeterRegistrar.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/MicrometerHttp2ConnectionProviderMeterRegistrar.java index 75bf6cd939..b81a075074 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/MicrometerHttp2ConnectionProviderMeterRegistrar.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/MicrometerHttp2ConnectionProviderMeterRegistrar.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2023 VMware, Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2021-2024 VMware, Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,8 +19,10 @@ import io.micrometer.core.instrument.Meter; import io.micrometer.core.instrument.Tags; import reactor.netty.internal.shaded.reactor.pool.InstrumentedPool; +import reactor.netty.internal.shaded.reactor.pool.PoolScheduler; import java.net.SocketAddress; +import java.util.List; import static reactor.netty.Metrics.REGISTRY; import static reactor.netty.http.client.Http2ConnectionProviderMeters.ACTIVE_CONNECTIONS; @@ -31,6 +33,7 @@ import static reactor.netty.http.client.Http2ConnectionProviderMeters.IDLE_CONNECTIONS; import static reactor.netty.http.client.Http2ConnectionProviderMeters.PENDING_STREAMS; import static reactor.netty.Metrics.formatSocketAddress; +import static reactor.netty.http.client.Http2ConnectionProviderMeters.STEAL_STREAMS; final class MicrometerHttp2ConnectionProviderMeterRegistrar { @@ -48,9 +51,20 @@ void registerMetrics(String poolName, String id, SocketAddress remoteAddress, In .tags(tags) .register(REGISTRY); - Gauge.builder(ACTIVE_STREAMS.getName(), metrics, poolMetrics -> ((Http2Pool) poolMetrics).activeStreams()) - .tags(tags) - .register(REGISTRY); + if (metrics instanceof Http2Pool) { + Gauge.builder(ACTIVE_STREAMS.getName(), metrics, poolMetrics -> ((Http2Pool) poolMetrics).activeStreams()) + .tags(tags) + .register(REGISTRY); + } + else if (metrics instanceof PoolScheduler) { + Gauge.builder(ACTIVE_STREAMS.getName(), metrics, poolMetrics -> getActiveStreams(((PoolScheduler) metrics).getPools())) + .tags(tags) + .register(REGISTRY); + + Gauge.builder(STEAL_STREAMS.getName(), metrics, poolMetrics -> ((PoolScheduler) metrics).stealCount()) + .tags(tags) + .register(REGISTRY); + } Gauge.builder(IDLE_CONNECTIONS.getName(), metrics, InstrumentedPool.PoolMetrics::idleSize) .tags(tags) @@ -70,4 +84,10 @@ void deRegisterMetrics(String poolName, String id, SocketAddress remoteAddress) REGISTRY.remove(new Meter.Id(IDLE_CONNECTIONS.getName(), tags, null, null, Meter.Type.GAUGE)); REGISTRY.remove(new Meter.Id(PENDING_STREAMS.getName(), tags, null, null, Meter.Type.GAUGE)); } -} \ No newline at end of file + + int getActiveStreams(List> pools) { + return pools.stream() + .mapToInt(pool -> ((Http2Pool) pool).activeStreams()) + .sum(); + } +} diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java index b08280fb28..1ccd2a2dea 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java @@ -95,6 +95,7 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; @@ -3427,18 +3428,58 @@ void testIssue3285(String serverResponse, @Nullable Class e static void testIssue3285SendRequest(HttpClient client, @Nullable Class exception) { Mono response = client.get() - .uri("/") - .responseSingle((res, bytes) -> bytes.asString()); + .uri("/") + .responseSingle((res, bytes) -> bytes.asString()); if (exception != null) { response.as(StepVerifier::create) - .expectError(exception) - .verify(Duration.ofSeconds(5)); + .expectError(exception) + .verify(Duration.ofSeconds(5)); } else { response.as(StepVerifier::create) - .expectNext("test") - .expectComplete() - .verify(Duration.ofSeconds(5)); + .expectNext("test") + .expectComplete() + .verify(Duration.ofSeconds(5)); + } + } + + @RepeatedTest(10) + void testHttp2ClientWithWorkStealing() { + disposableServer = + HttpServer.create() + .protocol(HttpProtocol.H2C) + .port(0) + .handle((req, res) -> + res.sendString(Mono.just("Welcome"))) + .bindNow(); + + ConnectionProvider provider = ConnectionProvider + .builder("http") + .allocationStrategy(Http2AllocationStrategy.builder() + .maxConcurrentStreams(100) + .minConnections(1) + .maxConnections(Runtime.getRuntime().availableProcessors()) + .enableWorkStealing(true) + .build()) + .build(); + + try { + HttpClient client = HttpClient.create(provider) + .protocol(HttpProtocol.H2C) + .port(disposableServer.port()) + .wiretap(true); + + StepVerifier.create(client + .headers(hdr -> hdr.set("Content-Type", "text/plain")) + .get() + .uri("/payload-size") + .response((r, buf) -> buf.aggregate().asString().zipWith(Mono.just(r)))) + .expectNextMatches(tuple -> "Welcome".equals(tuple.getT1()) && tuple.getT2().status().equals(HttpResponseStatus.OK)) + .expectComplete() + .verify(Duration.ofSeconds(30)); + } + finally { + provider.disposeLater().block(); } } }