Skip to content

Commit

Permalink
Support high-level upload retries with upload cache (#264)
Browse files Browse the repository at this point in the history
Co-authored-by: Igor Dvorzhak <[email protected]>
  • Loading branch information
adam-stasiak-polidea and medb authored Mar 11, 2020
1 parent a863b8c commit f643920
Show file tree
Hide file tree
Showing 8 changed files with 353 additions and 11 deletions.
5 changes: 5 additions & 0 deletions gcs/CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
### 2.1.1 - 2020-XX-XX

1. Add upload cache to support high-level retries of failed uploads. Cache size
configured via property and disabled by default (zero or negative value):

fs.gs.outputstream.upload.cache.size (deafult: 0)

### 2.1.0 - 2020-03-09

1. Update all dependencies to latest versions.
Expand Down
7 changes: 7 additions & 0 deletions gcs/CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,13 @@ is `false`.
The number of bytes in one GCS upload request.
* `fs.gs.outputstream.upload.cache.size` (default: `0`)
The upload cache size in bytes used for high-level upload retries. To
disable this feature set this property to zero or negative value. Retry will
be performed if total size of written/uploaded data to the object is less
than or equal to the cache size.
* `fs.gs.outputstream.direct.upload.enable` (default: `false`)
Enables Cloud Storage direct uploads.
Expand Down
5 changes: 5 additions & 0 deletions gcs/conf/gcs-core-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,11 @@
<value>67108864</value>
<description>The number of bytes in one GCS upload request.</description>
</property>
<property>
<name>fs.gs.outputstream.upload.cache.size</name>
<value>0</value>
<description>The number of bytes in upload cache that used for high-level retries.</description>
</property>
<property>
<name>fs.gs.marker.file.pattern</name>
<value></value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,10 @@ public class GoogleHadoopFileSystemConfiguration {
new HadoopConfigurationProperty<>(
"fs.gs.outputstream.upload.chunk.size", 64 * 1024 * 1024, "fs.gs.io.buffersize.write");

/** Configuration for setting GCS upload cache size. */
public static final HadoopConfigurationProperty<Integer> GCS_OUTPUT_STREAM_UPLOAD_CACHE_SIZE =
new HadoopConfigurationProperty<>("fs.gs.outputstream.upload.cache.size", 0);

/** Configuration key for enabling GCS direct upload. */
public static final HadoopConfigurationProperty<Boolean> GCS_OUTPUT_STREAM_DIRECT_UPLOAD_ENABLE =
new HadoopConfigurationProperty<>("fs.gs.outputstream.direct.upload.enable", false);
Expand Down Expand Up @@ -450,6 +454,7 @@ private static AsyncWriteChannelOptions getWriteChannelOptions(Configuration con
.setBufferSize(GCS_OUTPUT_STREAM_BUFFER_SIZE.get(config, config::getInt))
.setPipeBufferSize(GCS_OUTPUT_STREAM_PIPE_BUFFER_SIZE.get(config, config::getInt))
.setUploadChunkSize(GCS_OUTPUT_STREAM_UPLOAD_CHUNK_SIZE.get(config, config::getInt))
.setUploadCacheSize(GCS_OUTPUT_STREAM_UPLOAD_CACHE_SIZE.get(config, config::getInt))
.setDirectUploadEnabled(
GCS_OUTPUT_STREAM_DIRECT_UPLOAD_ENABLE.get(config, config::getBoolean))
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public class GoogleHadoopFileSystemConfigurationTest {
put("fs.gs.outputstream.buffer.size", 8_388_608);
put("fs.gs.outputstream.pipe.buffer.size", 1_048_576);
put("fs.gs.outputstream.upload.chunk.size", 67_108_864);
put("fs.gs.outputstream.upload.cache.size", 0);
put("fs.gs.io.buffersize.write", 67_108_864);
put("fs.gs.outputstream.direct.upload.enable", false);
put("fs.gs.outputstream.type", OutputStreamType.BASIC);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.googleapis.media.MediaHttpUploader;
import com.google.api.client.http.HttpRequest;
import com.google.api.client.http.HttpStatusCodes;
import com.google.api.client.http.HttpTransport;
Expand All @@ -65,10 +67,12 @@
import com.google.api.services.storage.model.Objects;
import com.google.api.services.storage.model.StorageObject;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
import com.google.cloud.hadoop.util.AsyncWriteChannelOptions;
import com.google.cloud.hadoop.util.testing.MockHttpTransportHelper.ErrorResponses;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.primitives.Bytes;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
Expand Down Expand Up @@ -272,6 +276,267 @@ public void testCreateObjectApiIOException() throws IOException {
.inOrder();
}

@Test
public void reupload_success_singleWrite_singleUploadChunk() throws Exception {
byte[] testData = new byte[MediaHttpUploader.MINIMUM_CHUNK_SIZE];
new Random().nextBytes(testData);
int uploadChunkSize = testData.length * 2;
int uploadCacheSize = testData.length * 2;

MockHttpTransport transport =
mockTransport(
emptyResponse(HttpStatusCodes.STATUS_CODE_NOT_FOUND),
resumableUploadResponse(BUCKET_NAME, OBJECT_NAME),
jsonErrorResponse(ErrorResponses.GONE),
resumableUploadResponse(BUCKET_NAME, OBJECT_NAME),
jsonDataResponse(
newStorageObject(BUCKET_NAME, OBJECT_NAME)
.setSize(BigInteger.valueOf(testData.length))));

AsyncWriteChannelOptions writeOptions =
AsyncWriteChannelOptions.builder()
.setUploadChunkSize(uploadChunkSize)
.setUploadCacheSize(uploadCacheSize)
.build();

GoogleCloudStorage gcs =
mockedGcs(GCS_OPTIONS.toBuilder().setWriteChannelOptions(writeOptions).build(), transport);

try (WritableByteChannel writeChannel =
gcs.create(new StorageResourceId(BUCKET_NAME, OBJECT_NAME))) {
writeChannel.write(ByteBuffer.wrap(testData));
}

assertThat(trackingHttpRequestInitializer.getAllRequestStrings())
.containsExactly(
getRequestString(BUCKET_NAME, OBJECT_NAME),
resumableUploadRequestString(
BUCKET_NAME, OBJECT_NAME, /* generationId= */ 0, /* replaceGenerationId= */ false),
resumableUploadChunkRequestString(BUCKET_NAME, OBJECT_NAME, /* uploadId= */ 1),
resumableUploadRequestString(
BUCKET_NAME, OBJECT_NAME, /* generationId= */ 0, /* replaceGenerationId= */ false),
resumableUploadChunkRequestString(BUCKET_NAME, OBJECT_NAME, /* uploadId= */ 2))
.inOrder();

HttpRequest writeRequest = trackingHttpRequestInitializer.getAllRequests().get(4);
assertThat(writeRequest.getContent().getLength()).isEqualTo(testData.length);
try (ByteArrayOutputStream writtenData = new ByteArrayOutputStream(testData.length)) {
writeRequest.getContent().writeTo(writtenData);
assertThat(writtenData.toByteArray()).isEqualTo(testData);
}
}

@Test
public void reupload_success_singleWrite_multipleUploadChunks() throws Exception {
byte[] testData = new byte[2 * MediaHttpUploader.MINIMUM_CHUNK_SIZE];
new Random().nextBytes(testData);
int uploadChunkSize = testData.length / 2;
int uploadCacheSize = testData.length * 2;

MockHttpTransport transport =
mockTransport(
emptyResponse(HttpStatusCodes.STATUS_CODE_NOT_FOUND),
resumableUploadResponse(BUCKET_NAME, OBJECT_NAME),
// "308 Resume Incomplete" - successfully uploaded 1st chunk
emptyResponse(308).addHeader("Range", "bytes=0-" + (uploadChunkSize - 1)),
jsonErrorResponse(ErrorResponses.GONE),
resumableUploadResponse(BUCKET_NAME, OBJECT_NAME),
// "308 Resume Incomplete" - successfully uploaded 1st chunk
emptyResponse(308).addHeader("Range", "bytes=0-" + (uploadChunkSize - 1)),
jsonDataResponse(
newStorageObject(BUCKET_NAME, OBJECT_NAME)
.setSize(BigInteger.valueOf(testData.length))));

AsyncWriteChannelOptions writeOptions =
AsyncWriteChannelOptions.builder()
.setUploadChunkSize(uploadChunkSize)
.setUploadCacheSize(uploadCacheSize)
.build();

GoogleCloudStorage gcs =
mockedGcs(GCS_OPTIONS.toBuilder().setWriteChannelOptions(writeOptions).build(), transport);

try (WritableByteChannel writeChannel =
gcs.create(new StorageResourceId(BUCKET_NAME, OBJECT_NAME))) {
writeChannel.write(ByteBuffer.wrap(testData));
}

assertThat(trackingHttpRequestInitializer.getAllRequestStrings())
.containsExactly(
getRequestString(BUCKET_NAME, OBJECT_NAME),
resumableUploadRequestString(
BUCKET_NAME, OBJECT_NAME, /* generationId= */ 0, /* replaceGenerationId= */ false),
resumableUploadChunkRequestString(BUCKET_NAME, OBJECT_NAME, /* uploadId= */ 1),
resumableUploadChunkRequestString(BUCKET_NAME, OBJECT_NAME, /* uploadId= */ 2),
resumableUploadRequestString(
BUCKET_NAME, OBJECT_NAME, /* generationId= */ 0, /* replaceGenerationId= */ false),
resumableUploadChunkRequestString(BUCKET_NAME, OBJECT_NAME, /* uploadId= */ 3),
resumableUploadChunkRequestString(BUCKET_NAME, OBJECT_NAME, /* uploadId= */ 4))
.inOrder();

HttpRequest writeRequestChunk1 = trackingHttpRequestInitializer.getAllRequests().get(5);
assertThat(writeRequestChunk1.getContent().getLength()).isEqualTo(testData.length / 2);
HttpRequest writeRequestChunk2 = trackingHttpRequestInitializer.getAllRequests().get(6);
assertThat(writeRequestChunk2.getContent().getLength()).isEqualTo(testData.length / 2);
try (ByteArrayOutputStream writtenData = new ByteArrayOutputStream(testData.length)) {
writeRequestChunk1.getContent().writeTo(writtenData);
writeRequestChunk2.getContent().writeTo(writtenData);
assertThat(writtenData.toByteArray()).isEqualTo(testData);
}
}

@Test
public void reupload_success_multipleWrites_singleUploadChunk() throws Exception {
byte[] testData = new byte[MediaHttpUploader.MINIMUM_CHUNK_SIZE];
new Random().nextBytes(testData);
int uploadChunkSize = testData.length * 2;
int uploadCacheSize = testData.length * 2;

MockHttpTransport transport =
mockTransport(
emptyResponse(HttpStatusCodes.STATUS_CODE_NOT_FOUND),
resumableUploadResponse(BUCKET_NAME, OBJECT_NAME),
jsonErrorResponse(ErrorResponses.GONE),
resumableUploadResponse(BUCKET_NAME, OBJECT_NAME),
jsonDataResponse(
newStorageObject(BUCKET_NAME, OBJECT_NAME)
.setSize(BigInteger.valueOf(testData.length))));

AsyncWriteChannelOptions writeOptions =
AsyncWriteChannelOptions.builder()
.setUploadChunkSize(uploadChunkSize)
.setUploadCacheSize(uploadCacheSize)
.build();

GoogleCloudStorage gcs =
mockedGcs(GCS_OPTIONS.toBuilder().setWriteChannelOptions(writeOptions).build(), transport);

try (WritableByteChannel writeChannel =
gcs.create(new StorageResourceId(BUCKET_NAME, OBJECT_NAME))) {
writeChannel.write(ByteBuffer.wrap(testData));
writeChannel.write(ByteBuffer.wrap(testData));
}

assertThat(trackingHttpRequestInitializer.getAllRequestStrings())
.containsExactly(
getRequestString(BUCKET_NAME, OBJECT_NAME),
resumableUploadRequestString(
BUCKET_NAME, OBJECT_NAME, /* generationId= */ 0, /* replaceGenerationId= */ false),
resumableUploadChunkRequestString(BUCKET_NAME, OBJECT_NAME, /* uploadId= */ 1),
resumableUploadRequestString(
BUCKET_NAME, OBJECT_NAME, /* generationId= */ 0, /* replaceGenerationId= */ false),
resumableUploadChunkRequestString(BUCKET_NAME, OBJECT_NAME, /* uploadId= */ 2))
.inOrder();

HttpRequest writeRequest = trackingHttpRequestInitializer.getAllRequests().get(4);
assertThat(writeRequest.getContent().getLength()).isEqualTo(2 * testData.length);
try (ByteArrayOutputStream writtenData = new ByteArrayOutputStream(testData.length)) {
writeRequest.getContent().writeTo(writtenData);
assertThat(writtenData.toByteArray()).isEqualTo(Bytes.concat(testData, testData));
}
}

@Test
public void reupload_success_multipleWrites_multipleUploadChunks() throws Exception {
byte[] testData = new byte[2 * MediaHttpUploader.MINIMUM_CHUNK_SIZE];
new Random().nextBytes(testData);
int uploadChunkSize = testData.length / 2;
int uploadCacheSize = testData.length * 2;

MockHttpTransport transport =
mockTransport(
emptyResponse(HttpStatusCodes.STATUS_CODE_NOT_FOUND),
resumableUploadResponse(BUCKET_NAME, OBJECT_NAME),
// "308 Resume Incomplete" - successfully uploaded 1st chunk
emptyResponse(308).addHeader("Range", "bytes=0-" + (uploadChunkSize - 1)),
jsonErrorResponse(ErrorResponses.GONE),
resumableUploadResponse(BUCKET_NAME, OBJECT_NAME),
// "308 Resume Incomplete" - successfully uploaded 3 chunks
emptyResponse(308).addHeader("Range", "bytes=0-" + (uploadChunkSize - 1)),
emptyResponse(308).addHeader("Range", "bytes=0-" + (2 * uploadChunkSize - 1)),
emptyResponse(308).addHeader("Range", "bytes=0-" + (3 * uploadChunkSize - 1)),
jsonDataResponse(
newStorageObject(BUCKET_NAME, OBJECT_NAME)
.setSize(BigInteger.valueOf(2 * testData.length))));

AsyncWriteChannelOptions writeOptions =
AsyncWriteChannelOptions.builder()
.setUploadChunkSize(uploadChunkSize)
.setUploadCacheSize(uploadCacheSize)
.build();

GoogleCloudStorage gcs =
mockedGcs(GCS_OPTIONS.toBuilder().setWriteChannelOptions(writeOptions).build(), transport);

try (WritableByteChannel writeChannel =
gcs.create(new StorageResourceId(BUCKET_NAME, OBJECT_NAME))) {
writeChannel.write(ByteBuffer.wrap(testData));
writeChannel.write(ByteBuffer.wrap(testData));
}

assertThat(trackingHttpRequestInitializer.getAllRequestStrings())
.containsExactly(
getRequestString(BUCKET_NAME, OBJECT_NAME),
resumableUploadRequestString(
BUCKET_NAME, OBJECT_NAME, /* generationId= */ 0, /* replaceGenerationId= */ false),
resumableUploadChunkRequestString(BUCKET_NAME, OBJECT_NAME, /* uploadId= */ 1),
resumableUploadChunkRequestString(BUCKET_NAME, OBJECT_NAME, /* uploadId= */ 2),
resumableUploadRequestString(
BUCKET_NAME, OBJECT_NAME, /* generationId= */ 0, /* replaceGenerationId= */ false),
resumableUploadChunkRequestString(BUCKET_NAME, OBJECT_NAME, /* uploadId= */ 3),
resumableUploadChunkRequestString(BUCKET_NAME, OBJECT_NAME, /* uploadId= */ 4),
resumableUploadChunkRequestString(BUCKET_NAME, OBJECT_NAME, /* uploadId= */ 5),
resumableUploadChunkRequestString(BUCKET_NAME, OBJECT_NAME, /* uploadId= */ 6))
.inOrder();

HttpRequest writeRequestChunk1 = trackingHttpRequestInitializer.getAllRequests().get(5);
assertThat(writeRequestChunk1.getContent().getLength()).isEqualTo(testData.length / 2);
HttpRequest writeRequestChunk2 = trackingHttpRequestInitializer.getAllRequests().get(6);
assertThat(writeRequestChunk2.getContent().getLength()).isEqualTo(testData.length / 2);
HttpRequest writeRequestChunk3 = trackingHttpRequestInitializer.getAllRequests().get(7);
assertThat(writeRequestChunk3.getContent().getLength()).isEqualTo(testData.length / 2);
HttpRequest writeRequestChunk4 = trackingHttpRequestInitializer.getAllRequests().get(8);
assertThat(writeRequestChunk4.getContent().getLength()).isEqualTo(testData.length / 2);
try (ByteArrayOutputStream writtenData = new ByteArrayOutputStream(testData.length)) {
writeRequestChunk1.getContent().writeTo(writtenData);
writeRequestChunk2.getContent().writeTo(writtenData);
writeRequestChunk3.getContent().writeTo(writtenData);
writeRequestChunk4.getContent().writeTo(writtenData);
assertThat(writtenData.toByteArray()).isEqualTo(Bytes.concat(testData, testData));
}
}

@Test
public void reupload_failure_cacheTooSmall_singleWrite_singleChunk() throws Exception {
byte[] testData = new byte[MediaHttpUploader.MINIMUM_CHUNK_SIZE];
new Random().nextBytes(testData);
int uploadChunkSize = testData.length;
int uploadCacheSize = testData.length / 2;

MockHttpTransport transport =
mockTransport(
emptyResponse(HttpStatusCodes.STATUS_CODE_NOT_FOUND),
resumableUploadResponse(BUCKET_NAME, OBJECT_NAME),
jsonErrorResponse(ErrorResponses.GONE));

AsyncWriteChannelOptions writeOptions =
AsyncWriteChannelOptions.builder()
.setUploadChunkSize(uploadChunkSize)
.setUploadCacheSize(uploadCacheSize)
.build();

GoogleCloudStorage gcs =
mockedGcs(GCS_OPTIONS.toBuilder().setWriteChannelOptions(writeOptions).build(), transport);

WritableByteChannel writeChannel = gcs.create(new StorageResourceId(BUCKET_NAME, OBJECT_NAME));
writeChannel.write(ByteBuffer.wrap(testData));

IOException writeException = assertThrows(IOException.class, writeChannel::close);

assertThat(writeException).hasCauseThat().isInstanceOf(GoogleJsonResponseException.class);
assertThat(writeException).hasCauseThat().hasMessageThat().startsWith("410");
}

/** Test successful operation of GoogleCloudStorage.createEmptyObject(1). */
@Test
public void testCreateEmptyObject() throws IOException {
Expand Down
Loading

0 comments on commit f643920

Please sign in to comment.