Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updating dispatching methods to ensure dbt-synapse adapter can use ad… #178

Merged
merged 17 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .github/workflows/integration-tests-azure.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ jobs:
name: Regular
strategy:
fail-fast: false
max-parallel: 1
matrix:
python_version: ["3.8", "3.9", "3.10", "3.11"]
profile: ["ci_azure_auto"]
python_version: ["3.11"]
prdpsvs marked this conversation as resolved.
Show resolved Hide resolved
msodbc_version: ["17", "18"]
max-parallel: 1

runs-on: ubuntu-latest
container:
image: ghcr.io/${{ github.repository }}:CI-${{ matrix.python_version }}-msodbc${{ matrix.msodbc_version }}
Expand Down
2 changes: 1 addition & 1 deletion dbt/adapters/fabric/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = "1.8.5"
version = "1.8.6"
4 changes: 2 additions & 2 deletions dbt/include/fabric/macros/adapters/columns.sql
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@

{% macro fabric__alter_column_type(relation, column_name, new_column_type) %}

{%- set table_name= tmp_relation.include(database=False).include(schema=False)-%}
{%- set schema_name = tmp_relation.include(database=False).include(identifier=False) -%}
{%- set table_name= relation.include(database=False).include(schema=False)-%}
{%- set schema_name = relation.include(database=False).include(identifier=False) -%}
prdpsvs marked this conversation as resolved.
Show resolved Hide resolved

{% set generate_tmp_relation_script %}
SELECT TRIM(REPLACE(STRING_AGG(ColumnName + ' ', ',-'), '-', CHAR(10))) AS ColumnDef
Expand Down
16 changes: 14 additions & 2 deletions dbt/include/fabric/macros/adapters/metadata.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,18 @@
information_schema
{%- endmacro %}

{% macro get_use_database_sql(database) %}
{{ return(adapter.dispatch('get_use_database_sql', 'dbt')(database)) }}
{% endmacro %}

{%- macro default__get_use_database_sql(database) -%}
prdpsvs marked this conversation as resolved.
Show resolved Hide resolved
{%- endmacro -%}


{%- macro fabric__get_use_database_sql(database) -%}
USE [{{database}}];
prdpsvs marked this conversation as resolved.
Show resolved Hide resolved
{%- endmacro -%}

{% macro fabric__list_schemas(database) %}
{% call statement('list_schemas', fetch_result=True, auto_begin=False) -%}
select name as [schema]
Expand All @@ -27,7 +39,7 @@

{% macro fabric__list_relations_without_caching(schema_relation) -%}
{% call statement('list_relations_without_caching', fetch_result=True) -%}
USE [{{ schema_relation.database }}];
{{ get_use_database_sql(schema_relation.database) }}
with base as (
select
DB_NAME() as [database],
Expand All @@ -51,7 +63,7 @@

{% macro fabric__get_relation_without_caching(schema_relation) -%}
{% call statement('get_relation_without_caching', fetch_result=True) -%}
USE [{{ schema_relation.database }}];
{{ get_use_database_sql(schema_relation.database) }}
with base as (
select
DB_NAME() as [database],
Expand Down
52 changes: 22 additions & 30 deletions dbt/include/fabric/macros/adapters/relation.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6,51 +6,43 @@
{{ return(temp_relation) }}
{% endmacro %}

{% macro fabric__drop_relation(relation) -%}
{% call statement('drop_relation', auto_begin=False) -%}
{{ fabric__drop_relation_script(relation) }}
{%- endcall %}
{% endmacro %}

{% macro fabric__drop_relation_script(relation) -%}

{% if relation.type == 'view' -%}
{% macro fabric__get_drop_sql(relation) -%}
{% if relation.type == 'view' -%}
{% call statement('find_references', fetch_result=true) %}
USE [{{ relation.database }}];
select
sch.name as schema_name,
obj.name as view_name
from sys.sql_expression_dependencies refs
inner join sys.objects obj
on refs.referencing_id = obj.object_id
inner join sys.schemas sch
on obj.schema_id = sch.schema_id
where refs.referenced_database_name = '{{ relation.database }}'
and refs.referenced_schema_name = '{{ relation.schema }}'
and refs.referenced_entity_name = '{{ relation.identifier }}'
and refs.referencing_class = 1
and obj.type = 'V'
{{ get_use_database_sql(relation.database) }}
select
sch.name as schema_name,
obj.name as view_name
from sys.sql_expression_dependencies refs
inner join sys.objects obj
on refs.referencing_id = obj.object_id
inner join sys.schemas sch
on obj.schema_id = sch.schema_id
where refs.referenced_database_name = '{{ relation.database }}'
and refs.referenced_schema_name = '{{ relation.schema }}'
and refs.referenced_entity_name = '{{ relation.identifier }}'
and refs.referencing_class = 1
and obj.type = 'V'
{% endcall %}
{% set references = load_result('find_references')['data'] %}
{% for reference in references -%}
-- dropping referenced view {{ reference[0] }}.{{ reference[1] }}
{{ fabric__drop_relation_script(relation.incorporate(
type="view",
path={"schema": reference[0], "identifier": reference[1]})) }}
-- dropping referenced view {{ reference[0] }}.{{ reference[1] }}
{% do adapter.drop_relation(relation.incorporate(
prdpsvs marked this conversation as resolved.
Show resolved Hide resolved
type="view",
path={"schema": reference[0], "identifier": reference[1]})) %}
{% endfor %}
{% elif relation.type == 'table'%}
{% set object_id_type = 'U' %}

{%- else -%}
{{ exceptions.raise_not_implemented('Invalid relation being dropped: ' ~ relation) }}
{% endif %}
USE [{{ relation.database }}];
{{ get_use_database_sql(relation.database) }}
EXEC('DROP {{ relation.type }} IF EXISTS {{ relation.include(database=False) }};');
{% endmacro %}

{% macro fabric__rename_relation(from_relation, to_relation) -%}
{% call statement('rename_relation') -%}
USE [{{ from_relation.database }}];
{{ get_use_database_sql(from_relation.database) }}
EXEC sp_rename '{{ from_relation.schema }}.{{ from_relation.identifier }}', '{{ to_relation.identifier }}'
{%- endcall %}
{% endmacro %}
Expand Down
4 changes: 2 additions & 2 deletions dbt/include/fabric/macros/adapters/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

{% macro fabric__create_schema_with_authorization(relation, schema_authorization) -%}
{% call statement('create_schema') -%}
USE [{{ relation.database }}];
{{ get_use_database_sql(relation.database) }}
IF NOT EXISTS (SELECT * FROM sys.schemas WHERE name = '{{ relation.schema }}')
BEGIN
EXEC('CREATE SCHEMA [{{ relation.schema }}] AUTHORIZATION [{{ schema_authorization }}]')
Expand All @@ -27,7 +27,7 @@
identifier=row[1],
type=row[3]
) -%}
{% do drop_relation(schema_relation) %}
{% do adapter.drop_relation(schema_relation) %}
{%- endfor %}

{% call statement('drop_schema') -%}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,15 @@

{%- set full_refresh_mode = (should_full_refresh()) -%}
{% set target_relation = this.incorporate(type='table') %}
{%- set relations_list = fabric__get_relation_without_caching(target_relation) -%}
{%- set relation = adapter.get_relation(database=this.database, schema=this.schema, identifier=this.identifier) -%}
prdpsvs marked this conversation as resolved.
Show resolved Hide resolved
prdpsvs marked this conversation as resolved.
Show resolved Hide resolved

{%- set existing_relation = none %}
{% if (relations_list|length == 1) and (relations_list[0][2] == target_relation.schema)
and (relations_list[0][1] == target_relation.identifier) and (relations_list[0][3] == target_relation.type)%}
{% if (relation.type == target_relation.type) and (relation.identifier == target_relation.identifier) and (relation.schema == target_relation.schema) and (relation.database == target_relation.database) %}
prdpsvs marked this conversation as resolved.
Show resolved Hide resolved
{% set existing_relation = target_relation %}
{% elif (relations_list|length == 1) and (relations_list[0][2] == target_relation.schema)
and (relations_list[0][1] == target_relation.identifier) and (relations_list[0][3] != target_relation.type) %}
{% set existing_relation = get_or_create_relation(relations_list[0][0], relations_list[0][2] , relations_list[0][1] , relations_list[0][3])[1] %}
{% elif (relation.type != target_relation.type) and (relation.identifier == target_relation.identifier) and (relation.schema == target_relation.schema) and (relation.database == target_relation.database) %}
{% set existing_relation = get_or_create_relation(relation.database, relation.schema, relation.identifier, relation.type)[1] %}
{% endif %}

{{ log("Full refresh mode" ~ full_refresh_mode)}}
{{ log("existing relation : "~existing_relation ~ " type "~ existing_relation.type ~ " is view? "~existing_relation.is_view) }}
{{ log("target relation: " ~target_relation ~ " type "~ target_relation.type ~ " is view? "~target_relation.is_view) }}

-- configs
{%- set unique_key = config.get('unique_key') -%}
{% set incremental_strategy = config.get('incremental_strategy') or 'default' %}
Expand All @@ -28,35 +22,39 @@

{{ run_hooks(pre_hooks, inside_transaction=True) }}

{% if existing_relation is none %}
-- naming a temp relation
{% set tmp_relation_view = target_relation.incorporate(path={"identifier": target_relation.identifier ~ '__dbt_tmp_vw'}, type='view')-%}

-- Fabric & Synapse adapters use temp relation because of lack of CTE support for CTE in CTAS, Insert
-- drop temp relation if exists
{% do adapter.drop_relation(tmp_relation_view) %}

{% if existing_relation is none %}
{%- call statement('main') -%}
{{ fabric__create_table_as(False, target_relation, sql)}}
{{ get_create_table_as_sql(False, target_relation, sql)}}
{%- endcall -%}

{% elif existing_relation.is_view %}

{#-- Can't overwrite a view with a table - we must drop --#}
{{ log("Dropping relation " ~ target_relation ~ " because it is a view and this model is a table.") }}
{{ drop_relation_if_exists(existing_relation) }}
{% do adapter.drop_relation(existing_relation) %}

{%- call statement('main') -%}
{{ fabric__create_table_as(False, target_relation, sql)}}
{{ get_create_table_as_sql(False, target_relation, sql)}}
{%- endcall -%}

{% elif full_refresh_mode %}

{%- call statement('main') -%}
{{ fabric__create_table_as(False, target_relation, sql)}}
{{ get_create_table_as_sql(False, target_relation, sql)}}
{%- endcall -%}

{% else %}

{%- call statement('create_tmp_relation') -%}
{{ fabric__create_table_as(True, temp_relation, sql)}}
{{ get_create_table_as_sql(True, temp_relation, sql)}}
{%- endcall -%}
{% do adapter.expand_target_column_types(
from_relation=temp_relation,
to_relation=target_relation) %}
from_relation=temp_relation,
to_relation=target_relation) %}
{#-- Process schema changes. Returns dict of changes if successful. Use source columns for upserting/merging --#}
{% set dest_columns = process_schema_changes(on_schema_change, temp_relation, existing_relation) %}
{% if not dest_columns %}
Expand All @@ -72,14 +70,12 @@
{%- endcall -%}
{% endif %}

{% do drop_relation_if_exists(temp_relation) %}
{% do adapter.drop_relation(tmp_relation_view) %}
{% do adapter.drop_relation(temp_relation) %}
{{ run_hooks(post_hooks, inside_transaction=True) }}

{% set target_relation = target_relation.incorporate(type='table') %}

{% set should_revoke = should_revoke(existing_relation, full_refresh_mode) %}
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}

{% do persist_docs(target_relation, model) %}
{% do adapter.commit() %}
{{ return({'relations': [target_relation]}) }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
{%- set target_relation = this.incorporate(type='table') -%}

{% call statement('main') %}
{{ fabric__drop_relation_script(target_relation) }}
{% do adapter.drop_relation(target_relation) %}
{{ create_or_replace_clone(target_relation, defer_relation) }}
{% endcall %}
{{ return({'relations': [target_relation]}) }}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
{% macro build_columns_constraints(relation) %}
{{ return(adapter.dispatch('build_columns_constraints', 'dbt')(relation)) }}
{% endmacro %}

{%- macro default__build_columns_constraints(relation) -%}
prdpsvs marked this conversation as resolved.
Show resolved Hide resolved
{%- endmacro -%}

{% macro fabric__build_columns_constraints(relation) %}
{# loop through user_provided_columns to create DDL with data types and constraints #}
{%- set raw_column_constraints = adapter.render_raw_columns_constraints(raw_columns=model['columns']) -%}
Expand All @@ -8,6 +15,13 @@
)
{% endmacro %}

{% macro build_model_constraints(relation) %}
{{ return(adapter.dispatch('build_model_constraints', 'dbt')(relation)) }}
{% endmacro %}

{%- macro default__build_model_constraints(relation) -%}
prdpsvs marked this conversation as resolved.
Show resolved Hide resolved
{%- endmacro -%}

{% macro fabric__build_model_constraints(relation) %}
{# loop through user_provided_columns to create DDL with data types and constraints #}
{%- set raw_model_constraints = adapter.render_raw_model_constraints(raw_constraints=model['constraints']) -%}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,14 @@
{% macro fabric__create_table_as(temporary, relation, sql) -%}

{% set tmp_relation = relation.incorporate(
path={"identifier": relation.identifier.replace("#", "") ~ '_temp_view'},
type='view')-%}
{% do run_query(fabric__drop_relation_script(tmp_relation)) %}
{% set tmp_relation = relation.incorporate(path={"identifier": relation.identifier ~ '__dbt_tmp_vw'}, type='view')-%}
{{ get_create_view_as_sql(tmp_relation, sql) }}

{% set contract_config = config.get('contract') %}

{{ fabric__create_view_as(tmp_relation, sql) }}
{% set contract_config = config.get('contract') %}
{% if contract_config.enforced %}

CREATE TABLE [{{relation.database}}].[{{relation.schema}}].[{{relation.identifier}}]
{{ fabric__build_columns_constraints(relation) }}
{{ build_columns_constraints(relation) }}
{{ get_assert_columns_equivalent(sql) }}

{% set listColumns %}
{% for column in model['columns'] %}
{{ "["~column~"]" }}{{ ", " if not loop.last }}
Expand All @@ -24,9 +19,7 @@
({{listColumns}}) SELECT {{listColumns}} FROM [{{tmp_relation.database}}].[{{tmp_relation.schema}}].[{{tmp_relation.identifier}}];

{%- else %}
EXEC('CREATE TABLE [{{relation.database}}].[{{relation.schema}}].[{{relation.identifier}}] AS (SELECT * FROM [{{tmp_relation.database}}].[{{tmp_relation.schema}}].[{{tmp_relation.identifier}}]);');
EXEC('CREATE TABLE [{{relation.database}}].[{{relation.schema}}].[{{relation.identifier}}] AS (SELECT * FROM [{{tmp_relation.database}}].[{{tmp_relation.schema}}].[{{tmp_relation.identifier}}]);');
{% endif %}

{{ fabric__drop_relation_script(tmp_relation) }}

{% do adapter.drop_relation(tmp_relation)%}
{% endmacro %}
27 changes: 16 additions & 11 deletions dbt/include/fabric/macros/materializations/models/table/table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,9 @@

-- Load target relation
{%- set target_relation = this.incorporate(type='table') %}
-- Load existing relation
{%- set relation = fabric__get_relation_without_caching(this) %}

{% set existing_relation = none %}
{% if (relation|length == 1) %}
{% set existing_relation = get_or_create_relation(relation[0][0], relation[0][2] , relation[0][1] , relation[0][3])[1] %}
{% endif %}
{%- set existing_relation = adapter.get_relation(database=this.database, schema=this.schema, identifier=this.identifier) -%}

{%- set backup_relation = none %}
{{log("Existing Relation type is "~ existing_relation.type)}}
{% if (existing_relation != none and existing_relation.type == "table") %}
{%- set backup_relation = make_backup_relation(target_relation, 'table') -%}
{% elif (existing_relation != none and existing_relation.type == "view") %}
Expand All @@ -20,7 +13,7 @@

{% if (existing_relation != none) %}
-- drop the temp relations if they exist already in the database
{{ drop_relation_if_exists(backup_relation) }}
{% do adapter.drop_relation(backup_relation) %}
-- Rename target relation as backup relation
{{ adapter.rename_relation(existing_relation, backup_relation) }}
{% endif %}
Expand All @@ -32,13 +25,24 @@
-- `BEGIN` happens here:
{{ run_hooks(pre_hooks, inside_transaction=True) }}

-- naming a temp relation
{% set tmp_relation = target_relation.incorporate(path={"identifier": target_relation.identifier ~ '__dbt_tmp_vw'}, type='view')-%}
prdpsvs marked this conversation as resolved.
Show resolved Hide resolved

-- Fabric & Synapse adapters use temp relation because of lack of CTE support for CTE in CTAS, Insert
-- drop temp relation if exists
{% do adapter.drop_relation(tmp_relation) %}

-- build model
{% call statement('main') -%}
{{ get_create_table_as_sql(False, target_relation, sql) }}
{%- endcall %}

-- drop temp relation if exists
{% do adapter.drop_relation(tmp_relation) %}

-- cleanup
{{ run_hooks(post_hooks, inside_transaction=True) }}

{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}
{% do persist_docs(target_relation, model) %}
-- `COMMIT` happens here
Expand All @@ -48,10 +52,11 @@
-- finally, drop the foreign key references if exists
{{ drop_fk_indexes_on_table(backup_relation) }}
-- drop existing/backup relation after the commit
{{ drop_relation_if_exists(backup_relation) }}
{% do adapter.drop_relation(backup_relation) %}
{% endif %}

-- Add constraints including FK relation.
{{ fabric__build_model_constraints(target_relation) }}
{{ build_model_constraints(target_relation) }}
{{ run_hooks(post_hooks, inside_transaction=False) }}
{{ return({'relations': [target_relation]}) }}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

{%- set temp_view_sql = sql.replace("'", "''") -%}

USE [{{ relation.database }}];
{{ get_use_database_sql(relation.database) }}
{% set contract_config = config.get('contract') %}
{% if contract_config.enforced %}
{{ get_assert_columns_equivalent(sql) }}
Expand Down
Loading