diff --git a/docs/src/main/sphinx/connector/redshift.md b/docs/src/main/sphinx/connector/redshift.md index c3f06531eb31..e615d7631e82 100644 --- a/docs/src/main/sphinx/connector/redshift.md +++ b/docs/src/main/sphinx/connector/redshift.md @@ -78,13 +78,13 @@ One of the limitation is that Redshift cluster and S3 must be in the same AWS region. The following table describes configuration properties for using -`UNLOAD` command in Redshift connector. `redshift.unload-location` and -`redshift.unload-iam-role` must be set to use `UNLOAD`. +`UNLOAD` command in Redshift connector. `redshift.unload-location` must be set +to use `UNLOAD`. -| Property name | Required | Description | -|----------------------------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| `redshift.unload-location` | optional | A writeable location in Amazon S3, to be used for temporarily unloading Redshift query results. | -| `redshift.unload-iam-role` | optional | Fully specified ARN of the IAM Role attached to the Redshift cluster. Provided role will be used in `UNLOAD` command. IAM role must have access to Redshift cluster and write access to S3 bucket. | +| Property name | Required | Description | +|----------------------------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `redshift.unload-location` | optional | A writeable location in Amazon S3, to be used for temporarily unloading Redshift query results. | +| `redshift.unload-iam-role` | optional | Fully specified ARN of the IAM Role attached to the Redshift cluster. Provided role will be used in `UNLOAD` command. IAM role must have access to Redshift cluster and write access to S3 bucket. Defaults to `DEFAULT` which uses IAM role attached to Redshift cluster. | Additionally, define appropriate [S3 configurations](/object-storage/file-system-s3) except `fs.native-s3.enabled`, required to read Parquet files from S3 bucket. diff --git a/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftConfig.java b/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftConfig.java index 7467de7e44c6..b373cc670c5a 100644 --- a/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftConfig.java +++ b/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftConfig.java @@ -16,14 +16,11 @@ import io.airlift.configuration.Config; import io.airlift.configuration.ConfigDescription; import io.airlift.configuration.DefunctConfig; -import jakarta.annotation.PostConstruct; import jakarta.validation.constraints.Min; import jakarta.validation.constraints.Pattern; import java.util.Optional; -import static com.google.common.base.Preconditions.checkState; - @DefunctConfig({ "redshift.disable-automatic-fetch-size", "redshift.use-legacy-type-mapping", @@ -73,11 +70,4 @@ public RedshiftConfig setUnloadIamRole(String unloadIamRole) this.unloadIamRole = unloadIamRole; return this; } - - @PostConstruct - public void validate() - { - checkState(getUnloadIamRole().isPresent() == getUnloadLocation().isPresent(), - "Either 'redshift.unload-iam-role' and 'redshift.unload-location' must be set or neither of them must not be set"); - } } diff --git a/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftSplitManager.java b/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftSplitManager.java index 1192e462e45a..e6cec0c0bf56 100644 --- a/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftSplitManager.java +++ b/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftSplitManager.java @@ -112,12 +112,13 @@ public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, Co Connection connection; PreparedStatement statement; - String unloadOutputPath = unloadLocation.orElseThrow() + "/" + session.getQueryId() + "-" + UUID.randomUUID() + "/"; + String queryFragmentId = session.getQueryId() + "-" + UUID.randomUUID(); + String unloadOutputPath = unloadLocation.orElseThrow() + "/" + queryFragmentId + "/"; try { connection = jdbcClient.getConnection(session); String redshiftSelectSql = buildRedshiftSelectSql(session, connection, jdbcTableHandle, columns); - // Query containing \\b is unsupported with unload command. See https://github.com/aws/amazon-redshift-jdbc-driver/issues/124 + // Query containing \b is unsupported with unload command. See https://github.com/aws/amazon-redshift-jdbc-driver/issues/124 if (redshiftSelectSql.contains("\\b")) { log.debug("Unload query contains unsupported characters. Falling back to using JDBC"); return fallbackSplitSource; @@ -127,7 +128,7 @@ public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, Co catch (SQLException e) { throw new RuntimeException(e); } - return new RedshiftUnloadSplitSource(executor, connection, statement, unloadOutputPath, fileSystemFactory.create(session)); + return new RedshiftUnloadSplitSource(executor, connection, statement, queryFragmentId, unloadOutputPath, fileSystemFactory.create(session)); } private String buildRedshiftSelectSql(ConnectorSession session, Connection connection, JdbcTableHandle table, List columns) @@ -146,7 +147,7 @@ private PreparedStatement buildUnloadSql(ConnectorSession session, Connection co String unloadSql = "UNLOAD ('%s') TO '%s' IAM_ROLE '%s' FORMAT PARQUET MAXFILESIZE 64MB MANIFEST VERBOSE".formatted( escapeSingleQuote(redshiftSelectSql), unloadOutputPath, - unloadAuthorization.orElseThrow()); + unloadAuthorization.orElse("DEFAULT")); return queryBuilder.prepareStatement(jdbcClient, session, connection, new PreparedQuery(unloadSql, List.of()), Optional.of(columns.size())); } diff --git a/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftUnloadSplitSource.java b/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftUnloadSplitSource.java index 12af12e1cef2..ab61249ccf63 100644 --- a/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftUnloadSplitSource.java +++ b/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftUnloadSplitSource.java @@ -50,7 +50,7 @@ public class RedshiftUnloadSplitSource private static final Logger log = Logger.get(RedshiftUnloadSplitSource.class); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapperProvider().get(); - private final CompletableFuture resultSetFuture; + private final CompletableFuture resultSetFuture; private final Connection connection; private final Statement statement; private final String unloadOutputPath; @@ -59,7 +59,7 @@ public class RedshiftUnloadSplitSource private Optional manifestLocation; private boolean finished; - public RedshiftUnloadSplitSource(ExecutorService executor, Connection connection, PreparedStatement statement, String unloadOutputPath, TrinoFileSystem fileSystem) + public RedshiftUnloadSplitSource(ExecutorService executor, Connection connection, PreparedStatement statement, String queryFragmentId, String unloadOutputPath, TrinoFileSystem fileSystem) { requireNonNull(executor, "executor is null"); this.connection = requireNonNull(connection, "connection is null"); @@ -67,12 +67,14 @@ public RedshiftUnloadSplitSource(ExecutorService executor, Connection connection this.unloadOutputPath = requireNonNull(unloadOutputPath, "unloadOutputPath is null"); verify(unloadOutputPath.endsWith("/"), "unloadOutputPath must end with '/'."); this.fileSystem = requireNonNull(fileSystem, "fileSystem is null"); - resultSetFuture = CompletableFuture.supplyAsync(() -> { + resultSetFuture = CompletableFuture.runAsync(() -> { log.debug("Executing: %s", statement); try { // Exclusively set readOnly to false to avoid query failing with "ERROR: transaction is read-only". connection.setReadOnly(false); - return statement.execute(); + long beginTs = System.currentTimeMillis(); + statement.execute(); // Return value of `statement.execute()` is not useful as it always return false. + log.info("UNLOAD command for %s query took %sms", queryFragmentId, System.currentTimeMillis() - beginTs); } catch (SQLException e) { if (e instanceof RedshiftException && e.getMessage().contains("The S3 bucket addressed by the query is in a different region from this cluster")) { diff --git a/plugin/trino-redshift/src/test/java/io/trino/plugin/redshift/TestRedshiftUnload.java b/plugin/trino-redshift/src/test/java/io/trino/plugin/redshift/TestRedshiftUnload.java index 8391e495b28a..d7c4c1e147e7 100644 --- a/plugin/trino-redshift/src/test/java/io/trino/plugin/redshift/TestRedshiftUnload.java +++ b/plugin/trino-redshift/src/test/java/io/trino/plugin/redshift/TestRedshiftUnload.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; +import java.util.stream.Stream; import static com.google.common.collect.MoreCollectors.onlyElement; import static io.trino.plugin.redshift.RedshiftQueryRunner.IAM_ROLE; @@ -80,7 +81,7 @@ void testUnloadFlow() .collect(onlyElement()) .getInfo()) .getSplitInfo(); - assertThat(splitInfo.get("path")).matches("%s/.*/0001_part_00.parquet".formatted(S3_UNLOAD_ROOT)); + Stream.of(splitInfo.get("path").split(", ")).forEach(path -> assertThat(path).matches("%s/.*/.*.parquet".formatted(S3_UNLOAD_ROOT))); }, results -> assertThat(results.getRowCount()).isEqualTo(5)); } @@ -91,7 +92,7 @@ void testUnloadFlowFallbackToJdbc() // Fallback to JDBC as limit clause is not supported by UNLOAD assertQueryStats( getSession(), - "SELECT nationkey, name FROM nation WHERE regionkey = 0 LIMIT 2", + "SELECT nationkey, name FROM nation WHERE nationkey = 6 LIMIT 1", queryStats -> { OperatorStats operatorStats = queryStats.getOperatorSummaries() .stream() @@ -99,6 +100,6 @@ void testUnloadFlowFallbackToJdbc() .collect(onlyElement()); assertThat(operatorStats.getInfo()).isNull(); }, - results -> assertThat(results.getRowCount()).isEqualTo(2)); + results -> assertThat(results.getRowCount()).isEqualTo(1)); } }