From d700bc340ee35b82bad9f2dadfa95dcfebf455a4 Mon Sep 17 00:00:00 2001 From: sagar-salvi-apptware Date: Fri, 27 Dec 2024 18:43:48 +0530 Subject: [PATCH] fix: pr comment --- .../ingestion/source/bigquery_v2/bigquery.py | 14 +++----------- .../source/bigquery_v2/bigquery_schema_gen.py | 5 ++--- 2 files changed, 5 insertions(+), 14 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py index 696906dd61f50..8ab89ee3b9739 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py @@ -2,7 +2,7 @@ import functools import logging import os -from typing import Iterable, List, Optional, Set +from typing import Iterable, List, Optional from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.decorators import ( @@ -274,11 +274,6 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: self.report.set_ingestion_stage("*", QUERIES_EXTRACTION) - discovered_tables: Set[str] = set() - discovered_tables.update(self.bq_schema_extractor.view_snapshot_refs) - if self.config.include_table_lineage: - discovered_tables.update(self.bq_schema_extractor.table_refs) - with BigQueryQueriesExtractor( connection=self.config.get_bigquery_client(), schema_api=self.bq_schema_extractor.schema_api, @@ -295,7 +290,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: filters=self.filters, identifiers=self.identifiers, schema_resolver=self.sql_parser_schema_resolver, - discovered_tables=discovered_tables, + discovered_tables=self.bq_schema_extractor.table_refs, ) as queries_extractor: self.report.queries_extractor = queries_extractor.report yield from queries_extractor.get_workunits_internal() @@ -303,10 +298,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: else: if self.config.include_usage_statistics: yield from self.usage_extractor.get_usage_workunits( - [p.id for p in projects], - self.bq_schema_extractor.table_refs.union( - self.bq_schema_extractor.view_snapshot_refs - ), + [p.id for p in projects], self.bq_schema_extractor.table_refs ) if self.config.include_table_lineage: diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py index b425d77af120d..05acf3d46993e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py @@ -195,7 +195,6 @@ def __init__( # Global store of table identifiers for lineage filtering self.table_refs: Set[str] = set() - self.view_snapshot_refs: Set[str] = set() # Maps project -> view_ref, so we can find all views in a project self.view_refs_by_project: Dict[str, Set[str]] = defaultdict(set) @@ -655,7 +654,7 @@ def _process_view( return table_ref = str(BigQueryTableRef(table_identifier).get_sanitized_table_ref()) - self.view_snapshot_refs.add(table_ref) + self.table_refs.add(table_ref) if self.config.lineage_parse_view_ddl and view.view_definition: self.view_refs_by_project[project_id].add(table_ref) self.view_definitions[table_ref] = view.view_definition @@ -700,7 +699,7 @@ def _process_snapshot( ) table_ref = str(BigQueryTableRef(table_identifier).get_sanitized_table_ref()) - self.view_snapshot_refs.add(table_ref) + self.table_refs.add(table_ref) if snapshot.base_table_identifier: self.snapshot_refs_by_project[project_id].add(table_ref) self.snapshots_by_ref[table_ref] = snapshot