Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updating dispatching methods to ensure dbt-synapse adapter can use ad… #178

Merged
merged 17 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .github/workflows/integration-tests-azure.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ jobs:
name: Regular
strategy:
fail-fast: false
max-parallel: 1
matrix:
python_version: ["3.8", "3.9", "3.10", "3.11"]
profile: ["ci_azure_auto"]
python_version: ["3.11"]
prdpsvs marked this conversation as resolved.
Show resolved Hide resolved
msodbc_version: ["17", "18"]
max-parallel: 1

runs-on: ubuntu-latest
container:
image: ghcr.io/${{ github.repository }}:CI-${{ matrix.python_version }}-msodbc${{ matrix.msodbc_version }}
Expand Down
2 changes: 1 addition & 1 deletion dbt/adapters/fabric/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = "1.8.5"
version = "1.8.6"
6 changes: 6 additions & 0 deletions dbt/adapters/fabric/fabric_connection_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,12 @@ def open(cls, connection: Connection) -> Connection:

con_str.append(f"Database={credentials.database}")

#Enabling trace flag
if credentials.trace_flag:
con_str.append("SQL_ATTR_TRACE=SQL_OPT_TRACE_ON")
else:
con_str.append("SQL_ATTR_TRACE=SQL_OPT_TRACE_OFF")

assert credentials.authentication is not None

if "ActiveDirectory" in credentials.authentication:
Expand Down
3 changes: 3 additions & 0 deletions dbt/adapters/fabric/fabric_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class FabricCredentials(Credentials):
UID: Optional[str] = None
PWD: Optional[str] = None
windows_login: Optional[bool] = False
trace_flag: Optional[bool] = False
tenant_id: Optional[str] = None
client_id: Optional[str] = None
client_secret: Optional[str] = None
Expand All @@ -36,6 +37,7 @@ class FabricCredentials(Credentials):
"app_secret": "client_secret",
"TrustServerCertificate": "trust_cert",
"schema_auth": "schema_authorization",
"SQL_ATTR_TRACE": "trace_flag",
}

@property
Expand Down Expand Up @@ -63,6 +65,7 @@ def _connection_keys(self):
"retries",
"login_timeout",
"query_timeout",
"trace_flag",
)

@property
Expand Down
4 changes: 2 additions & 2 deletions dbt/include/fabric/macros/adapters/columns.sql
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@

{% macro fabric__alter_column_type(relation, column_name, new_column_type) %}

{%- set table_name= tmp_relation.include(database=False).include(schema=False)-%}
{%- set schema_name = tmp_relation.include(database=False).include(identifier=False) -%}
{%- set table_name= relation.identifier -%}
{%- set schema_name = relation.schema -%}

{% set generate_tmp_relation_script %}
SELECT TRIM(REPLACE(STRING_AGG(ColumnName + ' ', ',-'), '-', CHAR(10))) AS ColumnDef
Expand Down
12 changes: 10 additions & 2 deletions dbt/include/fabric/macros/adapters/metadata.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@
information_schema
{%- endmacro %}

{% macro get_use_database_sql(database) %}
{{ return(adapter.dispatch('get_use_database_sql', 'dbt')(database)) }}
{% endmacro %}

{%- macro fabric__get_use_database_sql(database) -%}
USE [{{database}}];
prdpsvs marked this conversation as resolved.
Show resolved Hide resolved
{%- endmacro -%}

{% macro fabric__list_schemas(database) %}
{% call statement('list_schemas', fetch_result=True, auto_begin=False) -%}
select name as [schema]
Expand All @@ -27,7 +35,7 @@

{% macro fabric__list_relations_without_caching(schema_relation) -%}
{% call statement('list_relations_without_caching', fetch_result=True) -%}
USE [{{ schema_relation.database }}];
{{ get_use_database_sql(schema_relation.database) }}
with base as (
select
DB_NAME() as [database],
Expand All @@ -51,7 +59,7 @@

{% macro fabric__get_relation_without_caching(schema_relation) -%}
{% call statement('get_relation_without_caching', fetch_result=True) -%}
USE [{{ schema_relation.database }}];
{{ get_use_database_sql(schema_relation.database) }}
with base as (
select
DB_NAME() as [database],
Expand Down
53 changes: 23 additions & 30 deletions dbt/include/fabric/macros/adapters/relation.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6,51 +6,44 @@
{{ return(temp_relation) }}
{% endmacro %}

{% macro fabric__drop_relation(relation) -%}
{% call statement('drop_relation', auto_begin=False) -%}
{{ fabric__drop_relation_script(relation) }}
{%- endcall %}
{% endmacro %}

{% macro fabric__drop_relation_script(relation) -%}

{% if relation.type == 'view' -%}
{% macro fabric__get_drop_sql(relation) -%}
{% if relation.type == 'view' -%}
{% call statement('find_references', fetch_result=true) %}
USE [{{ relation.database }}];
select
sch.name as schema_name,
obj.name as view_name
from sys.sql_expression_dependencies refs
inner join sys.objects obj
on refs.referencing_id = obj.object_id
inner join sys.schemas sch
on obj.schema_id = sch.schema_id
where refs.referenced_database_name = '{{ relation.database }}'
and refs.referenced_schema_name = '{{ relation.schema }}'
and refs.referenced_entity_name = '{{ relation.identifier }}'
and refs.referencing_class = 1
and obj.type = 'V'
{{ get_use_database_sql(relation.database) }}
select
sch.name as schema_name,
obj.name as view_name
from sys.sql_expression_dependencies refs
inner join sys.objects obj
on refs.referencing_id = obj.object_id
inner join sys.schemas sch
on obj.schema_id = sch.schema_id
where refs.referenced_database_name = '{{ relation.database }}'
and refs.referenced_schema_name = '{{ relation.schema }}'
and refs.referenced_entity_name = '{{ relation.identifier }}'
and refs.referencing_class = 1
and obj.type = 'V'
{% endcall %}
{% set references = load_result('find_references')['data'] %}
{% for reference in references -%}
-- dropping referenced view {{ reference[0] }}.{{ reference[1] }}
{{ fabric__drop_relation_script(relation.incorporate(
type="view",
path={"schema": reference[0], "identifier": reference[1]})) }}
-- dropping referenced view {{ reference[0] }}.{{ reference[1] }}
{% do adapter.drop_relation
(api.Relation.create(
identifier = reference[1], schema = reference[0], database = relation.database, type='view'
))%}
{% endfor %}
{% elif relation.type == 'table'%}
{% set object_id_type = 'U' %}

{%- else -%}
{{ exceptions.raise_not_implemented('Invalid relation being dropped: ' ~ relation) }}
{% endif %}
USE [{{ relation.database }}];
{{ get_use_database_sql(relation.database) }}
EXEC('DROP {{ relation.type }} IF EXISTS {{ relation.include(database=False) }};');
{% endmacro %}

{% macro fabric__rename_relation(from_relation, to_relation) -%}
{% call statement('rename_relation') -%}
USE [{{ from_relation.database }}];
{{ get_use_database_sql(from_relation.database) }}
EXEC sp_rename '{{ from_relation.schema }}.{{ from_relation.identifier }}', '{{ to_relation.identifier }}'
{%- endcall %}
{% endmacro %}
Expand Down
4 changes: 2 additions & 2 deletions dbt/include/fabric/macros/adapters/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

{% macro fabric__create_schema_with_authorization(relation, schema_authorization) -%}
{% call statement('create_schema') -%}
USE [{{ relation.database }}];
{{ get_use_database_sql(relation.database) }}
IF NOT EXISTS (SELECT * FROM sys.schemas WHERE name = '{{ relation.schema }}')
BEGIN
EXEC('CREATE SCHEMA [{{ relation.schema }}] AUTHORIZATION [{{ schema_authorization }}]')
Expand All @@ -27,7 +27,7 @@
identifier=row[1],
type=row[3]
) -%}
{% do drop_relation(schema_relation) %}
{% do adapter.drop_relation(schema_relation) %}
{%- endfor %}

{% call statement('drop_schema') -%}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,16 @@

{% materialization incremental, adapter='fabric' -%}

{%- set full_refresh_mode = (should_full_refresh()) -%}
{% set target_relation = this.incorporate(type='table') %}
{%- set relations_list = fabric__get_relation_without_caching(target_relation) -%}
{%- set relation = load_cached_relation(this) -%}

{%- set existing_relation = none %}
{% if (relations_list|length == 1) and (relations_list[0][2] == target_relation.schema)
and (relations_list[0][1] == target_relation.identifier) and (relations_list[0][3] == target_relation.type)%}
{% if relation.type == 'table' %}
{% set existing_relation = target_relation %}
{% elif (relations_list|length == 1) and (relations_list[0][2] == target_relation.schema)
and (relations_list[0][1] == target_relation.identifier) and (relations_list[0][3] != target_relation.type) %}
{% set existing_relation = get_or_create_relation(relations_list[0][0], relations_list[0][2] , relations_list[0][1] , relations_list[0][3])[1] %}
{% elif relation.type == 'view' %}
{% set existing_relation = get_or_create_relation(relation.database, relation.schema, relation.identifier, relation.type)[1] %}
{% endif %}

{{ log("Full refresh mode" ~ full_refresh_mode)}}
{{ log("existing relation : "~existing_relation ~ " type "~ existing_relation.type ~ " is view? "~existing_relation.is_view) }}
{{ log("target relation: " ~target_relation ~ " type "~ target_relation.type ~ " is view? "~target_relation.is_view) }}

-- configs
{%- set unique_key = config.get('unique_key') -%}
{% set incremental_strategy = config.get('incremental_strategy') or 'default' %}
Expand All @@ -28,35 +21,39 @@

{{ run_hooks(pre_hooks, inside_transaction=True) }}

{% if existing_relation is none %}
-- naming a temp relation
{% set tmp_relation_view = target_relation.incorporate(path={"identifier": target_relation.identifier ~ '__dbt_tmp_vw'}, type='view')-%}

-- Fabric & Synapse adapters use temp relation because of lack of CTE support for CTE in CTAS, Insert
-- drop temp relation if exists
{% do adapter.drop_relation(tmp_relation_view) %}

{% if existing_relation is none %}
{%- call statement('main') -%}
{{ fabric__create_table_as(False, target_relation, sql)}}
{{ get_create_table_as_sql(False, target_relation, sql)}}
{%- endcall -%}

{% elif existing_relation.is_view %}

{#-- Can't overwrite a view with a table - we must drop --#}
{{ log("Dropping relation " ~ target_relation ~ " because it is a view and this model is a table.") }}
{{ drop_relation_if_exists(existing_relation) }}
{% do adapter.drop_relation(existing_relation) %}

{%- call statement('main') -%}
{{ fabric__create_table_as(False, target_relation, sql)}}
{{ get_create_table_as_sql(False, target_relation, sql)}}
{%- endcall -%}

{% elif full_refresh_mode %}

{%- call statement('main') -%}
{{ fabric__create_table_as(False, target_relation, sql)}}
{{ get_create_table_as_sql(False, target_relation, sql)}}
{%- endcall -%}

{% else %}

{%- call statement('create_tmp_relation') -%}
{{ fabric__create_table_as(True, temp_relation, sql)}}
{{ get_create_table_as_sql(True, temp_relation, sql)}}
{%- endcall -%}
{% do adapter.expand_target_column_types(
from_relation=temp_relation,
to_relation=target_relation) %}
from_relation=temp_relation,
to_relation=target_relation) %}
{#-- Process schema changes. Returns dict of changes if successful. Use source columns for upserting/merging --#}
{% set dest_columns = process_schema_changes(on_schema_change, temp_relation, existing_relation) %}
{% if not dest_columns %}
Expand All @@ -72,14 +69,12 @@
{%- endcall -%}
{% endif %}

{% do drop_relation_if_exists(temp_relation) %}
{% do adapter.drop_relation(tmp_relation_view) %}
{% do adapter.drop_relation(temp_relation) %}
{{ run_hooks(post_hooks, inside_transaction=True) }}

{% set target_relation = target_relation.incorporate(type='table') %}

{% set should_revoke = should_revoke(existing_relation, full_refresh_mode) %}
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}

{% do persist_docs(target_relation, model) %}
{% do adapter.commit() %}
{{ return({'relations': [target_relation]}) }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
{%- set target_relation = this.incorporate(type='table') -%}

{% call statement('main') %}
{{ fabric__drop_relation_script(target_relation) }}
{% do adapter.drop_relation(target_relation) %}
{{ create_or_replace_clone(target_relation, defer_relation) }}
{% endcall %}
{{ return({'relations': [target_relation]}) }}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
{% macro build_columns_constraints(relation) %}
{{ return(adapter.dispatch('build_columns_constraints', 'dbt')(relation)) }}
{% endmacro %}

{% macro fabric__build_columns_constraints(relation) %}
{# loop through user_provided_columns to create DDL with data types and constraints #}
{%- set raw_column_constraints = adapter.render_raw_columns_constraints(raw_columns=model['columns']) -%}
Expand All @@ -8,6 +12,10 @@
)
{% endmacro %}

{% macro build_model_constraints(relation) %}
{{ return(adapter.dispatch('build_model_constraints', 'dbt')(relation)) }}
{% endmacro %}

{% macro fabric__build_model_constraints(relation) %}
{# loop through user_provided_columns to create DDL with data types and constraints #}
{%- set raw_model_constraints = adapter.render_raw_model_constraints(raw_constraints=model['constraints']) -%}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,14 @@
{% macro fabric__create_table_as(temporary, relation, sql) -%}

{% set tmp_relation = relation.incorporate(
path={"identifier": relation.identifier.replace("#", "") ~ '_temp_view'},
type='view')-%}
{% do run_query(fabric__drop_relation_script(tmp_relation)) %}
{% set tmp_relation = relation.incorporate(path={"identifier": relation.identifier ~ '__dbt_tmp_vw'}, type='view')-%}
{{ get_create_view_as_sql(tmp_relation, sql) }}

{% set contract_config = config.get('contract') %}

{{ fabric__create_view_as(tmp_relation, sql) }}
{% set contract_config = config.get('contract') %}
{% if contract_config.enforced %}

CREATE TABLE [{{relation.database}}].[{{relation.schema}}].[{{relation.identifier}}]
{{ fabric__build_columns_constraints(relation) }}
{{ build_columns_constraints(relation) }}
{{ get_assert_columns_equivalent(sql) }}

{% set listColumns %}
{% for column in model['columns'] %}
{{ "["~column~"]" }}{{ ", " if not loop.last }}
Expand All @@ -24,9 +19,7 @@
({{listColumns}}) SELECT {{listColumns}} FROM [{{tmp_relation.database}}].[{{tmp_relation.schema}}].[{{tmp_relation.identifier}}];

{%- else %}
EXEC('CREATE TABLE [{{relation.database}}].[{{relation.schema}}].[{{relation.identifier}}] AS (SELECT * FROM [{{tmp_relation.database}}].[{{tmp_relation.schema}}].[{{tmp_relation.identifier}}]);');
EXEC('CREATE TABLE [{{relation.database}}].[{{relation.schema}}].[{{relation.identifier}}] AS (SELECT * FROM [{{tmp_relation.database}}].[{{tmp_relation.schema}}].[{{tmp_relation.identifier}}]);');
{% endif %}

{{ fabric__drop_relation_script(tmp_relation) }}

{% do adapter.drop_relation(tmp_relation)%}
{% endmacro %}
Loading