From f6439205429648568752d2f2759ea78c06a1d1ea Mon Sep 17 00:00:00 2001 From: adam-stasiak-polidea Date: Wed, 11 Mar 2020 19:53:09 +0100 Subject: [PATCH] Support high-level upload retries with upload cache (#264) Co-authored-by: Igor Dvorzhak --- gcs/CHANGES.md | 5 + gcs/CONFIGURATION.md | 7 + gcs/conf/gcs-core-default.xml | 5 + .../GoogleHadoopFileSystemConfiguration.java | 5 + ...ogleHadoopFileSystemConfigurationTest.java | 1 + .../hadoop/gcsio/GoogleCloudStorageTest.java | 265 ++++++++++++++++++ .../util/AbstractGoogleAsyncWriteChannel.java | 68 ++++- .../hadoop/util/AsyncWriteChannelOptions.java | 8 + 8 files changed, 353 insertions(+), 11 deletions(-) diff --git a/gcs/CHANGES.md b/gcs/CHANGES.md index c1db9b0586..39e3b971b8 100644 --- a/gcs/CHANGES.md +++ b/gcs/CHANGES.md @@ -1,5 +1,10 @@ ### 2.1.1 - 2020-XX-XX +1. Add upload cache to support high-level retries of failed uploads. Cache size + configured via property and disabled by default (zero or negative value): + + fs.gs.outputstream.upload.cache.size (deafult: 0) + ### 2.1.0 - 2020-03-09 1. Update all dependencies to latest versions. diff --git a/gcs/CONFIGURATION.md b/gcs/CONFIGURATION.md index bd51a63233..ae25096216 100644 --- a/gcs/CONFIGURATION.md +++ b/gcs/CONFIGURATION.md @@ -266,6 +266,13 @@ is `false`. The number of bytes in one GCS upload request. +* `fs.gs.outputstream.upload.cache.size` (default: `0`) + + The upload cache size in bytes used for high-level upload retries. To + disable this feature set this property to zero or negative value. Retry will + be performed if total size of written/uploaded data to the object is less + than or equal to the cache size. + * `fs.gs.outputstream.direct.upload.enable` (default: `false`) Enables Cloud Storage direct uploads. diff --git a/gcs/conf/gcs-core-default.xml b/gcs/conf/gcs-core-default.xml index 04288194d1..2c7bcf3cbd 100644 --- a/gcs/conf/gcs-core-default.xml +++ b/gcs/conf/gcs-core-default.xml @@ -216,6 +216,11 @@ 67108864 The number of bytes in one GCS upload request. + + fs.gs.outputstream.upload.cache.size + 0 + The number of bytes in upload cache that used for high-level retries. + fs.gs.marker.file.pattern diff --git a/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemConfiguration.java b/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemConfiguration.java index 849c433863..1fcd9ff9e8 100644 --- a/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemConfiguration.java +++ b/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemConfiguration.java @@ -284,6 +284,10 @@ public class GoogleHadoopFileSystemConfiguration { new HadoopConfigurationProperty<>( "fs.gs.outputstream.upload.chunk.size", 64 * 1024 * 1024, "fs.gs.io.buffersize.write"); + /** Configuration for setting GCS upload cache size. */ + public static final HadoopConfigurationProperty GCS_OUTPUT_STREAM_UPLOAD_CACHE_SIZE = + new HadoopConfigurationProperty<>("fs.gs.outputstream.upload.cache.size", 0); + /** Configuration key for enabling GCS direct upload. */ public static final HadoopConfigurationProperty GCS_OUTPUT_STREAM_DIRECT_UPLOAD_ENABLE = new HadoopConfigurationProperty<>("fs.gs.outputstream.direct.upload.enable", false); @@ -450,6 +454,7 @@ private static AsyncWriteChannelOptions getWriteChannelOptions(Configuration con .setBufferSize(GCS_OUTPUT_STREAM_BUFFER_SIZE.get(config, config::getInt)) .setPipeBufferSize(GCS_OUTPUT_STREAM_PIPE_BUFFER_SIZE.get(config, config::getInt)) .setUploadChunkSize(GCS_OUTPUT_STREAM_UPLOAD_CHUNK_SIZE.get(config, config::getInt)) + .setUploadCacheSize(GCS_OUTPUT_STREAM_UPLOAD_CACHE_SIZE.get(config, config::getInt)) .setDirectUploadEnabled( GCS_OUTPUT_STREAM_DIRECT_UPLOAD_ENABLE.get(config, config::getBoolean)) .build(); diff --git a/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemConfigurationTest.java b/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemConfigurationTest.java index 8978738f1a..a4b997f0c8 100644 --- a/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemConfigurationTest.java +++ b/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemConfigurationTest.java @@ -78,6 +78,7 @@ public class GoogleHadoopFileSystemConfigurationTest { put("fs.gs.outputstream.buffer.size", 8_388_608); put("fs.gs.outputstream.pipe.buffer.size", 1_048_576); put("fs.gs.outputstream.upload.chunk.size", 67_108_864); + put("fs.gs.outputstream.upload.cache.size", 0); put("fs.gs.io.buffersize.write", 67_108_864); put("fs.gs.outputstream.direct.upload.enable", false); put("fs.gs.outputstream.type", OutputStreamType.BASIC); diff --git a/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageTest.java b/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageTest.java index 38ed3e48b1..cbf74e844c 100644 --- a/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageTest.java +++ b/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageTest.java @@ -51,6 +51,8 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; +import com.google.api.client.googleapis.json.GoogleJsonResponseException; +import com.google.api.client.googleapis.media.MediaHttpUploader; import com.google.api.client.http.HttpRequest; import com.google.api.client.http.HttpStatusCodes; import com.google.api.client.http.HttpTransport; @@ -65,10 +67,12 @@ import com.google.api.services.storage.model.Objects; import com.google.api.services.storage.model.StorageObject; import com.google.cloud.hadoop.util.ApiErrorExtractor; +import com.google.cloud.hadoop.util.AsyncWriteChannelOptions; import com.google.cloud.hadoop.util.testing.MockHttpTransportHelper.ErrorResponses; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; +import com.google.common.primitives.Bytes; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.EOFException; @@ -272,6 +276,267 @@ public void testCreateObjectApiIOException() throws IOException { .inOrder(); } + @Test + public void reupload_success_singleWrite_singleUploadChunk() throws Exception { + byte[] testData = new byte[MediaHttpUploader.MINIMUM_CHUNK_SIZE]; + new Random().nextBytes(testData); + int uploadChunkSize = testData.length * 2; + int uploadCacheSize = testData.length * 2; + + MockHttpTransport transport = + mockTransport( + emptyResponse(HttpStatusCodes.STATUS_CODE_NOT_FOUND), + resumableUploadResponse(BUCKET_NAME, OBJECT_NAME), + jsonErrorResponse(ErrorResponses.GONE), + resumableUploadResponse(BUCKET_NAME, OBJECT_NAME), + jsonDataResponse( + newStorageObject(BUCKET_NAME, OBJECT_NAME) + .setSize(BigInteger.valueOf(testData.length)))); + + AsyncWriteChannelOptions writeOptions = + AsyncWriteChannelOptions.builder() + .setUploadChunkSize(uploadChunkSize) + .setUploadCacheSize(uploadCacheSize) + .build(); + + GoogleCloudStorage gcs = + mockedGcs(GCS_OPTIONS.toBuilder().setWriteChannelOptions(writeOptions).build(), transport); + + try (WritableByteChannel writeChannel = + gcs.create(new StorageResourceId(BUCKET_NAME, OBJECT_NAME))) { + writeChannel.write(ByteBuffer.wrap(testData)); + } + + assertThat(trackingHttpRequestInitializer.getAllRequestStrings()) + .containsExactly( + getRequestString(BUCKET_NAME, OBJECT_NAME), + resumableUploadRequestString( + BUCKET_NAME, OBJECT_NAME, /* generationId= */ 0, /* replaceGenerationId= */ false), + resumableUploadChunkRequestString(BUCKET_NAME, OBJECT_NAME, /* uploadId= */ 1), + resumableUploadRequestString( + BUCKET_NAME, OBJECT_NAME, /* generationId= */ 0, /* replaceGenerationId= */ false), + resumableUploadChunkRequestString(BUCKET_NAME, OBJECT_NAME, /* uploadId= */ 2)) + .inOrder(); + + HttpRequest writeRequest = trackingHttpRequestInitializer.getAllRequests().get(4); + assertThat(writeRequest.getContent().getLength()).isEqualTo(testData.length); + try (ByteArrayOutputStream writtenData = new ByteArrayOutputStream(testData.length)) { + writeRequest.getContent().writeTo(writtenData); + assertThat(writtenData.toByteArray()).isEqualTo(testData); + } + } + + @Test + public void reupload_success_singleWrite_multipleUploadChunks() throws Exception { + byte[] testData = new byte[2 * MediaHttpUploader.MINIMUM_CHUNK_SIZE]; + new Random().nextBytes(testData); + int uploadChunkSize = testData.length / 2; + int uploadCacheSize = testData.length * 2; + + MockHttpTransport transport = + mockTransport( + emptyResponse(HttpStatusCodes.STATUS_CODE_NOT_FOUND), + resumableUploadResponse(BUCKET_NAME, OBJECT_NAME), + // "308 Resume Incomplete" - successfully uploaded 1st chunk + emptyResponse(308).addHeader("Range", "bytes=0-" + (uploadChunkSize - 1)), + jsonErrorResponse(ErrorResponses.GONE), + resumableUploadResponse(BUCKET_NAME, OBJECT_NAME), + // "308 Resume Incomplete" - successfully uploaded 1st chunk + emptyResponse(308).addHeader("Range", "bytes=0-" + (uploadChunkSize - 1)), + jsonDataResponse( + newStorageObject(BUCKET_NAME, OBJECT_NAME) + .setSize(BigInteger.valueOf(testData.length)))); + + AsyncWriteChannelOptions writeOptions = + AsyncWriteChannelOptions.builder() + .setUploadChunkSize(uploadChunkSize) + .setUploadCacheSize(uploadCacheSize) + .build(); + + GoogleCloudStorage gcs = + mockedGcs(GCS_OPTIONS.toBuilder().setWriteChannelOptions(writeOptions).build(), transport); + + try (WritableByteChannel writeChannel = + gcs.create(new StorageResourceId(BUCKET_NAME, OBJECT_NAME))) { + writeChannel.write(ByteBuffer.wrap(testData)); + } + + assertThat(trackingHttpRequestInitializer.getAllRequestStrings()) + .containsExactly( + getRequestString(BUCKET_NAME, OBJECT_NAME), + resumableUploadRequestString( + BUCKET_NAME, OBJECT_NAME, /* generationId= */ 0, /* replaceGenerationId= */ false), + resumableUploadChunkRequestString(BUCKET_NAME, OBJECT_NAME, /* uploadId= */ 1), + resumableUploadChunkRequestString(BUCKET_NAME, OBJECT_NAME, /* uploadId= */ 2), + resumableUploadRequestString( + BUCKET_NAME, OBJECT_NAME, /* generationId= */ 0, /* replaceGenerationId= */ false), + resumableUploadChunkRequestString(BUCKET_NAME, OBJECT_NAME, /* uploadId= */ 3), + resumableUploadChunkRequestString(BUCKET_NAME, OBJECT_NAME, /* uploadId= */ 4)) + .inOrder(); + + HttpRequest writeRequestChunk1 = trackingHttpRequestInitializer.getAllRequests().get(5); + assertThat(writeRequestChunk1.getContent().getLength()).isEqualTo(testData.length / 2); + HttpRequest writeRequestChunk2 = trackingHttpRequestInitializer.getAllRequests().get(6); + assertThat(writeRequestChunk2.getContent().getLength()).isEqualTo(testData.length / 2); + try (ByteArrayOutputStream writtenData = new ByteArrayOutputStream(testData.length)) { + writeRequestChunk1.getContent().writeTo(writtenData); + writeRequestChunk2.getContent().writeTo(writtenData); + assertThat(writtenData.toByteArray()).isEqualTo(testData); + } + } + + @Test + public void reupload_success_multipleWrites_singleUploadChunk() throws Exception { + byte[] testData = new byte[MediaHttpUploader.MINIMUM_CHUNK_SIZE]; + new Random().nextBytes(testData); + int uploadChunkSize = testData.length * 2; + int uploadCacheSize = testData.length * 2; + + MockHttpTransport transport = + mockTransport( + emptyResponse(HttpStatusCodes.STATUS_CODE_NOT_FOUND), + resumableUploadResponse(BUCKET_NAME, OBJECT_NAME), + jsonErrorResponse(ErrorResponses.GONE), + resumableUploadResponse(BUCKET_NAME, OBJECT_NAME), + jsonDataResponse( + newStorageObject(BUCKET_NAME, OBJECT_NAME) + .setSize(BigInteger.valueOf(testData.length)))); + + AsyncWriteChannelOptions writeOptions = + AsyncWriteChannelOptions.builder() + .setUploadChunkSize(uploadChunkSize) + .setUploadCacheSize(uploadCacheSize) + .build(); + + GoogleCloudStorage gcs = + mockedGcs(GCS_OPTIONS.toBuilder().setWriteChannelOptions(writeOptions).build(), transport); + + try (WritableByteChannel writeChannel = + gcs.create(new StorageResourceId(BUCKET_NAME, OBJECT_NAME))) { + writeChannel.write(ByteBuffer.wrap(testData)); + writeChannel.write(ByteBuffer.wrap(testData)); + } + + assertThat(trackingHttpRequestInitializer.getAllRequestStrings()) + .containsExactly( + getRequestString(BUCKET_NAME, OBJECT_NAME), + resumableUploadRequestString( + BUCKET_NAME, OBJECT_NAME, /* generationId= */ 0, /* replaceGenerationId= */ false), + resumableUploadChunkRequestString(BUCKET_NAME, OBJECT_NAME, /* uploadId= */ 1), + resumableUploadRequestString( + BUCKET_NAME, OBJECT_NAME, /* generationId= */ 0, /* replaceGenerationId= */ false), + resumableUploadChunkRequestString(BUCKET_NAME, OBJECT_NAME, /* uploadId= */ 2)) + .inOrder(); + + HttpRequest writeRequest = trackingHttpRequestInitializer.getAllRequests().get(4); + assertThat(writeRequest.getContent().getLength()).isEqualTo(2 * testData.length); + try (ByteArrayOutputStream writtenData = new ByteArrayOutputStream(testData.length)) { + writeRequest.getContent().writeTo(writtenData); + assertThat(writtenData.toByteArray()).isEqualTo(Bytes.concat(testData, testData)); + } + } + + @Test + public void reupload_success_multipleWrites_multipleUploadChunks() throws Exception { + byte[] testData = new byte[2 * MediaHttpUploader.MINIMUM_CHUNK_SIZE]; + new Random().nextBytes(testData); + int uploadChunkSize = testData.length / 2; + int uploadCacheSize = testData.length * 2; + + MockHttpTransport transport = + mockTransport( + emptyResponse(HttpStatusCodes.STATUS_CODE_NOT_FOUND), + resumableUploadResponse(BUCKET_NAME, OBJECT_NAME), + // "308 Resume Incomplete" - successfully uploaded 1st chunk + emptyResponse(308).addHeader("Range", "bytes=0-" + (uploadChunkSize - 1)), + jsonErrorResponse(ErrorResponses.GONE), + resumableUploadResponse(BUCKET_NAME, OBJECT_NAME), + // "308 Resume Incomplete" - successfully uploaded 3 chunks + emptyResponse(308).addHeader("Range", "bytes=0-" + (uploadChunkSize - 1)), + emptyResponse(308).addHeader("Range", "bytes=0-" + (2 * uploadChunkSize - 1)), + emptyResponse(308).addHeader("Range", "bytes=0-" + (3 * uploadChunkSize - 1)), + jsonDataResponse( + newStorageObject(BUCKET_NAME, OBJECT_NAME) + .setSize(BigInteger.valueOf(2 * testData.length)))); + + AsyncWriteChannelOptions writeOptions = + AsyncWriteChannelOptions.builder() + .setUploadChunkSize(uploadChunkSize) + .setUploadCacheSize(uploadCacheSize) + .build(); + + GoogleCloudStorage gcs = + mockedGcs(GCS_OPTIONS.toBuilder().setWriteChannelOptions(writeOptions).build(), transport); + + try (WritableByteChannel writeChannel = + gcs.create(new StorageResourceId(BUCKET_NAME, OBJECT_NAME))) { + writeChannel.write(ByteBuffer.wrap(testData)); + writeChannel.write(ByteBuffer.wrap(testData)); + } + + assertThat(trackingHttpRequestInitializer.getAllRequestStrings()) + .containsExactly( + getRequestString(BUCKET_NAME, OBJECT_NAME), + resumableUploadRequestString( + BUCKET_NAME, OBJECT_NAME, /* generationId= */ 0, /* replaceGenerationId= */ false), + resumableUploadChunkRequestString(BUCKET_NAME, OBJECT_NAME, /* uploadId= */ 1), + resumableUploadChunkRequestString(BUCKET_NAME, OBJECT_NAME, /* uploadId= */ 2), + resumableUploadRequestString( + BUCKET_NAME, OBJECT_NAME, /* generationId= */ 0, /* replaceGenerationId= */ false), + resumableUploadChunkRequestString(BUCKET_NAME, OBJECT_NAME, /* uploadId= */ 3), + resumableUploadChunkRequestString(BUCKET_NAME, OBJECT_NAME, /* uploadId= */ 4), + resumableUploadChunkRequestString(BUCKET_NAME, OBJECT_NAME, /* uploadId= */ 5), + resumableUploadChunkRequestString(BUCKET_NAME, OBJECT_NAME, /* uploadId= */ 6)) + .inOrder(); + + HttpRequest writeRequestChunk1 = trackingHttpRequestInitializer.getAllRequests().get(5); + assertThat(writeRequestChunk1.getContent().getLength()).isEqualTo(testData.length / 2); + HttpRequest writeRequestChunk2 = trackingHttpRequestInitializer.getAllRequests().get(6); + assertThat(writeRequestChunk2.getContent().getLength()).isEqualTo(testData.length / 2); + HttpRequest writeRequestChunk3 = trackingHttpRequestInitializer.getAllRequests().get(7); + assertThat(writeRequestChunk3.getContent().getLength()).isEqualTo(testData.length / 2); + HttpRequest writeRequestChunk4 = trackingHttpRequestInitializer.getAllRequests().get(8); + assertThat(writeRequestChunk4.getContent().getLength()).isEqualTo(testData.length / 2); + try (ByteArrayOutputStream writtenData = new ByteArrayOutputStream(testData.length)) { + writeRequestChunk1.getContent().writeTo(writtenData); + writeRequestChunk2.getContent().writeTo(writtenData); + writeRequestChunk3.getContent().writeTo(writtenData); + writeRequestChunk4.getContent().writeTo(writtenData); + assertThat(writtenData.toByteArray()).isEqualTo(Bytes.concat(testData, testData)); + } + } + + @Test + public void reupload_failure_cacheTooSmall_singleWrite_singleChunk() throws Exception { + byte[] testData = new byte[MediaHttpUploader.MINIMUM_CHUNK_SIZE]; + new Random().nextBytes(testData); + int uploadChunkSize = testData.length; + int uploadCacheSize = testData.length / 2; + + MockHttpTransport transport = + mockTransport( + emptyResponse(HttpStatusCodes.STATUS_CODE_NOT_FOUND), + resumableUploadResponse(BUCKET_NAME, OBJECT_NAME), + jsonErrorResponse(ErrorResponses.GONE)); + + AsyncWriteChannelOptions writeOptions = + AsyncWriteChannelOptions.builder() + .setUploadChunkSize(uploadChunkSize) + .setUploadCacheSize(uploadCacheSize) + .build(); + + GoogleCloudStorage gcs = + mockedGcs(GCS_OPTIONS.toBuilder().setWriteChannelOptions(writeOptions).build(), transport); + + WritableByteChannel writeChannel = gcs.create(new StorageResourceId(BUCKET_NAME, OBJECT_NAME)); + writeChannel.write(ByteBuffer.wrap(testData)); + + IOException writeException = assertThrows(IOException.class, writeChannel::close); + + assertThat(writeException).hasCauseThat().isInstanceOf(GoogleJsonResponseException.class); + assertThat(writeException).hasCauseThat().hasMessageThat().startsWith("410"); + } + /** Test successful operation of GoogleCloudStorage.createEmptyObject(1). */ @Test public void testCreateEmptyObject() throws IOException { diff --git a/util/src/main/java/com/google/cloud/hadoop/util/AbstractGoogleAsyncWriteChannel.java b/util/src/main/java/com/google/cloud/hadoop/util/AbstractGoogleAsyncWriteChannel.java index 7e7feb03c0..bcc5ab1f84 100644 --- a/util/src/main/java/com/google/cloud/hadoop/util/AbstractGoogleAsyncWriteChannel.java +++ b/util/src/main/java/com/google/cloud/hadoop/util/AbstractGoogleAsyncWriteChannel.java @@ -61,7 +61,7 @@ public abstract class AbstractGoogleAsyncWriteChannel 0) { + this.uploadCache = ByteBuffer.allocate(options.getUploadCacheSize()); + } setUploadChunkSize(options.getUploadChunkSize()); setDirectUploadEnabled(options.isDirectUploadEnabled()); setContentType("application/octet-stream"); @@ -175,7 +180,7 @@ public boolean isDirectUploadEnabled() { */ @Override public synchronized int write(ByteBuffer buffer) throws IOException { - checkState(isInitialized, "initialize() must be invoked before use."); + checkState(initialized, "initialize() must be invoked before use."); if (!isOpen()) { throw new ClosedChannelException(); } @@ -185,6 +190,14 @@ public synchronized int write(ByteBuffer buffer) throws IOException { waitForCompletionAndThrowIfUploadFailed(); } + if (uploadCache != null && uploadCache.remaining() >= buffer.remaining()) { + int position = buffer.position(); + uploadCache.put(buffer); + buffer.position(position); + } else { + uploadCache = null; + } + return pipeSinkChannel.write(buffer); } @@ -208,16 +221,49 @@ public boolean isOpen() { */ @Override public void close() throws IOException { - checkState(isInitialized, "initialize() must be invoked before use."); - if (isOpen()) { - try { - pipeSinkChannel.close(); - handleResponse(waitForCompletionAndThrowIfUploadFailed()); - } finally { - pipeSinkChannel = null; - uploadOperation = null; + checkState(initialized, "initialize() must be invoked before use."); + if (!isOpen()) { + return; + } + try { + pipeSinkChannel.close(); + handleResponse(waitForCompletionAndThrowIfUploadFailed()); + } catch (IOException e) { + if (uploadCache == null) { + throw e; } + logger.atWarning().withCause(e).log("Reuploading using cached data"); + reuploadFromCache(); + } finally { + closeInternal(); + } + } + + private void reuploadFromCache() throws IOException { + closeInternal(); + initialized = false; + + initialize(); + + // Set cache to null so it will not be re-cached during retry. + ByteBuffer reuploadData = uploadCache; + uploadCache = null; + + reuploadData.flip(); + + try { + write(reuploadData); + } finally { + close(); + } + } + + private void closeInternal() { + pipeSinkChannel = null; + if (uploadOperation != null && !uploadOperation.isDone()) { + uploadOperation.cancel(/* mayInterruptIfRunning= */ true); } + uploadOperation = null; } /** @@ -248,7 +294,7 @@ public void initialize() throws IOException { // to each other, we need to start the upload operation on a separate thread. uploadOperation = threadPool.submit(new UploadOperation(request, pipeSource)); - isInitialized = true; + initialized = true; } class UploadOperation implements Callable { diff --git a/util/src/main/java/com/google/cloud/hadoop/util/AsyncWriteChannelOptions.java b/util/src/main/java/com/google/cloud/hadoop/util/AsyncWriteChannelOptions.java index 37146a407e..f42d057213 100644 --- a/util/src/main/java/com/google/cloud/hadoop/util/AsyncWriteChannelOptions.java +++ b/util/src/main/java/com/google/cloud/hadoop/util/AsyncWriteChannelOptions.java @@ -29,6 +29,9 @@ public abstract class AsyncWriteChannelOptions { /** Default upload chunk size. */ public static final int UPLOAD_CHUNK_SIZE_DEFAULT = 64 * 1024 * 1024; + /** Default upload cache size. */ + public static final int UPLOAD_CACHE_SIZE_DEFAULT = 0; + /** Default of whether to use direct upload. */ public static final boolean DIRECT_UPLOAD_ENABLED_DEFAULT = false; @@ -47,6 +50,7 @@ public static Builder builder() { .setBufferSize(BUFFER_SIZE_DEFAULT) .setPipeBufferSize(PIPE_BUFFER_SIZE_DEFAULT) .setUploadChunkSize(UPLOAD_CHUNK_SIZE_DEFAULT) + .setUploadCacheSize(UPLOAD_CACHE_SIZE_DEFAULT) .setDirectUploadEnabled(DIRECT_UPLOAD_ENABLED_DEFAULT); } @@ -56,6 +60,8 @@ public static Builder builder() { public abstract int getUploadChunkSize(); + public abstract int getUploadCacheSize(); + public abstract boolean isDirectUploadEnabled(); /** Mutable builder for the GoogleCloudStorageWriteChannelOptions class. */ @@ -68,6 +74,8 @@ public abstract static class Builder { public abstract Builder setUploadChunkSize(int uploadChunkSize); + public abstract Builder setUploadCacheSize(int uploadCacheSize); + public abstract Builder setDirectUploadEnabled(boolean directUploadEnabled); public abstract AsyncWriteChannelOptions build();