diff --git a/README.md b/README.md index 45153d2..b3b108c 100644 --- a/README.md +++ b/README.md @@ -72,6 +72,7 @@ Mono resultMono = Mono.usingWhen(pooledConnectionFactory.create(), | `registerJmx` | Whether to register the pool to JMX. | `validationDepth` | Validation depth used to validate an R2DBC connection. Defaults to `LOCAL`. | `validationQuery` | Query that will be executed just before a connection is given to you from the pool to validate that the connection to the database is still alive. +| `warmupParallelism` | The concurrency level used when the allocator is subscribed to during the warmup phase. Default to `1`. All other properties are driver-specific. diff --git a/src/main/java/io/r2dbc/pool/ConnectionPool.java b/src/main/java/io/r2dbc/pool/ConnectionPool.java index 5c57f9c..7339f72 100644 --- a/src/main/java/io/r2dbc/pool/ConnectionPool.java +++ b/src/main/java/io/r2dbc/pool/ConnectionPool.java @@ -276,9 +276,14 @@ private InstrumentedPool createConnectionPool(ConnectionPoolConfigur .idleResourceReuseMruOrder(); // MRU to support eviction of idle if (maxSize == -1 || initialSize > 0) { - builder.sizeBetween(Math.max(configuration.getMinIdle(), initialSize), maxSize == -1 ? Integer.MAX_VALUE : maxSize); + builder.sizeBetween( + Math.max(configuration.getMinIdle(), initialSize), + maxSize == -1 ? Integer.MAX_VALUE : maxSize, + configuration.getWarmupParallelism()); } else { - builder.sizeBetween(Math.max(configuration.getMinIdle(), initialSize), maxSize); + builder.sizeBetween(Math.max(configuration.getMinIdle(), initialSize), + maxSize, + configuration.getWarmupParallelism()); } Duration backgroundEvictionInterval = configuration.getBackgroundEvictionInterval(); diff --git a/src/main/java/io/r2dbc/pool/ConnectionPoolConfiguration.java b/src/main/java/io/r2dbc/pool/ConnectionPoolConfiguration.java index a8cb6e7..861e3f4 100644 --- a/src/main/java/io/r2dbc/pool/ConnectionPoolConfiguration.java +++ b/src/main/java/io/r2dbc/pool/ConnectionPoolConfiguration.java @@ -52,6 +52,11 @@ public final class ConnectionPoolConfiguration { */ public static final Duration NO_TIMEOUT = Duration.ofMillis(-1); + /** + * Constant indicating the default parallelism used during connection pool warmup. + */ + public static final int DEFAULT_WARMUP_PARALLELISM = 1; + @Nullable private final Scheduler allocatorSubscribeOn; @@ -99,12 +104,14 @@ public final class ConnectionPoolConfiguration { @Nullable private final String validationQuery; + private final int warmupParallelism; + private ConnectionPoolConfiguration(@Nullable Scheduler allocatorSubscribeOn, int acquireRetry, @Nullable Duration backgroundEvictionInterval, ConnectionFactory connectionFactory, Clock clock, Consumer>> customizer, int initialSize, int maxSize, int minIdle, Duration maxAcquireTime, Duration maxCreateConnectionTime, Duration maxIdleTime, Duration maxLifeTime, Duration maxValidationTime, PoolMetricsRecorder metricsRecorder, @Nullable String name, @Nullable Function> postAllocate, @Nullable Function> preRelease, boolean registerJmx, ValidationDepth validationDepth, - @Nullable String validationQuery) { + @Nullable String validationQuery, int warmupParallelism) { this.allocatorSubscribeOn = allocatorSubscribeOn; this.acquireRetry = acquireRetry; this.connectionFactory = Assert.requireNonNull(connectionFactory, "ConnectionFactory must not be null"); @@ -126,6 +133,7 @@ private ConnectionPoolConfiguration(@Nullable Scheduler allocatorSubscribeOn, in this.validationDepth = validationDepth; this.validationQuery = validationQuery; this.backgroundEvictionInterval = backgroundEvictionInterval; + this.warmupParallelism = warmupParallelism; } /** @@ -237,6 +245,10 @@ String getValidationQuery() { return this.validationQuery; } + int getWarmupParallelism() { + return this.warmupParallelism; + } + /** * A builder for {@link ConnectionPoolConfiguration} instances. *

@@ -293,6 +305,8 @@ public static final class Builder { private ValidationDepth validationDepth = ValidationDepth.LOCAL; + private Integer warmupParallelism = DEFAULT_WARMUP_PARALLELISM; + private Builder() { } @@ -583,6 +597,23 @@ public Builder validationQuery(String validationQuery) { return this; } + /** + * Configure the concurrency level used when the allocator is subscribed to during the warmup phase. + * + * @param warmupParallelism Specifies the concurrency level used when the allocator is subscribed to during the warmup phase, if any. + * During warmup, resources that can be pre-allocated will be created eagerly, but at most {@code warmupParallelism} resources are + * subscribed to at the same time. + * @return this {@link Builder} + * @throws IllegalArgumentException if {@code warmupParallelism} is negative + */ + public Builder warmupParallelism(int warmupParallelism) { + if (warmupParallelism < 0) { + throw new IllegalArgumentException("warmupParallelism must not be negative"); + } + this.warmupParallelism = warmupParallelism; + return this; + } + /** * Returns a configured {@link ConnectionPoolConfiguration}. * @@ -596,7 +627,7 @@ public ConnectionPoolConfiguration build() { this.clock, this.customizer, this.initialSize, this.maxSize, this.minIdle, this.maxAcquireTime, this.maxCreateConnectionTime, this.maxIdleTime, this.maxLifeTime, this.maxValidationTime, this.metricsRecorder, this.name, this.postAllocate, this.preRelease, this.registerJmx, - this.validationDepth, this.validationQuery + this.validationDepth, this.validationQuery, this.warmupParallelism ); } diff --git a/src/main/java/io/r2dbc/pool/PoolingConnectionFactoryProvider.java b/src/main/java/io/r2dbc/pool/PoolingConnectionFactoryProvider.java index ec87579..539709d 100644 --- a/src/main/java/io/r2dbc/pool/PoolingConnectionFactoryProvider.java +++ b/src/main/java/io/r2dbc/pool/PoolingConnectionFactoryProvider.java @@ -149,6 +149,11 @@ public class PoolingConnectionFactoryProvider implements ConnectionFactoryProvid */ public static final Option VALIDATION_DEPTH = Option.valueOf("validationDepth"); + /** + * WarmupParallelism {@link Option}. + */ + public static final Option WARMUP_PARALLELISM = Option.valueOf("warmupParallelism"); + private static final String COLON = ":"; /** @@ -210,6 +215,7 @@ static ConnectionPoolConfiguration buildConfiguration(ConnectionFactoryOptions c mapper.from(REGISTER_JMX).as(OptionMapper::toBoolean).to(builder::registerJmx); mapper.fromExact(VALIDATION_QUERY).to(builder::validationQuery); mapper.from(VALIDATION_DEPTH).as(validationDepth -> OptionMapper.toEnum(validationDepth, ValidationDepth.class)).to(builder::validationDepth); + mapper.from(WARMUP_PARALLELISM).as(OptionMapper::toInteger).to(builder::warmupParallelism); return builder.build(); } diff --git a/src/test/java/io/r2dbc/pool/ConnectionPoolConfigurationUnitTests.java b/src/test/java/io/r2dbc/pool/ConnectionPoolConfigurationUnitTests.java index 026337f..f41cf83 100644 --- a/src/test/java/io/r2dbc/pool/ConnectionPoolConfigurationUnitTests.java +++ b/src/test/java/io/r2dbc/pool/ConnectionPoolConfigurationUnitTests.java @@ -49,6 +49,7 @@ void configuration() { .maxSize(20) .name("bar") .registerJmx(true) + .warmupParallelism(99) .build(); assertThat(configuration) @@ -62,7 +63,8 @@ void configuration() { .hasFieldOrPropertyWithValue("initialSize", 2) .hasFieldOrPropertyWithValue("maxSize", 20) .hasFieldOrPropertyWithValue("name", "bar") - .hasFieldOrPropertyWithValue("registerJmx", true); + .hasFieldOrPropertyWithValue("registerJmx", true) + .hasFieldOrPropertyWithValue("warmupParallelism", 99); } @Test @@ -82,7 +84,8 @@ void configurationDefaults() { .hasFieldOrPropertyWithValue("maxValidationTime", Duration.ofMillis(-1)) .hasFieldOrPropertyWithValue("initialSize", 10) .hasFieldOrPropertyWithValue("maxSize", 10) - .hasFieldOrPropertyWithValue("registerJmx", false); + .hasFieldOrPropertyWithValue("registerJmx", false) + .hasFieldOrPropertyWithValue("warmupParallelism", 1); } @Test