diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 99091cda7e11..b905c857b1d6 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -458,6 +458,7 @@ jobs:
!:trino-faulttolerant-tests,
!:trino-filesystem,
!:trino-filesystem-azure,
+ !:trino-filesystem-gcs,
!:trino-filesystem-manager,
!:trino-filesystem-s3,
!:trino-google-sheets,
@@ -574,6 +575,7 @@ jobs:
- { modules: lib/trino-filesystem-s3, profile: cloud-tests }
- { modules: lib/trino-filesystem-azure, profile: cloud-tests }
- { modules: lib/trino-hdfs, profile: cloud-tests }
+ - { modules: lib/trino-filesystem-gcs, profile: cloud-tests }
- { modules: plugin/trino-accumulo }
- { modules: plugin/trino-bigquery }
- { modules: plugin/trino-bigquery, profile: cloud-tests-arrow }
@@ -659,6 +661,7 @@ jobs:
&& ! (contains(matrix.modules, 'trino-filesystem-s3') && contains(matrix.profile, 'cloud-tests'))
&& ! (contains(matrix.modules, 'trino-filesystem-azure') && contains(matrix.profile, 'cloud-tests'))
&& ! (contains(matrix.modules, 'trino-hdfs') && contains(matrix.profile, 'cloud-tests'))
+ && ! (contains(matrix.modules, 'trino-filesystem-gcs') && contains(matrix.profile, 'cloud-tests'))
run: $MAVEN test ${MAVEN_TEST} -pl ${{ matrix.modules }} ${{ matrix.profile != '' && format('-P {0}', matrix.profile) || '' }}
# Additional tests for selected modules
- name: HDFS file system cache isolated JVM tests
@@ -714,6 +717,14 @@ jobs:
(env.CI_SKIP_SECRETS_PRESENCE_CHECKS != '' || env.ABFS_BLOB_ACCOUNT != '' || env.ABFS_BLOB_ACCESS_KEY != '' || env.ABFS_FLAT_ACCOUNT != '' || env.ABFS_FLAT_ACCESS_KEY != '' || env.ABFS_ACCOUNT != '' || env.ABFS_ACCESS_KEY != '')
run: |
$MAVEN test ${MAVEN_TEST} -pl ${{ matrix.modules }} ${{ format('-P {0}', matrix.profile) }}
+ - name: GCS FileSystem Cloud Tests
+ env:
+ GCP_CREDENTIALS_KEY: ${{ secrets.GCP_CREDENTIALS_KEY }}
+ if: >-
+ contains(matrix.modules, 'trino-filesystem-gcs') && contains(matrix.profile, 'cloud-tests') &&
+ (env.CI_SKIP_SECRETS_PRESENCE_CHECKS != '' || env.GCP_CREDENTIALS_KEY != '')
+ run: |
+ $MAVEN test ${MAVEN_TEST} -pl ${{ matrix.modules }} ${{ format('-P {0}', matrix.profile) }}
- name: Cloud Delta Lake Tests
# Cloud tests are separate because they are time intensive, requiring cross-cloud network communication
env:
diff --git a/lib/trino-filesystem-gcs/pom.xml b/lib/trino-filesystem-gcs/pom.xml
new file mode 100644
index 000000000000..cbf748c58d4b
--- /dev/null
+++ b/lib/trino-filesystem-gcs/pom.xml
@@ -0,0 +1,203 @@
+
+
+ 4.0.0
+
+
+ io.trino
+ trino-root
+ 433-SNAPSHOT
+ ../../pom.xml
+
+
+ trino-filesystem-gcs
+ trino-filesystem-gcs
+ Trino Filesystem - GCS
+
+
+ ${project.parent.basedir}
+
+
+
+
+ com.google.api
+ gax
+
+
+ javax.annotation
+ javax.annotation-api
+
+
+
+
+
+ com.google.auth
+ google-auth-library-credentials
+
+
+
+ com.google.auth
+ google-auth-library-oauth2-http
+
+
+
+ com.google.cloud
+ google-cloud-core
+
+
+
+ com.google.cloud
+ google-cloud-storage
+
+
+ com.google.guava
+ listenablefuture
+
+
+ javax.annotation
+ javax.annotation-api
+
+
+
+
+
+ com.google.guava
+ guava
+
+
+
+ com.google.http-client
+ google-http-client
+
+
+ commons-logging
+ commons-logging
+
+
+ org.apache.httpcomponents
+ httpclient
+
+
+ org.apache.httpcomponents
+ httpcore
+
+
+
+
+
+ com.google.inject
+ guice
+
+
+
+ io.airlift
+ concurrent
+
+
+
+ io.airlift
+ configuration
+
+
+
+ io.airlift
+ units
+
+
+
+ io.trino
+ trino-filesystem
+
+
+
+ io.trino
+ trino-memory-context
+
+
+
+ jakarta.annotation
+ jakarta.annotation-api
+
+
+
+ jakarta.validation
+ jakarta.validation-api
+
+
+
+ io.trino
+ trino-spi
+ provided
+
+
+
+ org.jetbrains
+ annotations
+ provided
+
+
+
+ io.airlift
+ testing
+ test
+
+
+
+ io.trino
+ trino-filesystem
+ ${project.version}
+ tests
+ test
+
+
+
+ org.assertj
+ assertj-core
+ test
+
+
+
+ org.junit.jupiter
+ junit-jupiter-api
+ test
+
+
+
+
+
+ default
+
+ true
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+
+
+ **/TestDefaultGcsFileSystem.java
+
+
+
+
+
+
+
+
+ cloud-tests
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+
+
+ **/TestDefaultGcsFileSystem.java
+
+
+
+
+
+
+
+
diff --git a/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/DefaultGcsStorageFactory.java b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/DefaultGcsStorageFactory.java
new file mode 100644
index 000000000000..8224dafaa6a3
--- /dev/null
+++ b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/DefaultGcsStorageFactory.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.filesystem.gcs;
+
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageOptions;
+import com.google.common.base.VerifyException;
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Inject;
+import io.trino.spi.security.ConnectorIdentity;
+
+import java.io.ByteArrayInputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.util.List;
+import java.util.Optional;
+
+import static com.google.common.base.Strings.nullToEmpty;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+public class DefaultGcsStorageFactory
+ implements GcsStorageFactory
+{
+ public static final String GCS_OAUTH_KEY = "gcs.oauth";
+ public static final List DEFAULT_SCOPES = ImmutableList.of("https://www.googleapis.com/auth/cloud-platform");
+ private final String projectId;
+ private final boolean useGcsAccessToken;
+ private final Optional jsonGoogleCredential;
+
+ @Inject
+ public DefaultGcsStorageFactory(GcsFileSystemConfig config)
+ throws IOException
+ {
+ config.validate();
+ projectId = config.getProjectId();
+ useGcsAccessToken = config.isUseGcsAccessToken();
+ String jsonKey = config.getJsonKey();
+ String jsonKeyFilePath = config.getJsonKeyFilePath();
+ if (jsonKey != null) {
+ try (InputStream inputStream = new ByteArrayInputStream(jsonKey.getBytes(UTF_8))) {
+ jsonGoogleCredential = Optional.of(GoogleCredentials.fromStream(inputStream).createScoped(DEFAULT_SCOPES));
+ }
+ }
+ else if (jsonKeyFilePath != null) {
+ try (FileInputStream inputStream = new FileInputStream(jsonKeyFilePath)) {
+ jsonGoogleCredential = Optional.of(GoogleCredentials.fromStream(inputStream).createScoped(DEFAULT_SCOPES));
+ }
+ }
+ else {
+ jsonGoogleCredential = Optional.empty();
+ }
+ }
+
+ @Override
+ public Storage create(ConnectorIdentity identity)
+ {
+ try {
+ GoogleCredentials credentials;
+ if (useGcsAccessToken) {
+ String accessToken = nullToEmpty(identity.getExtraCredentials().get(GCS_OAUTH_KEY));
+ try (ByteArrayInputStream inputStream = new ByteArrayInputStream(accessToken.getBytes(UTF_8))) {
+ credentials = GoogleCredentials.fromStream(inputStream).createScoped(DEFAULT_SCOPES);
+ }
+ }
+ else {
+ credentials = jsonGoogleCredential.orElseThrow(() -> new VerifyException("GCS credentials not configured"));
+ }
+ StorageOptions.Builder storageOptionsBuilder = StorageOptions.newBuilder();
+ if (projectId != null) {
+ storageOptionsBuilder.setProjectId(projectId);
+ }
+ return storageOptionsBuilder.setCredentials(credentials).build().getService();
+ }
+ catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+}
diff --git a/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileIterator.java b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileIterator.java
new file mode 100644
index 000000000000..5b3f4e42fa4d
--- /dev/null
+++ b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileIterator.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.filesystem.gcs;
+
+import com.google.api.gax.paging.Page;
+import com.google.cloud.storage.Blob;
+import com.google.common.collect.Iterators;
+import io.trino.filesystem.FileEntry;
+import io.trino.filesystem.FileIterator;
+import io.trino.filesystem.Location;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.Iterator;
+import java.util.Optional;
+
+import static io.trino.filesystem.gcs.GcsUtils.handleGcsException;
+import static java.util.Objects.requireNonNull;
+
+public class GcsFileIterator
+ implements FileIterator
+{
+ private final GcsLocation location;
+ private Iterator blobIterator;
+
+ public GcsFileIterator(GcsLocation location, Page page)
+ {
+ this.location = requireNonNull(location, "location is null");
+ // Page::iterateAll handles paging internally
+ this.blobIterator = Iterators.filter(page.iterateAll().iterator(), blob -> !blob.isDirectory());
+ }
+
+ @Override
+ public boolean hasNext()
+ throws IOException
+ {
+ try {
+ return blobIterator.hasNext();
+ }
+ catch (RuntimeException e) {
+ throw handleGcsException(e, "iterate files", location);
+ }
+ }
+
+ @Override
+ public FileEntry next()
+ throws IOException
+ {
+ try {
+ Blob blob = blobIterator.next();
+ long length = requireNonNull(blob.getSize(), "blob size is null");
+ return new FileEntry(Location.of(location.getBase() + blob.getName()), length, Instant.from(blob.getUpdateTimeOffsetDateTime()), Optional.empty());
+ }
+ catch (RuntimeException e) {
+ throw handleGcsException(e, "iterate files", location);
+ }
+ }
+}
diff --git a/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileSystem.java b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileSystem.java
new file mode 100644
index 000000000000..fcb5c14d3754
--- /dev/null
+++ b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileSystem.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.filesystem.gcs;
+
+import com.google.api.gax.paging.Page;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.Storage.BlobListOption;
+import com.google.cloud.storage.StorageBatch;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterators;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import io.trino.filesystem.FileIterator;
+import io.trino.filesystem.Location;
+import io.trino.filesystem.TrinoFileSystem;
+import io.trino.filesystem.TrinoInputFile;
+import io.trino.filesystem.TrinoOutputFile;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+
+import static com.google.api.client.util.Preconditions.checkState;
+import static com.google.cloud.storage.Storage.BlobListOption.currentDirectory;
+import static com.google.cloud.storage.Storage.BlobListOption.matchGlob;
+import static com.google.cloud.storage.Storage.BlobListOption.pageSize;
+import static com.google.common.collect.Iterables.partition;
+import static io.airlift.concurrent.MoreFutures.getFutureValue;
+import static io.trino.filesystem.gcs.GcsUtils.getBlobOrThrow;
+import static io.trino.filesystem.gcs.GcsUtils.handleGcsException;
+import static java.util.Objects.requireNonNull;
+
+public class GcsFileSystem
+ implements TrinoFileSystem
+{
+ private final ListeningExecutorService executorService;
+ private final Storage storage;
+ private final int readBlockSizeBytes;
+ private final long writeBlockSizeBytes;
+ private final int pageSize;
+ private final int batchSize;
+
+ public GcsFileSystem(ListeningExecutorService executorService, Storage storage, int readBlockSizeBytes, long writeBlockSizeBytes, int pageSize, int batchSize)
+ {
+ this.executorService = requireNonNull(executorService, "executorService is null");
+ this.storage = requireNonNull(storage, "storage is null");
+ this.readBlockSizeBytes = readBlockSizeBytes;
+ this.writeBlockSizeBytes = writeBlockSizeBytes;
+ this.pageSize = pageSize;
+ this.batchSize = batchSize;
+ }
+
+ @Override
+ public TrinoInputFile newInputFile(Location location)
+ {
+ GcsLocation gcsLocation = new GcsLocation(location);
+ checkIsValidFile(gcsLocation);
+ return new GcsInputFile(gcsLocation, storage, readBlockSizeBytes, OptionalLong.empty());
+ }
+
+ @Override
+ public TrinoInputFile newInputFile(Location location, long length)
+ {
+ GcsLocation gcsLocation = new GcsLocation(location);
+ checkIsValidFile(gcsLocation);
+ return new GcsInputFile(gcsLocation, storage, readBlockSizeBytes, OptionalLong.of(length));
+ }
+
+ @Override
+ public TrinoOutputFile newOutputFile(Location location)
+ {
+ GcsLocation gcsLocation = new GcsLocation(location);
+ checkIsValidFile(gcsLocation);
+ return new GcsOutputFile(gcsLocation, storage, writeBlockSizeBytes);
+ }
+
+ @Override
+ public void deleteFile(Location location)
+ throws IOException
+ {
+ GcsLocation gcsLocation = new GcsLocation(location);
+ checkIsValidFile(gcsLocation);
+ Blob blob = getBlobOrThrow(storage, gcsLocation);
+ blob.delete();
+ }
+
+ @Override
+ public void deleteFiles(Collection locations)
+ throws IOException
+ {
+ List> batchFutures = new ArrayList<>();
+ try {
+ for (List locationBatch : partition(locations, batchSize)) {
+ StorageBatch batch = storage.batch();
+ for (Location location : locationBatch) {
+ batch.delete(getBlobOrThrow(storage, new GcsLocation(location)).getBlobId());
+ }
+ batchFutures.add(executorService.submit(batch::submit));
+ }
+ getFutureValue(Futures.allAsList(batchFutures));
+ }
+ catch (RuntimeException e) {
+ throw handleGcsException(e, "delete files", locations);
+ }
+ }
+
+ @Override
+ public void deleteDirectory(Location location)
+ throws IOException
+ {
+ GcsLocation gcsLocation = new GcsLocation(normalizeToDirectory(location));
+ try {
+ List> batchFutures = new ArrayList<>();
+
+ for (List blobBatch : partition(getPage(gcsLocation).iterateAll(), batchSize)) {
+ StorageBatch batch = storage.batch();
+ for (Blob blob : blobBatch) {
+ batch.delete(blob.getBlobId());
+ }
+ batchFutures.add(executorService.submit(batch::submit));
+ }
+ getFutureValue(Futures.allAsList(batchFutures));
+ }
+ catch (RuntimeException e) {
+ throw handleGcsException(e, "delete directory", gcsLocation);
+ }
+ }
+
+ @Override
+ public void renameFile(Location source, Location target)
+ throws IOException
+ {
+ throw new IOException("GCS does not support renames");
+ }
+
+ @Override
+ public FileIterator listFiles(Location location)
+ throws IOException
+ {
+ GcsLocation gcsLocation = new GcsLocation(normalizeToDirectory(location));
+ try {
+ return new GcsFileIterator(gcsLocation, getPage(gcsLocation));
+ }
+ catch (RuntimeException e) {
+ throw handleGcsException(e, "list files", gcsLocation);
+ }
+ }
+
+ private static Location normalizeToDirectory(Location location)
+ {
+ String path = location.path();
+ if (!path.isEmpty() && !path.endsWith("/")) {
+ return location.appendSuffix("/");
+ }
+ return location;
+ }
+
+ private static void checkIsValidFile(GcsLocation gcsLocation)
+ {
+ checkState(!gcsLocation.path().isEmpty(), "Location path is empty: %s", gcsLocation);
+ checkState(!gcsLocation.path().endsWith("/"), "Location path ends with a slash: %s", gcsLocation);
+ }
+
+ private Page getPage(GcsLocation location, BlobListOption... blobListOptions)
+ {
+ List optionsBuilder = new ArrayList<>();
+
+ if (!location.path().isEmpty()) {
+ optionsBuilder.add(BlobListOption.prefix(location.path()));
+ }
+ Arrays.stream(blobListOptions).forEach(optionsBuilder::add);
+ optionsBuilder.add(pageSize(this.pageSize));
+ return storage.list(location.bucket(), optionsBuilder.toArray(BlobListOption[]::new));
+ }
+
+ @Override
+ public Optional directoryExists(Location location)
+ throws IOException
+ {
+ // Notes:
+ // GCS is not hierarchical, there is no way to determine the difference
+ // between an empty blob and a directory: for this case Optional.empty() will be returned per the super
+ // method spec.
+ //
+ // Note on blob.isDirectory: The isDirectory() method returns false unless invoked via storage.list()
+ // with currentDirectory() enabled for empty blobs intended to be used as directories.
+ // The only time blob.isDirectory() is true is when an object was created that introduced the path:
+ //
+ // Example 1: createBlob("bucket", "dir") creates an empty blob intended to be used as a "directory"
+ // you can then create a file "bucket", "dir/file")
+ // Invoking blob.isDirectory() on "dir" returns false even after the "dir/file" object is created.
+ //
+ // Example 2: createBlob("bucket", "dir2/file") when "dir2" does not exist will return true for isDirectory()
+ // when invoked on the "dir2/" path. Also note that the blob name has a trailing slash.
+ // This behavior is only enabled with BlobListOption.currentDirectory() and isDirectory() is only true when the blob
+ // is returned from a storage.list operation.
+
+ GcsLocation gcsLocation = new GcsLocation(location);
+ if (gcsLocation.path().isEmpty()) {
+ return Optional.of(bucketExists(gcsLocation.bucket()));
+ }
+ if (listFiles(location).hasNext()) {
+ return Optional.of(true);
+ }
+ return Optional.empty();
+ }
+
+ private boolean bucketExists(String bucket)
+ {
+ return storage.get(bucket) != null;
+ }
+
+ @Override
+ public void createDirectory(Location location)
+ throws IOException
+ {
+ validateGcsLocation(location);
+ }
+
+ @Override
+ public void renameDirectory(Location source, Location target)
+ throws IOException
+ {
+ throw new IOException("GCS does not support directory renames");
+ }
+
+ @Override
+ public Set listDirectories(Location location)
+ throws IOException
+ {
+ GcsLocation gcsLocation = new GcsLocation(normalizeToDirectory(location));
+ try {
+ Page page = getPage(gcsLocation, currentDirectory(), matchGlob(gcsLocation.path() + "*/"));
+ Iterator blobIterator = Iterators.filter(page.iterateAll().iterator(), blob -> blob.isDirectory());
+ ImmutableSet.Builder locationBuilder = ImmutableSet.builder();
+ while (blobIterator.hasNext()) {
+ locationBuilder.add(Location.of(gcsLocation.getBase() + blobIterator.next().getName()));
+ }
+ return locationBuilder.build();
+ }
+ catch (RuntimeException e) {
+ throw handleGcsException(e, "list directories", gcsLocation);
+ }
+ }
+
+ @Override
+ public Optional createTemporaryDirectory(Location targetPath, String temporaryPrefix, String relativePrefix)
+ {
+ validateGcsLocation(targetPath);
+ // GCS does not have directories
+ return Optional.empty();
+ }
+
+ @SuppressWarnings("ResultOfObjectAllocationIgnored")
+ private static void validateGcsLocation(Location location)
+ {
+ new GcsLocation(location);
+ }
+}
diff --git a/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileSystemConfig.java b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileSystemConfig.java
new file mode 100644
index 000000000000..915d1dcf1e66
--- /dev/null
+++ b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileSystemConfig.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.filesystem.gcs;
+
+import io.airlift.configuration.Config;
+import io.airlift.configuration.ConfigDescription;
+import io.airlift.configuration.ConfigSecuritySensitive;
+import io.airlift.configuration.validation.FileExists;
+import io.airlift.units.DataSize;
+import jakarta.annotation.Nullable;
+import jakarta.validation.constraints.Min;
+import jakarta.validation.constraints.NotNull;
+
+import static com.google.common.base.Preconditions.checkState;
+import static io.airlift.units.DataSize.Unit.MEGABYTE;
+
+public class GcsFileSystemConfig
+{
+ private DataSize readBlockSize = DataSize.of(2, MEGABYTE);
+ private DataSize writeBlockSize = DataSize.of(16, MEGABYTE);
+ private int pageSize = 100;
+ private int batchSize = 100;
+
+ private String projectId;
+
+ private boolean useGcsAccessToken;
+ private String jsonKey;
+ private String jsonKeyFilePath;
+
+ @NotNull
+ public DataSize getReadBlockSize()
+ {
+ return readBlockSize;
+ }
+
+ @Config("gcs.read-block-size")
+ @ConfigDescription("Minimum size that will be read in one RPC. The default size is 2MiB, see com.google.cloud.BaseStorageReadChannel.")
+ public GcsFileSystemConfig setReadBlockSize(DataSize readBlockSize)
+ {
+ this.readBlockSize = readBlockSize;
+ return this;
+ }
+
+ @NotNull
+ public DataSize getWriteBlockSize()
+ {
+ return writeBlockSize;
+ }
+
+ @Config("gcs.write-block-size")
+ @ConfigDescription("Minimum size that will be written in one RPC. The default size is 16MiB, see com.google.cloud.BaseStorageWriteChannel.")
+ public GcsFileSystemConfig setWriteBlockSize(DataSize writeBlockSize)
+ {
+ this.writeBlockSize = writeBlockSize;
+ return this;
+ }
+
+ @Min(1)
+ public int getPageSize()
+ {
+ return pageSize;
+ }
+
+ @Config("gcs.page-size")
+ @ConfigDescription("The maximum number of blobs to return per page.")
+ public GcsFileSystemConfig setPageSize(int pageSize)
+ {
+ this.pageSize = pageSize;
+ return this;
+ }
+
+ @Min(1)
+ public int getBatchSize()
+ {
+ return batchSize;
+ }
+
+ @Config("gcs.batch-size")
+ @ConfigDescription("Number of blobs to delete per batch. Recommended batch size is 100: https://cloud.google.com/storage/docs/batch")
+ public GcsFileSystemConfig setBatchSize(int batchSize)
+ {
+ this.batchSize = batchSize;
+ return this;
+ }
+
+ @Nullable
+ public String getProjectId()
+ {
+ return projectId;
+ }
+
+ @Config("gcs.projectId")
+ public GcsFileSystemConfig setProjectId(String projectId)
+ {
+ this.projectId = projectId;
+ return this;
+ }
+
+ public boolean isUseGcsAccessToken()
+ {
+ return useGcsAccessToken;
+ }
+
+ @Config("gcs.use-access-token")
+ public GcsFileSystemConfig setUseGcsAccessToken(boolean useGcsAccessToken)
+ {
+ this.useGcsAccessToken = useGcsAccessToken;
+ return this;
+ }
+
+ @Nullable
+ public String getJsonKey()
+ {
+ return jsonKey;
+ }
+
+ @Config("gcs.json-key")
+ @ConfigSecuritySensitive
+ public GcsFileSystemConfig setJsonKey(String jsonKey)
+ {
+ this.jsonKey = jsonKey;
+ return this;
+ }
+
+ @Nullable
+ @FileExists
+ public String getJsonKeyFilePath()
+ {
+ return jsonKeyFilePath;
+ }
+
+ @Config("gcs.json-key-file-path")
+ @ConfigDescription("JSON key file used to access Google Cloud Storage")
+ public GcsFileSystemConfig setJsonKeyFilePath(String jsonKeyFilePath)
+ {
+ this.jsonKeyFilePath = jsonKeyFilePath;
+ return this;
+ }
+
+ public void validate()
+ {
+ // This cannot be normal validation, as it would make it impossible to write TestHiveGcsConfig.testExplicitPropertyMappings
+
+ if (useGcsAccessToken) {
+ checkState(jsonKey == null, "Cannot specify 'hive.gcs.json-key' when 'hive.gcs.use-access-token' is set");
+ checkState(jsonKeyFilePath == null, "Cannot specify 'hive.gcs.json-key-file-path' when 'hive.gcs.use-access-token' is set");
+ }
+ checkState(jsonKey == null || jsonKeyFilePath == null, "'hive.gcs.json-key' and 'hive.gcs.json-key-file-path' cannot be both set");
+ }
+}
diff --git a/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileSystemFactory.java b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileSystemFactory.java
new file mode 100644
index 000000000000..4901dc5f50fc
--- /dev/null
+++ b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileSystemFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.filesystem.gcs;
+
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.inject.Inject;
+import io.trino.filesystem.TrinoFileSystem;
+import io.trino.filesystem.TrinoFileSystemFactory;
+import io.trino.spi.security.ConnectorIdentity;
+import jakarta.annotation.PreDestroy;
+
+import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator;
+import static io.airlift.concurrent.Threads.daemonThreadsNamed;
+import static java.lang.Math.toIntExact;
+import static java.util.Objects.requireNonNull;
+import static java.util.concurrent.Executors.newCachedThreadPool;
+
+public class GcsFileSystemFactory
+ implements TrinoFileSystemFactory
+{
+ private final int readBlockSizeBytes;
+ private final long writeBlockSizeBytes;
+ private final int pageSize;
+ private final int batchSize;
+ private final ListeningExecutorService executorService;
+ private final GcsStorageFactory storageFactory;
+
+ @Inject
+ public GcsFileSystemFactory(GcsFileSystemConfig config, GcsStorageFactory storageFactory)
+ {
+ this.readBlockSizeBytes = toIntExact(config.getReadBlockSize().toBytes());
+ this.writeBlockSizeBytes = config.getWriteBlockSize().toBytes();
+ this.pageSize = config.getPageSize();
+ this.batchSize = config.getBatchSize();
+ this.storageFactory = requireNonNull(storageFactory, "storageFactory is null");
+ this.executorService = listeningDecorator(newCachedThreadPool(daemonThreadsNamed("trino-filesystem-gcs-%S")));
+ }
+
+ @PreDestroy
+ public void stop()
+ {
+ executorService.shutdownNow();
+ }
+
+ @Override
+ public TrinoFileSystem create(ConnectorIdentity identity)
+ {
+ return new GcsFileSystem(executorService, storageFactory.create(identity), readBlockSizeBytes, writeBlockSizeBytes, pageSize, batchSize);
+ }
+}
diff --git a/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileSystemModule.java b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileSystemModule.java
new file mode 100644
index 000000000000..7d4479d3557f
--- /dev/null
+++ b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileSystemModule.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.filesystem.gcs;
+
+import com.google.inject.Binder;
+import com.google.inject.Module;
+import com.google.inject.Scopes;
+
+import static io.airlift.configuration.ConfigBinder.configBinder;
+
+public class GcsFileSystemModule
+ implements Module
+{
+ @Override
+ public void configure(Binder binder)
+ {
+ configBinder(binder).bindConfig(GcsFileSystemConfig.class);
+ binder.bind(GcsStorageFactory.class).to(DefaultGcsStorageFactory.class).in(Scopes.SINGLETON);
+ binder.bind(GcsFileSystemFactory.class).in(Scopes.SINGLETON);
+ }
+}
diff --git a/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsInput.java b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsInput.java
new file mode 100644
index 000000000000..89600751777b
--- /dev/null
+++ b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsInput.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.filesystem.gcs;
+
+import com.google.cloud.ReadChannel;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.Storage;
+import io.trino.filesystem.TrinoInput;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.OptionalLong;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static io.trino.filesystem.gcs.GcsUtils.getBlobOrThrow;
+import static io.trino.filesystem.gcs.GcsUtils.getReadChannel;
+import static io.trino.filesystem.gcs.GcsUtils.handleGcsException;
+import static java.util.Objects.checkFromIndexSize;
+import static java.util.Objects.requireNonNull;
+
+final class GcsInput
+ implements TrinoInput
+{
+ private final GcsLocation location;
+ private final Storage storage;
+ private final int readBlockSize;
+ private OptionalLong length;
+ private boolean closed;
+
+ public GcsInput(GcsLocation location, Storage storage, int readBlockSize, OptionalLong length)
+ {
+ this.location = requireNonNull(location, "location is null");
+ this.storage = requireNonNull(storage, "storage is null");
+ checkArgument(readBlockSize >= 0, "readBlockSize is negative");
+ this.readBlockSize = readBlockSize;
+ this.length = requireNonNull(length, "length is null");
+ }
+
+ @Override
+ public void readFully(long position, byte[] buffer, int bufferOffset, int bufferLength)
+ throws IOException
+ {
+ ensureOpen();
+ if (position < 0) {
+ throw new IOException("Negative seek offset");
+ }
+ checkFromIndexSize(bufferOffset, bufferLength, buffer.length);
+ if (bufferLength == 0) {
+ return;
+ }
+
+ try (ReadChannel readChannel = getReadChannel(getBlobOrThrow(storage, location), location, position, readBlockSize, length)) {
+ int readSize = readNBytes(readChannel, buffer, bufferOffset, bufferLength);
+ if (readSize != bufferLength) {
+ throw new EOFException("End of file reached before reading fully: " + location);
+ }
+ }
+ catch (RuntimeException e) {
+ throw handleGcsException(e, "reading file", location);
+ }
+ }
+
+ @Override
+ public int readTail(byte[] buffer, int bufferOffset, int bufferLength)
+ throws IOException
+ {
+ ensureOpen();
+ checkFromIndexSize(bufferOffset, bufferLength, buffer.length);
+ Blob blob = getBlobOrThrow(storage, location);
+ long offset = Math.max(0, length.orElse(blob.getSize()) - bufferLength);
+ try (ReadChannel readChannel = getReadChannel(blob, location, offset, readBlockSize, length)) {
+ return readNBytes(readChannel, buffer, bufferOffset, bufferLength);
+ }
+ catch (RuntimeException e) {
+ throw handleGcsException(e, "read file", location);
+ }
+ }
+
+ private void ensureOpen()
+ throws IOException
+ {
+ if (closed) {
+ throw new IOException("Input stream closed: " + location);
+ }
+ }
+
+ @Override
+ public void close()
+ {
+ closed = true;
+ }
+
+ @Override
+ public String toString()
+ {
+ return location.toString();
+ }
+
+ private int readNBytes(ReadChannel readChannel, byte[] buffer, int bufferOffset, int bufferLength)
+ throws IOException
+ {
+ ByteBuffer wrappedBuffer = ByteBuffer.wrap(buffer, bufferOffset, bufferLength);
+ int readSize = 0;
+ while (readSize < bufferLength) {
+ int bytesRead = readChannel.read(wrappedBuffer);
+ if (bytesRead == -1) {
+ break;
+ }
+ readSize += bytesRead;
+ }
+ return readSize;
+ }
+}
diff --git a/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsInputFile.java b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsInputFile.java
new file mode 100644
index 000000000000..2c6879bf3314
--- /dev/null
+++ b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsInputFile.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.filesystem.gcs;
+
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.Storage;
+import io.trino.filesystem.Location;
+import io.trino.filesystem.TrinoInput;
+import io.trino.filesystem.TrinoInputFile;
+import io.trino.filesystem.TrinoInputStream;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.Optional;
+import java.util.OptionalLong;
+
+import static io.trino.filesystem.gcs.GcsUtils.getBlob;
+import static io.trino.filesystem.gcs.GcsUtils.getBlobOrThrow;
+import static io.trino.filesystem.gcs.GcsUtils.handleGcsException;
+import static java.util.Objects.requireNonNull;
+
+public class GcsInputFile
+ implements TrinoInputFile
+{
+ private final GcsLocation location;
+ private final Storage storage;
+ private final int readBlockSize;
+ private OptionalLong length;
+ private OptionalLong predeclaredLength;
+ private Optional lastModified = Optional.empty();
+
+ public GcsInputFile(GcsLocation location, Storage storage, int readBockSize, OptionalLong predeclaredLength)
+ {
+ this.location = requireNonNull(location, "location is null");
+ this.storage = requireNonNull(storage, "storage is null");
+ this.readBlockSize = readBockSize;
+ this.predeclaredLength = requireNonNull(predeclaredLength, "length is null");
+ this.length = OptionalLong.empty();
+ }
+
+ @Override
+ public TrinoInput newInput()
+ throws IOException
+ {
+ // Note: Only pass predeclared length, to keep the contract of TrinoFileSystem.newInputFile
+ return new GcsInput(location, storage, readBlockSize, predeclaredLength);
+ }
+
+ @Override
+ public TrinoInputStream newStream()
+ throws IOException
+ {
+ Blob blob = getBlobOrThrow(storage, location);
+ return new GcsInputStream(location, blob, readBlockSize, predeclaredLength);
+ }
+
+ @Override
+ public long length()
+ throws IOException
+ {
+ if (predeclaredLength.isPresent()) {
+ return predeclaredLength.getAsLong();
+ }
+ if (length.isEmpty()) {
+ loadProperties();
+ }
+ return length.orElseThrow();
+ }
+
+ @Override
+ public Instant lastModified()
+ throws IOException
+ {
+ if (lastModified.isEmpty()) {
+ loadProperties();
+ }
+ return lastModified.orElseThrow();
+ }
+
+ @Override
+ public boolean exists()
+ throws IOException
+ {
+ Optional blob = getBlob(storage, location);
+ return blob.isPresent() && blob.get().exists();
+ }
+
+ @Override
+ public Location location()
+ {
+ return location.location();
+ }
+
+ private void loadProperties()
+ throws IOException
+ {
+ Blob blob = getBlobOrThrow(storage, location);
+ try {
+ length = OptionalLong.of(blob.getSize());
+ lastModified = Optional.of(Instant.from(blob.getUpdateTimeOffsetDateTime()));
+ }
+ catch (RuntimeException e) {
+ throw handleGcsException(e, "fetching properties for file", location);
+ }
+ }
+}
diff --git a/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsInputStream.java b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsInputStream.java
new file mode 100644
index 000000000000..4f6f39fae3d7
--- /dev/null
+++ b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsInputStream.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.filesystem.gcs;
+
+import com.google.cloud.ReadChannel;
+import com.google.cloud.storage.Blob;
+import com.google.common.primitives.Ints;
+import io.trino.filesystem.TrinoInputStream;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.OptionalLong;
+
+import static com.google.common.primitives.Longs.constrainToRange;
+import static io.trino.filesystem.gcs.GcsUtils.getReadChannel;
+import static io.trino.filesystem.gcs.GcsUtils.handleGcsException;
+import static java.util.Objects.checkFromIndexSize;
+import static java.util.Objects.requireNonNull;
+
+public class GcsInputStream
+ extends TrinoInputStream
+{
+ private final GcsLocation location;
+ private final Blob blob;
+ private final int readBlockSizeBytes;
+ private final long fileSize;
+ private final OptionalLong predeclaredLength;
+ private ReadChannel readChannel;
+ // Used for read(). Similar to sun.nio.ch.ChannelInputStream
+ private ByteBuffer readBuffer = ByteBuffer.allocate(1);
+ private long currentPosition;
+ private long nextPosition;
+ private boolean closed;
+
+ public GcsInputStream(GcsLocation location, Blob blob, int readBlockSizeBytes, OptionalLong predeclaredLength)
+ throws IOException
+ {
+ this.location = requireNonNull(location, "location is null");
+ this.blob = requireNonNull(blob, "blob is null");
+ this.readBlockSizeBytes = readBlockSizeBytes;
+ this.predeclaredLength = requireNonNull(predeclaredLength, "predeclaredLength is null");
+ this.fileSize = predeclaredLength.orElse(blob.getSize());
+ openStream();
+ }
+
+ @Override
+ public int available()
+ throws IOException
+ {
+ ensureOpen();
+ repositionStream();
+ return Ints.saturatedCast(fileSize - currentPosition);
+ }
+
+ @Override
+ public long getPosition()
+ throws IOException
+ {
+ return nextPosition;
+ }
+
+ @Override
+ public void seek(long newPosition)
+ throws IOException
+ {
+ ensureOpen();
+ if (newPosition < 0) {
+ throw new IOException("Negative seek offset");
+ }
+ if (newPosition > fileSize) {
+ throw new IOException("Cannot seek to %s. File size is %s: %s".formatted(newPosition, fileSize, location));
+ }
+ nextPosition = newPosition;
+ }
+
+ @Override
+ public int read()
+ throws IOException
+ {
+ ensureOpen();
+ repositionStream();
+ try {
+ // Similar to sun.nio.ch.ChannelInputStream::read but uses a byte buffer and is not synchronized
+ readBuffer.position(0);
+ int bytesRead = readChannel.read(readBuffer);
+
+ if (bytesRead == 1) {
+ currentPosition++;
+ nextPosition++;
+ return readBuffer.get(0) & 0xff;
+ }
+ return -1;
+ }
+ catch (IOException e) {
+ throw new IOException("Error reading file: " + location, e);
+ }
+ }
+
+ @Override
+ public int read(byte[] buffer, int offset, int length)
+ throws IOException
+ {
+ checkFromIndexSize(offset, length, buffer.length);
+ ensureOpen();
+ repositionStream();
+ ByteBuffer wrappedBuffer = ByteBuffer.wrap(buffer, offset, length);
+ try {
+ int readSize = readChannel.read(wrappedBuffer);
+ if (readSize > 0) {
+ currentPosition += readSize;
+ nextPosition += readSize;
+ }
+ return readSize;
+ }
+ catch (IOException e) {
+ throw new IOException("Error reading file: " + location, e);
+ }
+ }
+
+ @Override
+ public long skip(long n)
+ throws IOException
+ {
+ ensureOpen();
+ long skipSize = constrainToRange(n, 0, fileSize - nextPosition);
+ nextPosition += skipSize;
+ return skipSize;
+ }
+
+ @Override
+ public void skipNBytes(long n)
+ throws IOException
+ {
+ ensureOpen();
+ if (n <= 0) {
+ return;
+ }
+
+ long position = nextPosition + n;
+ if ((position < 0) || (position > fileSize)) {
+ throw new EOFException("Unable to skip %s bytes (position=%s, fileSize=%s): %s".formatted(n, nextPosition, fileSize, location));
+ }
+ nextPosition = position;
+ }
+
+ @Override
+ public void close()
+ throws IOException
+ {
+ if (!closed) {
+ closed = true;
+ try {
+ readChannel.close();
+ }
+ catch (RuntimeException e) {
+ throw handleGcsException(e, "closing file", location);
+ }
+ }
+ }
+
+ private void ensureOpen()
+ throws IOException
+ {
+ if (closed) {
+ throw new IOException("Output stream closed: " + location);
+ }
+ }
+
+ private void openStream()
+ throws IOException
+ {
+ try {
+ this.readChannel = getReadChannel(blob, location, 0L, readBlockSizeBytes, predeclaredLength);
+ }
+ catch (RuntimeException e) {
+ throw handleGcsException(e, "read file", location);
+ }
+ }
+
+ private void repositionStream()
+ throws IOException
+ {
+ if (nextPosition == currentPosition) {
+ return;
+ }
+ readChannel.seek(nextPosition);
+ currentPosition = nextPosition;
+ }
+}
diff --git a/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsLocation.java b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsLocation.java
new file mode 100644
index 000000000000..fc802cdee67e
--- /dev/null
+++ b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsLocation.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.filesystem.gcs;
+
+import io.trino.filesystem.Location;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
+record GcsLocation(Location location)
+{
+ GcsLocation
+ {
+ // Note: Underscores are allowed in bucket names, see https://cloud.google.com/storage/docs/buckets#naming.
+ // Generation numbers are not supported, i.e. gs://bucket/path#generation.
+ // For more details on generation numbers see https://cloud.google.com/storage/docs/object-versioning.
+
+ requireNonNull(location, "location");
+ checkArgument(location.scheme().isPresent(), "No scheme for GCS location: %s", location);
+ checkArgument(location.scheme().get().equals("gs"), "Wrong scheme for S3 location: %s", location);
+ checkArgument(location.host().isPresent(), "No bucket for GCS location: %s", location);
+ checkArgument(location.userInfo().isEmpty(), "GCS location contains user info: %s", location);
+ checkArgument(location.port().isEmpty(), "GCS location contains port: %s", location);
+ checkArgument(!location.path().contains("#"), "GCS generation numbers are not supported: %s", location);
+ checkArgument(!location.path().contains("?"), "Invalid character '?': %s", location);
+ }
+
+ public String scheme()
+ {
+ return location.scheme().orElseThrow();
+ }
+
+ public String bucket()
+ {
+ return location.host().orElseThrow();
+ }
+
+ public String path()
+ {
+ return location.path();
+ }
+
+ public String getBase()
+ {
+ return "%s://%s/".formatted(scheme(), bucket());
+ }
+
+ @Override
+ public String toString()
+ {
+ return location.toString();
+ }
+}
diff --git a/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsOutputFile.java b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsOutputFile.java
new file mode 100644
index 000000000000..12344c9a0f98
--- /dev/null
+++ b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsOutputFile.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.filesystem.gcs;
+
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.BlobInfo;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.Storage.BlobTargetOption;
+import io.trino.filesystem.Location;
+import io.trino.filesystem.TrinoOutputFile;
+import io.trino.memory.context.AggregatedMemoryContext;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.FileAlreadyExistsException;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static io.trino.filesystem.gcs.GcsUtils.getBlob;
+import static io.trino.filesystem.gcs.GcsUtils.handleGcsException;
+import static java.util.Objects.requireNonNull;
+
+public class GcsOutputFile
+ implements TrinoOutputFile
+{
+ private static final Storage.BlobTargetOption[] DOES_NOT_EXIST_TARGET_OPTION = {Storage.BlobTargetOption.doesNotExist()};
+ private static final Storage.BlobTargetOption[] EMPTY_TARGET_OPTIONS = {};
+
+ private final GcsLocation location;
+ private final Storage storage;
+ private final long writeBlockSizeBytes;
+
+ public GcsOutputFile(GcsLocation location, Storage storage, long writeBlockSizeBytes)
+ {
+ this.location = requireNonNull(location, "location is null");
+ this.storage = requireNonNull(storage, "storage is null");
+ checkArgument(writeBlockSizeBytes >= 0, "writeBlockSizeBytes is negative");
+ this.writeBlockSizeBytes = writeBlockSizeBytes;
+ }
+
+ @Override
+ public OutputStream create(AggregatedMemoryContext memoryContext)
+ throws IOException
+ {
+ return createOutputStream(memoryContext, false);
+ }
+
+ @Override
+ public OutputStream createOrOverwrite(AggregatedMemoryContext memoryContext)
+ throws IOException
+ {
+ return createOutputStream(memoryContext, true);
+ }
+
+ @Override
+ public OutputStream createExclusive(AggregatedMemoryContext memoryContext)
+ throws IOException
+ {
+ return create(memoryContext);
+ }
+
+ private OutputStream createOutputStream(AggregatedMemoryContext memoryContext, boolean overwrite)
+ throws IOException
+ {
+ try {
+ BlobTargetOption[] blobTargetOptions = EMPTY_TARGET_OPTIONS;
+ if (!overwrite) {
+ if (!getBlob(storage, location).isEmpty()) {
+ throw new FileAlreadyExistsException("File %s already exists".formatted(location));
+ }
+ blobTargetOptions = DOES_NOT_EXIST_TARGET_OPTION;
+ }
+ Blob blob = storage.create(
+ BlobInfo.newBuilder(BlobId.of(location.bucket(), location.path())).build(),
+ blobTargetOptions);
+
+ return new GcsOutputStream(location, blob, memoryContext, writeBlockSizeBytes);
+ }
+ catch (FileAlreadyExistsException e) {
+ throw e;
+ }
+ catch (RuntimeException e) {
+ throw handleGcsException(e, "writing file", location);
+ }
+ }
+
+ @Override
+ public Location location()
+ {
+ return location.location();
+ }
+}
diff --git a/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsOutputStream.java b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsOutputStream.java
new file mode 100644
index 000000000000..9b081dc66912
--- /dev/null
+++ b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsOutputStream.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.filesystem.gcs;
+
+import com.google.cloud.WriteChannel;
+import com.google.cloud.storage.Blob;
+import com.google.common.primitives.Ints;
+import io.trino.memory.context.AggregatedMemoryContext;
+import io.trino.memory.context.LocalMemoryContext;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.lang.Math.min;
+import static java.util.Objects.requireNonNull;
+
+public class GcsOutputStream
+ extends OutputStream
+{
+ private static final int BUFFER_SIZE = 8192;
+
+ private final GcsLocation location;
+ private final long writeBlockSizeBytes;
+ private final LocalMemoryContext memoryContext;
+ private final WriteChannel writeChannel;
+ private final ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
+ private long writtenBytes;
+ private boolean closed;
+
+ public GcsOutputStream(GcsLocation location, Blob blob, AggregatedMemoryContext memoryContext, long writeBlockSizeBytes)
+ {
+ this.location = requireNonNull(location, "location is null");
+ checkArgument(writeBlockSizeBytes >= 0, "writeBlockSizeBytes is negative");
+ this.writeBlockSizeBytes = writeBlockSizeBytes;
+ this.memoryContext = memoryContext.newLocalMemoryContext(GcsOutputStream.class.getSimpleName());
+ this.writeChannel = blob.writer();
+ this.writeChannel.setChunkSize(Ints.saturatedCast(writeBlockSizeBytes));
+ }
+
+ @Override
+ public void write(int b)
+ throws IOException
+ {
+ ensureOpen();
+ if (!buffer.hasRemaining()) {
+ flush();
+ }
+ buffer.put((byte) b);
+ recordBytesWritten(1);
+ }
+
+ @Override
+ public void write(byte[] buffer, int offset, int length)
+ throws IOException
+ {
+ ensureOpen();
+ if (length > BUFFER_SIZE) {
+ writeDirect(ByteBuffer.wrap(buffer, offset, length));
+ }
+ else {
+ if (length > this.buffer.remaining()) {
+ flush();
+ }
+ this.buffer.put(buffer, offset, length);
+ recordBytesWritten(length);
+ }
+ }
+
+ private void writeDirect(ByteBuffer buffer)
+ throws IOException
+ {
+ // Flush write buffer in case calls to write(int) are interleaved with calls to this function
+ flush();
+ int bytesWritten = 0;
+ try {
+ bytesWritten = writeChannel.write(buffer);
+ if (bytesWritten != buffer.remaining()) {
+ throw new IOException("Unexpected bytes written length: %s should be %s".formatted(bytesWritten, buffer.remaining()));
+ }
+ }
+ catch (IOException e) {
+ throw new IOException("Error writing file: " + location, e);
+ }
+ recordBytesWritten(bytesWritten);
+ }
+
+ private void ensureOpen()
+ throws IOException
+ {
+ if (closed) {
+ throw new IOException("Output stream closed: " + location);
+ }
+ }
+
+ @Override
+ public void flush()
+ throws IOException
+ {
+ ensureOpen();
+ if (buffer.position() > 0) {
+ buffer.flip();
+ while (buffer.hasRemaining()) {
+ try {
+ // WriteChannel is buffered internally: see com.google.cloud.BaseWriteChannel
+ writeChannel.write(buffer);
+ }
+ catch (IOException e) {
+ throw new IOException("Error writing file: " + location, e);
+ }
+ }
+ buffer.clear();
+ }
+ }
+
+ @Override
+ public void close()
+ throws IOException
+ {
+ if (!closed) {
+ flush();
+ closed = true;
+ try {
+ writeChannel.close();
+ }
+ catch (IOException e) {
+ throw new IOException("Error closing file: " + location, e);
+ }
+ finally {
+ memoryContext.close();
+ }
+ }
+ }
+
+ private void recordBytesWritten(int size)
+ {
+ if (writtenBytes < writeBlockSizeBytes) {
+ // assume that there is only one pending block buffer, and that it grows as written bytes grow
+ memoryContext.setBytes(BUFFER_SIZE + min(writtenBytes + size, writeBlockSizeBytes));
+ }
+ writtenBytes += size;
+ }
+}
diff --git a/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsStorageFactory.java b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsStorageFactory.java
new file mode 100644
index 000000000000..20db0b57b68a
--- /dev/null
+++ b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsStorageFactory.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.filesystem.gcs;
+
+import com.google.cloud.storage.Storage;
+import io.trino.spi.security.ConnectorIdentity;
+
+public interface GcsStorageFactory
+{
+ Storage create(ConnectorIdentity identity);
+}
diff --git a/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsUtils.java b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsUtils.java
new file mode 100644
index 000000000000..bc1c90f32c7b
--- /dev/null
+++ b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsUtils.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.filesystem.gcs;
+
+import com.google.cloud.ReadChannel;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.Storage;
+import io.trino.filesystem.Location;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.OptionalLong;
+
+import static com.google.api.client.util.Preconditions.checkArgument;
+import static com.google.cloud.storage.Blob.BlobSourceOption.shouldReturnRawInputStream;
+import static java.util.Objects.requireNonNull;
+
+public class GcsUtils
+{
+ private GcsUtils() {}
+
+ public static IOException handleGcsException(RuntimeException exception, String action, GcsLocation location)
+ throws IOException
+ {
+ throw new IOException("Error %s: %s".formatted(action, location), exception);
+ }
+
+ public static IOException handleGcsException(RuntimeException exception, String action, Collection locations)
+ throws IOException
+ {
+ throw new IOException("Error %s: %s".formatted(action, locations), exception);
+ }
+
+ public static ReadChannel getReadChannel(Blob blob, GcsLocation location, long position, int readBlockSize, OptionalLong limit)
+ throws IOException
+ {
+ long fileSize = requireNonNull(blob.getSize(), "blob size is null");
+ if (position >= fileSize) {
+ throw new IOException("Cannot read at %s. File size is %s: %s".formatted(position, fileSize, location));
+ }
+ // Enable shouldReturnRawInputStream: currently set by default but just to ensure the behavior is predictable
+ ReadChannel readChannel = blob.reader(shouldReturnRawInputStream(true));
+
+ readChannel.setChunkSize(readBlockSize);
+ readChannel.seek(position);
+ if (limit.isPresent()) {
+ return readChannel.limit(limit.getAsLong());
+ }
+ return readChannel;
+ }
+
+ public static Optional getBlob(Storage storage, GcsLocation location, Storage.BlobGetOption... blobGetOptions)
+ {
+ checkArgument(!location.path().isEmpty(), "Path for location %s is empty", location);
+ return Optional.ofNullable(storage.get(BlobId.of(location.bucket(), location.path()), blobGetOptions));
+ }
+
+ public static Blob getBlobOrThrow(Storage storage, GcsLocation location, Storage.BlobGetOption... blobGetOptions)
+ throws IOException
+ {
+ return getBlob(storage, location, blobGetOptions).orElseThrow(() -> new FileNotFoundException("File %s not found".formatted(location)));
+ }
+}
diff --git a/lib/trino-filesystem-gcs/src/test/java/io/trino/filesystem/gcs/AbstractTestGcsFileSystem.java b/lib/trino-filesystem-gcs/src/test/java/io/trino/filesystem/gcs/AbstractTestGcsFileSystem.java
new file mode 100644
index 000000000000..805dea0176fc
--- /dev/null
+++ b/lib/trino-filesystem-gcs/src/test/java/io/trino/filesystem/gcs/AbstractTestGcsFileSystem.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.filesystem.gcs;
+
+import com.google.cloud.storage.BucketInfo;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.testing.RemoteStorageHelper;
+import io.trino.filesystem.AbstractTestTrinoFileSystem;
+import io.trino.filesystem.Location;
+import io.trino.filesystem.TrinoFileSystem;
+import io.trino.spi.security.ConnectorIdentity;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.TestInstance;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Base64;
+import java.util.concurrent.TimeUnit;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Objects.requireNonNull;
+import static org.assertj.core.api.Assertions.assertThat;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public abstract class AbstractTestGcsFileSystem
+ extends AbstractTestTrinoFileSystem
+{
+ private TrinoFileSystem fileSystem;
+ private Location rootLocation;
+ private Storage storage;
+ private GcsFileSystemFactory gcsFileSystemFactory;
+
+ protected static String getRequiredEnvironmentVariable(String name)
+ {
+ return requireNonNull(System.getenv(name), "Environment variable not set: " + name);
+ }
+
+ protected void initialize(String gcpCredentialKey)
+ throws IOException
+ {
+ // Note: the account needs the following permissions:
+ // create/get/delete bucket
+ // create/get/list/delete blob
+ // For gcp testing this corresponds to the Cluster Storage Admin and Cluster Storage Object Admin roles
+ byte[] jsonKeyBytes = Base64.getDecoder().decode(gcpCredentialKey);
+ GcsFileSystemConfig config = new GcsFileSystemConfig().setJsonKey(new String(jsonKeyBytes, UTF_8));
+ GcsStorageFactory storageFactory = new TestingGcsStorageFactory(config);
+ this.gcsFileSystemFactory = new GcsFileSystemFactory(config, storageFactory);
+ this.storage = storageFactory.create(ConnectorIdentity.ofUser("test"));
+ String bucket = RemoteStorageHelper.generateBucketName();
+ storage.create(BucketInfo.of(bucket));
+ this.rootLocation = Location.of("gs://%s/".formatted(bucket));
+ this.fileSystem = gcsFileSystemFactory.create(ConnectorIdentity.ofUser("test"));
+ cleanupFiles();
+ }
+
+ @AfterAll
+ void tearDown()
+ throws Exception
+ {
+ try {
+ RemoteStorageHelper.forceDelete(storage, rootLocation.host().get(), 5, TimeUnit.SECONDS);
+ gcsFileSystemFactory.stop();
+ }
+ finally {
+ fileSystem = null;
+ storage = null;
+ rootLocation = null;
+ gcsFileSystemFactory = null;
+ }
+ }
+
+ @AfterEach
+ void afterEach()
+ throws IOException
+ {
+ cleanupFiles();
+ }
+
+ private void cleanupFiles()
+ throws IOException
+ {
+ fileSystem.deleteDirectory(getRootLocation());
+ }
+
+ @Override
+ protected boolean isHierarchical()
+ {
+ return false;
+ }
+
+ @Override
+ protected TrinoFileSystem getFileSystem()
+ {
+ return fileSystem;
+ }
+
+ @Override
+ protected Location getRootLocation()
+ {
+ return rootLocation;
+ }
+
+ @Override
+ protected void verifyFileSystemIsEmpty()
+ {
+ String bucket = new GcsLocation(rootLocation).bucket();
+ assertThat(storage.list(bucket).iterateAll()).isEmpty();
+ }
+
+ @Override
+ protected final boolean supportsRenameFile()
+ {
+ return false;
+ }
+
+ private static class TestingGcsStorageFactory
+ implements GcsStorageFactory
+ {
+ private final Storage storage;
+
+ public TestingGcsStorageFactory(GcsFileSystemConfig config)
+ {
+ requireNonNull(config, "config is null");
+ InputStream inputStream = new ByteArrayInputStream(config.getJsonKey().getBytes(UTF_8));
+ // Note: the default project id from the credentials file will be used. See StorageOptions.setProjectId()
+ RemoteStorageHelper helper = RemoteStorageHelper.create(null, inputStream);
+ this.storage = helper.getOptions().getService();
+ }
+
+ @Override
+ public Storage create(ConnectorIdentity identity)
+ {
+ return storage;
+ }
+ }
+}
diff --git a/lib/trino-filesystem-gcs/src/test/java/io/trino/filesystem/gcs/TestGcsFileSystemConfig.java b/lib/trino-filesystem-gcs/src/test/java/io/trino/filesystem/gcs/TestGcsFileSystemConfig.java
new file mode 100644
index 000000000000..0a8dad86df75
--- /dev/null
+++ b/lib/trino-filesystem-gcs/src/test/java/io/trino/filesystem/gcs/TestGcsFileSystemConfig.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.filesystem.gcs;
+
+import com.google.common.collect.ImmutableMap;
+import io.airlift.units.DataSize;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Map;
+
+import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping;
+import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults;
+import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+public class TestGcsFileSystemConfig
+{
+ @Test
+ void testDefaults()
+ {
+ assertRecordedDefaults(recordDefaults(GcsFileSystemConfig.class)
+ .setReadBlockSize(DataSize.of(2, DataSize.Unit.MEGABYTE))
+ .setWriteBlockSize(DataSize.of(16, DataSize.Unit.MEGABYTE))
+ .setPageSize(100)
+ .setBatchSize(100)
+ .setProjectId(null)
+ .setUseGcsAccessToken(false)
+ .setJsonKey(null)
+ .setJsonKeyFilePath(null));
+ }
+
+ @Test
+ void testExplicitPropertyMappings()
+ throws IOException
+ {
+ Path jsonKeyFile = Files.createTempFile(null, null);
+
+ Map properties = ImmutableMap.builder()
+ .put("gcs.read-block-size", "51MB")
+ .put("gcs.write-block-size", "52MB")
+ .put("gcs.page-size", "10")
+ .put("gcs.batch-size", "11")
+ .put("gcs.projectId", "project")
+ .put("gcs.use-access-token", "true")
+ .put("gcs.json-key", "{}")
+ .put("gcs.json-key-file-path", jsonKeyFile.toString())
+ .buildOrThrow();
+
+ GcsFileSystemConfig expected = new GcsFileSystemConfig()
+ .setReadBlockSize(DataSize.of(51, DataSize.Unit.MEGABYTE))
+ .setWriteBlockSize(DataSize.of(52, DataSize.Unit.MEGABYTE))
+ .setPageSize(10)
+ .setBatchSize(11)
+ .setProjectId("project")
+ .setUseGcsAccessToken(true)
+ .setJsonKey("{}")
+ .setJsonKeyFilePath(jsonKeyFile.toString());
+ assertFullMapping(properties, expected);
+ }
+
+ @Test
+ public void testValidation()
+ {
+ assertThatThrownBy(
+ new GcsFileSystemConfig()
+ .setUseGcsAccessToken(true)
+ .setJsonKey("{}}")::validate)
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessage("Cannot specify 'hive.gcs.json-key' when 'hive.gcs.use-access-token' is set");
+
+ assertThatThrownBy(
+ new GcsFileSystemConfig()
+ .setUseGcsAccessToken(true)
+ .setJsonKeyFilePath("/dev/null")::validate)
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessage("Cannot specify 'hive.gcs.json-key-file-path' when 'hive.gcs.use-access-token' is set");
+
+ assertThatThrownBy(
+ new GcsFileSystemConfig()
+ .setJsonKey("{}")
+ .setJsonKeyFilePath("/dev/null")::validate)
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessage("'hive.gcs.json-key' and 'hive.gcs.json-key-file-path' cannot be both set");
+ }
+}
diff --git a/lib/trino-filesystem-gcs/src/test/java/io/trino/filesystem/gcs/TestGcsFileSystemGcs.java b/lib/trino-filesystem-gcs/src/test/java/io/trino/filesystem/gcs/TestGcsFileSystemGcs.java
new file mode 100644
index 000000000000..b9a056e487fc
--- /dev/null
+++ b/lib/trino-filesystem-gcs/src/test/java/io/trino/filesystem/gcs/TestGcsFileSystemGcs.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.filesystem.gcs;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestInstance;
+
+import java.io.IOException;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class TestGcsFileSystemGcs
+ extends AbstractTestGcsFileSystem
+{
+ @BeforeAll
+ void setup()
+ throws IOException
+ {
+ initialize(getRequiredEnvironmentVariable("GCP_CREDENTIALS_KEY"));
+ }
+}
diff --git a/lib/trino-filesystem-manager/pom.xml b/lib/trino-filesystem-manager/pom.xml
index f81a732992f9..1798b5909033 100644
--- a/lib/trino-filesystem-manager/pom.xml
+++ b/lib/trino-filesystem-manager/pom.xml
@@ -47,6 +47,11 @@
trino-filesystem-azure
+
+ io.trino
+ trino-filesystem-gcs
+
+
io.trino
trino-filesystem-s3
diff --git a/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemConfig.java b/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemConfig.java
index 72e5bdfcee0f..13390b5d420b 100644
--- a/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemConfig.java
+++ b/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemConfig.java
@@ -20,6 +20,7 @@ public class FileSystemConfig
private boolean hadoopEnabled = true;
private boolean nativeAzureEnabled;
private boolean nativeS3Enabled;
+ private boolean nativeGcsEnabled;
public boolean isHadoopEnabled()
{
@@ -56,4 +57,16 @@ public FileSystemConfig setNativeS3Enabled(boolean nativeS3Enabled)
this.nativeS3Enabled = nativeS3Enabled;
return this;
}
+
+ public boolean isNativeGcsEnabled()
+ {
+ return nativeGcsEnabled;
+ }
+
+ @Config("fs.native-gcs.enabled")
+ public FileSystemConfig setNativeGcsEnabled(boolean nativeGcsEnabled)
+ {
+ this.nativeGcsEnabled = nativeGcsEnabled;
+ return this;
+ }
}
diff --git a/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemModule.java b/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemModule.java
index b366955b5287..535067fb74f3 100644
--- a/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemModule.java
+++ b/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemModule.java
@@ -22,6 +22,8 @@
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.filesystem.azure.AzureFileSystemFactory;
import io.trino.filesystem.azure.AzureFileSystemModule;
+import io.trino.filesystem.gcs.GcsFileSystemFactory;
+import io.trino.filesystem.gcs.GcsFileSystemModule;
import io.trino.filesystem.hdfs.HdfsFileSystemFactory;
import io.trino.filesystem.hdfs.HdfsFileSystemModule;
import io.trino.filesystem.s3.S3FileSystemFactory;
@@ -82,6 +84,14 @@ else if (config.isHadoopEnabled()) {
else if (config.isHadoopEnabled()) {
install(new HiveS3Module());
}
+
+ if (config.isNativeGcsEnabled()) {
+ install(new GcsFileSystemModule());
+ factories.addBinding("gs").to(GcsFileSystemFactory.class);
+ }
+ else {
+ install(new HiveGcsModule());
+ }
}
@Provides
diff --git a/lib/trino-filesystem-manager/src/test/java/io/trino/filesystem/manager/TestFileSystemConfig.java b/lib/trino-filesystem-manager/src/test/java/io/trino/filesystem/manager/TestFileSystemConfig.java
index a9b9bad5dad0..4e765b70ccee 100644
--- a/lib/trino-filesystem-manager/src/test/java/io/trino/filesystem/manager/TestFileSystemConfig.java
+++ b/lib/trino-filesystem-manager/src/test/java/io/trino/filesystem/manager/TestFileSystemConfig.java
@@ -30,7 +30,8 @@ public void testDefaults()
assertRecordedDefaults(recordDefaults(FileSystemConfig.class)
.setHadoopEnabled(true)
.setNativeAzureEnabled(false)
- .setNativeS3Enabled(false));
+ .setNativeS3Enabled(false)
+ .setNativeGcsEnabled(false));
}
@Test
@@ -40,12 +41,14 @@ public void testExplicitPropertyMappings()
.put("fs.hadoop.enabled", "false")
.put("fs.native-azure.enabled", "true")
.put("fs.native-s3.enabled", "true")
+ .put("fs.native-gcs.enabled", "true")
.buildOrThrow();
FileSystemConfig expected = new FileSystemConfig()
.setHadoopEnabled(false)
.setNativeAzureEnabled(true)
- .setNativeS3Enabled(true);
+ .setNativeS3Enabled(true)
+ .setNativeGcsEnabled(true);
assertFullMapping(properties, expected);
}
diff --git a/pom.xml b/pom.xml
index fc8857e0b1de..2de19d7d8578 100644
--- a/pom.xml
+++ b/pom.xml
@@ -43,6 +43,7 @@
lib/trino-cache
lib/trino-filesystem
lib/trino-filesystem-azure
+ lib/trino-filesystem-gcs
lib/trino-filesystem-manager
lib/trino-filesystem-s3
lib/trino-geospatial-toolkit
@@ -992,6 +993,12 @@
${project.version}
+
+ io.trino
+ trino-filesystem-gcs
+ ${project.version}
+
+
io.trino
trino-filesystem-manager
@@ -2293,6 +2300,102 @@
shaded.parquet.it.unimi.dsi.fastutil
+
+
+
+ com.google.cloud.bigdataoss
+ gcs-connector
+
+
+ io.opencensus
+ opencensus-proto
+
+
+
+ opencensus/proto/agent/common/v1/common.proto
+ opencensus/proto/agent/metrics/v1/metrics_service.proto
+ opencensus/proto/agent/trace/v1/trace_service.proto
+ opencensus/proto/metrics/v1/metrics.proto
+ opencensus/proto/resource/v1/resource.proto
+ opencensus/proto/stats/v1/stats.proto
+ opencensus/proto/trace/v1/trace.proto
+ opencensus/proto/trace/v1/trace_config.proto
+
+
+
+
+
+ io.grpc
+ grpc-services
+
+
+ org.alluxio
+ alluxio-shaded-client
+
+
+
+ grpc/binlog/v1/binarylog.proto
+ grpc/health/v1/health.proto
+ grpc/reflection/v1alpha/reflection.proto
+ grpc/channelz/v1/channelz.proto
+
+
+
+
+
+ com.google.android
+ annotations
+
+
+ org.alluxio
+ alluxio-shaded-client
+
+
+
+ android.annotation.SuppressLint
+ android.annotation.TargetApi
+
+
+
+
+
+ com.google.re2j
+ re2j
+
+
+ io.trino
+ re2j
+
+
+
+ com.google.re2j
+
+
+
+
+
+ org.alluxio
+ alluxio-shaded-client
+
+
+ com.google.protobuf
+ protobuf-java
+
+
+
+ google/protobuf/any.proto
+ google/protobuf/api.proto
+ google/protobuf/descriptor.proto
+ google/protobuf/duration.proto
+ google/protobuf/empty.proto
+ google/protobuf/field_mask.proto
+ google/protobuf/source_context.proto
+ google/protobuf/struct.proto
+ google/protobuf/timestamp.proto
+ google/protobuf/type.proto
+ google/protobuf/wrappers.proto
+
+