diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3ClientSettings.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3ClientSettings.java index 2392c66329e06..4fda0ee95a3ec 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3ClientSettings.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3ClientSettings.java @@ -177,7 +177,7 @@ final class S3ClientSettings { static final Setting.AffixSetting REQUEST_TIMEOUT_SETTING = Setting.affixKeySetting( PREFIX, "request_timeout", - key -> Setting.timeSetting(key, TimeValue.timeValueMinutes(2), Property.NodeScope) + key -> Setting.timeSetting(key, TimeValue.timeValueMinutes(5), Property.NodeScope) ); /** The connection timeout for connecting to s3. */ @@ -198,14 +198,14 @@ final class S3ClientSettings { static final Setting.AffixSetting MAX_CONNECTIONS_SETTING = Setting.affixKeySetting( PREFIX, "max_connections", - key -> Setting.intSetting(key, 100, Property.NodeScope) + key -> Setting.intSetting(key, 500, Property.NodeScope) ); /** Connection acquisition timeout for new connections to S3. */ static final Setting.AffixSetting CONNECTION_ACQUISITION_TIMEOUT = Setting.affixKeySetting( PREFIX, "connection_acquisition_timeout", - key -> Setting.timeSetting(key, TimeValue.timeValueMinutes(2), Property.NodeScope) + key -> Setting.timeSetting(key, TimeValue.timeValueMinutes(15), Property.NodeScope) ); /** The maximum pending connections to S3. */ diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java index 9ed232464d080..dd420baa970d9 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java @@ -99,23 +99,32 @@ public S3RepositoryPlugin(final Settings settings, final Path configPath) { @Override public List> getExecutorBuilders(Settings settings) { List> executorBuilders = new ArrayList<>(); - int halfProcMaxAt5 = halfAllocatedProcessorsMaxFive(allocatedProcessors(settings)); + int halfProc = halfNumberOfProcessors(allocatedProcessors(settings)); executorBuilders.add( new FixedExecutorBuilder(settings, URGENT_FUTURE_COMPLETION, urgentPoolCount(settings), 10_000, URGENT_FUTURE_COMPLETION) ); - executorBuilders.add(new ScalingExecutorBuilder(URGENT_STREAM_READER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5))); + executorBuilders.add(new ScalingExecutorBuilder(URGENT_STREAM_READER, 1, halfProc, TimeValue.timeValueMinutes(5))); executorBuilders.add( - new FixedExecutorBuilder(settings, PRIORITY_FUTURE_COMPLETION, priorityPoolCount(settings), 10_000, PRIORITY_FUTURE_COMPLETION) + new ScalingExecutorBuilder(PRIORITY_FUTURE_COMPLETION, 1, allocatedProcessors(settings), TimeValue.timeValueMinutes(5)) ); - executorBuilders.add(new ScalingExecutorBuilder(PRIORITY_STREAM_READER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5))); + executorBuilders.add(new ScalingExecutorBuilder(PRIORITY_STREAM_READER, 1, halfProc, TimeValue.timeValueMinutes(5))); - executorBuilders.add(new FixedExecutorBuilder(settings, FUTURE_COMPLETION, normalPoolCount(settings), 10_000, FUTURE_COMPLETION)); - executorBuilders.add(new ScalingExecutorBuilder(STREAM_READER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5))); + executorBuilders.add( + new ScalingExecutorBuilder(FUTURE_COMPLETION, 1, allocatedProcessors(settings), TimeValue.timeValueMinutes(5)) + ); + executorBuilders.add( + new ScalingExecutorBuilder( + STREAM_READER, + allocatedProcessors(settings), + 4 * allocatedProcessors(settings), + TimeValue.timeValueMinutes(5) + ) + ); return executorBuilders; } - static int halfAllocatedProcessorsMaxFive(final int allocatedProcessors) { - return boundedBy((allocatedProcessors + 1) / 2, 1, 5); + static int halfNumberOfProcessors(int numberOfProcessors) { + return (numberOfProcessors + 1) / 2; } S3RepositoryPlugin(final Settings settings, final Path configPath, final S3Service service, final S3AsyncService s3AsyncService) { diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/StatsMetricPublisher.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/StatsMetricPublisher.java index 0c63bfdb1ff97..8d2772d42ebca 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/StatsMetricPublisher.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/StatsMetricPublisher.java @@ -12,6 +12,8 @@ import software.amazon.awssdk.metrics.MetricPublisher; import software.amazon.awssdk.metrics.MetricRecord; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.opensearch.common.blobstore.BlobStore; import java.time.Duration; @@ -21,6 +23,7 @@ public class StatsMetricPublisher { + private static final Logger LOGGER = LogManager.getLogger(StatsMetricPublisher.class); private final Stats stats = new Stats(); private final Map extendedStats = new HashMap<>() { @@ -35,6 +38,7 @@ public class StatsMetricPublisher { public MetricPublisher listObjectsMetricPublisher = new MetricPublisher() { @Override public void publish(MetricCollection metricCollection) { + LOGGER.debug(() -> "List objects request metrics: " + metricCollection); for (MetricRecord metricRecord : metricCollection) { switch (metricRecord.metric().name()) { case "ApiCallDuration": @@ -64,6 +68,7 @@ public void close() {} public MetricPublisher deleteObjectsMetricPublisher = new MetricPublisher() { @Override public void publish(MetricCollection metricCollection) { + LOGGER.debug(() -> "Delete objects request metrics: " + metricCollection); for (MetricRecord metricRecord : metricCollection) { switch (metricRecord.metric().name()) { case "ApiCallDuration": @@ -93,6 +98,7 @@ public void close() {} public MetricPublisher getObjectMetricPublisher = new MetricPublisher() { @Override public void publish(MetricCollection metricCollection) { + LOGGER.debug(() -> "Get object request metrics: " + metricCollection); for (MetricRecord metricRecord : metricCollection) { switch (metricRecord.metric().name()) { case "ApiCallDuration": @@ -122,6 +128,7 @@ public void close() {} public MetricPublisher putObjectMetricPublisher = new MetricPublisher() { @Override public void publish(MetricCollection metricCollection) { + LOGGER.debug(() -> "Put object request metrics: " + metricCollection); for (MetricRecord metricRecord : metricCollection) { switch (metricRecord.metric().name()) { case "ApiCallDuration": @@ -151,6 +158,7 @@ public void close() {} public MetricPublisher multipartUploadMetricCollector = new MetricPublisher() { @Override public void publish(MetricCollection metricCollection) { + LOGGER.debug(() -> "Multi-part request metrics: " + metricCollection); for (MetricRecord metricRecord : metricCollection) { switch (metricRecord.metric().name()) { case "ApiCallDuration": diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncPartsHandler.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncPartsHandler.java index 933ee6dc29513..2bead6b588696 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncPartsHandler.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncPartsHandler.java @@ -25,6 +25,7 @@ import org.opensearch.common.io.InputStreamContainer; import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.repositories.s3.SocketAccess; +import org.opensearch.repositories.s3.StatsMetricPublisher; import org.opensearch.repositories.s3.io.CheckedContainer; import java.io.BufferedInputStream; @@ -55,6 +56,7 @@ public class AsyncPartsHandler { * @param completedParts Reference of completed parts * @param inputStreamContainers Checksum containers * @return list of completable futures + * @param statsMetricPublisher sdk metric publisher * @throws IOException thrown in case of an IO error */ public static List> uploadParts( @@ -66,7 +68,8 @@ public static List> uploadParts( StreamContext streamContext, String uploadId, AtomicReferenceArray completedParts, - AtomicReferenceArray inputStreamContainers + AtomicReferenceArray inputStreamContainers, + StatsMetricPublisher statsMetricPublisher ) throws IOException { List> futures = new ArrayList<>(); for (int partIdx = 0; partIdx < streamContext.getNumberOfParts(); partIdx++) { @@ -77,6 +80,7 @@ public static List> uploadParts( .partNumber(partIdx + 1) .key(uploadRequest.getKey()) .uploadId(uploadId) + .overrideConfiguration(o -> o.addMetricPublisher(statsMetricPublisher.multipartUploadMetricCollector)) .contentLength(inputStreamContainer.getContentLength()); if (uploadRequest.doRemoteDataIntegrityCheck()) { uploadPartRequestBuilder.checksumAlgorithm(ChecksumAlgorithm.CRC32); diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java index 4f1ab9764702e..46fbdd3d0487b 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java @@ -146,7 +146,14 @@ private void uploadInParts( handleException(returnFuture, () -> "Failed to initiate multipart upload", throwable); } else { log.debug(() -> "Initiated new multipart upload, uploadId: " + createMultipartUploadResponse.uploadId()); - doUploadInParts(s3AsyncClient, uploadRequest, streamContext, returnFuture, createMultipartUploadResponse.uploadId()); + doUploadInParts( + s3AsyncClient, + uploadRequest, + streamContext, + returnFuture, + createMultipartUploadResponse.uploadId(), + statsMetricPublisher + ); } }); } @@ -156,7 +163,8 @@ private void doUploadInParts( UploadRequest uploadRequest, StreamContext streamContext, CompletableFuture returnFuture, - String uploadId + String uploadId, + StatsMetricPublisher statsMetricPublisher ) { // The list of completed parts must be sorted @@ -174,7 +182,8 @@ private void doUploadInParts( streamContext, uploadId, completedParts, - inputStreamContainers + inputStreamContainers, + statsMetricPublisher ); } catch (Exception ex) { try { @@ -198,7 +207,7 @@ private void doUploadInParts( } return null; }) - .thenCompose(ignore -> completeMultipartUpload(s3AsyncClient, uploadRequest, uploadId, completedParts)) + .thenCompose(ignore -> completeMultipartUpload(s3AsyncClient, uploadRequest, uploadId, completedParts, statsMetricPublisher)) .handle(handleExceptionOrResponse(s3AsyncClient, uploadRequest, returnFuture, uploadId)) .exceptionally(throwable -> { handleException(returnFuture, () -> "Unexpected exception occurred", throwable); @@ -245,7 +254,8 @@ private CompletableFuture completeMultipartUplo S3AsyncClient s3AsyncClient, UploadRequest uploadRequest, String uploadId, - AtomicReferenceArray completedParts + AtomicReferenceArray completedParts, + StatsMetricPublisher statsMetricPublisher ) { log.debug(() -> new ParameterizedMessage("Sending completeMultipartUploadRequest, uploadId: {}", uploadId)); @@ -254,6 +264,7 @@ private CompletableFuture completeMultipartUplo .bucket(uploadRequest.getBucket()) .key(uploadRequest.getKey()) .uploadId(uploadId) + .overrideConfiguration(o -> o.addMetricPublisher(statsMetricPublisher.multipartUploadMetricCollector)) .multipartUpload(CompletedMultipartUpload.builder().parts(parts).build()) .build(); diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3ClientSettingsTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3ClientSettingsTests.java index 61c9c998b1dec..f27c8387b6e45 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3ClientSettingsTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3ClientSettingsTests.java @@ -70,10 +70,10 @@ public void testThereIsADefaultClientByDefault() { assertThat(defaultSettings.protocol, is(Protocol.HTTPS)); assertThat(defaultSettings.proxySettings, is(ProxySettings.NO_PROXY_SETTINGS)); assertThat(defaultSettings.readTimeoutMillis, is(50 * 1000)); - assertThat(defaultSettings.requestTimeoutMillis, is(120 * 1000)); + assertThat(defaultSettings.requestTimeoutMillis, is(5 * 60 * 1000)); assertThat(defaultSettings.connectionTimeoutMillis, is(10 * 1000)); assertThat(defaultSettings.connectionTTLMillis, is(5 * 1000)); - assertThat(defaultSettings.maxConnections, is(100)); + assertThat(defaultSettings.maxConnections, is(500)); assertThat(defaultSettings.maxRetries, is(3)); assertThat(defaultSettings.throttleRetries, is(true)); }