Skip to content

Commit

Permalink
Fix Delete Condition (#12)
Browse files Browse the repository at this point in the history
* removed delete_placeholder to use delete_condition properly

* added delete_condition example

* fixed list options

* fixed options render

* Increment version for build

* fixed the dictionary of options

* fixed the missing commas

* added heartbeat_table to options

* added parse_json_columns

* Fix typo in macro name

---------

Co-authored-by: Tanya Shemet <[email protected]>
  • Loading branch information
navado and tanyshak authored May 16, 2023
1 parent 1ab62a1 commit ce69d21
Show file tree
Hide file tree
Showing 10 changed files with 70 additions and 45 deletions.
2 changes: 1 addition & 1 deletion dbt/adapters/upsolver/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = "0.2.0"
version = "0.2.1"
28 changes: 12 additions & 16 deletions dbt/adapters/upsolver/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import agate
import datetime
import re
import dbt

logger = AdapterLogger("Upsolver")
LIST_RELATION_MACRO_NAME = "list_relation_without_caching"
Expand Down Expand Up @@ -44,10 +45,12 @@ def drop_schema(self, relation: UpsolverRelation) -> None:

@available
def get_connection_from_sql(self, sql):
connection_identifier = re.search('"(.*)"', sql).group().split('.')[2] \
.translate(str.maketrans({'\"':'', '\'':''}))

return connection_identifier
try:
connection_identifier = re.search('"(.*)"', sql).group().split('.')[2] \
.translate(str.maketrans({'\"':'', '\'':''}))
return connection_identifier
except Exception:
raise dbt.exceptions.ParsingError(f"Error while parsing connection name from sql:\n{sql}")

@available
def get_columns_names_with_types(self, list_dict):
Expand Down Expand Up @@ -77,8 +80,11 @@ def enrich_options(self, config_options, source, options_type):
for option, value in config_options.items():
find_value = options.get(option.lower(), None)
if find_value:
if options[option.lower()]['type'] == 'list' and isinstance(value, str):
value = f"('{value}')"
if options[option.lower()]['type'] == 'list':
if not isinstance(value, str):
value = tuple(i for i in value)
else:
value = f"('{value}')"
enriched_options[option] = find_value
enriched_options[option]['value'] = value
else:
Expand All @@ -103,16 +109,6 @@ def get_options(self, source, options_type):
options = Copy_options[source.lower()][options_type]
return options

@available
def get_delete_placeholder(self, sql, delete_condition):
if delete_condition:
delete_placeholder= re.search('(nettotal < 0 [as|AS,\s]*\w*)', sql)[1] \
.lower().replace(" ", "") \
.replace(f"{delete_condition.lower().replace(' ', '')}as", "")
return delete_placeholder
else:
return False

def list_relations_without_caching(
self,
schema_relation: UpsolverRelation,
Expand Down
41 changes: 27 additions & 14 deletions dbt/adapters/upsolver/options/copy_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,34 +16,31 @@
"comment": {"type": "text", "editable": True, "optional": True}
}
},
"my_sql": {
"mysql": {
"source_options": {
"table_include_list": {"type": "text", "editable": True, "optional": True},
"column_exclude_list": {"type": "text", "editable": True, "optional": True}
"table_include_list": {"type": "list", "editable": True, "optional": True},
"column_exclude_list": {"type": "list", "editable": True, "optional": True}
},
"job_options": {
"skip_snapshots": {"value": "text", "editable": True, "optional": True},
"skip_snapshots": {"type": "boolean", "editable": True, "optional": True},
"end_at": {"type": "value", "editable": True, "optional": True},
"compute_cluster": {"type": "identifier", "editable": True, "optional": True},
"comment": {"type": "text", "editable": True, "optional": True}
}
},
"postgre_sql": {
"postgres": {
"source_options": {
"bucket": {"type": "text", "editable": False, "optional": False},
"prefix": {"type": "text", "editable": False, "optional": True}
"table_include_list": {"type": "list", "editable": False, "optional": False},
"column_exclude_list": {"type": "list", "editable": False, "optional": True}
},
"job_options": {
"table_include_list": {"type": "text", "editable": False, "optional": True},
"heartbeat_table": {"type": "text", "editable": False, "optional": True},
"skip_snapshots": {"type": "boolean", "editable": False, "optional": True},
"column_exclude_list": {"type": "text", "editable": False, "optional": True},
"publication_name": {"type": "text", "editable": False, "optional": False},
"file_pattern": {"type": "text", "editable": False, "optional": True},
"delete_files_after_load": {"type": "boolean", "editable": False, "optional": True},
"end_at": {"type": "value", "editable": True, "optional": True},
"compute_cluster": {"type": "identifier", "editable": True, "optional": True},
"run_parallelism": {"type": "integer", "editable": True, "optional": True},
"comment": {"type": "text", "editable": True, "optional": True}
"comment": {"type": "text", "editable": True, "optional": True},
"parse_json_columns": {"type": "boolean", "editable": False, "optional": False}
}
},
"s3": {
Expand All @@ -60,5 +57,21 @@
"compression": {"type": "value", "editable": False, "optional": True},
"comment": {"type": "text", "editable": True, "optional": True}
}
}
},
"kinesis": {
"source_options": {
"stream": {"type": "text", "editable": False, "optional": False}
},
"job_options": {
"reader_shards": {"type": "integer", "editable": True, "optional": True},
"store_raw_data": {"type": "boolean", "editable": False, "optional": True},
"start_from": {"type": "value", "editable": False, "optional": True},
"end_at": {"type": "value", "editable": False, "optional": True},
"compute_cluster": {"type": "identifier", "editable": True, "optional": True},
"run_parallelism": {"type": "integer", "editable": False, "optional": True},
"content_type": {"type": "value", "editable": True, "optional": True},
"compression": {"type": "value", "editable": False, "optional": True},
"comment": {"type": "text", "editable": True, "optional": True}
}
}
}
2 changes: 1 addition & 1 deletion dbt/adapters/upsolver/options/table_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"storage_location": {"type": "text", "editable": False, "optional": True},
"compute_cluster": {"type": "identifier", "editable": True, "optional": True},
"compression": {"type": "value", "editable": True, "optional": True},
"compaction_processes": {"type": "list", "editable": True, "optional": True},
"compaction_processes": {"type": "integer", "editable": True, "optional": True},
"disable_compaction": {"type": "boolean", "editable": True, "optional": True},
"retention_date_partition": {"type": "text", "editable": True, "optional": True},
"table_data_retention": {"type": "text", "editable": True, "optional": True},
Expand Down
5 changes: 0 additions & 5 deletions dbt/include/upsolver/macros/materializations/connection.sql
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,6 @@
{{ run_hooks(pre_hooks, inside_transaction=False) }}
{{ run_hooks(pre_hooks, inside_transaction=True) }}

{{ log("Options: " ~ connection_options ) }}
{{ log("Enriched options: " ~ enriched_options ) }}
{{ log("Enriched options: " ~ enriched_options ) }}


{% if old_relation %}
{% call statement('main') %}
ALTER {{ connection_type }} CONNECTION {{target_relation.identifier}}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{% macro get_create_incert_job_sql(job_identifier, table, sync, options, map_columns_by_name) -%}
{% macro get_create_insert_job_sql(job_identifier, table, sync, options, map_columns_by_name) -%}

{% set enriched_options = adapter.enrich_options(options, 'upsolver_data_lake', 'transformation_options') %}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
{% macro get_create_merge_job_sql(job_identifier, table, sync, options, primary_key, delete_condition) -%}

{% set enriched_options = adapter.enrich_options(options, 'upsolver_data_lake', 'transformation_options') %}
{% set delete_placeholder = adapter.get_delete_placeholder(sql, delete_condition) %}

CREATE
{% if sync %}
Expand All @@ -20,12 +19,9 @@
{% endfor %}
)
{% endif %}
{% if delete_placeholder %}
WHEN MATCHED AND {{ delete_placeholder }} THEN DELETE
{% if delete_condition %}
WHEN MATCHED AND {{ delete_condition}} THEN DELETE
{% endif %}
WHEN MATCHED THEN REPLACE
WHEN NOT MATCHED THEN INSERT MAP_COLUMNS_BY_NAME
{% if delete_placeholder %}
EXCEPT {{ delete_placeholder }}
{% endif %}
{%- endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
{{ get_create_merge_job_sql(job_identifier, table_relation, sync,
options, primary_key, delete_condition) }}
{% elif incremental_strategy == 'insert' %}
{{ get_create_incert_job_sql(job_identifier,
{{ get_create_insert_job_sql(job_identifier,
table_relation, sync, options,
map_columns_by_name) }}

Expand Down
15 changes: 15 additions & 0 deletions examples/upsert_records_new/models/dell_test2.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{{ config( materialized='incremental',
incremental_strategy='merge',
map_columns_by_name=True,
sync=True,
options={'START_FROM': 'NOW',
'ADD_MISSING_COLUMNS': True,
'RUN_INTERVAL': '1 MINUTE'},
primary_key=[{'field':'orderid', 'type':'string'}],
delete_condition='nettotal > 1000' )
}}

SELECT
*
FROM {{ source('upsert_records_new', 'orders_raw_data_for_upsert_2') }}
WHERE $event_time BETWEEN run_start_time() AND run_end_time()
10 changes: 10 additions & 0 deletions examples/upsert_records_new/models/sources.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
version: 2

sources:
- name: upsert_records_new
database: default_glue_catalog
schema: database_16e61b
tables:
- name: insert_orders_upsert_2
- name: merge_orders_upsert_2
- name: orders_raw_data_for_upsert_2

0 comments on commit ce69d21

Please sign in to comment.