Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for fetching Redshift query results using Redshift unload command #24117

Merged
merged 3 commits into from
Dec 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,7 @@ jobs:
REDSHIFT_IAM_ROLES: ${{ vars.REDSHIFT_IAM_ROLES }}
REDSHIFT_VPC_SECURITY_GROUP_IDS: ${{ vars.REDSHIFT_VPC_SECURITY_GROUP_IDS }}
REDSHIFT_S3_TPCH_TABLES_ROOT: ${{ vars.REDSHIFT_S3_TPCH_TABLES_ROOT }}
REDSHIFT_S3_UNLOAD_ROOT: ${{ vars.REDSHIFT_S3_UNLOAD_ROOT }}
if: >-
contains(matrix.modules, 'trino-redshift') &&
(contains(matrix.profile, 'cloud-tests') || contains(matrix.profile, 'fte-tests')) &&
Expand All @@ -752,6 +753,7 @@ jobs:
-Dtest.redshift.jdbc.password="${REDSHIFT_PASSWORD}" \
-Dtest.redshift.jdbc.endpoint="${REDSHIFT_ENDPOINT}:${REDSHIFT_PORT}/" \
-Dtest.redshift.s3.tpch.tables.root="${REDSHIFT_S3_TPCH_TABLES_ROOT}" \
-Dtest.redshift.s3.unload.root="${REDSHIFT_S3_UNLOAD_ROOT}" \
-Dtest.redshift.iam.role="${REDSHIFT_IAM_ROLES}" \
-Dtest.redshift.aws.region="${AWS_REGION}" \
-Dtest.redshift.aws.access-key="${AWS_ACCESS_KEY_ID}" \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.trino.spi.Mergeable;
import io.trino.spi.connector.CatalogHandle;

import java.util.List;
import java.util.Map;

import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static java.util.Objects.requireNonNull;

public class SplitOperatorInfo
implements OperatorInfo
implements OperatorInfo, Mergeable<SplitOperatorInfo>
{
private final CatalogHandle catalogHandle;
private final Map<String, String> splitInfo;
Expand Down Expand Up @@ -53,4 +56,18 @@ public CatalogHandle getCatalogHandle()
{
return catalogHandle;
}

@Override
public SplitOperatorInfo mergeWith(SplitOperatorInfo other)
{
return mergeWith(List.of(other));
}

@Override
public SplitOperatorInfo mergeWith(List<SplitOperatorInfo> others)
{
return new SplitOperatorInfo(
catalogHandle,
splitInfo.entrySet().stream().collect(toImmutableMap(Map.Entry::getKey, e -> e.getValue() + " (" + others.size() + " more)")));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,8 @@ public void testAdd()
assertThat(actual.getPeakRevocableMemoryReservation()).isEqualTo(DataSize.ofBytes(24));
assertThat(actual.getPeakTotalMemoryReservation()).isEqualTo(DataSize.ofBytes(25));
assertThat(actual.getSpilledDataSize()).isEqualTo(DataSize.ofBytes(3 * 26));
assertThat(actual.getInfo()).isNull();
assertThat(actual.getInfo()).isInstanceOf(SplitOperatorInfo.class);
assertThat(((SplitOperatorInfo) actual.getInfo()).getSplitInfo().get("some_info")).isEqualTo("some_value (2 more)");
}

@Test
Expand Down
37 changes: 37 additions & 0 deletions docs/src/main/sphinx/connector/redshift.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,43 @@ documentation](https://docs.aws.amazon.com/redshift/latest/mgmt/jdbc20-configura
```{include} jdbc-authentication.fragment
```

### UNLOAD configuration
raunaqmorarka marked this conversation as resolved.
Show resolved Hide resolved

This feature enables using Amazon S3 to efficiently transfer data out of Redshift
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please describe what this actually does for user .. seems like it is only a performance improvement .. right? And if so .. why is it called unload?

Copy link
Contributor Author

@mayankvadariya mayankvadariya Dec 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feature uses UNLOAD(https://docs.aws.amazon.com/redshift/latest/dg/r_UNLOAD.html) approach for performance improvements. Additionally, it requires additional configs to enable this feature. Please suggest text improvements.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So lets move that into the performance section and explain more there.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume we need to explain that they need a S3 account and whatever else .. and when this should and should not be used.. I am not aware of this info so I cant really reword appropriately without more details

instead of the default single threaded JDBC based implementation.
The connector automatically triggers the appropriate `UNLOAD` command
on Redshift to extract the output from Redshift to the configured
S3 bucket in the form of Parquet files. These Parquet files are read in parallel
from S3 to improve latency of reading from Redshift tables. The Parquet
files will be removed when Trino finishes executing the query. It is recommended
to define a custom life cycle policy on the S3 bucket used for unloading the
Redshift query results.
This feature is supported only when the Redshift cluster and the configured S3
bucket are in the same AWS region.

The following table describes configuration properties for using
mayankvadariya marked this conversation as resolved.
Show resolved Hide resolved
`UNLOAD` command in Redshift connector. `redshift.unload-location` must be set
to use `UNLOAD`.

:::{list-table} UNLOAD configuration properties
:widths: 30, 60
:header-rows: 1

* - Property value
- Description
* - `redshift.unload-location`
- A writeable location in Amazon S3, to be used for temporarily unloading
Redshift query results.
* - `redshift.unload-iam-role`
- Optional. Fully specified ARN of the IAM Role attached to the Redshift cluster.
Provided role will be used in `UNLOAD` command. IAM role must have access to
Redshift cluster and write access to S3 bucket. The default IAM role attached to
Redshift cluster is used when this property is not configured.
:::

Additionally, define appropriate [S3 configurations](/object-storage/file-system-s3)
except `fs.native-s3.enabled`, required to read Parquet files from S3 bucket.

### Multiple Redshift databases or clusters

The Redshift connector can only access a single database within
Expand Down
81 changes: 76 additions & 5 deletions plugin/trino-redshift/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@
<version>2.1.0.30</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
Expand All @@ -30,21 +35,61 @@
<artifactId>guice</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>bootstrap</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>configuration</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>json</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>log</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>units</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-base-jdbc</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-filesystem</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-filesystem-s3</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-matching</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-memory-context</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-parquet</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-plugin-toolkit</artifactId>
Expand All @@ -55,6 +100,16 @@
<artifactId>jakarta.validation-api</artifactId>
</dependency>

<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</dependency>

<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-column</artifactId>
</dependency>

<dependency>
<groupId>org.jdbi</groupId>
<artifactId>jdbi3-core</artifactId>
Expand Down Expand Up @@ -116,19 +171,31 @@

<dependency>
<groupId>io.airlift</groupId>
<artifactId>log</artifactId>
<artifactId>log-manager</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>log-manager</artifactId>
<groupId>software.amazon.awssdk</groupId>
<artifactId>auth</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>units</artifactId>
<groupId>software.amazon.awssdk</groupId>
<artifactId>aws-core</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>regions</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
<scope>runtime</scope>
</dependency>

Expand Down Expand Up @@ -236,9 +303,11 @@
<exclude>**/TestRedshiftAutomaticJoinPushdown.java</exclude>
<exclude>**/TestRedshiftCastPushdown.java</exclude>
<exclude>**/TestRedshiftConnectorTest.java</exclude>
<exclude>**/TestRedshiftUnload.java</exclude>
<exclude>**/TestRedshiftConnectorSmokeTest.java</exclude>
<exclude>**/TestRedshiftTableStatisticsReader.java</exclude>
<exclude>**/TestRedshiftTypeMapping.java</exclude>
<exclude>**/TestRedshiftUnloadTypeMapping.java</exclude>
<exclude>**/Test*FailureRecoveryTest.java</exclude>
<exclude>**/Test*FailureRecoverySmokeTest.java</exclude>
</excludes>
Expand All @@ -265,6 +334,8 @@
<!-- JDBC operations performed on the ephemeral AWS Redshift cluster. -->
<include>**/TestRedshiftCastPushdown.java</include>
<include>**/TestRedshiftConnectorSmokeTest.java</include>
<include>**/TestRedshiftUnloadTypeMapping.java</include>
<include>**/TestRedshiftUnload.java</include>
</includes>
</configuration>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,44 @@

import com.amazon.redshift.Driver;
import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.Provides;
import com.google.inject.Scopes;
import com.google.inject.Singleton;
import io.airlift.configuration.AbstractConfigurationAwareModule;
import io.opentelemetry.api.OpenTelemetry;
import io.trino.filesystem.s3.S3FileSystemModule;
import io.trino.plugin.base.metrics.FileFormatDataSourceStats;
import io.trino.plugin.jdbc.BaseJdbcConfig;
import io.trino.plugin.jdbc.ConnectionFactory;
import io.trino.plugin.jdbc.DecimalModule;
import io.trino.plugin.jdbc.DriverConnectionFactory;
import io.trino.plugin.jdbc.ForBaseJdbc;
import io.trino.plugin.jdbc.ForJdbcDynamicFiltering;
import io.trino.plugin.jdbc.JdbcClient;
import io.trino.plugin.jdbc.JdbcConnector;
import io.trino.plugin.jdbc.JdbcJoinPushdownSupportModule;
import io.trino.plugin.jdbc.JdbcMetadataConfig;
import io.trino.plugin.jdbc.JdbcQueryEventListener;
import io.trino.plugin.jdbc.JdbcRecordSetProvider;
import io.trino.plugin.jdbc.JdbcSplitManager;
import io.trino.plugin.jdbc.JdbcStatisticsConfig;
import io.trino.plugin.jdbc.RemoteQueryCancellationModule;
import io.trino.plugin.jdbc.credential.CredentialProvider;
import io.trino.plugin.jdbc.ptf.Query;
import io.trino.spi.connector.Connector;
import io.trino.spi.connector.ConnectorRecordSetProvider;
import io.trino.spi.connector.ConnectorSplitManager;
import io.trino.spi.function.table.ConnectorTableFunction;

import java.util.Properties;

import static com.google.inject.Scopes.SINGLETON;
import static com.google.inject.multibindings.Multibinder.newSetBinder;
import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder;
import static io.airlift.configuration.ConditionalModule.conditionalModule;
import static io.airlift.configuration.ConfigBinder.configBinder;
import static io.trino.plugin.jdbc.JdbcModule.bindSessionPropertiesProvider;

public class RedshiftClientModule
extends AbstractConfigurationAwareModule
Expand All @@ -50,10 +65,28 @@ public void setup(Binder binder)
configBinder(binder).bindConfigDefaults(JdbcMetadataConfig.class, config -> config.setBulkListColumns(true));
newSetBinder(binder, ConnectorTableFunction.class).addBinding().toProvider(Query.class).in(SINGLETON);
configBinder(binder).bindConfig(JdbcStatisticsConfig.class);
bindSessionPropertiesProvider(binder, RedshiftSessionProperties.class);

install(new DecimalModule());
install(new JdbcJoinPushdownSupportModule());
install(new RemoteQueryCancellationModule());
binder.bind(ConnectorRecordSetProvider.class).to(JdbcRecordSetProvider.class).in(Scopes.SINGLETON);

install(conditionalModule(
RedshiftConfig.class,
config -> config.getUnloadLocation().isPresent(),
unloadBinder -> {
install(new S3FileSystemModule());
unloadBinder.bind(JdbcSplitManager.class).in(Scopes.SINGLETON);
unloadBinder.bind(Connector.class).to(RedshiftUnloadConnector.class).in(Scopes.SINGLETON);
unloadBinder.bind(FileFormatDataSourceStats.class).in(Scopes.SINGLETON);

newSetBinder(unloadBinder, JdbcQueryEventListener.class).addBinding().to(RedshiftUnloadJdbcQueryEventListener.class).in(Scopes.SINGLETON);

newOptionalBinder(unloadBinder, Key.get(ConnectorSplitManager.class, ForJdbcDynamicFiltering.class))
.setBinding().to(RedshiftSplitManager.class).in(SINGLETON);
},
jdbcBinder -> jdbcBinder.bind(Connector.class).to(JdbcConnector.class).in(Scopes.SINGLETON)));
}

@Singleton
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.airlift.configuration.ConfigDescription;
import io.airlift.configuration.DefunctConfig;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.Pattern;

import java.util.Optional;

Expand All @@ -27,6 +28,8 @@
public class RedshiftConfig
{
private Integer fetchSize;
private String unloadLocation;
private String unloadIamRole;

public Optional<@Min(0) Integer> getFetchSize()
{
Expand All @@ -40,4 +43,30 @@ public RedshiftConfig setFetchSize(Integer fetchSize)
this.fetchSize = fetchSize;
return this;
}

public Optional<@Pattern(regexp = "^s3://[^/]+(/[^/]+)?$", message = "Path shouldn't end with trailing slash") String> getUnloadLocation()
{
return Optional.ofNullable(unloadLocation);
raunaqmorarka marked this conversation as resolved.
Show resolved Hide resolved
}

@Config("redshift.unload-location")
mayankvadariya marked this conversation as resolved.
Show resolved Hide resolved
@ConfigDescription("A writeable location in Amazon S3, to be used for unloading Redshift query results")
public RedshiftConfig setUnloadLocation(String unloadLocation)
{
this.unloadLocation = unloadLocation;
return this;
}

public Optional<String> getUnloadIamRole()
{
return Optional.ofNullable(unloadIamRole);
}

@Config("redshift.unload-iam-role")
@ConfigDescription("Fully specified ARN of the IAM Role attached to the Redshift cluster and having access to S3")
public RedshiftConfig setUnloadIamRole(String unloadIamRole)
{
this.unloadIamRole = unloadIamRole;
return this;
}
}
Loading
Loading