diff --git a/plugin/trino-bigquery/pom.xml b/plugin/trino-bigquery/pom.xml
index 27304f80aaf0..c54d29cdefd1 100644
--- a/plugin/trino-bigquery/pom.xml
+++ b/plugin/trino-bigquery/pom.xml
@@ -112,6 +112,12 @@
+
+ com.google.api.grpc
+ proto-google-common-protos
+ 2.21.0
+
+
com.google.auth
google-auth-library-credentials
@@ -313,6 +319,12 @@
httpcore
+
+ org.json
+ json
+ 20230618
+
+
org.threeten
threetenbp
diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryConnectorModule.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryConnectorModule.java
index 733fb33717f9..4826989f4638 100644
--- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryConnectorModule.java
+++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryConnectorModule.java
@@ -68,6 +68,7 @@ protected void setup(Binder binder)
{
// BigQuery related
binder.bind(BigQueryReadClientFactory.class).in(Scopes.SINGLETON);
+ binder.bind(BigQueryWriteClientFactory.class).in(Scopes.SINGLETON);
binder.bind(BigQueryClientFactory.class).in(Scopes.SINGLETON);
// Connector implementation
diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryErrorCode.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryErrorCode.java
index 60fa13b0274b..d42de472b776 100644
--- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryErrorCode.java
+++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryErrorCode.java
@@ -31,6 +31,7 @@ public enum BigQueryErrorCode
BIGQUERY_UNSUPPORTED_OPERATION(5, USER_ERROR),
BIGQUERY_INVALID_STATEMENT(6, USER_ERROR),
BIGQUERY_PROXY_SSL_INITIALIZATION_FAILED(7, EXTERNAL),
+ BIGQUERY_BAD_WRITE(8, EXTERNAL),
/**/;
private final ErrorCode errorCode;
diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryGrpcOptionsConfigurer.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryGrpcOptionsConfigurer.java
index f1c7c667ccac..7fe40acafc9d 100644
--- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryGrpcOptionsConfigurer.java
+++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryGrpcOptionsConfigurer.java
@@ -15,6 +15,7 @@
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.cloud.bigquery.storage.v1.BigQueryReadSettings;
+import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings;
import io.trino.spi.connector.ConnectorSession;
interface BigQueryGrpcOptionsConfigurer
@@ -27,5 +28,12 @@ default BigQueryReadSettings.Builder configure(BigQueryReadSettings.Builder buil
return builder.setTransportChannelProvider(configure(channelBuilder, session).build());
}
+ @Override
+ default BigQueryWriteSettings.Builder configure(BigQueryWriteSettings.Builder builder, ConnectorSession session)
+ {
+ InstantiatingGrpcChannelProvider.Builder channelBuilder = ((InstantiatingGrpcChannelProvider) builder.getTransportChannelProvider()).toBuilder();
+ return builder.setTransportChannelProvider(configure(channelBuilder, session).build());
+ }
+
InstantiatingGrpcChannelProvider.Builder configure(InstantiatingGrpcChannelProvider.Builder channelBuilder, ConnectorSession session);
}
diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryOptionsConfigurer.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryOptionsConfigurer.java
index 14f4bdcd8277..1543a2c9f535 100644
--- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryOptionsConfigurer.java
+++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryOptionsConfigurer.java
@@ -15,6 +15,7 @@
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.storage.v1.BigQueryReadSettings;
+import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings;
import io.trino.spi.connector.ConnectorSession;
interface BigQueryOptionsConfigurer
@@ -22,4 +23,6 @@ interface BigQueryOptionsConfigurer
BigQueryOptions.Builder configure(BigQueryOptions.Builder builder, ConnectorSession session);
BigQueryReadSettings.Builder configure(BigQueryReadSettings.Builder builder, ConnectorSession session);
+
+ BigQueryWriteSettings.Builder configure(BigQueryWriteSettings.Builder builder, ConnectorSession session);
}
diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSink.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSink.java
index e67d13702d0f..48e3b7b8eb94 100644
--- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSink.java
+++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSink.java
@@ -13,40 +13,49 @@
*/
package io.trino.plugin.bigquery;
-import com.google.cloud.bigquery.InsertAllRequest;
-import com.google.cloud.bigquery.TableId;
+import com.google.api.core.ApiFuture;
+import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
+import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
+import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest;
+import com.google.cloud.bigquery.storage.v1.JsonStreamWriter;
+import com.google.cloud.bigquery.storage.v1.TableName;
+import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.common.collect.ImmutableList;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import io.trino.spi.Page;
+import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorPageSink;
import io.trino.spi.connector.ConnectorPageSinkId;
import io.trino.spi.type.Type;
+import org.json.JSONArray;
+import org.json.JSONObject;
import java.util.Collection;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import static com.google.cloud.bigquery.storage.v1.WriteStream.Type.COMMITTED;
import static com.google.common.base.Preconditions.checkArgument;
+import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_BAD_WRITE;
import static io.trino.plugin.bigquery.BigQueryTypeUtils.readNativeValue;
+import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.CompletableFuture.completedFuture;
public class BigQueryPageSink
implements ConnectorPageSink
{
- private final BigQueryClient client;
- private final TableId tableId;
+ private final BigQueryWriteClient client;
+ private final TableName tableName;
private final List columnNames;
private final List columnTypes;
private final ConnectorPageSinkId pageSinkId;
private final Optional pageSinkIdColumnName;
public BigQueryPageSink(
- BigQueryClient client,
+ BigQueryWriteClient client,
RemoteTableName remoteTableName,
List columnNames,
List columnTypes,
@@ -64,28 +73,46 @@ public BigQueryPageSink(
this.pageSinkIdColumnName = requireNonNull(pageSinkIdColumnName, "pageSinkIdColumnName is null");
checkArgument(temporaryTableName.isPresent() == pageSinkIdColumnName.isPresent(),
"temporaryTableName.isPresent is not equal to pageSinkIdColumn.isPresent");
- this.tableId = temporaryTableName
- .map(tableName -> TableId.of(remoteTableName.getProjectId(), remoteTableName.getDatasetName(), tableName))
- .orElseGet(remoteTableName::toTableId);
+ this.tableName = temporaryTableName
+ .map(tableName -> TableName.of(remoteTableName.getProjectId(), remoteTableName.getDatasetName(), tableName))
+ .orElseGet(remoteTableName::toTableName);
}
@Override
public CompletableFuture> appendPage(Page page)
{
- InsertAllRequest.Builder batch = InsertAllRequest.newBuilder(tableId);
+ JSONArray batch = new JSONArray();
for (int position = 0; position < page.getPositionCount(); position++) {
- Map row = new HashMap<>();
+ JSONObject row = new JSONObject();
pageSinkIdColumnName.ifPresent(column -> row.put(column, pageSinkId.getId()));
for (int channel = 0; channel < page.getChannelCount(); channel++) {
row.put(columnNames.get(channel), readNativeValue(columnTypes.get(channel), page.getBlock(channel), position));
}
- batch.addRow(row);
+ batch.put(row);
}
- client.insert(batch.build());
+ insertWithCommitted(batch);
return NOT_BLOCKED;
}
+ private void insertWithCommitted(JSONArray batch)
+ {
+ WriteStream stream = WriteStream.newBuilder().setType(COMMITTED).build();
+ CreateWriteStreamRequest createWriteStreamRequest = CreateWriteStreamRequest.newBuilder().setParent(tableName.toString()).setWriteStream(stream).build();
+ WriteStream writeStream = client.createWriteStream(createWriteStreamRequest);
+
+ try (JsonStreamWriter writer = JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema(), client).build()) {
+ ApiFuture future = writer.append(batch);
+ AppendRowsResponse response = future.get();
+ if (response.hasError()) {
+ throw new TrinoException(BIGQUERY_BAD_WRITE, format("Response has error: %s", response.getError().getMessage()));
+ }
+ }
+ catch (Exception e) {
+ throw new TrinoException(BIGQUERY_BAD_WRITE, "Failed to insert rows", e);
+ }
+ }
+
@Override
public CompletableFuture> finish()
{
diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSinkProvider.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSinkProvider.java
index a3423fa78670..dc8de166298c 100644
--- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSinkProvider.java
+++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSinkProvider.java
@@ -29,10 +29,10 @@
public class BigQueryPageSinkProvider
implements ConnectorPageSinkProvider
{
- private final BigQueryClientFactory clientFactory;
+ private final BigQueryWriteClientFactory clientFactory;
@Inject
- public BigQueryPageSinkProvider(BigQueryClientFactory clientFactory)
+ public BigQueryPageSinkProvider(BigQueryWriteClientFactory clientFactory)
{
this.clientFactory = requireNonNull(clientFactory, "clientFactory is null");
}
@@ -42,7 +42,7 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa
{
BigQueryOutputTableHandle handle = (BigQueryOutputTableHandle) outputTableHandle;
return new BigQueryPageSink(
- clientFactory.createBigQueryClient(session),
+ clientFactory.create(session),
handle.getRemoteTableName(),
handle.getColumnNames(),
handle.getColumnTypes(),
@@ -56,7 +56,7 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa
{
BigQueryInsertTableHandle handle = (BigQueryInsertTableHandle) insertTableHandle;
return new BigQueryPageSink(
- clientFactory.createBigQueryClient(session),
+ clientFactory.create(session),
handle.getRemoteTableName(),
handle.getColumnNames(),
handle.getColumnTypes(),
diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryWriteClientFactory.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryWriteClientFactory.java
new file mode 100644
index 000000000000..39afc63c068d
--- /dev/null
+++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryWriteClientFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.bigquery;
+
+import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
+import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings;
+import com.google.common.collect.ImmutableSet;
+import com.google.inject.Inject;
+import io.trino.spi.connector.ConnectorSession;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Set;
+
+import static java.util.Objects.requireNonNull;
+
+public class BigQueryWriteClientFactory
+{
+ private final Set configurers;
+
+ @Inject
+ public BigQueryWriteClientFactory(Set configurers)
+ {
+ this.configurers = ImmutableSet.copyOf(requireNonNull(configurers, "configurers is null"));
+ }
+
+ public BigQueryWriteClient create(ConnectorSession session)
+ {
+ BigQueryWriteSettings.Builder builder = BigQueryWriteSettings.newBuilder();
+
+ for (BigQueryOptionsConfigurer configurer : configurers) {
+ builder = configurer.configure(builder, session);
+ }
+ try {
+ return BigQueryWriteClient.create(builder.build());
+ }
+ catch (IOException e) {
+ throw new UncheckedIOException("Error creating BigQueryWriteClient", e);
+ }
+ }
+}
diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/CredentialsOptionsConfigurer.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/CredentialsOptionsConfigurer.java
index f44eab1a8ff1..ee771c77b8b2 100644
--- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/CredentialsOptionsConfigurer.java
+++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/CredentialsOptionsConfigurer.java
@@ -18,6 +18,7 @@
import com.google.auth.oauth2.ServiceAccountCredentials;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.storage.v1.BigQueryReadSettings;
+import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings;
import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Inject;
import io.trino.spi.connector.ConnectorSession;
@@ -58,6 +59,14 @@ public BigQueryReadSettings.Builder configure(BigQueryReadSettings.Builder build
return builder;
}
+ @Override
+ public BigQueryWriteSettings.Builder configure(BigQueryWriteSettings.Builder builder, ConnectorSession session)
+ {
+ Optional credentials = credentialsSupplier.getCredentials(session);
+ credentials.ifPresent(value -> builder.setCredentialsProvider(FixedCredentialsProvider.create(value)));
+ return builder;
+ }
+
// Note that at this point the config has been validated, which means that option 2 or option 3 will always be valid
@VisibleForTesting
static String calculateBillingProjectId(Optional configParentProjectId, Optional credentials)
diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/RemoteTableName.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/RemoteTableName.java
index 33bdf307d6ce..1fe82e53fdcc 100644
--- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/RemoteTableName.java
+++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/RemoteTableName.java
@@ -16,6 +16,7 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.cloud.bigquery.TableId;
+import com.google.cloud.bigquery.storage.v1.TableName;
import java.util.Objects;
@@ -46,6 +47,11 @@ public TableId toTableId()
return TableId.of(projectId, datasetName, tableName);
}
+ public TableName toTableName()
+ {
+ return TableName.of(projectId, datasetName, tableName);
+ }
+
@JsonProperty
public String getProjectId()
{
diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/RetryOptionsConfigurer.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/RetryOptionsConfigurer.java
index 56f25cf87349..94735b606c9b 100644
--- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/RetryOptionsConfigurer.java
+++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/RetryOptionsConfigurer.java
@@ -16,6 +16,7 @@
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.storage.v1.BigQueryReadSettings;
+import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings;
import com.google.inject.Inject;
import io.airlift.units.Duration;
import io.trino.spi.connector.ConnectorSession;
@@ -62,6 +63,20 @@ public BigQueryReadSettings.Builder configure(BigQueryReadSettings.Builder build
}
}
+ @Override
+ public BigQueryWriteSettings.Builder configure(BigQueryWriteSettings.Builder builder, ConnectorSession session)
+ {
+ try {
+ return builder.applyToAllUnaryMethods(methodBuilder -> {
+ methodBuilder.setRetrySettings(retrySettings());
+ return null;
+ });
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
private RetrySettings retrySettings()
{
long maxDelay = retryDelay.toMillis() * (long) pow(retryMultiplier, retries);