Skip to content

Commit

Permalink
fixup! Fetch Redshift query results unloaded to S3
Browse files Browse the repository at this point in the history
  • Loading branch information
mayankvadariya committed Dec 23, 2024
1 parent e1078ea commit 3ae7328
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 27 deletions.
12 changes: 6 additions & 6 deletions docs/src/main/sphinx/connector/redshift.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,13 @@ One of the limitation is that Redshift cluster and S3 must be in the same AWS
region.

The following table describes configuration properties for using
`UNLOAD` command in Redshift connector. `redshift.unload-location` and
`redshift.unload-iam-role` must be set to use `UNLOAD`.
`UNLOAD` command in Redshift connector. `redshift.unload-location` must be set
to use `UNLOAD`.

| Property name | Required | Description |
|----------------------------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `redshift.unload-location` | optional | A writeable location in Amazon S3, to be used for temporarily unloading Redshift query results. |
| `redshift.unload-iam-role` | optional | Fully specified ARN of the IAM Role attached to the Redshift cluster. Provided role will be used in `UNLOAD` command. IAM role must have access to Redshift cluster and write access to S3 bucket. |
| Property name | Required | Description |
|----------------------------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `redshift.unload-location` | optional | A writeable location in Amazon S3, to be used for temporarily unloading Redshift query results. |
| `redshift.unload-iam-role` | optional | Fully specified ARN of the IAM Role attached to the Redshift cluster. Provided role will be used in `UNLOAD` command. IAM role must have access to Redshift cluster and write access to S3 bucket. Defaults to `DEFAULT` which uses IAM role attached to Redshift cluster. |

Additionally, define appropriate [S3 configurations](/object-storage/file-system-s3)
except `fs.native-s3.enabled`, required to read Parquet files from S3 bucket.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,11 @@
import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
import io.airlift.configuration.DefunctConfig;
import jakarta.annotation.PostConstruct;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.Pattern;

import java.util.Optional;

import static com.google.common.base.Preconditions.checkState;

@DefunctConfig({
"redshift.disable-automatic-fetch-size",
"redshift.use-legacy-type-mapping",
Expand Down Expand Up @@ -73,11 +70,4 @@ public RedshiftConfig setUnloadIamRole(String unloadIamRole)
this.unloadIamRole = unloadIamRole;
return this;
}

@PostConstruct
public void validate()
{
checkState(getUnloadIamRole().isPresent() == getUnloadLocation().isPresent(),
"Either 'redshift.unload-iam-role' and 'redshift.unload-location' must be set or neither of them must not be set");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,13 @@ public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, Co

Connection connection;
PreparedStatement statement;
String unloadOutputPath = unloadLocation.orElseThrow() + "/" + session.getQueryId() + "-" + UUID.randomUUID() + "/";
String queryFragmentId = session.getQueryId() + "-" + UUID.randomUUID();
String unloadOutputPath = unloadLocation.orElseThrow() + "/" + queryFragmentId + "/";
try {
connection = jdbcClient.getConnection(session);
String redshiftSelectSql = buildRedshiftSelectSql(session, connection, jdbcTableHandle, columns);

// Query containing \\b is unsupported with unload command. See https://github.com/aws/amazon-redshift-jdbc-driver/issues/124
// Query containing \b is unsupported with unload command. See https://github.com/aws/amazon-redshift-jdbc-driver/issues/124
if (redshiftSelectSql.contains("\\b")) {
log.debug("Unload query contains unsupported characters. Falling back to using JDBC");
return fallbackSplitSource;
Expand All @@ -127,7 +128,7 @@ public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, Co
catch (SQLException e) {
throw new RuntimeException(e);
}
return new RedshiftUnloadSplitSource(executor, connection, statement, unloadOutputPath, fileSystemFactory.create(session));
return new RedshiftUnloadSplitSource(executor, connection, statement, queryFragmentId, unloadOutputPath, fileSystemFactory.create(session));
}

private String buildRedshiftSelectSql(ConnectorSession session, Connection connection, JdbcTableHandle table, List<JdbcColumnHandle> columns)
Expand All @@ -146,7 +147,7 @@ private PreparedStatement buildUnloadSql(ConnectorSession session, Connection co
String unloadSql = "UNLOAD ('%s') TO '%s' IAM_ROLE '%s' FORMAT PARQUET MAXFILESIZE 64MB MANIFEST VERBOSE".formatted(
escapeSingleQuote(redshiftSelectSql),
unloadOutputPath,
unloadAuthorization.orElseThrow());
unloadAuthorization.orElse("DEFAULT"));
return queryBuilder.prepareStatement(jdbcClient, session, connection, new PreparedQuery(unloadSql, List.of()), Optional.of(columns.size()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class RedshiftUnloadSplitSource
private static final Logger log = Logger.get(RedshiftUnloadSplitSource.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapperProvider().get();

private final CompletableFuture<Boolean> resultSetFuture;
private final CompletableFuture<Void> resultSetFuture;
private final Connection connection;
private final Statement statement;
private final String unloadOutputPath;
Expand All @@ -59,20 +59,22 @@ public class RedshiftUnloadSplitSource
private Optional<Location> manifestLocation;
private boolean finished;

public RedshiftUnloadSplitSource(ExecutorService executor, Connection connection, PreparedStatement statement, String unloadOutputPath, TrinoFileSystem fileSystem)
public RedshiftUnloadSplitSource(ExecutorService executor, Connection connection, PreparedStatement statement, String queryFragmentId, String unloadOutputPath, TrinoFileSystem fileSystem)
{
requireNonNull(executor, "executor is null");
this.connection = requireNonNull(connection, "connection is null");
this.statement = requireNonNull(statement, "statement is null");
this.unloadOutputPath = requireNonNull(unloadOutputPath, "unloadOutputPath is null");
verify(unloadOutputPath.endsWith("/"), "unloadOutputPath must end with '/'.");
this.fileSystem = requireNonNull(fileSystem, "fileSystem is null");
resultSetFuture = CompletableFuture.supplyAsync(() -> {
resultSetFuture = CompletableFuture.runAsync(() -> {
log.debug("Executing: %s", statement);
try {
// Exclusively set readOnly to false to avoid query failing with "ERROR: transaction is read-only".
connection.setReadOnly(false);
return statement.execute();
long beginTs = System.currentTimeMillis();
statement.execute(); // Return value of `statement.execute()` is not useful as it always return false.
log.info("UNLOAD command for %s query took %sms", queryFragmentId, System.currentTimeMillis() - beginTs);
}
catch (SQLException e) {
if (e instanceof RedshiftException && e.getMessage().contains("The S3 bucket addressed by the query is in a different region from this cluster")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

import static com.google.common.collect.MoreCollectors.onlyElement;
import static io.trino.plugin.redshift.RedshiftQueryRunner.IAM_ROLE;
Expand Down Expand Up @@ -80,7 +81,7 @@ void testUnloadFlow()
.collect(onlyElement())
.getInfo())
.getSplitInfo();
assertThat(splitInfo.get("path")).matches("%s/.*/0001_part_00.parquet".formatted(S3_UNLOAD_ROOT));
Stream.of(splitInfo.get("path").split(", ")).forEach(path -> assertThat(path).matches("%s/.*/.*.parquet".formatted(S3_UNLOAD_ROOT)));
},
results -> assertThat(results.getRowCount()).isEqualTo(5));
}
Expand All @@ -91,14 +92,14 @@ void testUnloadFlowFallbackToJdbc()
// Fallback to JDBC as limit clause is not supported by UNLOAD
assertQueryStats(
getSession(),
"SELECT nationkey, name FROM nation WHERE regionkey = 0 LIMIT 2",
"SELECT nationkey, name FROM nation WHERE nationkey = 6 LIMIT 1",
queryStats -> {
OperatorStats operatorStats = queryStats.getOperatorSummaries()
.stream()
.filter(summary -> summary.getOperatorType().startsWith("TableScanOperator"))
.collect(onlyElement());
assertThat(operatorStats.getInfo()).isNull();
},
results -> assertThat(results.getRowCount()).isEqualTo(2));
results -> assertThat(results.getRowCount()).isEqualTo(1));
}
}

0 comments on commit 3ae7328

Please sign in to comment.