diff --git a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/HttpIntegrationTests.java b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/HttpIntegrationTests.java index d0d02dda3..cea069bb0 100644 --- a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/HttpIntegrationTests.java +++ b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/HttpIntegrationTests.java @@ -213,4 +213,24 @@ public AuthenticationH2Tls() throws Exception { } + @Nested + @DisplayName("HTTP message multiplexing (HTTP/2)") + class RequestMultiplexing extends TestHttpAsyncRequestMultiplexing { + + public RequestMultiplexing() { + super(URIScheme.HTTP); + } + + } + + @Nested + @DisplayName("HTTP message multiplexing (HTTP/2, TLS)") + class RequestMultiplexingTls extends TestHttpAsyncRequestMultiplexing { + + public RequestMultiplexingTls() { + super(URIScheme.HTTPS); + } + + } + } \ No newline at end of file diff --git a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestHttpAsyncRequestMultiplexing.java b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestHttpAsyncRequestMultiplexing.java new file mode 100644 index 000000000..e387e8e85 --- /dev/null +++ b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestHttpAsyncRequestMultiplexing.java @@ -0,0 +1,98 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.testing.async; + +import static org.hamcrest.MatcherAssert.assertThat; + +import java.util.LinkedList; +import java.util.Queue; +import java.util.Random; +import java.util.concurrent.Future; + +import org.apache.hc.client5.http.config.TlsConfig; +import org.apache.hc.client5.http.protocol.HttpClientContext; +import org.apache.hc.client5.testing.extension.async.ClientProtocolLevel; +import org.apache.hc.client5.testing.extension.async.ServerProtocolLevel; +import org.apache.hc.client5.testing.extension.async.TestAsyncClient; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.Message; +import org.apache.hc.core5.http.Method; +import org.apache.hc.core5.http.URIScheme; +import org.apache.hc.core5.http.nio.entity.AsyncEntityProducers; +import org.apache.hc.core5.http.nio.entity.BasicAsyncEntityConsumer; +import org.apache.hc.core5.http.nio.support.BasicRequestProducer; +import org.apache.hc.core5.http.nio.support.BasicResponseConsumer; +import org.apache.hc.core5.http2.HttpVersionPolicy; +import org.hamcrest.CoreMatchers; +import org.junit.jupiter.api.Test; + +abstract class TestHttpAsyncRequestMultiplexing extends AbstractIntegrationTestBase { + + public TestHttpAsyncRequestMultiplexing(final URIScheme uriScheme) { + super(uriScheme, ClientProtocolLevel.MINIMAL, ServerProtocolLevel.H2_ONLY); + } + + @Test + void testConcurrentPostRequests() throws Exception { + configureServer(bootstrap -> bootstrap.register("/echo/*", AsyncEchoHandler::new)); + configureClient(custimizer -> custimizer + .setDefaultTlsConfig(TlsConfig.custom() + .setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2) + .build()) + .useMessageMultiplexing() + ); + final HttpHost target = startServer(); + final TestAsyncClient client = startClient(); + final byte[] b1 = new byte[1024]; + final Random rnd = new Random(System.currentTimeMillis()); + rnd.nextBytes(b1); + + final int reqCount = 200; + + final Queue>> queue = new LinkedList<>(); + for (int i = 0; i < reqCount; i++) { + final Future> future = client.execute( + new BasicRequestProducer(Method.POST, target, "/echo/", + AsyncEntityProducers.create(b1, ContentType.APPLICATION_OCTET_STREAM)), + new BasicResponseConsumer<>(new BasicAsyncEntityConsumer()), HttpClientContext.create(), null); + queue.add(future); + } + + while (!queue.isEmpty()) { + final Future> future = queue.remove(); + final Message responseMessage = future.get(); + assertThat(responseMessage, CoreMatchers.notNullValue()); + final HttpResponse response = responseMessage.getHead(); + assertThat(response.getCode(), CoreMatchers.equalTo(200)); + final byte[] b2 = responseMessage.getBody(); + assertThat(b1, CoreMatchers.equalTo(b2)); + } + } + +} \ No newline at end of file diff --git a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/compatibility/async/HttpAsyncClientCompatibilityTest.java b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/compatibility/async/HttpAsyncClientCompatibilityTest.java index acfa492fd..e6019d995 100644 --- a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/compatibility/async/HttpAsyncClientCompatibilityTest.java +++ b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/compatibility/async/HttpAsyncClientCompatibilityTest.java @@ -26,6 +26,9 @@ */ package org.apache.hc.client5.testing.compatibility.async; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import org.apache.hc.client5.http.ContextBuilder; @@ -38,10 +41,13 @@ import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; import org.apache.hc.client5.http.impl.auth.BasicCredentialsProvider; import org.apache.hc.client5.http.protocol.HttpClientContext; +import org.apache.hc.client5.testing.Result; import org.apache.hc.client5.testing.extension.async.HttpAsyncClientResource; +import org.apache.hc.core5.concurrent.FutureCallback; import org.apache.hc.core5.http.HttpHost; import org.apache.hc.core5.http.HttpStatus; import org.apache.hc.core5.http.HttpVersion; +import org.apache.hc.core5.http.RequestNotExecutedException; import org.apache.hc.core5.http2.HttpVersionPolicy; import org.apache.hc.core5.util.Timeout; import org.junit.jupiter.api.Assertions; @@ -51,6 +57,7 @@ public abstract class HttpAsyncClientCompatibilityTest { static final Timeout TIMEOUT = Timeout.ofSeconds(5); + static final Timeout LONG_TIMEOUT = Timeout.ofSeconds(30); private final HttpVersionPolicy versionPolicy; private final HttpHost target; @@ -119,6 +126,54 @@ void test_sequential_gets() throws Exception { } } + @Test + void test_concurrent_gets() throws Exception { + final CloseableHttpAsyncClient client = client(); + + final String[] requestUris = new String[] {"/111", "/222", "/333"}; + final int n = 200; + final Queue> queue = new ConcurrentLinkedQueue<>(); + final CountDownLatch latch = new CountDownLatch(requestUris.length * n); + + for (int i = 0; i < n; i++) { + for (final String requestUri: requestUris) { + final SimpleHttpRequest request = SimpleRequestBuilder.get() + .setHttpHost(target) + .setPath(requestUri) + .build(); + final HttpClientContext context = context(); + client.execute(request, context, new FutureCallback() { + + @Override + public void completed(final SimpleHttpResponse response) { + queue.add(new Result<>(request, response, null)); + latch.countDown(); + } + + @Override + public void failed(final Exception ex) { + queue.add(new Result<>(request, ex)); + latch.countDown(); + } + + @Override + public void cancelled() { + queue.add(new Result<>(request, new RequestNotExecutedException())); + latch.countDown(); + } + + }); + } + } + Assertions.assertTrue(latch.await(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit())); + Assertions.assertEquals(requestUris.length * n, queue.size()); + for (final Result result : queue) { + if (result.isOK()) { + Assertions.assertEquals(HttpStatus.SC_OK, result.response.getCode()); + } + } + } + @Test void test_auth_failure_wrong_auth_scope() throws Exception { addCredentials( diff --git a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/extension/async/H2OnlyMinimalTestClientBuilder.java b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/extension/async/H2OnlyMinimalTestClientBuilder.java index 7cfae6424..9e7f5247d 100644 --- a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/extension/async/H2OnlyMinimalTestClientBuilder.java +++ b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/extension/async/H2OnlyMinimalTestClientBuilder.java @@ -68,6 +68,11 @@ public TestAsyncClientBuilder setH2Config(final H2Config h2Config) { return this; } + @Override + public TestAsyncClientBuilder useMessageMultiplexing() { + return this; + } + @Override public TestAsyncClient build() throws Exception { final CloseableHttpAsyncClient client = HttpAsyncClients.createHttp2Minimal( diff --git a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/extension/async/H2OnlyTestClientBuilder.java b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/extension/async/H2OnlyTestClientBuilder.java index 53bd966a6..b38e931ee 100644 --- a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/extension/async/H2OnlyTestClientBuilder.java +++ b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/extension/async/H2OnlyTestClientBuilder.java @@ -97,6 +97,11 @@ public TestAsyncClientBuilder setTlsStrategy(final TlsStrategy tlsStrategy) { return this; } + @Override + public TestAsyncClientBuilder useMessageMultiplexing() { + return this; + } + @Override public TestAsyncClientBuilder setH2Config(final H2Config h2Config) { this.h2Config = h2Config; diff --git a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/extension/async/HttpAsyncClientResource.java b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/extension/async/HttpAsyncClientResource.java index 3fff3d597..5212fd343 100644 --- a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/extension/async/HttpAsyncClientResource.java +++ b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/extension/async/HttpAsyncClientResource.java @@ -61,6 +61,7 @@ public HttpAsyncClientResource(final HttpVersionPolicy versionPolicy) throws IOE .setDefaultTlsConfig(TlsConfig.custom() .setVersionPolicy(versionPolicy) .build()) + .setMessageMultiplexing(true) .build()); } catch (final CertificateException | NoSuchAlgorithmException | KeyStoreException | KeyManagementException ex) { throw new IllegalStateException(ex); diff --git a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/extension/async/MinimalTestClientBuilder.java b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/extension/async/MinimalTestClientBuilder.java index a56ee0f40..5c8b1c0a9 100644 --- a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/extension/async/MinimalTestClientBuilder.java +++ b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/extension/async/MinimalTestClientBuilder.java @@ -77,6 +77,12 @@ public TestAsyncClientBuilder setDefaultTlsConfig(final TlsConfig tlsConfig) { return this; } + @Override + public TestAsyncClientBuilder useMessageMultiplexing() { + this.connectionManagerBuilder.setMessageMultiplexing(true); + return this; + } + @Override public TestAsyncClientBuilder setHttp1Config(final Http1Config http1Config) { this.http1Config = http1Config; diff --git a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/extension/async/StandardTestClientBuilder.java b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/extension/async/StandardTestClientBuilder.java index 24f29dd67..bce1873ea 100644 --- a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/extension/async/StandardTestClientBuilder.java +++ b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/extension/async/StandardTestClientBuilder.java @@ -111,6 +111,12 @@ public TestAsyncClientBuilder setDefaultTlsConfig(final TlsConfig tlsConfig) { return this; } + @Override + public TestAsyncClientBuilder useMessageMultiplexing() { + this.connectionManagerBuilder.setMessageMultiplexing(true); + return this; + } + @Override public TestAsyncClientBuilder setHttp1Config(final Http1Config http1Config) { this.clientBuilder.setHttp1Config(http1Config); diff --git a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/extension/async/TestAsyncClientBuilder.java b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/extension/async/TestAsyncClientBuilder.java index 1768d1040..0536c5468 100644 --- a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/extension/async/TestAsyncClientBuilder.java +++ b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/extension/async/TestAsyncClientBuilder.java @@ -73,6 +73,10 @@ default TestAsyncClientBuilder setDefaultTlsConfig(TlsConfig tlsConfig) { throw new UnsupportedOperationException("Operation not supported by " + getProtocolLevel()); } + default TestAsyncClientBuilder useMessageMultiplexing() { + throw new UnsupportedOperationException("Operation not supported by " + getProtocolLevel()); + } + default TestAsyncClientBuilder setHttp1Config(Http1Config http1Config) { throw new UnsupportedOperationException("Operation not supported by " + getProtocolLevel()); } diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/H2SharingConnPool.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/H2SharingConnPool.java new file mode 100644 index 000000000..2efc867fe --- /dev/null +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/H2SharingConnPool.java @@ -0,0 +1,322 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.impl.nio; + +import java.io.IOException; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.hc.client5.http.impl.ConnPoolSupport; +import org.apache.hc.core5.annotation.Contract; +import org.apache.hc.core5.annotation.Experimental; +import org.apache.hc.core5.annotation.ThreadingBehavior; +import org.apache.hc.core5.concurrent.CallbackContribution; +import org.apache.hc.core5.concurrent.CompletedFuture; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.http.HttpConnection; +import org.apache.hc.core5.http.HttpVersion; +import org.apache.hc.core5.http.ProtocolVersion; +import org.apache.hc.core5.io.CloseMode; +import org.apache.hc.core5.pool.ManagedConnPool; +import org.apache.hc.core5.pool.PoolEntry; +import org.apache.hc.core5.pool.PoolStats; +import org.apache.hc.core5.util.Args; +import org.apache.hc.core5.util.Asserts; +import org.apache.hc.core5.util.TimeValue; +import org.apache.hc.core5.util.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Experimental connections pool implementation that acts as a caching facade in front of + * a standard {@link ManagedConnPool} and shares already leased connections to multiplex + * message exchanges over active HTTP/2 connections. + * @param route + * @param connection object + * + * @since 5.5 + */ +@Contract(threading = ThreadingBehavior.SAFE) +@Experimental +public class H2SharingConnPool implements ManagedConnPool { + + private static final Logger LOG = LoggerFactory.getLogger(H2SharingConnPool.class); + + private final ManagedConnPool pool; + private final ConcurrentMap> perRouteCache; + private final AtomicBoolean closed; + + public H2SharingConnPool(final ManagedConnPool pool) { + this.pool = Args.notNull(pool, "Connection pool"); + this.perRouteCache = new ConcurrentHashMap<>(); + this.closed = new AtomicBoolean(); + } + + @Override + public void close(final CloseMode closeMode) { + if (closed.compareAndSet(false, true)) { + perRouteCache.clear(); + pool.close(closeMode); + } + } + + @Override + public void close() throws IOException { + if (closed.compareAndSet(false, true)) { + perRouteCache.clear(); + pool.close(); + } + } + + PerRoutePool getPerRoutePool(final T route) { + return perRouteCache.computeIfAbsent(route, r -> new PerRoutePool<>()); + } + + @Override + public Future> lease(final T route, + final Object state, + final Timeout requestTimeout, + final FutureCallback> callback) { + Asserts.check(!closed.get(), "Connection pool shut down"); + if (state == null) { + final PerRoutePool perRoutePool = perRouteCache.get(route); + if (perRoutePool != null) { + final PoolEntry entry = perRoutePool.lease(); + if (entry != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Sharing connection {} for message exchange multiplexing (lease count = {})", + ConnPoolSupport.getId(entry.getConnection()), perRoutePool.getCount(entry)); + } + final Future> future = new CompletedFuture<>(entry); + if (callback != null) { + callback.completed(entry); + } + return future; + } + } + } + LOG.debug("No shared connection available"); + return pool.lease(route, + state, + requestTimeout, + new CallbackContribution>(callback) { + + @Override + public void completed(final PoolEntry entry) { + if (state == null) { + final C connection = entry.getConnection(); + final ProtocolVersion ver = connection != null ? connection.getProtocolVersion() : null; + if (ver == HttpVersion.HTTP_2_0) { + final PerRoutePool perRoutePool = getPerRoutePool(route); + final long count = perRoutePool.track(entry); + if (LOG.isDebugEnabled()) { + LOG.debug("Connection {} can be shared for message exchange multiplexing (lease count = {})", + ConnPoolSupport.getId(entry.getConnection()), count); + } + } + } + if (callback != null) { + callback.completed(entry); + } + } + + }); + } + + @Override + public void release(final PoolEntry entry, final boolean reusable) { + if (entry == null) { + return; + } + if (closed.get()) { + pool.release(entry, reusable); + return; + } + final T route = entry.getRoute(); + final PerRoutePool perRoutePool = perRouteCache.get(route); + if (perRoutePool != null) { + final long count = perRoutePool.release(entry, reusable); + if (count > 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("Connection {} is being shared for message exchange multiplexing (lease count = {})", + ConnPoolSupport.getId(entry.getConnection()), count); + } + return; + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("Releasing connection {} back to the pool", ConnPoolSupport.getId(entry.getConnection())); + } + pool.release(entry, reusable); + } + + @Override + public void setMaxTotal(final int max) { + pool.setMaxTotal(max); + } + + @Override + public int getMaxTotal() { + return pool.getMaxTotal(); + } + + @Override + public void setDefaultMaxPerRoute(final int max) { + pool.setDefaultMaxPerRoute(max); + } + + @Override + public int getDefaultMaxPerRoute() { + return pool.getDefaultMaxPerRoute(); + } + + @Override + public void setMaxPerRoute(final T route, final int max) { + pool.setMaxPerRoute(route, max); + } + + @Override + public int getMaxPerRoute(final T route) { + return pool.getMaxPerRoute(route); + } + + @Override + public void closeIdle(final TimeValue idleTime) { + pool.closeIdle(idleTime); + } + + @Override + public void closeExpired() { + pool.closeExpired(); + } + + @Override + public Set getRoutes() { + return pool.getRoutes(); + } + + @Override + public PoolStats getTotalStats() { + return pool.getTotalStats(); + } + + @Override + public PoolStats getStats(final T route) { + return pool.getStats(route); + } + + @Override + public String toString() { + return pool.toString(); + } + + static class PerRoutePool { + + private final Map, AtomicLong> entryMap; + private final Lock lock; + + PerRoutePool() { + this.entryMap = new HashMap<>(); + this.lock = new ReentrantLock(); + } + + AtomicLong getCounter(final PoolEntry entry) { + return entryMap.computeIfAbsent(entry, e -> new AtomicLong()); + } + + long track(final PoolEntry entry) { + lock.lock(); + try { + final AtomicLong counter = getCounter(entry); + return counter.incrementAndGet(); + } finally { + lock.unlock(); + } + } + + PoolEntry lease() { + lock.lock(); + try { + final PoolEntry entry = entryMap.entrySet().stream() + .min(Comparator.comparingLong(e -> e.getValue().get())) + .map(Map.Entry::getKey) + .orElse(null); + if (entry == null) { + return null; + } + final AtomicLong counter = getCounter(entry); + counter.incrementAndGet(); + return entry; + } finally { + lock.unlock(); + } + } + + long release(final PoolEntry entry, final boolean reusable) { + lock.lock(); + try { + final C connection = entry.getConnection(); + if (!reusable || connection == null || !connection.isOpen()) { + entryMap.remove(entry); + return 0; + } else { + final AtomicLong counter = entryMap.compute(entry, (e, c) -> { + if (c == null) { + return null; + } + final long count = c.decrementAndGet(); + return count > 0 ? c : null; + }); + return counter != null ? counter.get() : 0L; + } + } finally { + lock.unlock(); + } + } + + long getCount(final PoolEntry entry) { + lock.lock(); + try { + final AtomicLong counter = entryMap.get(entry); + return counter == null ? 0L : counter.get(); + } finally { + lock.unlock(); + } + } + + } + +} diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java index b9c13f7d0..cd5473f2f 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java @@ -160,7 +160,7 @@ public PoolingAsyncClientConnectionManager( final SchemePortResolver schemePortResolver, final DnsResolver dnsResolver) { this(new DefaultAsyncClientConnectionOperator(tlsStrategyLookup, schemePortResolver, dnsResolver), - poolConcurrencyPolicy, poolReusePolicy, timeToLive); + poolConcurrencyPolicy, poolReusePolicy, timeToLive, false); } @Internal @@ -168,11 +168,13 @@ public PoolingAsyncClientConnectionManager( final AsyncClientConnectionOperator connectionOperator, final PoolConcurrencyPolicy poolConcurrencyPolicy, final PoolReusePolicy poolReusePolicy, - final TimeValue timeToLive) { + final TimeValue timeToLive, + final boolean messageMultiplexing) { this.connectionOperator = Args.notNull(connectionOperator, "Connection operator"); + final ManagedConnPool managedConnPool; switch (poolConcurrencyPolicy != null ? poolConcurrencyPolicy : PoolConcurrencyPolicy.STRICT) { case STRICT: - this.pool = new StrictConnPool( + managedConnPool = new StrictConnPool( DEFAULT_MAX_CONNECTIONS_PER_ROUTE, DEFAULT_MAX_TOTAL_CONNECTIONS, timeToLive, @@ -187,7 +189,7 @@ public void closeExpired() { }; break; case LAX: - this.pool = new LaxConnPool( + managedConnPool = new LaxConnPool( DEFAULT_MAX_CONNECTIONS_PER_ROUTE, timeToLive, poolReusePolicy, @@ -203,6 +205,7 @@ public void closeExpired() { default: throw new IllegalArgumentException("Unexpected PoolConcurrencyPolicy value: " + poolConcurrencyPolicy); } + this.pool = messageMultiplexing ? new H2SharingConnPool<>(managedConnPool) : managedConnPool; this.closed = new AtomicBoolean(false); } diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManagerBuilder.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManagerBuilder.java index 36df24eb9..f6c93457d 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManagerBuilder.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManagerBuilder.java @@ -35,6 +35,7 @@ import org.apache.hc.client5.http.nio.AsyncClientConnectionOperator; import org.apache.hc.client5.http.ssl.ConscryptClientTlsStrategy; import org.apache.hc.client5.http.ssl.DefaultClientTlsStrategy; +import org.apache.hc.core5.annotation.Experimental; import org.apache.hc.core5.annotation.Internal; import org.apache.hc.core5.function.Resolver; import org.apache.hc.core5.http.HttpHost; @@ -87,6 +88,7 @@ public class PoolingAsyncClientConnectionManagerBuilder { private Resolver connectionConfigResolver; private Resolver tlsConfigResolver; + private boolean messageMultiplexing; public static PoolingAsyncClientConnectionManagerBuilder create() { return new PoolingAsyncClientConnectionManagerBuilder(); @@ -254,6 +256,24 @@ public final PoolingAsyncClientConnectionManagerBuilder useSystemProperties() { return this; } + /** + * Use experimental connections pool implementation that acts as a caching facade + * in front of a standard connection pool and shares already leased connections + * to multiplex message exchanges over active HTTP/2 connections. + *

+ * Please note this flag has no effect on HTTP/1.1 and HTTP/1.0 connections. + *

+ * This feature is considered experimenal + * + * @since 5.5 + * @return this instance. + */ + @Experimental + public final PoolingAsyncClientConnectionManagerBuilder setMessageMultiplexing(final boolean messageMultiplexing) { + this.messageMultiplexing = messageMultiplexing; + return this; + } + @Internal protected AsyncClientConnectionOperator createConnectionOperator( final TlsStrategy tlsStrategy, @@ -290,7 +310,8 @@ public PoolingAsyncClientConnectionManager build() { createConnectionOperator(tlsStrategyCopy, schemePortResolver, dnsResolver), poolConcurrencyPolicy, poolReusePolicy, - null); + null, + messageMultiplexing); poolingmgr.setConnectionConfigResolver(connectionConfigResolver); poolingmgr.setTlsConfigResolver(tlsConfigResolver); if (maxConnTotal > 0) { diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncClientH2FullDuplexExchange.java b/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncClientH2FullDuplexExchange.java index b6309de7b..dd4fd4dd1 100644 --- a/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncClientH2FullDuplexExchange.java +++ b/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncClientH2FullDuplexExchange.java @@ -40,6 +40,7 @@ import org.apache.hc.core5.http.EntityDetails; import org.apache.hc.core5.http.Header; import org.apache.hc.core5.http.HttpException; +import org.apache.hc.core5.http.HttpHost; import org.apache.hc.core5.http.HttpResponse; import org.apache.hc.core5.http.config.Http1Config; import org.apache.hc.core5.http.message.BasicHttpRequest; @@ -78,7 +79,12 @@ public static void main(final String[] args) throws Exception { client.start(); - final BasicHttpRequest request = BasicRequestBuilder.post("https://nghttp2.org/httpbin/post").build(); + final HttpHost target = new HttpHost("https", "nghttp2.org"); + + final BasicHttpRequest request = BasicRequestBuilder.post() + .setHttpHost(target) + .setPath("/httpbin/post") + .build(); final BasicRequestProducer requestProducer = new BasicRequestProducer(request, new BasicAsyncEntityProducer("stuff", ContentType.TEXT_PLAIN)); final BasicResponseConsumer responseConsumer = new BasicResponseConsumer<>( diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncClientH2Multiplexing.java b/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncClientH2Multiplexing.java index 442dc669f..56a9e8018 100644 --- a/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncClientH2Multiplexing.java +++ b/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncClientH2Multiplexing.java @@ -28,7 +28,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; @@ -36,84 +35,124 @@ import org.apache.hc.client5.http.async.methods.SimpleRequestProducer; import org.apache.hc.client5.http.async.methods.SimpleResponseConsumer; import org.apache.hc.client5.http.config.TlsConfig; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; import org.apache.hc.client5.http.impl.async.HttpAsyncClients; -import org.apache.hc.client5.http.impl.async.MinimalHttpAsyncClient; +import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager; import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder; import org.apache.hc.core5.concurrent.FutureCallback; import org.apache.hc.core5.http.HttpHost; -import org.apache.hc.core5.http.config.Http1Config; import org.apache.hc.core5.http.message.StatusLine; -import org.apache.hc.core5.http.nio.AsyncClientEndpoint; import org.apache.hc.core5.http2.HttpVersionPolicy; -import org.apache.hc.core5.http2.config.H2Config; import org.apache.hc.core5.io.CloseMode; import org.apache.hc.core5.reactor.IOReactorConfig; +import org.apache.hc.core5.util.Timeout; /** - * This example demonstrates concurrent (multiplexed) execution of multiple - * HTTP/2 message exchanges. + * Example of asynchronous HTTP/1.1 request execution with message exchange multiplexing + * over HTTP/2 connections. */ public class AsyncClientH2Multiplexing { public static void main(final String[] args) throws Exception { - final MinimalHttpAsyncClient client = HttpAsyncClients.createMinimal( - H2Config.DEFAULT, - Http1Config.DEFAULT, - IOReactorConfig.DEFAULT, - PoolingAsyncClientConnectionManagerBuilder.create() - .setDefaultTlsConfig(TlsConfig.custom() - .setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2) - .build()) - .build()); + final IOReactorConfig ioReactorConfig = IOReactorConfig.custom() + .setSoTimeout(Timeout.ofSeconds(5)) + .build(); + + final PoolingAsyncClientConnectionManager connectionManager = PoolingAsyncClientConnectionManagerBuilder.create() + .setDefaultTlsConfig(TlsConfig.custom() + .setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2) + .build()) + .setMessageMultiplexing(true) + .build(); + + final CloseableHttpAsyncClient client = HttpAsyncClients.custom() + .setConnectionManager(connectionManager) + .setIOReactorConfig(ioReactorConfig) + .build(); client.start(); final HttpHost target = new HttpHost("https", "nghttp2.org"); - final Future leaseFuture = client.lease(target, null); - final AsyncClientEndpoint endpoint = leaseFuture.get(30, TimeUnit.SECONDS); - try { - final String[] requestUris = new String[] {"/httpbin/ip", "/httpbin/user-agent", "/httpbin/headers"}; - - final CountDownLatch latch = new CountDownLatch(requestUris.length); - for (final String requestUri: requestUris) { - final SimpleHttpRequest request = SimpleRequestBuilder.get() - .setHttpHost(target) - .setPath(requestUri) - .build(); - - System.out.println("Executing request " + request); - endpoint.execute( - SimpleRequestProducer.create(request), - SimpleResponseConsumer.create(), - new FutureCallback() { - - @Override - public void completed(final SimpleHttpResponse response) { - latch.countDown(); - System.out.println(request + "->" + new StatusLine(response)); - System.out.println(response.getBody()); - } - - @Override - public void failed(final Exception ex) { - latch.countDown(); - System.out.println(request + "->" + ex); - } - - @Override - public void cancelled() { - latch.countDown(); - System.out.println(request + " cancelled"); - } - - }); - } - latch.await(); - } finally { - endpoint.releaseAndReuse(); + + final SimpleHttpRequest warmup = SimpleRequestBuilder.get() + .setHttpHost(target) + .setPath("/httpbin") + .build(); + + // Make sure there is an open HTTP/2 connection in the pool + System.out.println("Executing warm-up request " + warmup); + final Future future = client.execute( + SimpleRequestProducer.create(warmup), + SimpleResponseConsumer.create(), + new FutureCallback() { + + @Override + public void completed(final SimpleHttpResponse response) { + System.out.println(warmup + "->" + new StatusLine(response)); + System.out.println(response.getBody()); + } + + @Override + public void failed(final Exception ex) { + System.out.println(warmup + "->" + ex); + } + + @Override + public void cancelled() { + System.out.println(warmup + " cancelled"); + } + + }); + future.get(); + + Thread.sleep(1000); + + System.out.println("Connection pool stats: " + connectionManager.getTotalStats()); + + // Execute multiple requests over the HTTP/2 connection from the pool + final String[] requestUris = new String[]{"/httpbin", "/httpbin/ip", "/httpbin/user-agent", "/httpbin/headers"}; + final CountDownLatch countDownLatch = new CountDownLatch(requestUris.length); + + for (final String requestUri : requestUris) { + final SimpleHttpRequest request = SimpleRequestBuilder.get() + .setHttpHost(target) + .setPath(requestUri) + .build(); + + System.out.println("Executing request " + request); + client.execute( + SimpleRequestProducer.create(request), + SimpleResponseConsumer.create(), + new FutureCallback() { + + @Override + public void completed(final SimpleHttpResponse response) { + countDownLatch.countDown(); + System.out.println(request + "->" + new StatusLine(response)); + System.out.println(response.getBody()); + } + + @Override + public void failed(final Exception ex) { + countDownLatch.countDown(); + System.out.println(request + "->" + ex); + } + + @Override + public void cancelled() { + countDownLatch.countDown(); + System.out.println(request + " cancelled"); + } + + }); } + countDownLatch.await(); + + // There still should be a single connection in the pool + System.out.println("Connection pool stats: " + connectionManager.getTotalStats()); + System.out.println("Shutting down"); client.close(CloseMode.GRACEFUL); } diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncClientH2ServerPush.java b/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncClientH2ServerPush.java index 676aed477..0843af5b7 100644 --- a/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncClientH2ServerPush.java +++ b/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncClientH2ServerPush.java @@ -38,6 +38,7 @@ import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder; import org.apache.hc.core5.function.Supplier; import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.HttpHost; import org.apache.hc.core5.http.HttpRequest; import org.apache.hc.core5.http.HttpResponse; import org.apache.hc.core5.http.impl.routing.RequestRouter; @@ -109,7 +110,12 @@ public void releaseResources() { }) .build(); - final BasicHttpRequest request = BasicRequestBuilder.get("https://nghttp2.org/httpbin/").build(); + final HttpHost target = new HttpHost("https", "nghttp2.org"); + + final BasicHttpRequest request = BasicRequestBuilder.get() + .setHttpHost(target) + .setPath("/httpbin/") + .build(); System.out.println("Executing request " + request); final Future future = client.execute( diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/impl/nio/H2SharingConnPoolTest.java b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/nio/H2SharingConnPoolTest.java new file mode 100644 index 000000000..26fa2e1cb --- /dev/null +++ b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/nio/H2SharingConnPoolTest.java @@ -0,0 +1,387 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.impl.nio; + +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.hc.core5.concurrent.BasicFuture; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.http.HttpConnection; +import org.apache.hc.core5.http.HttpVersion; +import org.apache.hc.core5.io.CloseMode; +import org.apache.hc.core5.pool.ManagedConnPool; +import org.apache.hc.core5.pool.PoolEntry; +import org.apache.hc.core5.util.Timeout; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +public class H2SharingConnPoolTest { + + static final String DEFAULT_ROUTE = "DEFAULT_ROUTE"; + + @Mock + ManagedConnPool connPool; + @Mock + FutureCallback> callback; + @Mock + HttpConnection connection; + H2SharingConnPool h2SharingPool; + + @BeforeEach + void setup() { + MockitoAnnotations.openMocks(this); + h2SharingPool = new H2SharingConnPool<>(connPool); + } + + @Test + void testLeaseFutureReturned() throws Exception { + Mockito.when(connPool.lease( + Mockito.eq(DEFAULT_ROUTE), + Mockito.any(), + Mockito.any(), + Mockito.any())).thenReturn(new BasicFuture<>(null)); + + final Future> result = h2SharingPool.lease(DEFAULT_ROUTE, null, Timeout.ONE_MILLISECOND, callback); + Assertions.assertNotNull(result); + Assertions.assertFalse(result.isDone()); + + Mockito.verify(connPool).lease( + Mockito.eq(DEFAULT_ROUTE), + Mockito.eq(null), + Mockito.eq(Timeout.ONE_MILLISECOND), + Mockito.any()); + Mockito.verify(callback, Mockito.never()).completed( + Mockito.any()); + } + + @Test + void testLeaseExistingConnectionReturned() throws Exception { + final PoolEntry poolEntry = new PoolEntry<>(DEFAULT_ROUTE); + final H2SharingConnPool.PerRoutePool routePool = h2SharingPool.getPerRoutePool(DEFAULT_ROUTE); + routePool.track(poolEntry); + + final Future> future = h2SharingPool.lease(DEFAULT_ROUTE, null, Timeout.ONE_MILLISECOND, callback); + Assertions.assertNotNull(future); + Assertions.assertSame(poolEntry, future.get()); + + Mockito.verify(connPool, Mockito.never()).lease( + Mockito.eq(DEFAULT_ROUTE), + Mockito.any(), + Mockito.any(), + Mockito.any()); + Mockito.verify(callback).completed( + Mockito.same(poolEntry)); + } + + @Test + void testLeaseWithStateCacheBypassed() throws Exception { + final PoolEntry poolEntry = new PoolEntry<>(DEFAULT_ROUTE); + final H2SharingConnPool.PerRoutePool routePool = h2SharingPool.getPerRoutePool(DEFAULT_ROUTE); + routePool.track(poolEntry); + + Mockito.when(connPool.lease( + Mockito.eq(DEFAULT_ROUTE), + Mockito.any(), + Mockito.any(), + Mockito.any())).thenReturn(new BasicFuture<>(null)); + + final Future> result = h2SharingPool.lease(DEFAULT_ROUTE, "stuff", Timeout.ONE_MILLISECOND, callback); + Assertions.assertNotNull(result); + Assertions.assertFalse(result.isDone()); + + Mockito.verify(connPool).lease( + Mockito.eq(DEFAULT_ROUTE), + Mockito.eq("stuff"), + Mockito.eq(Timeout.ONE_MILLISECOND), + Mockito.any()); + Mockito.verify(callback, Mockito.never()).completed( + Mockito.any()); + } + + @Test + void testLeaseNewConnectionReturnedAndCached() throws Exception { + final AtomicReference>> futureRef = new AtomicReference<>(); + Mockito.when(connPool.lease( + Mockito.eq(DEFAULT_ROUTE), + Mockito.any(), + Mockito.any(), + Mockito.any())).thenAnswer(invocationOnMock -> { + final BasicFuture> future = new BasicFuture<>(invocationOnMock.getArgument(3)); + futureRef.set(future); + return future; + }); + + final Future> result = h2SharingPool.lease(DEFAULT_ROUTE, null, Timeout.ONE_MILLISECOND, callback); + final BasicFuture> future = futureRef.get(); + Assertions.assertNotNull(future); + + final PoolEntry poolEntry = new PoolEntry<>(DEFAULT_ROUTE); + poolEntry.assignConnection(connection); + Mockito.when(connection.getProtocolVersion()).thenReturn(HttpVersion.HTTP_2); + future.completed(poolEntry); + + Assertions.assertTrue(result.isDone()); + + Mockito.verify(connPool).lease( + Mockito.eq(DEFAULT_ROUTE), + Mockito.eq(null), + Mockito.eq(Timeout.ONE_MILLISECOND), + Mockito.any()); + Mockito.verify(callback).completed( + Mockito.any()); + + final H2SharingConnPool.PerRoutePool routePool = h2SharingPool.getPerRoutePool(DEFAULT_ROUTE); + Assertions.assertEquals(1, routePool.getCount(poolEntry)); + } + + @Test + void testLeaseNewConnectionReturnedAndNotCached() throws Exception { + final AtomicReference>> futureRef = new AtomicReference<>(); + Mockito.when(connPool.lease( + Mockito.eq(DEFAULT_ROUTE), + Mockito.any(), + Mockito.any(), + Mockito.any())).thenAnswer(invocationOnMock -> { + final BasicFuture> future = new BasicFuture<>(invocationOnMock.getArgument(3)); + futureRef.set(future); + return future; + }); + + final Future> result = h2SharingPool.lease(DEFAULT_ROUTE, null, Timeout.ONE_MILLISECOND, callback); + final BasicFuture> future = futureRef.get(); + Assertions.assertNotNull(future); + + final PoolEntry poolEntry = new PoolEntry<>(DEFAULT_ROUTE); + poolEntry.assignConnection(connection); + Mockito.when(connection.getProtocolVersion()).thenReturn(HttpVersion.HTTP_1_1); + future.completed(poolEntry); + + Assertions.assertTrue(result.isDone()); + + Mockito.verify(connPool).lease( + Mockito.eq(DEFAULT_ROUTE), + Mockito.eq(null), + Mockito.eq(Timeout.ONE_MILLISECOND), + Mockito.any()); + Mockito.verify(callback).completed( + Mockito.any()); + + final H2SharingConnPool.PerRoutePool routePool = h2SharingPool.getPerRoutePool(DEFAULT_ROUTE); + Assertions.assertEquals(0, routePool.getCount(poolEntry)); + } + + @Test + void testLeaseNoConnection() throws Exception { + final AtomicReference>> futureRef = new AtomicReference<>(); + Mockito.when(connPool.lease( + Mockito.eq(DEFAULT_ROUTE), + Mockito.any(), + Mockito.any(), + Mockito.any())).thenAnswer(invocationOnMock -> { + final BasicFuture> future = new BasicFuture<>(invocationOnMock.getArgument(3)); + futureRef.set(future); + return future; + }); + + final Future> result = h2SharingPool.lease(DEFAULT_ROUTE, null, Timeout.ONE_MILLISECOND, callback); + final BasicFuture> future = futureRef.get(); + Assertions.assertNotNull(future); + + final PoolEntry poolEntry = new PoolEntry<>(DEFAULT_ROUTE); + poolEntry.discardConnection(CloseMode.IMMEDIATE); + future.completed(poolEntry); + + Assertions.assertTrue(result.isDone()); + + Mockito.verify(connPool).lease( + Mockito.eq(DEFAULT_ROUTE), + Mockito.eq(null), + Mockito.eq(Timeout.ONE_MILLISECOND), + Mockito.any()); + Mockito.verify(callback).completed( + Mockito.any()); + + final H2SharingConnPool.PerRoutePool routePool = h2SharingPool.getPerRoutePool(DEFAULT_ROUTE); + Assertions.assertEquals(0, routePool.getCount(poolEntry)); + } + + @Test + void testLeaseWithStateNewConnectionReturnedAndNotCached() throws Exception { + final AtomicReference>> futureRef = new AtomicReference<>(); + Mockito.when(connPool.lease( + Mockito.eq(DEFAULT_ROUTE), + Mockito.any(), + Mockito.any(), + Mockito.any())).thenAnswer(invocationOnMock -> { + final BasicFuture> future = new BasicFuture<>(invocationOnMock.getArgument(3)); + futureRef.set(future); + return future; + }); + + final Future> result = h2SharingPool.lease(DEFAULT_ROUTE, "stuff", Timeout.ONE_MILLISECOND, callback); + final BasicFuture> future = futureRef.get(); + Assertions.assertNotNull(future); + + final PoolEntry poolEntry = new PoolEntry<>(DEFAULT_ROUTE); + poolEntry.assignConnection(connection); + Mockito.when(connection.getProtocolVersion()).thenReturn(HttpVersion.HTTP_2); + future.completed(poolEntry); + + Assertions.assertTrue(result.isDone()); + + Mockito.verify(connPool).lease( + Mockito.eq(DEFAULT_ROUTE), + Mockito.eq("stuff"), + Mockito.eq(Timeout.ONE_MILLISECOND), + Mockito.any()); + Mockito.verify(callback).completed( + Mockito.any()); + + final H2SharingConnPool.PerRoutePool routePool = h2SharingPool.getPerRoutePool(DEFAULT_ROUTE); + Assertions.assertEquals(0, routePool.getCount(poolEntry)); + } + + @Test + void testReleaseReusableNoCacheReturnedToPool() throws Exception { + final PoolEntry poolEntry = new PoolEntry<>(DEFAULT_ROUTE); + poolEntry.assignConnection(connection); + Mockito.when(connection.isOpen()).thenReturn(true); + + h2SharingPool.release(poolEntry, true); + + Mockito.verify(connPool).release( + Mockito.same(poolEntry), + Mockito.eq(true)); + } + + @Test + void testReleaseReusableNotInCacheReturnedToPool() throws Exception { + final PoolEntry poolEntry = new PoolEntry<>(DEFAULT_ROUTE); + poolEntry.assignConnection(connection); + Mockito.when(connection.isOpen()).thenReturn(true); + final H2SharingConnPool.PerRoutePool routePool = h2SharingPool.getPerRoutePool(DEFAULT_ROUTE); + routePool.track(poolEntry); + + h2SharingPool.release(poolEntry, true); + + Mockito.verify(connPool).release( + Mockito.same(poolEntry), + Mockito.eq(true)); + } + + @Test + void testReleaseReusableInCacheNotReturnedToPool() throws Exception { + final PoolEntry poolEntry = new PoolEntry<>(DEFAULT_ROUTE); + poolEntry.assignConnection(connection); + Mockito.when(connection.isOpen()).thenReturn(true); + final H2SharingConnPool.PerRoutePool routePool = h2SharingPool.getPerRoutePool(DEFAULT_ROUTE); + routePool.track(poolEntry); + routePool.track(poolEntry); + + h2SharingPool.release(poolEntry, true); + + Mockito.verify(connPool, Mockito.never()).release( + Mockito.same(poolEntry), + Mockito.anyBoolean()); + } + + @Test + void testReleaseNonReusableInCacheReturnedToPool() throws Exception { + final PoolEntry poolEntry = new PoolEntry<>(DEFAULT_ROUTE); + poolEntry.assignConnection(connection); + Mockito.when(connection.isOpen()).thenReturn(true); + final H2SharingConnPool.PerRoutePool routePool = h2SharingPool.getPerRoutePool(DEFAULT_ROUTE); + routePool.track(poolEntry); + routePool.track(poolEntry); + + h2SharingPool.release(poolEntry, false); + + Mockito.verify(connPool).release( + Mockito.same(poolEntry), + Mockito.eq(false)); + } + + @Test + void testReleaseReusableAndClosedInCacheReturnedToPool() throws Exception { + final PoolEntry poolEntry = new PoolEntry<>(DEFAULT_ROUTE); + poolEntry.assignConnection(connection); + Mockito.when(connection.isOpen()).thenReturn(false); + final H2SharingConnPool.PerRoutePool routePool = h2SharingPool.getPerRoutePool(DEFAULT_ROUTE); + routePool.track(poolEntry); + routePool.track(poolEntry); + + h2SharingPool.release(poolEntry, true); + + Mockito.verify(connPool).release( + Mockito.same(poolEntry), + Mockito.eq(true)); + } + + @Test + void testClose() throws Exception { + h2SharingPool.close(); + + Mockito.verify(connPool).close(); + } + + @Test + void testCloseMode() throws Exception { + h2SharingPool.close(CloseMode.IMMEDIATE); + + Mockito.verify(connPool).close(CloseMode.IMMEDIATE); + } + + @Test + void testLeasePoolClosed() throws Exception { + h2SharingPool.close(); + + Assertions.assertThrows(IllegalStateException.class, () -> h2SharingPool.lease(DEFAULT_ROUTE, null, Timeout.ONE_MILLISECOND, callback)); + } + + @Test + void testReleasePoolClosed() throws Exception { + final PoolEntry poolEntry = new PoolEntry<>(DEFAULT_ROUTE); + poolEntry.assignConnection(connection); + Mockito.when(connection.isOpen()).thenReturn(false); + final H2SharingConnPool.PerRoutePool routePool = h2SharingPool.getPerRoutePool(DEFAULT_ROUTE); + routePool.track(poolEntry); + + h2SharingPool.close(); + + h2SharingPool.release(poolEntry, true); + + Mockito.verify(connPool).release( + Mockito.same(poolEntry), + Mockito.eq(true)); + } + +} diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/impl/nio/H2SharingPerRoutePoolTest.java b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/nio/H2SharingPerRoutePoolTest.java new file mode 100644 index 000000000..dc1a573fa --- /dev/null +++ b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/nio/H2SharingPerRoutePoolTest.java @@ -0,0 +1,130 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.impl.nio; + +import org.apache.hc.core5.http.HttpConnection; +import org.apache.hc.core5.io.CloseMode; +import org.apache.hc.core5.pool.PoolEntry; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +public class H2SharingPerRoutePoolTest { + + static PoolEntry createMockEntry() { + final PoolEntry poolEntry = new PoolEntry<>("some route"); + final HttpConnection conn = Mockito.mock(HttpConnection.class); + Mockito.when(conn.isOpen()).thenReturn(true); + poolEntry.assignConnection(conn); + return poolEntry; + } + + H2SharingConnPool.PerRoutePool pool; + PoolEntry poolEntry1; + PoolEntry poolEntry2; + + @BeforeEach + void setup() { + pool = new H2SharingConnPool.PerRoutePool<>(); + poolEntry1 = createMockEntry(); + poolEntry2 = createMockEntry(); + } + + @Test + void testKeep() { + Assertions.assertEquals(1, pool.track(poolEntry1)); + Assertions.assertEquals(2, pool.track(poolEntry1)); + Assertions.assertEquals(1, pool.track(poolEntry2)); + Assertions.assertEquals(3, pool.track(poolEntry1)); + } + + @Test + void testLeaseLeastUsed() { + pool.track(poolEntry1); + pool.track(poolEntry1); + pool.track(poolEntry2); + Assertions.assertSame(poolEntry2, pool.lease()); + Assertions.assertEquals(2, pool.getCount(poolEntry2)); + + final PoolEntry poolEntry = pool.lease(); + Assertions.assertEquals(3, pool.getCount(poolEntry)); + } + + @Test + void testLeaseEmptyPool() { + Assertions.assertNull(pool.lease()); + } + + @Test + void testReleaseReusable() { + pool.track(poolEntry1); + pool.track(poolEntry1); + pool.track(poolEntry1); + + Assertions.assertEquals(2, pool.release(poolEntry1, true)); + Assertions.assertEquals(1, pool.release(poolEntry1, true)); + Assertions.assertEquals(0, pool.release(poolEntry1, true)); + Assertions.assertEquals(0, pool.release(poolEntry1, true)); + } + + @Test + void testReleaseNonReusable() { + pool.track(poolEntry1); + pool.track(poolEntry1); + pool.track(poolEntry1); + + Assertions.assertEquals(0, pool.release(poolEntry1, false)); + } + + @Test + void testReleaseNonPresent() { + Assertions.assertEquals(0, pool.release(poolEntry1, true)); + Assertions.assertEquals(0, pool.release(poolEntry2, true)); + } + + @Test + void testReleaseConnectionClosed() { + pool.track(poolEntry1); + pool.track(poolEntry1); + pool.track(poolEntry1); + + Mockito.when(poolEntry1.getConnection().isOpen()).thenReturn(false); + Assertions.assertEquals(0, pool.release(poolEntry1, true)); + } + + @Test + void testReleaseConnectionMissing() { + pool.track(poolEntry1); + pool.track(poolEntry1); + pool.track(poolEntry1); + + poolEntry1.discardConnection(CloseMode.IMMEDIATE); + Assertions.assertEquals(0, pool.release(poolEntry1, true)); + } + +}