From 45d1a43b3951831c1e354e218c4627e7ad0baec0 Mon Sep 17 00:00:00 2001 From: sagar-salvi-apptware Date: Wed, 2 Oct 2024 15:43:31 +0530 Subject: [PATCH] fix: PR Comments --- .../ingestion/source/bigquery_v2/lineage.py | 9 ++++-- .../unit/bigquery/test_bigquery_lineage.py | 31 +++++++++++++++++++ 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py index 7076de855beb2e..3af4cc114c7472 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py @@ -255,6 +255,7 @@ def __init__( format_queries=True, ) self.report.sql_aggregator = self.aggregator.report + self.gcs_uris_regex = re.compile(r"uris=\[([^\]]+)\]") def get_time_window(self) -> Tuple[datetime, datetime]: if self.redundant_run_skip_handler: @@ -938,8 +939,9 @@ def gen_lineage_workunits_for_external_table( return # Expect URIs in `uris=[""]` format - uris_match = re.search(r"uris=\[([^\]]+)\]", ddl) + uris_match = self.gcs_uris_regex.search(ddl) if not uris_match: + logger.warning(f"Unable to parse GCS URI from the provided DDL {ddl}.") return uris_str = uris_match.group(1) @@ -1009,7 +1011,6 @@ def get_lineage_for_external_table( dataset_urn ) for gcs_dataset_urn in gcs_urns: - assert graph schema_metadata_for_gcs: Optional[ SchemaMetadataClass ] = graph.get_schema_metadata(gcs_dataset_urn) @@ -1021,6 +1022,10 @@ def get_lineage_for_external_table( schema_metadata_for_gcs, ) if not fine_grained_lineage: + logger.warning( + f"Failed to retrieve fine-grained lineage for dataset {dataset_urn} and GCS {gcs_dataset_urn}. " + f"Check schema metadata: {schema_metadata} and GCS metadata: {schema_metadata_for_gcs}." + ) continue fine_grained_lineages.extend(fine_grained_lineage) diff --git a/metadata-ingestion/tests/unit/bigquery/test_bigquery_lineage.py b/metadata-ingestion/tests/unit/bigquery/test_bigquery_lineage.py index 8cebc5f5365e75..6bdd67a9c4cb85 100644 --- a/metadata-ingestion/tests/unit/bigquery/test_bigquery_lineage.py +++ b/metadata-ingestion/tests/unit/bigquery/test_bigquery_lineage.py @@ -210,12 +210,33 @@ def fake_schema_metadata(entity_urn: str) -> models.SchemaMetadataClass: graph=pipeline_context.graph, ) + expected_schema_field_urns = [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:gcs,bigquery_data/customer1,PROD),age)", + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:gcs,bigquery_data/customer1,PROD),firstname)", + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:gcs,bigquery_data/customer1,PROD),lastname)", + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:gcs,bigquery_data/customer2,PROD),age)", + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:gcs,bigquery_data/customer2,PROD),firstname)", + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:gcs,bigquery_data/customer2,PROD),lastname)", + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:gcs,bigquery_data/customer3/my_table,PROD),age)", + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:gcs,bigquery_data/customer3/my_table,PROD),firstname)", + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:gcs,bigquery_data/customer3/my_table,PROD),lastname)", + ] assert upstream_lineage assert len(upstream_lineage.upstreams) == 3 assert ( upstream_lineage.fineGrainedLineages and len(upstream_lineage.fineGrainedLineages) == 9 ) + # Extracting column URNs from upstream_lineage.upstreams + actual_schema_field_urns = [ + fine_grained_lineage.upstreams[0] + if fine_grained_lineage.upstreams is not None + else [] + for fine_grained_lineage in upstream_lineage.fineGrainedLineages + ] + assert all( + urn in expected_schema_field_urns for urn in actual_schema_field_urns + ), "Some expected column URNs are missing from fine grained lineage." def test_lineage_for_external_bq_table_no_column_lineage(mock_datahub_graph): @@ -259,8 +280,18 @@ def fake_schema_metadata(entity_urn: str) -> Optional[models.SchemaMetadataClass graph=pipeline_context.graph, ) + expected_dataset_urns = [ + "urn:li:dataset:(urn:li:dataPlatform:gcs,bigquery_data/customer1,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:gcs,bigquery_data/customer2,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:gcs,bigquery_data/customer3/my_table,PROD)", + ] assert upstream_lineage assert len(upstream_lineage.upstreams) == 3 + # Extracting dataset URNs from upstream_lineage.upstreams + actual_dataset_urns = [upstream.dataset for upstream in upstream_lineage.upstreams] + assert all( + urn in actual_dataset_urns for urn in expected_dataset_urns + ), "Some expected dataset URNs are missing from upstream lineage." assert upstream_lineage.fineGrainedLineages is None