Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support s3_data_dir and s3_data_naming #39

Merged
merged 11 commits into from
Nov 21, 2022
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 38 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,19 @@ stored login info. You can configure the AWS profile name to use via `aws_profil

A dbt profile can be configured to run against AWS Athena using the following configuration:

| Option | Description | Required? | Example |
|---------------- |-------------------------------------------------------------------------------- |----------- |-------------------- |
| s3_staging_dir | S3 location to store Athena query results and metadata | Required | `s3://bucket/dbt/` |
| region_name | AWS region of your Athena instance | Required | `eu-west-1` |
| schema | Specify the schema (Athena database) to build models into (lowercase **only**) | Required | `dbt` |
| database | Specify the database (Data catalog) to build models into (lowercase **only**) | Required | `awsdatacatalog` |
| poll_interval | Interval in seconds to use for polling the status of query results in Athena | Optional | `5` |
| aws_profile_name| Profile to use from your AWS shared credentials file. | Optional | `my-profile` |
| work_group| Identifier of Athena workgroup | Optional | `my-custom-workgroup` |
| num_retries| Number of times to retry a failing query | Optional | `3` | `5`
| Option | Description | Required? | Example |
|---------------- |--------------------------------------------------------------------------------|----------- |-----------------------|
| s3_staging_dir | S3 location to store Athena query results and metadata | Required | `s3://bucket/dbt/` |
| s3_data_dir | Prefix for storing tables, if different from the connection's `s3_staging_dir` | Optional | `s3://bucket2/dbt/` |
| s3_data_naming | How to generate table paths in `s3_data_dir` | Optional | `schema_table_unique` |
| region_name | AWS region of your Athena instance | Required | `eu-west-1` |
| schema | Specify the schema (Athena database) to build models into (lowercase **only**) | Required | `dbt` |
| database | Specify the database (Data catalog) to build models into (lowercase **only**) | Required | `awsdatacatalog` |
| poll_interval | Interval in seconds to use for polling the status of query results in Athena | Optional | `5` |
| aws_profile_name| Profile to use from your AWS shared credentials file. | Optional | `my-profile` |
| work_group| Identifier of Athena workgroup | Optional | `my-custom-workgroup` |
| num_retries| Number of times to retry a failing query | Optional | `3` | `5`


**Example profiles.yml entry:**
```yaml
Expand All @@ -66,6 +69,8 @@ athena:
dev:
type: athena
s3_staging_dir: s3://athena-query-results/dbt/
s3_data_dir: s3://your_s3_bucket/dbt/
s3_data_naming: schema_table
region_name: eu-west-1
schema: dbt
database: awsdatacatalog
Expand All @@ -84,9 +89,7 @@ _Additional information_
#### Table Configuration

* `external_location` (`default=none`)
* The location where Athena saves your table in Amazon S3
* If `none` then it will default to `{s3_staging_dir}/tables`
* If you are using a static value, when your table/partition is recreated underlying data will be cleaned up and overwritten by new data
* If set, the full S3 path in which the table will be saved.
* `partitioned_by` (`default=none`)
* An array list of columns by which the table will be partitioned
* Limited to creation of 100 partitions (_currently_)
Expand All @@ -104,11 +107,23 @@ _Additional information_
* `table_properties`: table properties to add to the table, valid for Iceberg only
* `strict_location` (`default=True`): when working with iceberg it's possible to rename tables, in order to do so, tables need to avoid to have same location. Setting up `strict_location` to *false* allow a table creation on an unique location

More information: [CREATE TABLE AS][create-table-as]
#### Table location
The location in which a table is saved is determined by:

1. If `external_location` is defined, that value is used.
2. If `s3_data_dir` is defined, the path is determined by that and `s3_data_naming`
3. If `s3_data_dir` is not defined data is stored under `s3_staging_dir/tables/`

Here all the options available for `s3_data_naming`:
* `uuid`: `{s3_data_dir}/{uuid4()}/`
* `table_table`: `{s3_data_dir}/{table}/`
* `table_unique`: `{s3_data_dir}/{table}/{uuid4()}/`
* `schema_table`: `{s3_data_dir}/{schema}/{table}/`
* `s3_data_naming=schema_table_unique`: `{s3_data_dir}/{schema}/{table}/{uuid4()}/`

It's possible to set the `s3_data_naming` globally in the target profile, or overwrite the value in the table config,
or setting up the value for groups of model in dbt_project.yml

[run_started_at]: https://docs.getdbt.com/reference/dbt-jinja-functions/run_started_at
[invocation_id]: https://docs.getdbt.com/reference/dbt-jinja-functions/invocation_id
[create-table-as]: https://docs.aws.amazon.com/athena/latest/ug/create-table-as.html

#### Supported functionality

Expand All @@ -129,7 +144,6 @@ To get started just add this as your model:
materialized='table',
format='iceberg',
partitioned_by=['bucket(5, user_id)'],
strict_location=false,
table_properties={
'optimize_rewrite_delete_file_threshold': '2'
}
Expand Down Expand Up @@ -193,6 +207,12 @@ You can run the tests using `make`:
make run_tests
```

### Helpful Resources

* [Athena CREATE TABLE AS](https://docs.aws.amazon.com/athena/latest/ug/create-table-as.html)
* [dbt run_started_at](https://docs.getdbt.com/reference/dbt-jinja-functions/run_started_at)
* [dbt invocation_id](https://docs.getdbt.com/reference/dbt-jinja-functions/invocation_id)

### Community

* [fishtown-analytics/dbt][fishtown-analytics/dbt]
Expand Down
4 changes: 4 additions & 0 deletions dbt/adapters/athena/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ class AthenaCredentials(Credentials):
poll_interval: float = 1.0
_ALIASES = {"catalog": "database"}
num_retries: Optional[int] = 5
s3_data_dir: Optional[str] = None
s3_data_naming: Optional[str] = "schema_table_unique"

@property
def type(self) -> str:
Expand All @@ -64,6 +66,8 @@ def _connection_keys(self) -> Tuple[str, ...]:
"poll_interval",
"aws_profile_name",
"endpoing_url",
"s3_data_dir",
"s3_data_naming",
)


Expand Down
45 changes: 30 additions & 15 deletions dbt/adapters/athena/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,26 +46,41 @@ def convert_datetime_type(cls, agate_table: agate.Table, col_idx: int) -> str:
return "timestamp"

@available
def s3_uuid_table_location(self):
def s3_table_prefix(self, s3_data_dir: str) -> str:
"""
Returns the root location for storing tables in S3.
This is `s3_data_dir`, if set, and `s3_staging_dir/tables/` if not.
We generate a value here even if `s3_data_dir` is not set,
since creating a seed table requires a non-default location.
"""
conn = self.connections.get_thread_connection()
client = conn.handle
return f"{client.s3_staging_dir}tables/{str(uuid4())}/"
creds = conn.credentials
if s3_data_dir is not None:
return s3_data_dir
else:
return path.join(creds.s3_staging_dir, "tables")

@available
def s3_unique_location(self, external_location, strict_location, staging_dir, relation_name):
def s3_table_location(self, s3_data_dir: str, s3_data_naming: str, schema_name: str, table_name: str) -> str:
"""
Generate a unique not overlapping location.
Returns either a UUID or database/table prefix for storing a table,
depending on the value of s3_table
"""
unique_id = str(uuid4())
if external_location is not None:
if not strict_location:
if external_location.endswith("/"):
external_location = external_location[:-1]
external_location = f"{external_location}_{unique_id}/"
else:
base_path = path.join(staging_dir, f"{relation_name}_{unique_id}")
external_location = f"{base_path}/"
return external_location
mapping = {
"uuid": path.join(self.s3_table_prefix(s3_data_dir), str(uuid4())) + "/",
"table": path.join(self.s3_table_prefix(s3_data_dir), table_name) + "/",
"table_unique": path.join(self.s3_table_prefix(s3_data_dir), table_name, str(uuid4())) + "/",
"schema_table": path.join(self.s3_table_prefix(s3_data_dir), schema_name, table_name) + "/",
"schema_table_unique": path.join(self.s3_table_prefix(s3_data_dir), schema_name, table_name, str(uuid4()))
+ "/",
}

table_location = mapping.get(s3_data_naming)

if table_location is None:
raise ValueError(f"Unknown value for s3_data_naming: {s3_data_naming}")

return table_location

@available
def clean_up_partitions(self, database_name: str, table_name: str, where_condition: str):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,20 +50,23 @@

{% macro create_iceberg_table_definition(relation, dest_columns) -%}
{%- set external_location = config.get('external_location', default=none) -%}
{%- set strict_location = config.get('strict_location', default=true) -%}
{%- set partitioned_by = config.get('partitioned_by', default=none) -%}
{%- set table_properties = config.get('table_properties', default={}) -%}
{%- set _ = table_properties.update({'table_type': 'ICEBERG'}) -%}
{%- set table_properties_formatted = [] -%}
{%- set dest_columns_with_type = [] -%}
{%- set s3_data_dir = config.get('s3_data_dir', default=target.s3_data_dir) -%}
{%- set s3_data_naming = config.get('s3_data_naming', default=target.s3_data_naming) -%}

{%- for k in table_properties -%}
{% set _ = table_properties_formatted.append("'" + k + "'='" + table_properties[k] + "'") -%}
{%- endfor -%}

{%- set table_properties_csv= table_properties_formatted | join(', ') -%}

{%- set external_location = adapter.s3_unique_location(external_location, strict_location, target.s3_staging_dir, relation.name) -%}
{%- if external_location is none %}
{%- set external_location = adapter.s3_table_location(s3_data_dir, s3_data_naming, relation.schema, relation.identifier) -%}
{%- endif %}

{%- for col in dest_columns -%}
{% set dtype = iceberg_data_type(col.dtype) -%}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,17 @@
{%- set field_delimiter = config.get('field_delimiter', default=none) -%}
{%- set format = config.get('format', default='parquet') -%}
{%- set write_compression = config.get('write_compression', default=none) -%}
{%- set s3_data_dir = config.get('s3_data_dir', default=target.s3_data_dir) -%}
{%- set s3_data_naming = config.get('s3_data_naming', default=target.s3_data_naming) -%}

create table
{{ relation }}

with (
{%- if external_location is not none and not temporary %}
external_location='{{ external_location }}',
{%- else -%}
external_location='{{ adapter.s3_table_location(s3_data_dir, s3_data_naming, relation.schema, relation.identifier) }}',
{%- endif %}
{%- if partitioned_by is not none %}
partitioned_by=ARRAY{{ partitioned_by | tojson | replace('\"', '\'') }},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
{% macro athena__create_csv_table(model, agate_table) %}
{%- set column_override = model['config'].get('column_types', {}) -%}
{%- set quote_seed_column = model['config'].get('quote_columns', None) -%}
{%- set s3_data_dir = config.get('s3_data_dir', default=target.s3_data_dir) -%}
{%- set s3_data_naming = model['config'].get('s3_data_naming', target.s3_data_naming) -%}

{% set sql %}
create external table {{ this.render() }} (
Expand All @@ -21,7 +23,7 @@
{%- endfor -%}
)
stored as parquet
location '{{ adapter.s3_uuid_table_location() }}'
location '{{ adapter.s3_table_location(s3_data_dir, s3_data_naming, model["schema"], model["alias"]) }}'
tblproperties ('classification'='parquet')
{% endset %}

Expand Down