From 2a510095a4fdad10254c0bc00108d38c539aad97 Mon Sep 17 00:00:00 2001 From: sagar-salvi-apptware Date: Mon, 14 Oct 2024 21:30:34 +0530 Subject: [PATCH] fix: PR comments --- .../ingestion/source/bigquery_v2/bigquery_report.py | 1 + .../source/bigquery_v2/bigquery_schema_gen.py | 7 ++++--- .../datahub/ingestion/source/bigquery_v2/lineage.py | 10 +++++++++- 3 files changed, 14 insertions(+), 4 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py index e3c3258385caeb..c3a6afb5f8ef64 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py @@ -185,6 +185,7 @@ class BigQueryV2Report( usage_start_time: Optional[datetime] = None usage_end_time: Optional[datetime] = None stateful_usage_ingestion_enabled: bool = False + num_skipped_external_table_lineage: int = 0 queries_extractor: Optional[BigQueryQueriesExtractorReport] = None diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py index feee81182811c2..71b00135c2fbb5 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py @@ -199,6 +199,9 @@ def __init__( self.snapshots_by_ref: FileBackedDict[BigqueryTableSnapshot] = FileBackedDict() # Add External BQ table self.external_tables: Dict[str, BigqueryTable] = defaultdict() + self.bq_external_table_pattern = ( + r".*create\s+external\s+table\s+`?(?:project_id\.)?.*`?" + ) bq_project = ( self.config.project_on_behalf @@ -893,11 +896,9 @@ def gen_dataset_workunits( # Added for bigquery to gcs lineage extraction if ( isinstance(table, BigqueryTable) - and table.table_type is not None and table.table_type == "EXTERNAL" and table.ddl is not None - and f"CREATE EXTERNAL TABLE `{project_id}.{dataset_name}.{table.name}`" - in table.ddl + and re.search(self.bq_external_table_pattern, table.ddl, re.IGNORECASE) ): self.external_tables[dataset_urn] = table diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py index 3af4cc114c7472..b542992a7924a0 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py @@ -941,11 +941,19 @@ def gen_lineage_workunits_for_external_table( # Expect URIs in `uris=[""]` format uris_match = self.gcs_uris_regex.search(ddl) if not uris_match: + self.report.num_skipped_external_table_lineage += 1 logger.warning(f"Unable to parse GCS URI from the provided DDL {ddl}.") return uris_str = uris_match.group(1) - source_uris = json.loads(f"[{uris_str}]") + try: + source_uris = json.loads(f"[{uris_str}]") + except json.JSONDecodeError as e: + self.report.num_skipped_external_table_lineage += 1 + logger.warning( + f"Json load failed on loading source uri with error: {e}. The field value was: {uris_str}" + ) + return lineage_info = self.get_lineage_for_external_table( dataset_urn=dataset_urn,