From 82a35cf91218eb83ebff2685650ac5e26d919ec5 Mon Sep 17 00:00:00 2001 From: Bernardo Meireles Date: Mon, 16 Dec 2024 14:04:16 +0000 Subject: [PATCH] Move reflections to rest API calls --- dbt/adapters/dremio/api/__init__.py | 1 + dbt/adapters/dremio/api/rest/endpoints.py | 47 +++++++++- dbt/adapters/dremio/api/rest/url_builder.py | 62 +++++++++++-- dbt/adapters/dremio/connections.py | 90 ++++++++++++++++-- dbt/adapters/dremio/impl.py | 5 + dbt/include/dremio/dbt_project.yml | 4 - .../dremio/macros/adapters/metadata.sql | 48 ---------- .../reflection/create_reflection.sql | 93 +++---------------- .../materializations/reflection/helpers.sql | 11 +-- .../reflection/reflection.sql | 23 +++-- 10 files changed, 218 insertions(+), 166 deletions(-) diff --git a/dbt/adapters/dremio/api/__init__.py b/dbt/adapters/dremio/api/__init__.py index 66db7a8..095c905 100644 --- a/dbt/adapters/dremio/api/__init__.py +++ b/dbt/adapters/dremio/api/__init__.py @@ -14,6 +14,7 @@ # __init__.py from .rest.endpoints import ( + create_reflection, delete_catalog, sql_endpoint, job_status, diff --git a/dbt/adapters/dremio/api/rest/endpoints.py b/dbt/adapters/dremio/api/rest/endpoints.py index f26c418..5f91147 100644 --- a/dbt/adapters/dremio/api/rest/endpoints.py +++ b/dbt/adapters/dremio/api/rest/endpoints.py @@ -66,6 +66,26 @@ def _post( return _check_error(response, details) +def _put( + url, + request_headers=None, + json=None, + details="", + ssl_verify=True, + timeout=None, +): + if isinstance(json, str): + json = jsonlib.loads(json) + response = session.put( + url, + headers=request_headers, + timeout=timeout, + verify=ssl_verify, + json=json, + ) + return _check_error(response, details) + + def _delete(url, request_headers, details="", ssl_verify=True): response = session.delete(url, headers=request_headers, verify=ssl_verify) return _check_error(response, details) @@ -149,7 +169,6 @@ def _check_error(response, details=""): def login(api_parameters: Parameters, timeout=10): - if isinstance(api_parameters.authentication, DremioPatAuthentication): return api_parameters @@ -251,3 +270,29 @@ def delete_catalog(api_parameters, cid): api_parameters.authentication.get_headers(), ssl_verify=api_parameters.authentication.verify_ssl, ) + +def get_reflection(api_parameters, dataset_id): + url = UrlBuilder.get_reflection_url(api_parameters, dataset_id) + return _get( + url, + api_parameters.authentication.get_headers(), + ssl_verify=api_parameters.authentication.verify_ssl, + ) + +def create_reflection(api_parameters: Parameters, name: str, type: str, payload): + url = UrlBuilder.create_reflection_url(api_parameters) + return _post( + url, + api_parameters.authentication.get_headers(), + json=payload, + ssl_verify=api_parameters.authentication.verify_ssl, + ) + +def update_reflection(api_parameters: Parameters, dataset_id: str, payload): + url = UrlBuilder.update_reflection_url(api_parameters, dataset_id) + return _put( + url, + api_parameters.authentication.get_headers(), + json=payload, + ssl_verify=api_parameters.authentication.verify_ssl, + ) \ No newline at end of file diff --git a/dbt/adapters/dremio/api/rest/url_builder.py b/dbt/adapters/dremio/api/rest/url_builder.py index c47a6ee..a92327b 100644 --- a/dbt/adapters/dremio/api/rest/url_builder.py +++ b/dbt/adapters/dremio/api/rest/url_builder.py @@ -34,6 +34,12 @@ class UrlBuilder: SOFTWARE_CATALOG_ENDPOINT = "/api/v3/catalog" CLOUD_CATALOG_ENDPOINT = CLOUD_PROJECT_ENDPOINT + "/{}/catalog" + SOFTWARE_REFLECTIONS_ENDPOINT = "/api/v3/reflection" + CLOUD_REFLECTIONS_ENDPOINT = CLOUD_PROJECT_ENDPOINT + "/{}/reflection" + + SOFTWARE_DATASET_ENDPOIT = "/api/v3/dataset" + CLOUD_DATASET_ENDPOINT = CLOUD_PROJECT_ENDPOINT + "/{}/dataset" + # https://docs.dremio.com/software/rest-api/jobs/get-job/ OFFSET_DEFAULT = 0 LIMIT_DEFAULT = 100 @@ -56,10 +62,10 @@ def sql_url(cls, parameters: Parameters): def job_status_url(cls, parameters: Parameters, job_id): if type(parameters) is CloudParameters: return ( - parameters.base_url - + UrlBuilder.CLOUD_JOB_ENDPOINT.format(parameters.cloud_project_id) - + "/" - + job_id + parameters.base_url + + UrlBuilder.CLOUD_JOB_ENDPOINT.format(parameters.cloud_project_id) + + "/" + + job_id ) return parameters.base_url + UrlBuilder.SOFTWARE_JOB_ENDPOINT + "/" + job_id @@ -75,11 +81,11 @@ def job_cancel_url(cls, parameters: Parameters, job_id): @classmethod def job_results_url( - cls, - parameters: Parameters, - job_id, - offset=OFFSET_DEFAULT, - limit=LIMIT_DEFAULT, + cls, + parameters: Parameters, + job_id, + offset=OFFSET_DEFAULT, + limit=LIMIT_DEFAULT, ): url_path = parameters.base_url if type(parameters) is CloudParameters: @@ -139,3 +145,41 @@ def catalog_item_by_path_url(cls, parameters: Parameters, path_list): joined_path_str = "/".join(quoted_path_list).replace('"', "") endpoint = f"/by-path/{joined_path_str}" return url_path + endpoint + + @classmethod + def create_reflection_url(cls, parameters: Parameters): + url_path = parameters.base_url + if type(parameters) is CloudParameters: + url_path += UrlBuilder.CLOUD_REFLECTIONS_ENDPOINT.format( + parameters.cloud_project_id + ) + else: + url_path += UrlBuilder.SOFTWARE_REFLECTIONS_ENDPOINT + + return url_path + + @classmethod + def update_reflection_url(cls, parameters: Parameters, dataset_id): + url_path = parameters.base_url + if type(parameters) is CloudParameters: + url_path += UrlBuilder.CLOUD_REFLECTIONS_ENDPOINT.format( + parameters.cloud_project_id + ) + else: + url_path += UrlBuilder.SOFTWARE_REFLECTIONS_ENDPOINT + + endpoint = "/{}".format(dataset_id) + return url_path + endpoint + + @classmethod + def get_reflection_url(cls, parameters: Parameters, dataset_id): + url_path = parameters.base_url + if type(parameters) is CloudParameters: + url_path += UrlBuilder.CLOUD_DATASET_ENDPOINT.format( + parameters.cloud_project_id + ) + else: + url_path += UrlBuilder.SOFTWARE_DATASET_ENDPOIT + + endpoint = "/{}/reflection".format(dataset_id) + return url_path + endpoint diff --git a/dbt/adapters/dremio/connections.py b/dbt/adapters/dremio/connections.py index 3096efa..16ab584 100644 --- a/dbt/adapters/dremio/connections.py +++ b/dbt/adapters/dremio/connections.py @@ -30,6 +30,9 @@ from dbt.adapters.contracts.connection import AdapterResponse from dbt.adapters.dremio.api.rest.endpoints import ( + create_reflection, + update_reflection, + get_reflection, delete_catalog, create_catalog_api, get_catalog_item, @@ -133,7 +136,7 @@ def add_commit_query(self): # Auto_begin may not be relevant with the rest_api def add_query( - self, sql, auto_begin=True, bindings=None, abridge_sql_log=False, fetch=False + self, sql, auto_begin=True, bindings=None, abridge_sql_log=False, fetch=False ): connection = self.get_thread_connection() if auto_begin and connection.transaction_open is False: @@ -174,11 +177,11 @@ def get_response(cls, cursor: DremioCursor) -> AdapterResponse: return AdapterResponse(_message=message, rows_affected=rows) def execute( - self, - sql: str, - auto_begin: bool = False, - fetch: bool = False, - limit: Optional[int] = None, + self, + sql: str, + auto_begin: bool = False, + fetch: bool = False, + limit: Optional[int] = None, ) -> Tuple[AdapterResponse, agate.Table]: sql = self._add_query_comment(sql) _, cursor = self.add_query(sql, auto_begin, fetch=fetch) @@ -231,6 +234,76 @@ def create_catalog(self, relation): self._create_folders(database, schema, api_parameters) return + def dbt_reflection_integration(self, name: str, type: str, anchor, display, dimensions, date_dimensions, measures, + computations, partition_by, partition_method, localsort_by): + thread_connection = self.get_thread_connection() + connection = self.open(thread_connection) + api_parameters = connection.handle.get_parameters() + + database = anchor.database + schema = anchor.schema + path = self._create_path_list(database, schema) + identifier = anchor.identifier + + path.append(identifier) + + catalog_info = get_catalog_item( + api_parameters, + catalog_id=None, + catalog_path=path, + ) + + dataset_id = catalog_info.get("id") + + payload = { + "type": type, + "name": name, + "datasetId": dataset_id, + "enabled": True, + "arrowCachingEnabled": False, + "partitionDistributionStrategy": partition_method.upper(), + "entityType": "reflection" + } + + if display: + payload["displayFields"] = [{"name": field} for field in display] + + if dimensions: + if not date_dimensions: + date_dimensions = [] + + payload["dimensionFields"] = [ + {"name": dimension} if dimension not in date_dimensions else {"name": dimension, "granularity": "DATE"} + for dimension in dimensions] + + if measures and computations: + payload["measureFields"] = [{"name": measure, "measureTypeList": computation.split(',')} for + measure, computation in zip(measures, computations)] + + if partition_by: + payload["partitionFields"] = [{"name": partition} for partition in partition_by] + + if localsort_by: + payload["sortFields"] = [{"name": sort} for sort in localsort_by] + + dataset_info = get_reflection(api_parameters, dataset_id) + reflections_info = dataset_info.get("data") + + updated = False + for reflection in reflections_info: + if reflection.get("name") == name: + logger.debug(f"Reflection {name} already exists. Updating it") + payload["tag"] = reflection.get("tag") + logger.info( + update_reflection(api_parameters, reflection.get("id"), payload)) + updated = True + break + + if not updated: + logger.debug(f"Reflection {name} does not exist. Creating it") + logger.info( + create_reflection(api_parameters, name, type, payload)) + def _make_new_space_json(self, name) -> json: python_dict = {"entityType": "space", "name": name} return json.dumps(python_dict) @@ -263,6 +336,7 @@ def _create_folders(self, database, schema, api_parameters): def _create_path_list(self, database, schema): path = [database] - folders = schema.split(".") - path.extend(folders) + if schema != 'no_schema': + folders = schema.split(".") + path.extend(folders) return path diff --git a/dbt/adapters/dremio/impl.py b/dbt/adapters/dremio/impl.py index c5175ab..e778fc5 100644 --- a/dbt/adapters/dremio/impl.py +++ b/dbt/adapters/dremio/impl.py @@ -20,6 +20,7 @@ from typing import List from typing import Optional +from dbt.adapters.base.meta import available from dbt.adapters.base.relation import BaseRelation from dbt.adapters.capability import ( @@ -177,6 +178,10 @@ def run_sql_for_tests(self, sql, fetch, conn): finally: conn.transaction_open = False + @available + def dbt_reflection_integration(self, name: str, type: str, anchor, display, dimensions, date_dimensions, measures, computations, partition_by, partition_method, localsort_by) -> None: + self.connections.dbt_reflection_integration(name, type, anchor, display, dimensions, date_dimensions, measures, computations, partition_by, partition_method, localsort_by) + COLUMNS_EQUAL_SQL = """ with diff_count as ( diff --git a/dbt/include/dremio/dbt_project.yml b/dbt/include/dremio/dbt_project.yml index 5d055c1..95c0385 100644 --- a/dbt/include/dremio/dbt_project.yml +++ b/dbt/include/dremio/dbt_project.yml @@ -22,7 +22,3 @@ quoting: identifier: true macro-paths: ["macros"] - -vars: - "dremio:reflections_enabled": false - "dremio:exact_search_enabled": false diff --git a/dbt/include/dremio/macros/adapters/metadata.sql b/dbt/include/dremio/macros/adapters/metadata.sql index b6a316e..4a48382 100644 --- a/dbt/include/dremio/macros/adapters/metadata.sql +++ b/dbt/include/dremio/macros/adapters/metadata.sql @@ -136,16 +136,11 @@ limitations under the License.*/ {%- endmacro -%} {% macro dremio__get_catalog_relations_result_sql(relations) %} - {%- if var('dremio:reflections_enabled', default=false) %} - {{get_catalog_reflections(relations)}} - {% else %} - select * from t join columns on (t.table_schema = columns.table_schema and t.table_name = columns.table_name) order by "column_index" - {% endif %} {%- endmacro -%} {% macro get_catalog_reflections(relations) %} @@ -241,49 +236,6 @@ limitations under the License.*/ {%- set schema_name = database + (('.' + schema) if schema != 'no_schema' else '') -%} {% call statement('list_relations_without_caching', fetch_result=True) -%} - - {%- if var('dremio:reflections_enabled', default=false) -%} - - with cte1 as ( - select - dataset_name - ,reflection_name - ,type - ,case when substr(dataset_name, 1, 1) = '"' - then strpos(dataset_name, '".') + 1 - else strpos(dataset_name, '.') - end as first_dot - ,length(dataset_name) - - case when substr(dataset_name, length(dataset_name)) = '"' - then strpos(reverse(dataset_name), '".') - else strpos(reverse(dataset_name), '.') - 1 - end as last_dot - ,length(dataset_name) as length - {%- if target.cloud_host and not target.software_host %} - from sys.project.reflections - {%- elif target.software_host and not target.cloud_host %} - from sys.reflections - {%- endif %} - ) - , cte2 as ( - select - replace(substr(dataset_name, 1, first_dot - 1), '"', '') as table_catalog - ,reflection_name as table_name - ,replace(case when first_dot < last_dot - then substr(dataset_name, first_dot + 1, last_dot - first_dot - 1) - else 'no_schema' end, '"', '') as table_schema - ,'materialized_view' as table_type - from cte1 - ) - select table_catalog, table_name, table_schema, table_type - from cte2 - where ilike(table_catalog, '{{ database }}') - and ilike(table_schema, '{{ schema }}') - - union all - - {%- endif %} - select (case when position('.' in table_schema) > 0 then substring(table_schema, 1, position('.' in table_schema) - 1) else table_schema diff --git a/dbt/include/dremio/macros/materializations/reflection/create_reflection.sql b/dbt/include/dremio/macros/materializations/reflection/create_reflection.sql index 065e2f8..cc91c90 100644 --- a/dbt/include/dremio/macros/materializations/reflection/create_reflection.sql +++ b/dbt/include/dremio/macros/materializations/reflection/create_reflection.sql @@ -37,85 +37,18 @@ ADD EXTERNAL REFLECTION name USING target #} -{%- macro create_reflection(reflection_type, anchor, reflection, external_target=none, - display=none, dimensions=none, by_day_dimensions=none, measures=none) %} - alter dataset {{ anchor }} - create {{ reflection_type }} reflection {{ reflection.include(database=False, schema=False) }} - using - {%- if reflection_type == 'raw' %} - {{ display_clause(display) }} - {%- elif reflection_type == 'aggregate' %} - {{ dimensions_clause(dimensions=dimensions, by_day_dimensions=by_day_dimensions) }} - {{ measures_clause(measures) }} - {%- else -%} - {{ external_target }} - {% endif -%} - {%- if reflection_type in ['raw', 'aggregate'] %} - {{ partition_method() }} {{ config_cols("partition by") }} - {{ config_cols("localsort by") }} - {{ config_cols("distribute by") }} - {{ arrow_cache_clause() }} - {%- endif -%} -{% endmacro -%} - -{%- macro display_clause(display=none) %} - {%- set cols = config.get('display', validator=validation.any[list, basestring]) or display -%} - {%- if cols is not none %} - {%- if cols is string -%} - {%- set cols = [cols] -%} - {%- endif -%} - display ( - {%- for item in cols -%} - {{ adapter.quote(item) }} - {%- if not loop.last -%},{%- endif -%} - {%- endfor -%} - ) - {%- endif %} -{% endmacro -%} - -{%- macro dimensions_clause(dimensions=none, by_day_dimensions=none) %} - {%- set cols = config.get('dimensions', validator=validation.any[list, basestring]) or dimensions -%} - {%- set by_day_cols = config.get('by_day_dimensions', validator=validation.any[list, basestring]) or by_day_dimensions -%} - {%- if cols is not none %} - {%- if cols is string -%} - {%- set cols = [cols] -%} - {%- endif -%} - {%- if by_day_cols is string -%} - {%- set by_day_cols = [by_day_cols] -%} - {%- endif -%} - dimensions ( - {%- for item in cols -%} - {{ adapter.quote(item) ~ (' by day' if item in by_day_cols else "") }} - {%- if not loop.last -%},{%- endif -%} - {%- endfor -%} - ) - {%- endif %} -{% endmacro -%} +{%- macro create_reflection(reflection_name, reflection_type, anchor, + display=none, dimensions=none, date_dimensions=none, measures=none, computations=none, partition_by=none, partition_method=none, localsort_by=none) %} + + {%- if reflection_type == 'raw' %} + {% set reflection_type = 'RAW' %} + {%- elif reflection_type in ['aggregate', 'aggregation'] %} + {% set reflection_type = 'AGGREGATION' %} + {%- else -%} + {% do exceptions.CompilationError("invalid reflection type") %} + {%- endif -%} -{%- macro measures_clause(measures=none) %} - {%- set cols = config.get('measures', validator=validation.any[list, basestring]) or measures -%} - {%- set comp_cols = config.get('computations', validator=validation.any[list, basestring]) or [] -%} - {%- if cols is not none %} - {%- if cols is string -%} - {%- set cols = [cols] -%} - {%- endif -%} - {%- if comp_cols is string -%} - {%- set comp_cols = [comp_cols] -%} - {%- endif -%} - measures ( - {%- for item in cols -%} - {%- set computations = (' (' ~ comp_cols[loop.index0] ~ ')') - if loop.index0 < comp_cols | length and comp_cols[loop.index0] is not none else '' -%} - {{ adapter.quote(item) ~ computations }} - {%- if not loop.last -%},{%- endif -%} - {%- endfor -%} - ) - {%- endif %} -{% endmacro -%} + {% do adapter.dbt_reflection_integration(reflection_name, reflection_type, anchor, display, dimensions, date_dimensions, measures, computations, partition_by, partition_method, localsort_by) %} -{%- macro arrow_cache_clause() -%} - {%- set arrow_cache = config.get('arrow_cache', validator=validation.any[boolean]) -%} - {%- if arrow_cache is not none and arrow_cache -%} - arrow cache - {%- endif -%} -{% endmacro -%} + SELECT 1 +{% endmacro -%} \ No newline at end of file diff --git a/dbt/include/dremio/macros/materializations/reflection/helpers.sql b/dbt/include/dremio/macros/materializations/reflection/helpers.sql index a2559ec..9a0a773 100644 --- a/dbt/include/dremio/macros/materializations/reflection/helpers.sql +++ b/dbt/include/dremio/macros/materializations/reflection/helpers.sql @@ -12,17 +12,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.*/ -{% macro drop_reflection_if_exists(relation, reflection) %} - {% if reflection is not none and reflection.type == 'materialized_view' %} - {% call statement('drop reflection') -%} - alter dataset {{ relation }} - drop reflection {{ reflection.include(database=False, schema=False) }} - {%- endcall %} - {% endif %} -{% endmacro %} - {% macro dbt_dremio_validate_get_reflection_type(raw_reflection_type) %} - {% set accepted_types = ['raw', 'aggregate', 'external'] %} + {% set accepted_types = ['raw', 'aggregate', 'aggregation', 'external'] %} {% set invalid_reflection_type_msg -%} Invalid reflection type provided: {{ raw_reflection_type }} Expected one of: {{ accepted_types | join(', ') }} diff --git a/dbt/include/dremio/macros/materializations/reflection/reflection.sql b/dbt/include/dremio/macros/materializations/reflection/reflection.sql index 9778fec..4b63a9b 100644 --- a/dbt/include/dremio/macros/materializations/reflection/reflection.sql +++ b/dbt/include/dremio/macros/materializations/reflection/reflection.sql @@ -17,13 +17,22 @@ limitations under the License.*/ {% do exceptions.CompilationError("reflections are disabled, set 'dremio:reflections_enabled' variable to true to enable them") %} {%- endif -%} + {% set reflection_name = config.get('name', validator=validation.any[basetring]) or 'Unnamed Reflection' %} {% set raw_reflection_type = config.get('reflection_type', validator=validation.any[basestring]) or 'raw' %} {% set raw_anchor = config.get('anchor', validator=validation.any[list, basestring]) %} {% set raw_external_target = config.get('external_target', validator=validation.any[list, basestring]) %} {% set identifier = model['alias'] %} {%- set display = config.get('display', validator=validation.any[list, basestring]) -%} {%- set dimensions = config.get('dimensions', validator=validation.any[list, basestring]) -%} + {%- set date_dimensions = config.get('date_dimensions', validator=validation.any[list, basestring]) -%} {%- set measures = config.get('measures', validator=validation.any[list, basestring]) -%} + {%- set computations = config.get('computations', validator=validation.any[list, basestring]) -%} + {%- set partition_by = config.get('partition_by', validator=validation.any[basestring]) -%} + {%- set partition_method = config.get('partition_method', validator=validation.any[basestring]) or 'striped' -%} + {%- set localsort_by = config.get('localsort_by', validator=validation.any[basestring]) -%} + + + {% set relation = this %} {% if model.refs | length + model.sources | length == 1 %} {% if model.refs | length == 1 %} @@ -60,28 +69,30 @@ limitations under the License.*/ {%- set reflection_type = dbt_dremio_validate_get_reflection_type(raw_reflection_type) -%} {% if (reflection_type == 'raw' and display is none) - or (reflection_type == 'aggregate' and (dimensions is none or measures is none)) %} + or (reflection_type in ['aggregate', 'aggregation'] and (dimensions is none or measures is none)) %} {% set columns = adapter.get_columns_in_relation(anchor) %} {% if reflection_type == 'raw' %} {% set display = columns | map(attribute='name') | list %} - {% elif reflection_type == 'aggregate' %} + {% elif reflection_type in ['aggregate', 'aggregation'] %} {% if dimensions is none %} {% set dimensions = columns | rejectattr('dtype', 'in', ['decimal', 'float', 'double']) | map(attribute='name') | list %} - {% set by_day_dimensions = columns | selectattr('dtype', 'in', ['timestamp']) | map(attribute='name') | list %} + {% set date_dimensions = columns | selectattr('dtype', 'in', ['timestamp']) | map(attribute='name') | list %} {% endif %} {% if measures is none %} {% set measures = columns | selectattr('dtype', 'in', ['decimal', 'float', 'double']) | map(attribute='name') | list %} {% endif %} + {% if computations is none %} + {{ log("computations is null") }} + {% endif %} {% endif %} {% endif %} {{ run_hooks(pre_hooks) }} - {{ drop_reflection_if_exists(anchor, old_relation) }} -- build model {% call statement('main') -%} - {{ create_reflection(reflection_type, anchor, target_relation, external_target, - display=display, dimensions=dimensions, by_day_dimensions=by_day_dimensions, measures=measures) }} + {{ create_reflection(reflection_name, reflection_type, anchor, + display=display, dimensions=dimensions, date_dimensions=date_dimensions, measures=measures, computations=computations, partition_by=partition_by, partition_method=partition_method, localsort_by=localsort_by) }} {%- endcall %} {{ run_hooks(post_hooks) }}