Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/lazy describe extened dbt spark #1151

Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 58 additions & 88 deletions dbt/adapters/spark/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,107 +171,77 @@ def _get_relation_information(self, row: "agate.Row") -> RelationInfo:

return _schema, name, information

def _get_relation_information_using_describe(self, row: "agate.Row") -> RelationInfo:
"""Relation info fetched using SHOW TABLES and an auxiliary DESCRIBE statement"""
try:
_schema, name, _ = row
except ValueError:
raise DbtRuntimeError(
f'Invalid value from "show tables ...", got {len(row)} values, expected 3'
)

table_name = f"{_schema}.{name}"
def list_relations_without_caching(self, schema_relation: BaseRelation) -> List[BaseRelation]:
"""Distinct Spark compute engines may not support the same SQL featureset. Thus, we must
try different methods to fetch relation information."""
try:
table_results = self.execute_macro(
DESCRIBE_TABLE_EXTENDED_MACRO_NAME, kwargs={"table_name": table_name}
relations = []
kwargs = {"schema_relation": schema_relation}
# Iceberg behavior: 3-row result of relations obtained
show_table_rows = self.execute_macro(
LIST_RELATIONS_SHOW_TABLES_MACRO_NAME, kwargs=kwargs
)
except DbtRuntimeError as e:
logger.debug(f"Error while retrieving information about {table_name}: {e.msg}")
table_results = AttrDict()
for row in show_table_rows:
_schema, name, _ = row
information = ""

rel_type: RelationType = (
RelationType.View if "Type: VIEW" in information else RelationType.Table
)
is_delta: bool = "Provider: delta" in information
is_hudi: bool = "Provider: hudi" in information
is_iceberg: bool = "Provider: iceberg" in information

relation: BaseRelation = self.Relation.create(
schema=_schema,
identifier=name,
type=rel_type,
information=information,
is_delta=is_delta,
is_iceberg=is_iceberg,
is_hudi=is_hudi,
)
relations.append(relation)
return relations
except DbtRuntimeError as e:
description = "Error while retrieving information about"
logger.debug(f"{description} {schema_relation}: {e.msg}")
return []

def set_relation_information(self, relation: BaseRelation):
if relation.information:
return relation
rows: List[agate.Row] = super().get_columns_in_relation(relation)
information = ""
for info_row in table_results:
info_type, info_value, _ = info_row
for info_row in rows:
info_type, info_value, _ = info_row.values()
if not info_type.startswith("#"):
information += f"{info_type}: {info_value}\n"
rel_type: RelationType = (
RelationType.View if "Type: VIEW" in information else RelationType.Table
)
is_delta: bool = "Provider: delta" in information
is_hudi: bool = "Provider: hudi" in information
is_iceberg: bool = "Provider: iceberg" in information
relation: BaseRelation = self.Relation.create(
schema=relation.schema,
identifier=relation.identifier,
type=rel_type,
information=information,
is_delta=is_delta,
is_iceberg=is_iceberg,
is_hudi=is_hudi,
)
return relation

return _schema, name, information

def _build_spark_relation_list(
self,
row_list: "agate.Table",
relation_info_func: Callable[["agate.Row"], RelationInfo],
) -> List[BaseRelation]:
"""Aggregate relations with format metadata included."""
relations = []
for row in row_list:
_schema, name, information = relation_info_func(row)

rel_type: RelationType = (
RelationType.View if "Type: VIEW" in information else RelationType.Table
)
is_delta: bool = "Provider: delta" in information
is_hudi: bool = "Provider: hudi" in information
is_iceberg: bool = "Provider: iceberg" in information

relation: BaseRelation = self.Relation.create(
schema=_schema,
identifier=name,
type=rel_type,
information=information,
is_delta=is_delta,
is_iceberg=is_iceberg,
is_hudi=is_hudi,
)
relations.append(relation)

return relations

def list_relations_without_caching(self, schema_relation: BaseRelation) -> List[BaseRelation]:
"""Distinct Spark compute engines may not support the same SQL featureset. Thus, we must
try different methods to fetch relation information."""

kwargs = {"schema_relation": schema_relation}

try:
# Default compute engine behavior: show tables extended
show_table_extended_rows = self.execute_macro(LIST_RELATIONS_MACRO_NAME, kwargs=kwargs)
return self._build_spark_relation_list(
row_list=show_table_extended_rows,
relation_info_func=self._get_relation_information,
)
except DbtRuntimeError as e:
errmsg = getattr(e, "msg", "")
if f"Database '{schema_relation}' not found" in errmsg:
return []
# Iceberg compute engine behavior: show table
elif "SHOW TABLE EXTENDED is not supported for v2 tables" in errmsg:
# this happens with spark-iceberg with v2 iceberg tables
# https://issues.apache.org/jira/browse/SPARK-33393
try:
# Iceberg behavior: 3-row result of relations obtained
show_table_rows = self.execute_macro(
LIST_RELATIONS_SHOW_TABLES_MACRO_NAME, kwargs=kwargs
)
return self._build_spark_relation_list(
row_list=show_table_rows,
relation_info_func=self._get_relation_information_using_describe,
)
except DbtRuntimeError as e:
description = "Error while retrieving information about"
logger.debug(f"{description} {schema_relation}: {e.msg}")
return []
else:
logger.debug(
f"Error while retrieving information about {schema_relation}: {errmsg}"
)
return []

def get_relation(self, database: str, schema: str, identifier: str) -> Optional[BaseRelation]:
if not self.Relation.get_default_include_policy().database:
database = None # type: ignore

return super().get_relation(database, schema, identifier)
relation = super().get_relation(database, schema, identifier)
self.set_relation_information(relation) if relation else None

def parse_describe_extended(
self, relation: BaseRelation, raw_rows: AttrDict
Expand Down
Loading