diff --git a/README.md b/README.md index e345c03d..0171f768 100644 --- a/README.md +++ b/README.md @@ -38,9 +38,10 @@ - [Timestamp strategy](#timestamp-strategy) - [Check strategy](#check-strategy) - [Hard-deletes](#hard-deletes) - - [AWS Lakeformation integration](#aws-lakeformation-integration) - [Working example](#working-example) - [Snapshots Known issues](#snapshots-known-issues) + - [AWS Lakeformation integration](#aws-lakeformation-integration) + - [Python Models](#python-models) - [Contracts](#contracts) - [Contributing](#contributing) - [Contributors ✨](#contributors-) @@ -278,7 +279,7 @@ athena: } ``` -> Notes: +> Notes: > > - `lf_tags` and `lf_tags_columns` configs support only attaching lf tags to corresponding resources. > We recommend managing LF Tags permissions somewhere outside dbt. For example, you may use @@ -553,7 +554,113 @@ To use the check strategy refer to the [dbt docs](https://docs.getdbt.com/docs/b The materialization also supports invalidating hard deletes. Check the [docs](https://docs.getdbt.com/docs/build/snapshots#hard-deletes-opt-in) to understand usage. -### AWS Lakeformation integration +### Working example + +seed file - employent_indicators_november_2022_csv_tables.csv + +```csv +Series_reference,Period,Data_value,Suppressed +MEIM.S1WA,1999.04,80267, +MEIM.S1WA,1999.05,70803, +MEIM.S1WA,1999.06,65792, +MEIM.S1WA,1999.07,66194, +MEIM.S1WA,1999.08,67259, +MEIM.S1WA,1999.09,69691, +MEIM.S1WA,1999.1,72475, +MEIM.S1WA,1999.11,79263, +MEIM.S1WA,1999.12,86540, +MEIM.S1WA,2000.01,82552, +MEIM.S1WA,2000.02,81709, +MEIM.S1WA,2000.03,84126, +MEIM.S1WA,2000.04,77089, +MEIM.S1WA,2000.05,73811, +MEIM.S1WA,2000.06,70070, +MEIM.S1WA,2000.07,69873, +MEIM.S1WA,2000.08,71468, +MEIM.S1WA,2000.09,72462, +MEIM.S1WA,2000.1,74897, +``` + +model.sql + +```sql +{{ config( + materialized='table' +) }} + +select row_number() over() as id + , * + , cast(from_unixtime(to_unixtime(now())) as timestamp(6)) as refresh_timestamp +from {{ ref('employment_indicators_november_2022_csv_tables') }} +``` + +timestamp strategy - model_snapshot_1 + +```sql +{% snapshot model_snapshot_1 %} + +{{ + config( + strategy='timestamp', + updated_at='refresh_timestamp', + unique_key='id' + ) +}} + +select * +from {{ ref('model') }} {% endsnapshot %} +``` + +invalidate hard deletes - model_snapshot_2 + +```sql +{% snapshot model_snapshot_2 %} + +{{ + config + ( + unique_key='id', + strategy='timestamp', + updated_at='refresh_timestamp', + invalidate_hard_deletes=True, + ) +}} +select * +from {{ ref('model') }} {% endsnapshot %} +``` + +check strategy - model_snapshot_3 + +```sql +{% snapshot model_snapshot_3 %} + +{{ + config + ( + unique_key='id', + strategy='check', + check_cols=['series_reference','data_value'] + ) +}} +select * +from {{ ref('model') }} {% endsnapshot %} +``` + +### Snapshots Known issues + +- Incremental Iceberg models - Sync all columns on schema change can't remove columns used as partitioning. + The only way, from a dbt perspective, is to do a full-refresh of the incremental model. + +- Tables, schemas and database should only be lowercase + +- In order to avoid potential conflicts, make sure [`dbt-athena-adapter`](https://github.com/Tomme/dbt-athena) is not + installed in the target environment. + See for more details. + +- Snapshot does not support dropping columns from the source table. If you drop a column make sure to drop the column + from the snapshot as well. Another workaround is to NULL the column in the snapshot definition to preserve history + +## AWS Lakeformation integration The adapter implements AWS Lakeformation tags management in the following way: @@ -568,7 +675,7 @@ It's important to understand the following points: - dbt does not manage lf-tags for database - dbt does not manage lakeformation permissions -That's why you should handle this by yourself manually or using some automation tools like terraform, AWS CDK etc. +That's why you should handle this by yourself manually or using some automation tools like terraform, AWS CDK etc. You may find the following links useful to manage that: @@ -582,23 +689,23 @@ The adapter supports python models using [`spark`](https://docs.aws.amazon.com/a ### Setup -- A spark enabled work group created in athena +- A Spark-enabled workgroup created in Athena - Spark execution role granted access to Athena, Glue and S3 -- The spark work group is added to the ~/.dbt/profiles.yml file and the profile is referenced in dbt_project.yml - that will be created. It is recommended to keep this same as threads. +- The Spark workgroup is added to the `~/.dbt/profiles.yml` file and the profile + is referenced in `dbt_project.yml` that will be created. It is recommended to keep this same as threads. -### Spark specific table configuration +### Spark-specific table configuration - `timeout` (`default=43200`) - - Time out in seconds for each python model execution. Defaults to 12 hours/43200 seconds. + - Time out in seconds for each Python model execution. Defaults to 12 hours/43200 seconds. - `spark_encryption` (`default=false`) - If this flag is set to true, encrypts data in transit between Spark nodes and also encrypts data at rest stored locally by Spark. - `spark_cross_account_catalog` (`default=false`) - - In spark, you can query the external account catalog and for that the consumer account has to be configured to + - In Spark, you can query the external account catalog and for that the consumer account has to be configured to access the producer catalog. - If this flag is set to true, "/" can be used as the glue catalog separator. - Ex: 999999999999/mydatabase.cloudfront_logs (*where *999999999999* is the external catalog id*) + Ex: 999999999999/mydatabase.cloudfront_logs (*where *999999999999* is the external catalog ID*) - `spark_requester_pays` (`default=false`) - When an Amazon S3 bucket is configured as requester pays, the account of the user running the query is charged for data access and data transfer fees associated with the query. @@ -608,11 +715,12 @@ The adapter supports python models using [`spark`](https://docs.aws.amazon.com/a - A session is created for each unique engine configuration defined in the models that are part of the invocation. - A session's idle timeout is set to 10 minutes. Within the timeout period, if there is a new calculation - (spark python model) ready for execution and the engine configuration matches, the process will reuse the same session. -- Number of python models running at a time depends on the `threads`. Number of sessions created for the entire run - depends on number of unique engine configurations and availability of session to maintain threads concurrency. -- For iceberg table, it is recommended to use table_properties configuration to set the format_version to 2. This is to - maintain compatability between iceberg tables created by Trino with those created by Spark. + (Spark Python model) ready for execution and the engine configuration matches, the process will reuse the same session. +- The number of Python models running at a time depends on the `threads`. The number of sessions created for the + entire run depends on the number of unique engine configurations and the availability of sessions to maintain + thread concurrency. +- For Iceberg tables, it is recommended to use `table_properties` configuration to set the `format_version` to 2. + This is to maintain compatibility between Iceberg tables created by Trino with those created by Spark. ### Example models @@ -714,118 +822,22 @@ def model(dbt, spark_session): return df.withColumn("udf_test_col", udf_with_import(col("alpha"))) ``` -#### Known issues in python models - -- Incremental models do not fully utilize spark capabilities. They depend partially on existing sql based logic which - runs on trino. -- Snapshots materializations are not supported. +### Known issues in Python models + +- Python models cannot + [reference Athena SQL views](https://docs.aws.amazon.com/athena/latest/ug/notebooks-spark.html). +- Third-party Python libraries can be used, but they must be [included in the pre-installed list]([pre-installed list]) + or [imported manually]([imported manually]). +- Python models can only reference or write to tables with names meeting the + regular expression: `^[0-9a-zA-Z_]+$`. Dashes and special characters are not + supported by Spark, even though Athena supports them. +- Incremental models do not fully utilize Spark capabilities. They depend partially on existing SQL-based logic which + runs on Trino. +- Snapshot materializations are not supported. - Spark can only reference tables within the same catalog. -### Working example - -seed file - employent_indicators_november_2022_csv_tables.csv - -```csv -Series_reference,Period,Data_value,Suppressed -MEIM.S1WA,1999.04,80267, -MEIM.S1WA,1999.05,70803, -MEIM.S1WA,1999.06,65792, -MEIM.S1WA,1999.07,66194, -MEIM.S1WA,1999.08,67259, -MEIM.S1WA,1999.09,69691, -MEIM.S1WA,1999.1,72475, -MEIM.S1WA,1999.11,79263, -MEIM.S1WA,1999.12,86540, -MEIM.S1WA,2000.01,82552, -MEIM.S1WA,2000.02,81709, -MEIM.S1WA,2000.03,84126, -MEIM.S1WA,2000.04,77089, -MEIM.S1WA,2000.05,73811, -MEIM.S1WA,2000.06,70070, -MEIM.S1WA,2000.07,69873, -MEIM.S1WA,2000.08,71468, -MEIM.S1WA,2000.09,72462, -MEIM.S1WA,2000.1,74897, -``` - -model.sql - -```sql -{{ config( - materialized='table' -) }} - -select row_number() over() as id - , * - , cast(from_unixtime(to_unixtime(now())) as timestamp(6)) as refresh_timestamp -from {{ ref('employment_indicators_november_2022_csv_tables') }} -``` - -timestamp strategy - model_snapshot_1 - -```sql -{% snapshot model_snapshot_1 %} - -{{ - config( - strategy='timestamp', - updated_at='refresh_timestamp', - unique_key='id' - ) -}} - -select * -from {{ ref('model') }} {% endsnapshot %} -``` - -invalidate hard deletes - model_snapshot_2 - -```sql -{% snapshot model_snapshot_2 %} - -{{ - config - ( - unique_key='id', - strategy='timestamp', - updated_at='refresh_timestamp', - invalidate_hard_deletes=True, - ) -}} -select * -from {{ ref('model') }} {% endsnapshot %} -``` - -check strategy - model_snapshot_3 - -```sql -{% snapshot model_snapshot_3 %} - -{{ - config - ( - unique_key='id', - strategy='check', - check_cols=['series_reference','data_value'] - ) -}} -select * -from {{ ref('model') }} {% endsnapshot %} -``` - -### Snapshots Known issues - -- Incremental Iceberg models - Sync all columns on schema change can't remove columns used as partitioning. - The only way, from a dbt perspective, is to do a full-refresh of the incremental model. - -- Tables, schemas and database should only be lowercase - -- In order to avoid potential conflicts, make sure [`dbt-athena-adapter`](https://github.com/Tomme/dbt-athena) is not - installed in the target environment. - See for more details. - -- Snapshot does not support dropping columns from the source table. If you drop a column make sure to drop the column - from the snapshot as well. Another workaround is to NULL the column in the snapshot definition to preserve history +[pre-installed list]: https://docs.aws.amazon.com/athena/latest/ug/notebooks-spark-preinstalled-python-libraries.html +[imported manually]: https://docs.aws.amazon.com/athena/latest/ug/notebooks-import-files-libraries.html ## Contracts