Skip to content

Commit

Permalink
feat(ingest/dbt): add only_include_if_in_catalog flag for dbt core (d…
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Sep 13, 2024
1 parent cb7ac88 commit b35e3db
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,7 @@ def _parse_into_dbt_node(self, node: Dict) -> DBTNode:
upstream_nodes=upstream_nodes,
materialization=materialization,
catalog_type=catalog_type,
missing_from_catalog=False, # This doesn't really apply to dbt Cloud.
meta=meta,
query_tag={}, # TODO: Get this from the dbt API.
tags=tags,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ class DBTSourceReport(StaleEntityRemovalSourceReport):
default_factory=LossyList
)

in_manifest_but_missing_catalog: LossyList[str] = field(default_factory=LossyList)
nodes_filtered: LossyList[str] = field(default_factory=LossyList)


class EmitDirective(ConfigEnum):
Expand Down Expand Up @@ -528,6 +528,7 @@ class DBTNode:
materialization: Optional[str] # table, view, ephemeral, incremental, snapshot
# see https://docs.getdbt.com/reference/artifacts/manifest-json
catalog_type: Optional[str]
missing_from_catalog: bool # indicates if the node was missing from the catalog.json

owner: Optional[str]

Expand Down Expand Up @@ -853,6 +854,9 @@ def get_column_type(
TypeClass = resolve_postgres_modified_type(column_type)
elif dbt_adapter == "vertica":
TypeClass = resolve_vertica_modified_type(column_type)
elif dbt_adapter == "snowflake":
# Snowflake types are uppercase, so we check that.
TypeClass = _field_type_mapping.get(column_type.upper())

# if still not found, report the warning
if TypeClass is None:
Expand Down Expand Up @@ -1034,6 +1038,7 @@ def _filter_nodes(self, all_nodes: List[DBTNode]) -> List[DBTNode]:
key = node.dbt_name

if not self.config.node_name_pattern.allowed(key):
self.report.nodes_filtered.append(key)
continue

nodes.append(node)
Expand Down
48 changes: 38 additions & 10 deletions metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ class DBTCoreConfig(DBTCommonConfig):
"See https://docs.getdbt.com/reference/artifacts/run-results-json.",
)

only_include_if_in_catalog: bool = Field(
default=False,
description="[experimental] If true, only include nodes that are also present in the catalog file. "
"This is useful if you only want to include models that have been built by the associated run.",
)

# Because we now also collect model performance metadata, the "test_results" field was renamed to "run_results".
_convert_test_results_path = pydantic_renamed_field(
"test_results_path", "run_results_paths", transform=lambda x: [x] if x else []
Expand Down Expand Up @@ -156,6 +162,7 @@ def extract_dbt_entities(
manifest_adapter: str,
use_identifiers: bool,
tag_prefix: str,
only_include_if_in_catalog: bool,
report: DBTSourceReport,
) -> List[DBTNode]:
sources_by_id = {x["unique_id"]: x for x in sources_results}
Expand Down Expand Up @@ -194,12 +201,22 @@ def extract_dbt_entities(

# It's a source
catalog_node = all_catalog_entities.get(key)
missing_from_catalog = catalog_node is None
catalog_type = None

if catalog_node is None:
if materialization not in {"test", "ephemeral"}:
if materialization in {"test", "ephemeral"}:
# Test and ephemeral nodes will never show up in the catalog.
report.in_manifest_but_missing_catalog.append(key)
missing_from_catalog = False
else:
if not only_include_if_in_catalog:
report.warning(
title="Node missing from catalog",
message="Found a node in the manifest file but not in the catalog. "
"This usually means the catalog file was not generated by `dbt docs generate` and so is incomplete. "
"Some metadata, such as column types and descriptions, will be impacted.",
context=key,
)
else:
catalog_type = all_catalog_entities[key]["metadata"]["type"]

Expand Down Expand Up @@ -264,6 +281,7 @@ def extract_dbt_entities(
upstream_nodes=upstream_nodes,
materialization=materialization,
catalog_type=catalog_type,
missing_from_catalog=missing_from_catalog,
meta=meta,
query_tag=query_tag_props,
tags=tags,
Expand Down Expand Up @@ -291,14 +309,6 @@ def extract_dbt_entities(

dbt_entities.append(dbtNode)

if report.in_manifest_but_missing_catalog:
# We still want this to show up as a warning, but don't want to spam the warnings section
# if there's a lot of them.
report.warning(
"in_manifest_but_missing_catalog",
f"Found {len(report.in_manifest_but_missing_catalog)} nodes in manifest but not in catalog. See in_manifest_but_missing_catalog for details.",
)

return dbt_entities


Expand Down Expand Up @@ -535,6 +545,7 @@ def loadManifestAndCatalog(
manifest_adapter,
self.config.use_identifiers,
self.config.tag_prefix,
self.config.only_include_if_in_catalog,
self.report,
)

Expand Down Expand Up @@ -588,6 +599,23 @@ def load_nodes(self) -> Tuple[List[DBTNode], Dict[str, Optional[str]]]:

return all_nodes, additional_custom_props

def _filter_nodes(self, all_nodes: List[DBTNode]) -> List[DBTNode]:
nodes = super()._filter_nodes(all_nodes)

if not self.config.only_include_if_in_catalog:
return nodes

filtered_nodes = []
for node in nodes:
if node.missing_from_catalog:
# TODO: We need to do some additional testing of this flag to validate that it doesn't
# drop important things entirely (e.g. sources).
self.report.nodes_filtered.append(node.dbt_name)
else:
filtered_nodes.append(node)

return filtered_nodes

def get_external_url(self, node: DBTNode) -> Optional[str]:
if self.config.git_info and node.dbt_file_path:
return self.config.git_info.get_url_for_file_path(node.dbt_file_path)
Expand Down

0 comments on commit b35e3db

Please sign in to comment.