From 107769d4ab3f70518f2d7f859924687ec6c44acd Mon Sep 17 00:00:00 2001 From: Rahul Goyal Date: Mon, 9 Dec 2024 17:26:05 +0530 Subject: [PATCH] Fixed issue --- dbt/adapters/spark/impl.py | 148 +++++++++++++++---------------------- 1 file changed, 59 insertions(+), 89 deletions(-) diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index d33ebde20..4d1434cf7 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -171,107 +171,77 @@ def _get_relation_information(self, row: "agate.Row") -> RelationInfo: return _schema, name, information - def _get_relation_information_using_describe(self, row: "agate.Row") -> RelationInfo: - """Relation info fetched using SHOW TABLES and an auxiliary DESCRIBE statement""" - try: - _schema, name, _ = row - except ValueError: - raise DbtRuntimeError( - f'Invalid value from "show tables ...", got {len(row)} values, expected 3' - ) - table_name = f"{_schema}.{name}" + def list_relations_without_caching(self, schema_relation: BaseRelation) -> List[BaseRelation]: + """Distinct Spark compute engines may not support the same SQL featureset. Thus, we must + try different methods to fetch relation information.""" try: - table_results = self.execute_macro( - DESCRIBE_TABLE_EXTENDED_MACRO_NAME, kwargs={"table_name": table_name} + relations = [] + kwargs = {"schema_relation": schema_relation} + # Iceberg behavior: 3-row result of relations obtained + show_table_rows = self.execute_macro( + LIST_RELATIONS_SHOW_TABLES_MACRO_NAME, kwargs=kwargs ) - except DbtRuntimeError as e: - logger.debug(f"Error while retrieving information about {table_name}: {e.msg}") - table_results = AttrDict() + for row in show_table_rows: + _schema, name, _ = row + information = "" + rel_type: RelationType = ( + RelationType.View if "Type: VIEW" in information else RelationType.Table + ) + is_delta: bool = "Provider: delta" in information + is_hudi: bool = "Provider: hudi" in information + is_iceberg: bool = "Provider: iceberg" in information + + relation: BaseRelation = self.Relation.create( + schema=_schema, + identifier=name, + type=rel_type, + information=information, + is_delta=is_delta, + is_iceberg=is_iceberg, + is_hudi=is_hudi, + ) + relations.append(relation) + return relations + except DbtRuntimeError as e: + description = "Error while retrieving information about" + logger.debug(f"{description} {schema_relation}: {e.msg}") + return [] + + def set_relation_information(self, relation: BaseRelation): + if relation.information: + return relation + rows: List[agate.Row] = super().get_columns_in_relation(relation) information = "" - for info_row in table_results: - info_type, info_value, _ = info_row + for info_row in rows: + info_type, info_value, _ = info_row.values() if not info_type.startswith("#"): information += f"{info_type}: {info_value}\n" + rel_type: RelationType = ( + RelationType.View if "Type: VIEW" in information else RelationType.Table + ) + is_delta: bool = "Provider: delta" in information + is_hudi: bool = "Provider: hudi" in information + is_iceberg: bool = "Provider: iceberg" in information + relation: BaseRelation = self.Relation.create( + schema=relation.schema, + identifier=relation.identifier, + type=rel_type, + information=information, + is_delta=is_delta, + is_iceberg=is_iceberg, + is_hudi=is_hudi, + ) + return relation - return _schema, name, information - - def _build_spark_relation_list( - self, - row_list: "agate.Table", - relation_info_func: Callable[["agate.Row"], RelationInfo], - ) -> List[BaseRelation]: - """Aggregate relations with format metadata included.""" - relations = [] - for row in row_list: - _schema, name, information = relation_info_func(row) - - rel_type: RelationType = ( - RelationType.View if "Type: VIEW" in information else RelationType.Table - ) - is_delta: bool = "Provider: delta" in information - is_hudi: bool = "Provider: hudi" in information - is_iceberg: bool = "Provider: iceberg" in information - - relation: BaseRelation = self.Relation.create( - schema=_schema, - identifier=name, - type=rel_type, - information=information, - is_delta=is_delta, - is_iceberg=is_iceberg, - is_hudi=is_hudi, - ) - relations.append(relation) - - return relations - - def list_relations_without_caching(self, schema_relation: BaseRelation) -> List[BaseRelation]: - """Distinct Spark compute engines may not support the same SQL featureset. Thus, we must - try different methods to fetch relation information.""" - - kwargs = {"schema_relation": schema_relation} - - try: - # Default compute engine behavior: show tables extended - show_table_extended_rows = self.execute_macro(LIST_RELATIONS_MACRO_NAME, kwargs=kwargs) - return self._build_spark_relation_list( - row_list=show_table_extended_rows, - relation_info_func=self._get_relation_information, - ) - except DbtRuntimeError as e: - errmsg = getattr(e, "msg", "") - if f"Database '{schema_relation}' not found" in errmsg: - return [] - # Iceberg compute engine behavior: show table - elif "SHOW TABLE EXTENDED is not supported for v2 tables" in errmsg: - # this happens with spark-iceberg with v2 iceberg tables - # https://issues.apache.org/jira/browse/SPARK-33393 - try: - # Iceberg behavior: 3-row result of relations obtained - show_table_rows = self.execute_macro( - LIST_RELATIONS_SHOW_TABLES_MACRO_NAME, kwargs=kwargs - ) - return self._build_spark_relation_list( - row_list=show_table_rows, - relation_info_func=self._get_relation_information_using_describe, - ) - except DbtRuntimeError as e: - description = "Error while retrieving information about" - logger.debug(f"{description} {schema_relation}: {e.msg}") - return [] - else: - logger.debug( - f"Error while retrieving information about {schema_relation}: {errmsg}" - ) - return [] def get_relation(self, database: str, schema: str, identifier: str) -> Optional[BaseRelation]: if not self.Relation.get_default_include_policy().database: database = None # type: ignore - return super().get_relation(database, schema, identifier) + relation = super().get_relation(database, schema, identifier) + self.set_relation_information(relation) if relation else None def parse_describe_extended( self, relation: BaseRelation, raw_rows: AttrDict @@ -549,4 +519,4 @@ def debug_query(self) -> None: diff_count.num_missing as num_mismatched from row_count_diff cross join diff_count -""".strip() +""".strip() \ No newline at end of file