Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add API for configuring minimum connections for HTTP/2 connection pool #2155

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2021 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2020-2022 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,6 +21,8 @@

import java.util.Objects;

import static io.netty.handler.codec.http2.Http2CodecUtil.NUM_STANDARD_SETTINGS;

/**
* A configuration builder to fine tune the {@link Http2Settings}.
*
Expand Down Expand Up @@ -78,6 +80,15 @@ public interface Builder {
*/
Builder maxHeaderListSize(long maxHeaderListSize);

/**
* Sets the {@code SETTINGS_MIN_CONNECTIONS} value.
*
* @param minConnections the {@code SETTINGS_MAX_HEADER_LIST_SIZE} value
* @return {@code this}
* @since 1.0.19
*/
Builder minConnections(int minConnections);

/**
* Sets the {@code SETTINGS_ENABLE_PUSH} value.
*
Expand Down Expand Up @@ -147,6 +158,18 @@ public Long maxHeaderListSize() {
return maxHeaderListSize;
}

/**
* Returns the configured {@code SETTINGS_MIN_CONNECTIONS} value or
* the default {@code 0}.
*
* @return the configured {@code SETTINGS_MIN_CONNECTIONS} value or
* the default {@code 0}.
* @since 1.0.19
*/
public Integer minConnections() {
return minConnections;
}

/**
* Returns the configured {@code SETTINGS_ENABLE_PUSH} value or null.
*
Expand All @@ -171,19 +194,24 @@ public boolean equals(Object o) {
Objects.equals(maxConcurrentStreams, that.maxConcurrentStreams) &&
Objects.equals(maxFrameSize, that.maxFrameSize) &&
maxHeaderListSize.equals(that.maxHeaderListSize) &&
minConnections.equals(that.minConnections) &&
Objects.equals(pushEnabled, that.pushEnabled);
}

@Override
public int hashCode() {
return Objects.hash(headerTableSize, initialWindowSize, maxConcurrentStreams, maxFrameSize, maxHeaderListSize, pushEnabled);
return Objects.hash(headerTableSize, initialWindowSize, maxConcurrentStreams, maxFrameSize, maxHeaderListSize,
minConnections, pushEnabled);
}

static final char SETTINGS_MIN_CONNECTIONS = NUM_STANDARD_SETTINGS + 1;

final Long headerTableSize;
final Integer initialWindowSize;
final Long maxConcurrentStreams;
final Integer maxFrameSize;
final Long maxHeaderListSize;
final Integer minConnections;
final Boolean pushEnabled;

Http2SettingsSpec(Build build) {
Expand All @@ -193,11 +221,19 @@ public int hashCode() {
maxConcurrentStreams = settings.maxConcurrentStreams();
maxFrameSize = settings.maxFrameSize();
maxHeaderListSize = settings.maxHeaderListSize();
minConnections = settings.getIntValue(SETTINGS_MIN_CONNECTIONS);
pushEnabled = settings.pushEnabled();
}

static final class Build implements Builder {
final Http2Settings http2Settings = Http2Settings.defaultSettings();
static final Long DEFAULT_MIN_CONNECTIONS = 0L;

final Http2Settings http2Settings;

Build() {
http2Settings = Http2Settings.defaultSettings();
http2Settings.put(SETTINGS_MIN_CONNECTIONS, DEFAULT_MIN_CONNECTIONS);
}

@Override
public Http2SettingsSpec build() {
Expand Down Expand Up @@ -234,6 +270,15 @@ public Builder maxHeaderListSize(long maxHeaderListSize) {
return this;
}

@Override
public Builder minConnections(int minConnections) {
if (minConnections < 0) {
throw new IllegalArgumentException("Setting MIN_CONNECTIONS is invalid: " + minConnections);
}
http2Settings.put(SETTINGS_MIN_CONNECTIONS, Long.valueOf(minConnections));
return this;
}

/*
@Override
public Builder pushEnabled(boolean pushEnabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,8 @@ static final class PooledConnectionAllocator {
this.remoteAddress = remoteAddress;
this.resolver = resolver;
this.pool = poolFactory.newPool(connectChannel(), null, DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE,
poolConFig -> new Http2Pool(poolConFig, poolFactory.maxLifeTime()));
poolConFig -> new Http2Pool(poolConFig, poolFactory.maxLifeTime(),
this.config.http2Settings != null ? this.config.http2Settings.minConnections() : 0));
}

Publisher<Connection> connectChannel() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@
* <li>{@link PoolMetrics#idleSize()} always returns {@code 0}.</li>
* </ul>
* <p>
* If minimum connections is specified, the cached connections with active streams will be kept at that minimum
* (can be the best effort). However, if the cached connections have reached max concurrent streams,
* then new connections will be allocated up to the maximum connections limit.
Copy link

@crankydillo crankydillo Apr 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, so the code now will do this, right:
For any new HTTP/2 request, if the number of socket connections is below the minimum number of connections, a new socket connection will get created and the HTTP/2 stream will get created on that new socket connection, right?

I don't really get that from the javadoc, but that may just be me.

What does the best effort part mean? I think I'm missing that part in the code and want to make sure I'm not misunderstanding something?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, so the code now will do this, right: For any new HTTP/2 request, if the number of socket connections is below the minimum number of connections, a new socket connection will get created and the HTTP/2 stream will get created on that new socket connection, right?

yes

I don't really get that from the javadoc, but that may just be me.

What does the best effort part mean? I think I'm missing that part in the code and want to make sure I'm not misunderstanding something?

As we do not have a strong synchronisation between when we return the connection and we acquire the connection we may not be able to stay exactly on the min number that is specified.

* <p>
* Configurations that are not applicable
* <ul>
* <li>{@link PoolConfig#destroyHandler()} - the destroy handler cannot be used as the destruction is more complex.</li>
Expand All @@ -94,7 +98,6 @@
* <li>{@link PoolConfig#reuseIdleResourcesInLruOrder()} - FIFO is used when checking the connections.</li>
* <li>FIFO is used when obtaining the pending borrowers</li>
* <li>Warm up functionality is not supported</li>
* <li>Setting minimum connections configuration is not supported</li>
* </ul>
* <p>This class is based on
* https://github.com/reactor/reactor-pool/blob/v0.2.7/src/main/java/reactor/pool/SimpleDequePool.java
Expand Down Expand Up @@ -141,18 +144,24 @@ final class Http2Pool implements InstrumentedPool<Connection>, InstrumentedPool.

final Clock clock;
final long maxLifeTime;
final int minConnections;
final PoolConfig<Connection> poolConfig;

long lastInteractionTimestamp;

Http2Pool(PoolConfig<Connection> poolConfig, long maxLifeTime) {
Http2Pool(PoolConfig<Connection> poolConfig, long maxLifeTime, int minConnections) {
if (poolConfig.allocationStrategy().getPermits(0) != 0) {
throw new IllegalArgumentException("No support for configuring minimum number of connections");
throw new IllegalArgumentException(
"No support for configuring minimum number of connections via AllocationStrategy");
}
if (minConnections > poolConfig.allocationStrategy().permitMaximum()) {
throw new IllegalArgumentException("Minimum number of connections must be less than or equal to maximum");
}
this.clock = poolConfig.clock();
this.connections = new ConcurrentLinkedQueue<>();
this.lastInteractionTimestamp = clock.millis();
this.maxLifeTime = maxLifeTime;
this.minConnections = minConnections;
this.pending = new ConcurrentLinkedDeque<>();
this.poolConfig = poolConfig;

Expand Down Expand Up @@ -310,8 +319,11 @@ void drainLoop() {
int borrowersCount = pendingSize;

if (borrowersCount != 0) {
int resourcesCount = resources.size();
// find a connection that can be used for opening a new stream
Slot slot = findConnection(resources);
// when cached connections are below minimum connections, then allocate a new connection
boolean belowMinConnections = minConnections > 0 && resourcesCount < minConnections;
Slot slot = belowMinConnections ? null : findConnection(resources, resourcesCount);
if (slot != null) {
Borrower borrower = pollPending(borrowers, true);
if (borrower == null) {
Expand All @@ -338,54 +350,59 @@ void drainLoop() {
}
}
else {
int permits = poolConfig.allocationStrategy().getPermits(1);
if (permits <= 0) {
if (maxPending >= 0) {
borrowersCount = pendingSize;
int toCull = borrowersCount - maxPending;
for (int i = 0; i < toCull; i++) {
Borrower extraneous = pollPending(borrowers, true);
if (extraneous != null) {
pendingAcquireLimitReached(extraneous, maxPending);
}
}
}
if (belowMinConnections && poolConfig.allocationStrategy().permitGranted() >= minConnections) {
// connections allocations were triggered
}
else {
Borrower borrower = pollPending(borrowers, true);
if (borrower == null) {
continue;
int permits = poolConfig.allocationStrategy().getPermits(1);
if (permits <= 0) {
if (maxPending >= 0) {
borrowersCount = pendingSize;
int toCull = borrowersCount - maxPending;
for (int i = 0; i < toCull; i++) {
Borrower extraneous = pollPending(borrowers, true);
if (extraneous != null) {
pendingAcquireLimitReached(extraneous, maxPending);
}
}
}
}
if (isDisposed()) {
borrower.fail(new PoolShutdownException());
return;
else {
Borrower borrower = pollPending(borrowers, true);
if (borrower == null) {
continue;
}
if (isDisposed()) {
borrower.fail(new PoolShutdownException());
return;
}
borrower.stopPendingCountdown();
Mono<Connection> allocator = poolConfig.allocator();
Mono<Connection> primary =
allocator.doOnEach(sig -> {
if (sig.isOnNext()) {
Connection newInstance = sig.get();
assert newInstance != null;
Slot newSlot = new Slot(this, newInstance);
if (log.isDebugEnabled()) {
log.debug(format(newInstance.channel(), "Channel activated"));
}
ACQUIRED.incrementAndGet(this);
newSlot.incrementConcurrencyAndGet();
newSlot.deactivate();
borrower.deliver(new Http2PooledRef(newSlot));
}
else if (sig.isOnError()) {
Throwable error = sig.getThrowable();
assert error != null;
poolConfig.allocationStrategy().returnPermits(1);
borrower.fail(error);
}
})
.contextWrite(borrower.currentContext());

primary.subscribe(alreadyPropagated -> {}, alreadyPropagatedOrLogged -> drain(), this::drain);
}
borrower.stopPendingCountdown();
Mono<Connection> allocator = poolConfig.allocator();
Mono<Connection> primary =
allocator.doOnEach(sig -> {
if (sig.isOnNext()) {
Connection newInstance = sig.get();
assert newInstance != null;
Slot newSlot = new Slot(this, newInstance);
if (log.isDebugEnabled()) {
log.debug(format(newInstance.channel(), "Channel activated"));
}
ACQUIRED.incrementAndGet(this);
newSlot.incrementConcurrencyAndGet();
newSlot.deactivate();
borrower.deliver(new Http2PooledRef(newSlot));
}
else if (sig.isOnError()) {
Throwable error = sig.getThrowable();
assert error != null;
poolConfig.allocationStrategy().returnPermits(1);
borrower.fail(error);
}
})
.contextWrite(borrower.currentContext());

primary.subscribe(alreadyPropagated -> {}, alreadyPropagatedOrLogged -> drain(), this::drain);
}
}
}
Expand All @@ -398,8 +415,7 @@ else if (sig.isOnError()) {
}

@Nullable
Slot findConnection(ConcurrentLinkedQueue<Slot> resources) {
int resourcesCount = resources.size();
Slot findConnection(ConcurrentLinkedQueue<Slot> resources, int resourcesCount) {
while (resourcesCount > 0) {
// There are connections in the queue

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2021 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2020-2022 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,6 +21,7 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static reactor.netty.http.Http2SettingsSpec.Build.DEFAULT_MIN_CONNECTIONS;

class Http2SettingsSpecTests {

Expand All @@ -40,6 +41,7 @@ void headerTableSize() {
assertThat(spec.maxConcurrentStreams()).isNull();
assertThat(spec.maxFrameSize()).isNull();
assertThat(spec.maxHeaderListSize()).isEqualTo(Http2CodecUtil.DEFAULT_HEADER_LIST_SIZE);
assertThat(spec.minConnections()).isEqualTo(DEFAULT_MIN_CONNECTIONS.intValue());
assertThat(spec.pushEnabled()).isNull();
}

Expand All @@ -59,6 +61,7 @@ void initialWindowSize() {
assertThat(spec.maxConcurrentStreams()).isNull();
assertThat(spec.maxFrameSize()).isNull();
assertThat(spec.maxHeaderListSize()).isEqualTo(Http2CodecUtil.DEFAULT_HEADER_LIST_SIZE);
assertThat(spec.minConnections()).isEqualTo(DEFAULT_MIN_CONNECTIONS.intValue());
assertThat(spec.pushEnabled()).isNull();
}

Expand All @@ -78,6 +81,7 @@ void maxConcurrentStreams() {
assertThat(spec.maxConcurrentStreams()).isEqualTo(123);
assertThat(spec.maxFrameSize()).isNull();
assertThat(spec.maxHeaderListSize()).isEqualTo(Http2CodecUtil.DEFAULT_HEADER_LIST_SIZE);
assertThat(spec.minConnections()).isEqualTo(DEFAULT_MIN_CONNECTIONS.intValue());
assertThat(spec.pushEnabled()).isNull();
}

Expand All @@ -97,6 +101,7 @@ void maxFrameSize() {
assertThat(spec.maxConcurrentStreams()).isNull();
assertThat(spec.maxFrameSize()).isEqualTo(16384);
assertThat(spec.maxHeaderListSize()).isEqualTo(Http2CodecUtil.DEFAULT_HEADER_LIST_SIZE);
assertThat(spec.minConnections()).isEqualTo(DEFAULT_MIN_CONNECTIONS.intValue());
assertThat(spec.pushEnabled()).isNull();
}

Expand All @@ -116,6 +121,7 @@ void maxHeaderListSize() {
assertThat(spec.maxConcurrentStreams()).isNull();
assertThat(spec.maxFrameSize()).isNull();
assertThat(spec.maxHeaderListSize()).isEqualTo(123);
assertThat(spec.minConnections()).isEqualTo(DEFAULT_MIN_CONNECTIONS.intValue());
assertThat(spec.pushEnabled()).isNull();
}

Expand All @@ -126,6 +132,26 @@ void maxHeaderListSizeBadValues() {
.withMessage("Setting MAX_HEADER_LIST_SIZE is invalid: -1");
}

@Test
void minConnections() {
builder.minConnections(4);
Http2SettingsSpec spec = builder.build();
assertThat(spec.headerTableSize()).isNull();
assertThat(spec.initialWindowSize()).isNull();
assertThat(spec.maxConcurrentStreams()).isNull();
assertThat(spec.maxFrameSize()).isNull();
assertThat(spec.maxHeaderListSize()).isEqualTo(Http2CodecUtil.DEFAULT_HEADER_LIST_SIZE);
assertThat(spec.minConnections()).isEqualTo(4);
assertThat(spec.pushEnabled()).isNull();
}

@Test
void minConnectionsBadValues() {
assertThatExceptionOfType(IllegalArgumentException.class)
.isThrownBy(() -> builder.minConnections(-1))
.withMessage("Setting MIN_CONNECTIONS is invalid: -1");
}

/*
@Test
public void pushEnabled() {
Expand Down
Loading