Skip to content

Commit

Permalink
fix: PR Comments
Browse files Browse the repository at this point in the history
  • Loading branch information
sagar-salvi-apptware committed Oct 21, 2024
1 parent 18819fe commit 54e5725
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ def __init__(
format_queries=True,
)
self.report.sql_aggregator = self.aggregator.report
self.gcs_uris_regex = re.compile(r"uris=\[([^\]]+)\]")

def get_time_window(self) -> Tuple[datetime, datetime]:
if self.redundant_run_skip_handler:
Expand Down Expand Up @@ -938,8 +939,9 @@ def gen_lineage_workunits_for_external_table(
return

# Expect URIs in `uris=[""]` format
uris_match = re.search(r"uris=\[([^\]]+)\]", ddl)
uris_match = self.gcs_uris_regex.search(ddl)
if not uris_match:
logger.warning(f"Unable to parse GCS URI from the provided DDL {ddl}.")
return

uris_str = uris_match.group(1)
Expand Down Expand Up @@ -1009,7 +1011,6 @@ def get_lineage_for_external_table(
dataset_urn
)
for gcs_dataset_urn in gcs_urns:
assert graph
schema_metadata_for_gcs: Optional[
SchemaMetadataClass
] = graph.get_schema_metadata(gcs_dataset_urn)
Expand All @@ -1021,6 +1022,10 @@ def get_lineage_for_external_table(
schema_metadata_for_gcs,
)
if not fine_grained_lineage:
logger.warning(
f"Failed to retrieve fine-grained lineage for dataset {dataset_urn} and GCS {gcs_dataset_urn}. "
f"Check schema metadata: {schema_metadata} and GCS metadata: {schema_metadata_for_gcs}."
)
continue

fine_grained_lineages.extend(fine_grained_lineage)
Expand Down
31 changes: 31 additions & 0 deletions metadata-ingestion/tests/unit/bigquery/test_bigquery_lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,12 +210,33 @@ def fake_schema_metadata(entity_urn: str) -> models.SchemaMetadataClass:
graph=pipeline_context.graph,
)

expected_schema_field_urns = [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:gcs,bigquery_data/customer1,PROD),age)",
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:gcs,bigquery_data/customer1,PROD),firstname)",
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:gcs,bigquery_data/customer1,PROD),lastname)",
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:gcs,bigquery_data/customer2,PROD),age)",
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:gcs,bigquery_data/customer2,PROD),firstname)",
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:gcs,bigquery_data/customer2,PROD),lastname)",
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:gcs,bigquery_data/customer3/my_table,PROD),age)",
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:gcs,bigquery_data/customer3/my_table,PROD),firstname)",
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:gcs,bigquery_data/customer3/my_table,PROD),lastname)",
]
assert upstream_lineage
assert len(upstream_lineage.upstreams) == 3
assert (
upstream_lineage.fineGrainedLineages
and len(upstream_lineage.fineGrainedLineages) == 9
)
# Extracting column URNs from upstream_lineage.upstreams
actual_schema_field_urns = [
fine_grained_lineage.upstreams[0]
if fine_grained_lineage.upstreams is not None
else []
for fine_grained_lineage in upstream_lineage.fineGrainedLineages
]
assert all(
urn in expected_schema_field_urns for urn in actual_schema_field_urns
), "Some expected column URNs are missing from fine grained lineage."


def test_lineage_for_external_bq_table_no_column_lineage(mock_datahub_graph):
Expand Down Expand Up @@ -259,8 +280,18 @@ def fake_schema_metadata(entity_urn: str) -> Optional[models.SchemaMetadataClass
graph=pipeline_context.graph,
)

expected_dataset_urns = [
"urn:li:dataset:(urn:li:dataPlatform:gcs,bigquery_data/customer1,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:gcs,bigquery_data/customer2,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:gcs,bigquery_data/customer3/my_table,PROD)",
]
assert upstream_lineage
assert len(upstream_lineage.upstreams) == 3
# Extracting dataset URNs from upstream_lineage.upstreams
actual_dataset_urns = [upstream.dataset for upstream in upstream_lineage.upstreams]
assert all(
urn in actual_dataset_urns for urn in expected_dataset_urns
), "Some expected dataset URNs are missing from upstream lineage."
assert upstream_lineage.fineGrainedLineages is None


Expand Down

0 comments on commit 54e5725

Please sign in to comment.