Skip to content

Commit

Permalink
Stream read pool and default s3 timeouts tuning (#10912)
Browse files Browse the repository at this point in the history
Signed-off-by: vikasvb90 <[email protected]>
  • Loading branch information
vikasvb90 authored Nov 26, 2023
1 parent 5bb6cae commit 74b2d7d
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ final class S3ClientSettings {
static final Setting.AffixSetting<TimeValue> REQUEST_TIMEOUT_SETTING = Setting.affixKeySetting(
PREFIX,
"request_timeout",
key -> Setting.timeSetting(key, TimeValue.timeValueMinutes(2), Property.NodeScope)
key -> Setting.timeSetting(key, TimeValue.timeValueMinutes(5), Property.NodeScope)
);

/** The connection timeout for connecting to s3. */
Expand All @@ -198,14 +198,14 @@ final class S3ClientSettings {
static final Setting.AffixSetting<Integer> MAX_CONNECTIONS_SETTING = Setting.affixKeySetting(
PREFIX,
"max_connections",
key -> Setting.intSetting(key, 100, Property.NodeScope)
key -> Setting.intSetting(key, 500, Property.NodeScope)
);

/** Connection acquisition timeout for new connections to S3. */
static final Setting.AffixSetting<TimeValue> CONNECTION_ACQUISITION_TIMEOUT = Setting.affixKeySetting(
PREFIX,
"connection_acquisition_timeout",
key -> Setting.timeSetting(key, TimeValue.timeValueMinutes(2), Property.NodeScope)
key -> Setting.timeSetting(key, TimeValue.timeValueMinutes(15), Property.NodeScope)
);

/** The maximum pending connections to S3. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,23 +99,32 @@ public S3RepositoryPlugin(final Settings settings, final Path configPath) {
@Override
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
List<ExecutorBuilder<?>> executorBuilders = new ArrayList<>();
int halfProcMaxAt5 = halfAllocatedProcessorsMaxFive(allocatedProcessors(settings));
int halfProc = halfNumberOfProcessors(allocatedProcessors(settings));
executorBuilders.add(
new FixedExecutorBuilder(settings, URGENT_FUTURE_COMPLETION, urgentPoolCount(settings), 10_000, URGENT_FUTURE_COMPLETION)
);
executorBuilders.add(new ScalingExecutorBuilder(URGENT_STREAM_READER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
executorBuilders.add(new ScalingExecutorBuilder(URGENT_STREAM_READER, 1, halfProc, TimeValue.timeValueMinutes(5)));
executorBuilders.add(
new FixedExecutorBuilder(settings, PRIORITY_FUTURE_COMPLETION, priorityPoolCount(settings), 10_000, PRIORITY_FUTURE_COMPLETION)
new ScalingExecutorBuilder(PRIORITY_FUTURE_COMPLETION, 1, allocatedProcessors(settings), TimeValue.timeValueMinutes(5))
);
executorBuilders.add(new ScalingExecutorBuilder(PRIORITY_STREAM_READER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
executorBuilders.add(new ScalingExecutorBuilder(PRIORITY_STREAM_READER, 1, halfProc, TimeValue.timeValueMinutes(5)));

executorBuilders.add(new FixedExecutorBuilder(settings, FUTURE_COMPLETION, normalPoolCount(settings), 10_000, FUTURE_COMPLETION));
executorBuilders.add(new ScalingExecutorBuilder(STREAM_READER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
executorBuilders.add(
new ScalingExecutorBuilder(FUTURE_COMPLETION, 1, allocatedProcessors(settings), TimeValue.timeValueMinutes(5))
);
executorBuilders.add(
new ScalingExecutorBuilder(
STREAM_READER,
allocatedProcessors(settings),
4 * allocatedProcessors(settings),
TimeValue.timeValueMinutes(5)
)
);
return executorBuilders;
}

static int halfAllocatedProcessorsMaxFive(final int allocatedProcessors) {
return boundedBy((allocatedProcessors + 1) / 2, 1, 5);
static int halfNumberOfProcessors(int numberOfProcessors) {
return (numberOfProcessors + 1) / 2;
}

S3RepositoryPlugin(final Settings settings, final Path configPath, final S3Service service, final S3AsyncService s3AsyncService) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import software.amazon.awssdk.metrics.MetricPublisher;
import software.amazon.awssdk.metrics.MetricRecord;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.blobstore.BlobStore;

import java.time.Duration;
Expand All @@ -21,6 +23,7 @@

public class StatsMetricPublisher {

private static final Logger LOGGER = LogManager.getLogger(StatsMetricPublisher.class);
private final Stats stats = new Stats();

private final Map<BlobStore.Metric, Stats> extendedStats = new HashMap<>() {
Expand All @@ -35,6 +38,7 @@ public class StatsMetricPublisher {
public MetricPublisher listObjectsMetricPublisher = new MetricPublisher() {
@Override
public void publish(MetricCollection metricCollection) {
LOGGER.debug(() -> "List objects request metrics: " + metricCollection);
for (MetricRecord<?> metricRecord : metricCollection) {
switch (metricRecord.metric().name()) {
case "ApiCallDuration":
Expand Down Expand Up @@ -64,6 +68,7 @@ public void close() {}
public MetricPublisher deleteObjectsMetricPublisher = new MetricPublisher() {
@Override
public void publish(MetricCollection metricCollection) {
LOGGER.debug(() -> "Delete objects request metrics: " + metricCollection);
for (MetricRecord<?> metricRecord : metricCollection) {
switch (metricRecord.metric().name()) {
case "ApiCallDuration":
Expand Down Expand Up @@ -93,6 +98,7 @@ public void close() {}
public MetricPublisher getObjectMetricPublisher = new MetricPublisher() {
@Override
public void publish(MetricCollection metricCollection) {
LOGGER.debug(() -> "Get object request metrics: " + metricCollection);
for (MetricRecord<?> metricRecord : metricCollection) {
switch (metricRecord.metric().name()) {
case "ApiCallDuration":
Expand Down Expand Up @@ -122,6 +128,7 @@ public void close() {}
public MetricPublisher putObjectMetricPublisher = new MetricPublisher() {
@Override
public void publish(MetricCollection metricCollection) {
LOGGER.debug(() -> "Put object request metrics: " + metricCollection);
for (MetricRecord<?> metricRecord : metricCollection) {
switch (metricRecord.metric().name()) {
case "ApiCallDuration":
Expand Down Expand Up @@ -151,6 +158,7 @@ public void close() {}
public MetricPublisher multipartUploadMetricCollector = new MetricPublisher() {
@Override
public void publish(MetricCollection metricCollection) {
LOGGER.debug(() -> "Multi-part request metrics: " + metricCollection);
for (MetricRecord<?> metricRecord : metricCollection) {
switch (metricRecord.metric().name()) {
case "ApiCallDuration":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.opensearch.common.io.InputStreamContainer;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.repositories.s3.SocketAccess;
import org.opensearch.repositories.s3.StatsMetricPublisher;
import org.opensearch.repositories.s3.io.CheckedContainer;

import java.io.BufferedInputStream;
Expand Down Expand Up @@ -55,6 +56,7 @@ public class AsyncPartsHandler {
* @param completedParts Reference of completed parts
* @param inputStreamContainers Checksum containers
* @return list of completable futures
* @param statsMetricPublisher sdk metric publisher
* @throws IOException thrown in case of an IO error
*/
public static List<CompletableFuture<CompletedPart>> uploadParts(
Expand All @@ -66,7 +68,8 @@ public static List<CompletableFuture<CompletedPart>> uploadParts(
StreamContext streamContext,
String uploadId,
AtomicReferenceArray<CompletedPart> completedParts,
AtomicReferenceArray<CheckedContainer> inputStreamContainers
AtomicReferenceArray<CheckedContainer> inputStreamContainers,
StatsMetricPublisher statsMetricPublisher
) throws IOException {
List<CompletableFuture<CompletedPart>> futures = new ArrayList<>();
for (int partIdx = 0; partIdx < streamContext.getNumberOfParts(); partIdx++) {
Expand All @@ -77,6 +80,7 @@ public static List<CompletableFuture<CompletedPart>> uploadParts(
.partNumber(partIdx + 1)
.key(uploadRequest.getKey())
.uploadId(uploadId)
.overrideConfiguration(o -> o.addMetricPublisher(statsMetricPublisher.multipartUploadMetricCollector))
.contentLength(inputStreamContainer.getContentLength());
if (uploadRequest.doRemoteDataIntegrityCheck()) {
uploadPartRequestBuilder.checksumAlgorithm(ChecksumAlgorithm.CRC32);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,14 @@ private void uploadInParts(
handleException(returnFuture, () -> "Failed to initiate multipart upload", throwable);
} else {
log.debug(() -> "Initiated new multipart upload, uploadId: " + createMultipartUploadResponse.uploadId());
doUploadInParts(s3AsyncClient, uploadRequest, streamContext, returnFuture, createMultipartUploadResponse.uploadId());
doUploadInParts(
s3AsyncClient,
uploadRequest,
streamContext,
returnFuture,
createMultipartUploadResponse.uploadId(),
statsMetricPublisher
);
}
});
}
Expand All @@ -156,7 +163,8 @@ private void doUploadInParts(
UploadRequest uploadRequest,
StreamContext streamContext,
CompletableFuture<Void> returnFuture,
String uploadId
String uploadId,
StatsMetricPublisher statsMetricPublisher
) {

// The list of completed parts must be sorted
Expand All @@ -174,7 +182,8 @@ private void doUploadInParts(
streamContext,
uploadId,
completedParts,
inputStreamContainers
inputStreamContainers,
statsMetricPublisher
);
} catch (Exception ex) {
try {
Expand All @@ -198,7 +207,7 @@ private void doUploadInParts(
}
return null;
})
.thenCompose(ignore -> completeMultipartUpload(s3AsyncClient, uploadRequest, uploadId, completedParts))
.thenCompose(ignore -> completeMultipartUpload(s3AsyncClient, uploadRequest, uploadId, completedParts, statsMetricPublisher))
.handle(handleExceptionOrResponse(s3AsyncClient, uploadRequest, returnFuture, uploadId))
.exceptionally(throwable -> {
handleException(returnFuture, () -> "Unexpected exception occurred", throwable);
Expand Down Expand Up @@ -245,7 +254,8 @@ private CompletableFuture<CompleteMultipartUploadResponse> completeMultipartUplo
S3AsyncClient s3AsyncClient,
UploadRequest uploadRequest,
String uploadId,
AtomicReferenceArray<CompletedPart> completedParts
AtomicReferenceArray<CompletedPart> completedParts,
StatsMetricPublisher statsMetricPublisher
) {

log.debug(() -> new ParameterizedMessage("Sending completeMultipartUploadRequest, uploadId: {}", uploadId));
Expand All @@ -254,6 +264,7 @@ private CompletableFuture<CompleteMultipartUploadResponse> completeMultipartUplo
.bucket(uploadRequest.getBucket())
.key(uploadRequest.getKey())
.uploadId(uploadId)
.overrideConfiguration(o -> o.addMetricPublisher(statsMetricPublisher.multipartUploadMetricCollector))
.multipartUpload(CompletedMultipartUpload.builder().parts(parts).build())
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ public void testThereIsADefaultClientByDefault() {
assertThat(defaultSettings.protocol, is(Protocol.HTTPS));
assertThat(defaultSettings.proxySettings, is(ProxySettings.NO_PROXY_SETTINGS));
assertThat(defaultSettings.readTimeoutMillis, is(50 * 1000));
assertThat(defaultSettings.requestTimeoutMillis, is(120 * 1000));
assertThat(defaultSettings.requestTimeoutMillis, is(5 * 60 * 1000));
assertThat(defaultSettings.connectionTimeoutMillis, is(10 * 1000));
assertThat(defaultSettings.connectionTTLMillis, is(5 * 1000));
assertThat(defaultSettings.maxConnections, is(100));
assertThat(defaultSettings.maxConnections, is(500));
assertThat(defaultSettings.maxRetries, is(3));
assertThat(defaultSettings.throttleRetries, is(true));
}
Expand Down

0 comments on commit 74b2d7d

Please sign in to comment.