Skip to content

Commit

Permalink
support to read encoded content via java-storage library (#994) (#996)
Browse files Browse the repository at this point in the history
  • Loading branch information
singhravidutt authored May 4, 2023
1 parent c3e7ce0 commit 69eb118
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 149 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Strings.nullToEmpty;
import static java.lang.Math.max;
import static java.lang.Math.min;
import static java.lang.Math.toIntExact;
Expand All @@ -44,22 +45,25 @@
import java.nio.channels.SeekableByteChannel;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nullable;

/** Provides seekable read access to GCS via java-storage library. */
@VisibleForTesting
class GoogleCloudStorageClientReadChannel implements SeekableByteChannel {

private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();

private static final String GZIP_ENCODING = "gzip";

private final StorageResourceId resourceId;
private final GoogleCloudStorageReadOptions readOptions;
private final GoogleCloudStorageOptions storageOptions;
private final Storage storage;
// The size of this object generation, in bytes.
private final long objectSize;
private long objectSize;
private final ErrorTypeExtractor errorExtractor;
private ContentReadChannel contentReadChannel;

private boolean gzipEncoded = false;
private boolean open = true;

// Current position in this channel, it could be different from contentChannelCurrentPosition if
Expand All @@ -81,8 +85,17 @@ public GoogleCloudStorageClientReadChannel(
itemInfo.getBucketName(), itemInfo.getObjectName(), itemInfo.getContentGeneration());
this.readOptions = readOptions;
this.storageOptions = storageOptions;
this.objectSize = itemInfo.getSize();
this.contentReadChannel = new ContentReadChannel(readOptions, resourceId);
initMetadata(itemInfo.getContentEncoding(), itemInfo.getSize());
}

protected void initMetadata(@Nullable String encoding, long sizeFromMetadata) throws IOException {
gzipEncoded = nullToEmpty(encoding).contains(GZIP_ENCODING);
if (gzipEncoded && !readOptions.getSupportGzipEncoding()) {
throw new IOException(
"Cannot read GZIP encoded files - content encoding support is disabled.");
}
objectSize = gzipEncoded ? Long.MAX_VALUE : sizeFromMetadata;
}

@Override
Expand Down Expand Up @@ -219,6 +232,19 @@ public int readContent(ByteBuffer dst) throws IOException {
try {
if (byteChannel == null) {
byteChannel = openByteChannel(dst.remaining());
// We adjust the start index of content channel in following cases
// 1. request range is in footer boundaries --> request the whole footer
// 2. requested content is gzip encoded -> request always from start of file.
// Case(1) is handled with reading and caching the extra read bytes, for all other cases
// we need to skip all the unrequested bytes before start reading from current position.
if (currentPosition > contentChannelCurrentPosition) {
skipInPlace();
}
// making sure that currentPosition is in alignment with currentReadPosition before
// actual read starts to avoid read discrepancies.
checkState(
contentChannelCurrentPosition == currentPosition,
"position of read offset isn't in alignment with channel's read offset");
}
int bytesRead = byteChannel.read(dst);

Expand All @@ -228,6 +254,13 @@ public int readContent(ByteBuffer dst) throws IOException {
}

if (bytesRead < 0) {
// Because we don't know decompressed object size for gzip-encoded objects,
// assume that this is an object end.
if (gzipEncoded) {
objectSize = currentPosition;
contentChannelEnd = currentPosition;
}

if (currentPosition != contentChannelEnd && currentPosition != objectSize) {
throw new IOException(
String.format(
Expand Down Expand Up @@ -275,7 +308,7 @@ private int partiallyReadBytes(int remainingBeforeRead, ByteBuffer dst) {
}

private boolean shouldDetectRandomAccess() {
return !randomAccess && readOptions.getFadvise() == Fadvise.AUTO;
return !gzipEncoded && !randomAccess && readOptions.getFadvise() == Fadvise.AUTO;
}

private void setRandomAccess() {
Expand All @@ -294,13 +327,7 @@ private ReadableByteChannel openByteChannel(long bytesToRead) throws IOException
return serveFooterContent();
}

contentChannelCurrentPosition = getRangeRequestStart();
contentChannelEnd = getRangeRequestEnd(contentChannelCurrentPosition, bytesToRead);
checkState(
contentChannelEnd >= contentChannelCurrentPosition,
String.format(
"Start position should be <= endPosition startPosition:%d, endPosition: %d",
contentChannelCurrentPosition, contentChannelEnd));
setChannelBoundaries(bytesToRead);

ReadableByteChannel readableByteChannel =
getStorageReadChannel(contentChannelCurrentPosition, contentChannelEnd);
Expand All @@ -314,12 +341,19 @@ private ReadableByteChannel openByteChannel(long bytesToRead) throws IOException
}
return serveFooterContent();
}
checkState(
contentChannelCurrentPosition == currentPosition,
"position of read offset isn't in alignment with channel's read offset");
return readableByteChannel;
}

private void setChannelBoundaries(long bytesToRead) {
contentChannelCurrentPosition = getRangeRequestStart();
contentChannelEnd = getRangeRequestEnd(contentChannelCurrentPosition, bytesToRead);
checkState(
contentChannelEnd >= contentChannelCurrentPosition,
String.format(
"Start position should be <= endPosition startPosition:%d, endPosition: %d",
contentChannelCurrentPosition, contentChannelEnd));
}

private void cacheFooter(ReadableByteChannel readableByteChannel) throws IOException {
int footerSize = toIntExact(objectSize - contentChannelCurrentPosition);
footerContent = new byte[footerSize];
Expand Down Expand Up @@ -363,13 +397,36 @@ private ReadableByteChannel serveFooterContent() {
}

private long getRangeRequestStart() {
if (gzipEncoded) {
return 0;
}
if (readOptions.getFadvise() != Fadvise.SEQUENTIAL && isFooterRead()) {
// Prefetch footer and adjust start position to footerStart.
return max(0, objectSize - readOptions.getMinRangeRequestSize());
}
return currentPosition;
}

private long getRangeRequestEnd(long startPosition, long bytesToRead) {
// Always read gzip-encoded files till the end - they do not support range reads.
if (gzipEncoded) {
return objectSize;
}

long endPosition = objectSize;
if (randomAccess) {
// opening a channel for whole object doesn't make sense as anyhow it will not be utilized
// for further reads.
endPosition = startPosition + max(bytesToRead, readOptions.getMinRangeRequestSize());
}
if (footerContent != null) {
// If footer is cached open just till footerStart.
// Remaining content ill be served from cached footer itself.
endPosition = min(endPosition, objectSize - footerContent.length);
}
return endPosition;
}

public void closeContentChannel() {
if (byteChannel != null) {
logger.atFiner().log("Closing internal contentChannel for '%s'", resourceId);
Expand All @@ -395,7 +452,8 @@ private boolean isInRangeSeek() {
long seekDistance = currentPosition - contentChannelCurrentPosition;
if (byteChannel != null
&& seekDistance > 0
&& seekDistance <= readOptions.getInplaceSeekLimit()
// for gzip encoded content always seek in place
&& (gzipEncoded || seekDistance <= readOptions.getInplaceSeekLimit())
&& currentPosition < contentChannelEnd) {
return true;
}
Expand Down Expand Up @@ -499,6 +557,9 @@ private ReadableByteChannel getStorageReadChannel(long seek, long limit) throws

private BlobSourceOption[] generateReadOptions(BlobId blobId) {
List<BlobSourceOption> blobReadOptions = new ArrayList<>();
// To get decoded content
blobReadOptions.add(BlobSourceOption.shouldReturnRawInputStream(false));

if (blobId.getGeneration() != null) {
blobReadOptions.add(BlobSourceOption.generationMatch(blobId.getGeneration()));
}
Expand All @@ -512,21 +573,6 @@ private BlobSourceOption[] generateReadOptions(BlobId blobId) {
private boolean isFooterRead() {
return objectSize - currentPosition <= readOptions.getMinRangeRequestSize();
}

private long getRangeRequestEnd(long startPosition, long bytesToRead) {
long endPosition = objectSize;
if (randomAccess) {
// opening a channel for whole object doesn't make sense as anyhow it will not be utilized
// for further reads.
endPosition = startPosition + max(bytesToRead, readOptions.getMinRangeRequestSize());
}
if (footerContent != null) {
// If footer is cached open just till footerStart.
// Remaining content ill be served from cached footer itself.
endPosition = min(endPosition, objectSize - footerContent.length);
}
return endPosition;
}
}

@VisibleForTesting
Expand All @@ -542,16 +588,6 @@ private static void validate(GoogleCloudStorageItemInfo itemInfo) throws IOExcep
if (!itemInfo.exists()) {
throw new FileNotFoundException(String.format("Item not found: %s", resourceId));
}
// The non-gRPC read channel has special support for gzip.
// TODO: enable support for gzip if required.
String contentEncoding = itemInfo.getContentEncoding();
if (contentEncoding != null && contentEncoding.contains("gzip")) {

throw new IOException(
String.format(
"Cannot read GZIP-encoded file (%s) (not supported via gRPC API): %s",
contentEncoding, resourceId));
}
}

private IOException convertError(Exception error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,10 @@ public void read_whenPositionIsEqualToSize() throws IOException {
/* contentGeneration= */ 1,
/* metaGeneration= */ 2L,
/* verificationAttributes= */ null),
GoogleCloudStorageReadOptions.DEFAULT);
GoogleCloudStorageReadOptions.DEFAULT
.toBuilder()
.setSupportGzipEncoding(false)
.build());

ByteBuffer readBuffer = ByteBuffer.wrap(new byte[1]);
assertThat(readChannel.position()).isEqualTo(readChannel.size());
Expand All @@ -336,7 +339,10 @@ public void gzipEncodedObject_throwWhileChannelCreation() {
/* contentGeneration= */ 1,
/* metaGeneration= */ 2L,
/* verificationAttributes= */ null),
GoogleCloudStorageReadOptions.DEFAULT));
GoogleCloudStorageReadOptions.DEFAULT
.toBuilder()
.setSupportGzipEncoding(false)
.build()));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,18 @@

package com.google.cloud.hadoop.gcsio;

import static com.google.cloud.hadoop.gcsio.TrackingHttpRequestInitializer.listRequestWithTrailingDelimiter;
import static com.google.cloud.hadoop.gcsio.integration.GoogleCloudStorageTestHelper.getStandardOptionBuilder;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.truth.Truth.assertThat;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.assertThrows;

import com.google.api.client.auth.oauth2.Credential;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystemOptions.ClientType;
import com.google.cloud.hadoop.gcsio.integration.GoogleCloudStorageTestHelper;
import com.google.cloud.hadoop.util.CredentialAdapter;
import com.google.cloud.hadoop.util.RetryHttpInitializer;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.channels.Channels;
import java.util.List;
import java.util.zip.GZIPOutputStream;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

Expand Down Expand Up @@ -86,86 +76,8 @@ protected GoogleCloudStorage createGoogleCloudStorage(GoogleCloudStorageOptions
.build();
}

@Override
@Test
public void listObjectInfo_allMetadataFieldsCorrect() throws Exception {
GoogleCloudStorage gcs = createGoogleCloudStorage(gcsOptions);

String testDirName = name.getMethodName() + "/";
StorageResourceId objectId =
new StorageResourceId(gcsfsIHelper.sharedBucketName1, testDirName + "object");

// Create gzipped file so Content-Encoding will be not null
CreateObjectOptions createOptions =
GZIP_CREATE_OPTIONS
.toBuilder()
.setMetadata(ImmutableMap.of("test-key", "val".getBytes(UTF_8)))
.build();

try (OutputStream os =
new GZIPOutputStream(
Channels.newOutputStream(gcsfsIHelper.gcs.create(objectId, createOptions)))) {
os.write((objectId + "-content").getBytes(UTF_8));
}

List<GoogleCloudStorageItemInfo> listedObjects =
gcs.listObjectInfo(objectId.getBucketName(), testDirName);

assertThat(getObjectNames(listedObjects)).containsExactly(objectId.getObjectName());
assertThat(gcsRequestsTracker.getAllRequestStrings())
.containsExactly(
listRequestWithTrailingDelimiter(
objectId.getBucketName(), testDirName, /* pageToken= */ null));
}

@Test
public void create_gzipEncodedFile() throws Exception {
String testBucket = gcsfsIHelper.sharedBucketName1;
StorageResourceId testFile = new StorageResourceId(testBucket, getTestResource());

GoogleCloudStorage gcs = createGoogleCloudStorage(gcsOptions);

try (OutputStream os =
new GZIPOutputStream(Channels.newOutputStream(gcs.create(testFile, GZIP_CREATE_OPTIONS)))) {
os.write("content".getBytes(UTF_8));
}

assertThat(gcs.getItemInfo(testFile).getContentEncoding()).isEqualTo("gzip");
@Before
public void setup() {
isTracingSupported = false;
}

@Test
public void open_gzipEncoded_fails() throws Exception {
String testBucket = gcsfsIHelper.sharedBucketName1;
StorageResourceId testFile = new StorageResourceId(testBucket, getTestResource());

try (OutputStream os =
new GZIPOutputStream(
Channels.newOutputStream(gcsfsIHelper.gcs.create(testFile, GZIP_CREATE_OPTIONS)))) {
os.write("content".getBytes(UTF_8));
}

GoogleCloudStorage gcs = createGoogleCloudStorage(gcsOptions);

GoogleCloudStorageReadOptions readOptions = GoogleCloudStorageReadOptions.builder().build();
IOException e = assertThrows(IOException.class, () -> gcs.open(testFile, readOptions));
assertThat(e)
.hasMessageThat()
.startsWith("Cannot read GZIP-encoded file (gzip) (not supported via gRPC API):");
}

@Ignore("Gzip content read is is not supported via Java-storage yet.")
@Test
public void open_gzipEncoded_fails_ifContentEncodingSupportDisabled() {}

@Ignore("Gzip content read is is not supported via Java-storage yet.")
@Test
public void open_itemInfo_gzipEncoded_fails_ifContentEncodingSupportDisabled() {}

@Ignore("Gzip content read is is not supported via Java-storage yet.")
@Test
public void open_gzipEncoded_succeeds_ifContentEncodingSupportEnabled() {}

@Ignore("Gzip content read is is not supported via Java-storage yet.")
@Test
public void open_itemInfo_gzipEncoded_succeeds_ifContentEncodingSupportEnabled() {}
}
Loading

0 comments on commit 69eb118

Please sign in to comment.