Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP - Glue integration #529

Closed
Closed
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
5b9dcac
fix: indentation error of generate_schema_baseline_test macro
Arun-kc Jul 29, 2023
e05cd0d
feat: add incremental_strategy for dbt-glue
Arun-kc Aug 13, 2023
6f3eb70
Merge branch 'elementary-data:master' into ele-47-add-integration-wit…
Arun-kc Aug 13, 2023
9ded65c
Merge branch 'ele-47-add-integration-with-s3' of https://github.com/A…
Arun-kc Aug 13, 2023
4cce0c1
feat: add glue incremental materialization
Arun-kc Aug 14, 2023
1cf8b8f
feat: add glue table materialization
Arun-kc Aug 15, 2023
f2d0e38
Merge branch 'elementary-data:master' into ele-47-add-integration-wit…
Arun-kc Aug 20, 2023
d2580c2
Merge branch 'elementary-data:master' into ele-47-add-integration-wit…
Arun-kc Aug 22, 2023
1b24722
feat: add glue__replace_table_data
Arun-kc Aug 23, 2023
ee85c5d
Merge branch 'elementary-data:master' into ele-47-add-integration-wit…
Arun-kc Aug 23, 2023
afc20cd
Merge branch 'elementary-data:master' into ele-47-add-integration-wit…
Arun-kc Aug 31, 2023
9d27bf9
Merge branch 'elementary-data:master' into ele-47-add-integration-wit…
Arun-kc Sep 2, 2023
4445b12
fix: update glue incremental materialization
Arun-kc Sep 2, 2023
b65eea4
update create_table_like macro with glue
Arun-kc Sep 2, 2023
0a0aa76
add glue to replace_table_data
Arun-kc Sep 2, 2023
452f0f2
fixed glue__replace_table_data
Arun-kc Sep 3, 2023
3587f5c
add glue to make_temp_relation
Arun-kc Sep 3, 2023
0f5887e
update edr_quote_column with glue
Arun-kc Sep 3, 2023
74bb6c4
add glue to insert_rows
Arun-kc Sep 3, 2023
a890dc3
Merge branch 'elementary-data:master' into ele-47-add-integration-wit…
Arun-kc Sep 3, 2023
169d494
fix: explicit cast in query_table_metrics
Arun-kc Sep 4, 2023
82907bb
add glue timestamp cast
Arun-kc Sep 7, 2023
62e366f
Merge branch 'elementary-data:master' into ele-47-add-integration-wit…
Arun-kc Sep 7, 2023
b93b90c
change models on_schema_change
Arun-kc Sep 8, 2023
685d52e
Merge branch 'ele-47-add-integration-with-s3' of https://github.com/A…
Arun-kc Sep 8, 2023
87a09fb
change model on_schema_change
Arun-kc Sep 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 47 additions & 23 deletions macros/commands/generate_schema_baseline_test.sql
Original file line number Diff line number Diff line change
@@ -1,26 +1,26 @@
{% macro generate_schema_baseline_test(name=none, include_sources=True, include_models=False, fail_on_added=False, enforce_types=False) %}
{% macro generate_schema_baseline_test(name=none, include_sources=True, include_models=False, fail_on_added=False, enforce_types=False, convert_to_lower=False) %}
{% if name %}
{{ generate_schema_baseline_test_for_node(name, fail_on_added=fail_on_added, enforce_types=enforce_types) }}
{{ generate_schema_baseline_test_for_node(name, fail_on_added=fail_on_added, enforce_types=enforce_types, convert_to_lower=convert_to_lower) }}
{% else %}
{{ generate_schema_baseline_test_for_all_nodes(include_sources=include_sources, include_models=include_models,
fail_on_added=fail_on_added, enforce_types=enforce_types) }}
fail_on_added=fail_on_added, enforce_types=enforce_types, convert_to_lower=convert_to_lower) }}
{% endif %}
{% endmacro %}

{% macro generate_schema_baseline_test_for_all_nodes(include_sources=True, include_models=False, fail_on_added=False, enforce_types=False) %}
{% macro generate_schema_baseline_test_for_all_nodes(include_sources=True, include_models=False, fail_on_added=False, enforce_types=False, convert_to_lower=False) %}
{% set nodes = elementary.get_nodes_from_graph() %}
{% for node in nodes %}
{% if node.package_name != 'elementary' and
((include_sources and node.resource_type == 'source') or
(include_models and node.resource_type == 'model')) %}
{% do print("Generating schema changes from baseline test for {} '{}':".format(node.resource_type, node.name)) %}
{{ generate_schema_baseline_test_for_node(node, fail_on_added=fail_on_added, enforce_types=enforce_types) }}
{{ generate_schema_baseline_test_for_node(node, fail_on_added=fail_on_added, enforce_types=enforce_types, convert_to_lower=convert_to_lower) }}
{% do print('----------------------------------') %}
{% endif %}
{% endfor %}
{% endmacro %}

{% macro generate_schema_baseline_test_for_node(node, fail_on_added=False, enforce_types=False) %}
{% macro generate_schema_baseline_test_for_node(node, fail_on_added=False, enforce_types=False, convert_to_lower=False) %}
{% if node is string %}
{% set node_name = node %}
{% set node = elementary.get_node_by_name(node_name) %}
Expand Down Expand Up @@ -52,13 +52,52 @@
{% do test_params.update({"enforce_types": "true"}) %}
{% endif %}

{# Common yaml for sources and models #}
{% set common_yaml %}
{# Full yaml for sources and models #}
{% set full_yaml %}
{%- if node.resource_type == 'source' %}
{{generate_schema_baseline_test_for_source(node, columns, test_params, convert_to_lower)}}
{% else %}
{{generate_schema_baseline_test_for_model(node, columns, test_params, convert_to_lower)}}
{% endif -%}
{% endset %}

{% do print(full_yaml) %}
{% endmacro %}

{% macro generate_schema_baseline_test_for_source(node, columns, test_params, convert_to_lower) %}
sources:
- name: {{ node.source_name }}
tables:
- name: {{ node.name }}
columns:
{%- for column in columns %}
- name: {{ column.name }}
{%- if convert_to_lower %}
data_type: {{ column.dtype|lower }}
{% else %}
data_type: {{ column.dtype }}
{% endif -%}
{% endfor %}
tests:
- elementary.schema_changes_from_baseline
{%- if test_params %}:
{%- for param, param_val in test_params.items() %}
{{param}}: {{param_val}}
{%- endfor -%}
{% endif -%}
{% endmacro %}

{% macro generate_schema_baseline_test_for_model(node, columns, test_params, convert_to_lower) %}
models:
- name: {{ node.name }}
columns:
{%- for column in columns %}
- name: {{ column.name }}
{%- if convert_to_lower %}
data_type: {{ column.dtype|lower }}
{% else %}
data_type: {{ column.dtype }}
{% endif -%}
{% endfor %}
tests:
- elementary.schema_changes_from_baseline
Expand All @@ -67,19 +106,4 @@
{{param}}: {{param_val}}
{%- endfor -%}
{% endif -%}
{% endset %}

{% set full_yaml %}
{%- if node.resource_type == 'source' %}
sources:
- name: {{ node.source_name }}
tables:
{{- common_yaml }}
{% else %}
models:
{{- common_yaml }}
{% endif -%}
{% endset %}

{% do print(full_yaml) %}
{% endmacro %}
11 changes: 11 additions & 0 deletions macros/edr/materializations/model/incremental.sql
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,14 @@
{% do elementary.cache_metrics(metrics) %}
{% do return(relations) %}
{% endmaterialization %}

{% materialization incremental, adapter="glue", supported_languages=["sql", "python"] %}
{% set relations = dbt.materialization_incremental_glue.call_macro() %}
{% if not elementary.is_elementary_enabled() %}
{% do return(relations) %}
{% endif %}

{% set metrics = elementary.query_metrics() %}
{% do elementary.cache_metrics(metrics) %}
{% do return(relations) %}
{% endmaterialization %}
2 changes: 1 addition & 1 deletion macros/edr/materializations/model/metrics.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{% macro query_table_metrics() %}
{% set query %}
select
{{ modules.datetime.datetime.utcnow().timestamp() }} as build_timestamp,
cast({{ modules.datetime.datetime.utcnow().timestamp() }} as float) as build_timestamp,
count(*) as row_count
from {{ this }}
{% endset %}
Expand Down
11 changes: 11 additions & 0 deletions macros/edr/materializations/model/table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,14 @@
{% do elementary.cache_metrics(metrics) %}
{% do return(relations) %}
{% endmaterialization %}

{% materialization table, adapter="glue" %}
{% set relations = dbt.materialization_table_glue() %}
{% if not elementary.is_elementary_enabled() %}
{% do return(relations) %}
{% endif %}

{% set metrics = elementary.query_metrics() %}
{% do elementary.cache_metrics(metrics) %}
{% do return(relations) %}
{% endmaterialization %}
2 changes: 2 additions & 0 deletions macros/utils/cross_db_utils/quote_column.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
{%- macro edr_quote_column(column_name) -%}
{% if adapter.quote(column_name[1:-1]) == column_name %}
{{ return(column_name) }}
{% elif target.type == 'glue' %}
{{ return(column_name) }}
{% else %}
{% set quoted_column = adapter.quote(column_name) %}
{{ return(quoted_column) }}
Expand Down
20 changes: 20 additions & 0 deletions macros/utils/table_operations/create_table_like.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
{% macro create_table_like(relation, like_relation, temporary=False, like_columns=none) %}
{{ return(adapter.dispatch('create_table_like', 'elementary')(relation, like_relation, temporary, like_columns)) }}
{% endmacro %}

{% macro default__create_table_like(relation, like_relation, temporary=False, like_columns=none) %}
{% set empty_table_query %}
SELECT
{% if like_columns %}
Expand All @@ -13,3 +17,19 @@
{% endset %}
{% do elementary.run_query(dbt.create_table_as(temporary, relation, empty_table_query)) %}
{% endmacro %}

{% macro glue__create_table_like(relation, like_relation, temporary=False, like_columns=none) %}
{% set empty_table_query %}
SELECT
{% if like_columns %}
{% for column in like_columns %}
{{ column }}{{ ", " if not loop.last }}
{% endfor %}
{% else %}
*
{% endif %}
FROM {{ like_relation }}
WHERE 1 = 0
{% endset %}
{% do dbt.glue_exec_query(dbt.create_table_as(temporary, relation, empty_table_query)) %}
{% endmacro %}
24 changes: 19 additions & 5 deletions macros/utils/table_operations/insert_rows.sql
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,21 @@
{% set queries_len = insert_rows_queries | length %}
{% for insert_query in insert_rows_queries %}
{% do elementary.file_log("[{}/{}] Running insert query.".format(loop.index, queries_len)) %}
{% do elementary.run_query(insert_query) %}
{% if target.type == 'glue' %}
{% do dbt.glue_exec_query(insert_query) %}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same reason as #r1318786335

{% else %}
{% do elementary.run_query(insert_query) %}
{% endif %}
{% endfor %}
{% elif insert_rows_method == 'chunk' %}
{% set rows_chunks = elementary.split_list_to_chunks(rows, chunk_size) %}
{% for rows_chunk in rows_chunks %}
{% set insert_rows_query = elementary.get_chunk_insert_query(table_relation, columns, rows_chunk) %}
{% do elementary.run_query(insert_rows_query) %}
{% if target.type == 'glue' %}
{% do dbt.glue_exec_query(insert_rows_query) %}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same reason as #r1318786335

{% else %}
{% do elementary.run_query(insert_rows_query) %}
{% endif %}
{% endfor %}
{% else %}
{% do exceptions.raise_compiler_error("Specified invalid value for 'insert_rows_method' var.") %}
Expand Down Expand Up @@ -57,7 +65,7 @@
{% do rendered_column_values.append(column_value) %}
{% else %}
{% set column_value = elementary.insensitive_get_dict_value(row, column.name) %}
{% do rendered_column_values.append(elementary.render_value(column_value)) %}
{% do rendered_column_values.append(elementary.render_value(column_value, column.data_type)) %}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Passing column data_type in order to cast timestamp columns for glue

{% endif %}
{% endfor %}
{% set row_sql = "({})".format(rendered_column_values | join(",")) %}
Expand Down Expand Up @@ -92,7 +100,7 @@
{% for row in rows -%}
({%- for column in columns -%}
{%- set column_value = elementary.insensitive_get_dict_value(row, column.name, none) -%}
{{ elementary.render_value(column_value) }}
{{ elementary.render_value(column_value, column.data_type) }}
{{- "," if not loop.last else "" -}}
{%- endfor -%}) {{- "," if not loop.last else "" -}}
{%- endfor -%}
Expand All @@ -116,10 +124,16 @@
{{- return(string_value | replace("'", "''")) -}}
{%- endmacro -%}

{%- macro render_value(value) -%}
{%- macro glue__escape_special_chars(string_value) -%}
{{- return(string_value | replace("'", '"')) -}}
{%- endmacro -%}

{%- macro render_value(value, data_type=none) -%}
{%- if value is defined and value is not none -%}
{%- if value is number -%}
{{- value -}}
{%- elif target.type == 'glue' and data_type=='timestamp' -%}
cast('{{- value -}}' as timestamp)
{%- elif value is string -%}
'{{- elementary.escape_special_chars(value) -}}'
{%- elif value is mapping or value is sequence -%}
Expand Down
9 changes: 9 additions & 0 deletions macros/utils/table_operations/make_temp_relation.sql
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,12 @@
{% do return(tmp_relation) %}
{% endmacro %}

{% macro glue__make_temp_table_relation(base_relation, suffix) %}
{% set tmp_identifier = elementary.table_name_with_suffix(base_relation.identifier, suffix) %}
{% set tmp_relation = api.Relation.create(
identifier=tmp_identifier,
schema=base_relation.schema,
database=base_relation.database,
type='table') %}
{% do return(tmp_relation) %}
{% endmacro %}
12 changes: 12 additions & 0 deletions macros/utils/table_operations/replace_table_data.sql
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,15 @@

{% do adapter.drop_relation(intermediate_relation) %}
{% endmacro %}

{# Glue - truncate and insert (non-atomic) #}
{% macro glue__replace_table_data(relation, rows) %}
{% set intermediate_relation = elementary.create_intermediate_relation(relation, rows, temporary=True) %}
{% do dbt.glue_exec_query(dbt.get_insert_overwrite_sql(intermediate_relation, relation)) %}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally should be using run_query, but not able to pass the flag as 'False' for DDL and DML statements which in turn is causing issues at run_query when it tries to change the case of columns into lowercase. So using glue_exec_query for now.

Note: When DDL and DML statements are passed to run_query in dbt it will return none


{% set query %}
drop table if exists {{ intermediate_relation }}
{% endset %}

{% do dbt.glue_exec_query(query) %}
{% endmacro %}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{{
config(
materialized='incremental',
incremental_strategy='insert_overwrite',
unique_key='id',
on_schema_change='append_new_columns',
full_refresh=elementary.get_config_var('elementary_full_refresh'),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{{
config(
materialized='incremental',
incremental_strategy='insert_overwrite',
unique_key = 'column_state_id',
on_schema_change = 'append_new_columns',
full_refresh=elementary.get_config_var('elementary_full_refresh'),
Expand Down
3 changes: 2 additions & 1 deletion models/edr/dbt_artifacts/dbt_exposures.sql
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
{{
config(
materialized='incremental',
incremental_strategy='insert_overwrite',
transient=False,
post_hook='{{ elementary.upload_dbt_exposures() }}',
unique_key='unique_id',
on_schema_change='sync_all_columns',
on_schema_change='append_new_columns',
full_refresh=elementary.get_config_var('elementary_full_refresh')
)
}}
Expand Down
1 change: 1 addition & 0 deletions models/edr/dbt_artifacts/dbt_invocations.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{{
config(
materialized = 'incremental',
incremental_strategy='insert_overwrite',
transient=False,
unique_key = 'invocation_id',
on_schema_change = 'append_new_columns',
Expand Down
3 changes: 2 additions & 1 deletion models/edr/dbt_artifacts/dbt_metrics.sql
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
{{
config(
materialized='incremental',
incremental_strategy='insert_overwrite',
transient=False,
post_hook='{{ elementary.upload_dbt_metrics() }}',
unique_key='unique_id',
on_schema_change='sync_all_columns',
on_schema_change='append_new_columns',
full_refresh=elementary.get_config_var('elementary_full_refresh')
)
}}
Expand Down
3 changes: 2 additions & 1 deletion models/edr/dbt_artifacts/dbt_models.sql
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
{{
config(
materialized='incremental',
incremental_strategy='insert_overwrite',
transient=False,
post_hook='{{ elementary.upload_dbt_models() }}',
unique_key='unique_id',
on_schema_change='sync_all_columns',
on_schema_change='append_new_columns',
full_refresh=elementary.get_config_var('elementary_full_refresh')
)
}}
Expand Down
1 change: 1 addition & 0 deletions models/edr/dbt_artifacts/dbt_run_results.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{{
config(
materialized = 'incremental',
incremental_strategy='insert_overwrite',
transient=False,
unique_key = 'model_execution_id',
on_schema_change = 'append_new_columns',
Expand Down
3 changes: 2 additions & 1 deletion models/edr/dbt_artifacts/dbt_seeds.sql
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
{{
config(
materialized='incremental',
incremental_strategy='insert_overwrite',
transient=False,
post_hook='{{ elementary.upload_dbt_seeds() }}',
unique_key='unique_id',
on_schema_change='sync_all_columns',
on_schema_change='append_new_columns',
full_refresh=elementary.get_config_var('elementary_full_refresh')
)
}}
Expand Down
3 changes: 2 additions & 1 deletion models/edr/dbt_artifacts/dbt_snapshots.sql
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
{{
config(
materialized='incremental',
incremental_strategy='insert_overwrite',
transient=False,
post_hook='{{ elementary.upload_dbt_snapshots() }}',
unique_key='unique_id',
on_schema_change='sync_all_columns',
on_schema_change='append_new_columns',
full_refresh=elementary.get_config_var('elementary_full_refresh')
)
}}
Expand Down
3 changes: 2 additions & 1 deletion models/edr/dbt_artifacts/dbt_sources.sql
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
{{
config(
materialized='incremental',
incremental_strategy='insert_overwrite',
transient=False,
post_hook='{{ elementary.upload_dbt_sources() }}',
unique_key='unique_id',
on_schema_change='sync_all_columns',
on_schema_change='append_new_columns',
full_refresh=elementary.get_config_var('elementary_full_refresh')
)
}}
Expand Down
1 change: 1 addition & 0 deletions models/edr/dbt_artifacts/dbt_tests.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{{
config(
materialized='incremental',
incremental_strategy='insert_overwrite',
transient=False,
post_hook='{{ elementary.upload_dbt_tests() }}',
unique_key='unique_id',
Expand Down
Loading
Loading