Skip to content

Commit

Permalink
fix: pr comment
Browse files Browse the repository at this point in the history
  • Loading branch information
sagar-salvi-apptware committed Dec 27, 2024
1 parent c421a73 commit d700bc3
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import functools
import logging
import os
from typing import Iterable, List, Optional, Set
from typing import Iterable, List, Optional

from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
Expand Down Expand Up @@ -274,11 +274,6 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:

self.report.set_ingestion_stage("*", QUERIES_EXTRACTION)

discovered_tables: Set[str] = set()
discovered_tables.update(self.bq_schema_extractor.view_snapshot_refs)
if self.config.include_table_lineage:
discovered_tables.update(self.bq_schema_extractor.table_refs)

with BigQueryQueriesExtractor(
connection=self.config.get_bigquery_client(),
schema_api=self.bq_schema_extractor.schema_api,
Expand All @@ -295,18 +290,15 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
filters=self.filters,
identifiers=self.identifiers,
schema_resolver=self.sql_parser_schema_resolver,
discovered_tables=discovered_tables,
discovered_tables=self.bq_schema_extractor.table_refs,
) as queries_extractor:
self.report.queries_extractor = queries_extractor.report
yield from queries_extractor.get_workunits_internal()

else:
if self.config.include_usage_statistics:
yield from self.usage_extractor.get_usage_workunits(
[p.id for p in projects],
self.bq_schema_extractor.table_refs.union(
self.bq_schema_extractor.view_snapshot_refs
),
[p.id for p in projects], self.bq_schema_extractor.table_refs
)

if self.config.include_table_lineage:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,6 @@ def __init__(

# Global store of table identifiers for lineage filtering
self.table_refs: Set[str] = set()
self.view_snapshot_refs: Set[str] = set()

# Maps project -> view_ref, so we can find all views in a project
self.view_refs_by_project: Dict[str, Set[str]] = defaultdict(set)
Expand Down Expand Up @@ -655,7 +654,7 @@ def _process_view(
return

table_ref = str(BigQueryTableRef(table_identifier).get_sanitized_table_ref())
self.view_snapshot_refs.add(table_ref)
self.table_refs.add(table_ref)
if self.config.lineage_parse_view_ddl and view.view_definition:
self.view_refs_by_project[project_id].add(table_ref)
self.view_definitions[table_ref] = view.view_definition

Check warning on line 660 in metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py#L656-L660

Added lines #L656 - L660 were not covered by tests
Expand Down Expand Up @@ -700,7 +699,7 @@ def _process_snapshot(
)

table_ref = str(BigQueryTableRef(table_identifier).get_sanitized_table_ref())
self.view_snapshot_refs.add(table_ref)
self.table_refs.add(table_ref)
if snapshot.base_table_identifier:
self.snapshot_refs_by_project[project_id].add(table_ref)
self.snapshots_by_ref[table_ref] = snapshot

Check warning on line 705 in metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py#L701-L705

Added lines #L701 - L705 were not covered by tests
Expand Down

0 comments on commit d700bc3

Please sign in to comment.