Skip to content

Commit

Permalink
feat(ingest/bigquery): Support for View Labels (datahub-project#10648)
Browse files Browse the repository at this point in the history
Co-authored-by: Ethan Cartwright <[email protected]>
Co-authored-by: Andrew Sikowitz <[email protected]>
  • Loading branch information
3 people authored Jun 17, 2024
1 parent 933d249 commit c58be15
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1064,11 +1064,19 @@ def gen_view_dataset_workunits(
project_id: str,
dataset_name: str,
) -> Iterable[MetadataWorkUnit]:
tags_to_add = None
if table.labels and self.config.capture_view_label_as_tag:
tags_to_add = [
make_tag_urn(f"{k}:{v}")
for k, v in table.labels.items()
if is_tag_allowed(self.config.capture_view_label_as_tag, k)
]
yield from self.gen_dataset_workunits(
table=table,
columns=columns,
project_id=project_id,
dataset_name=dataset_name,
tags_to_add=tags_to_add,
sub_types=[DatasetSubTypes.VIEW],
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ class BigQueryV2Config(
description="Capture BigQuery table labels as DataHub tag",
)

capture_view_label_as_tag: Union[bool, AllowDenyPattern] = Field(
default=False,
description="Capture BigQuery view labels as DataHub tag",
)

capture_dataset_label_as_tag: Union[bool, AllowDenyPattern] = Field(
default=False,
description="Capture BigQuery dataset labels as DataHub tag",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import Optional
import re
from typing import Dict, Optional


def unquote_and_decode_unicode_escape_seq(
Expand All @@ -17,3 +18,10 @@ def unquote_and_decode_unicode_escape_seq(
cleaned_string = string.encode().decode("unicode-escape")

return cleaned_string


def parse_labels(labels_str: str) -> Dict[str, str]:
pattern = r'STRUCT\("([^"]+)", "([^"]+)"\)'

# Map of BigQuery label keys to label values
return dict(re.findall(pattern, labels_str))
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
)

from datahub.ingestion.source.bigquery_v2.bigquery_audit import BigqueryTableIdentifier
from datahub.ingestion.source.bigquery_v2.bigquery_helper import parse_labels
from datahub.ingestion.source.bigquery_v2.bigquery_report import (
BigQuerySchemaApiPerfReport,
BigQueryV2Report,
Expand Down Expand Up @@ -54,9 +55,7 @@ def from_time_partitioning(
cls, time_partitioning: TimePartitioning
) -> "PartitionInfo":
return cls(
field=time_partitioning.field
if time_partitioning.field
else "_PARTITIONTIME",
field=time_partitioning.field or "_PARTITIONTIME",
type=time_partitioning.type_,
expiration_ms=time_partitioning.expiration_ms,
require_partition_filter=time_partitioning.require_partition_filter,
Expand Down Expand Up @@ -107,6 +106,7 @@ class BigqueryTable(BaseTable):
class BigqueryView(BaseView):
columns: List[BigqueryColumn] = field(default_factory=list)
materialized: bool = False
labels: Optional[Dict[str, str]] = None


@dataclass
Expand Down Expand Up @@ -245,9 +245,11 @@ def get_tables_for_dataset(
BigqueryQuery.tables_for_dataset.format(
project_id=project_id,
dataset_name=dataset_name,
table_filter=f" and t.table_name in ({filter_clause})"
if filter_clause
else "",
table_filter=(
f" and t.table_name in ({filter_clause})"
if filter_clause
else ""
),
),
)
else:
Expand All @@ -257,9 +259,11 @@ def get_tables_for_dataset(
BigqueryQuery.tables_for_dataset_without_partition_data.format(
project_id=project_id,
dataset_name=dataset_name,
table_filter=f" and t.table_name in ({filter_clause})"
if filter_clause
else "",
table_filter=(
f" and t.table_name in ({filter_clause})"
if filter_clause
else ""
),
),
)

Expand Down Expand Up @@ -297,20 +301,22 @@ def _make_bigquery_table(
return BigqueryTable(
name=table.table_name,
created=table.created,
last_altered=datetime.fromtimestamp(
table.get("last_altered") / 1000, tz=timezone.utc
)
if table.get("last_altered") is not None
else None,
last_altered=(
datetime.fromtimestamp(
table.get("last_altered") / 1000, tz=timezone.utc
)
if table.get("last_altered") is not None
else None
),
size_in_bytes=table.get("bytes"),
rows_count=table.get("row_count"),
comment=table.comment,
ddl=table.ddl,
expires=expiration,
labels=table_basic.labels if table_basic else None,
partition_info=PartitionInfo.from_table_info(table_basic)
if table_basic
else None,
partition_info=(
PartitionInfo.from_table_info(table_basic) if table_basic else None
),
clustering_fields=table_basic.clustering_fields if table_basic else None,
max_partition_id=table.get("max_partition_id"),
max_shard_id=shard,
Expand Down Expand Up @@ -361,16 +367,17 @@ def _make_bigquery_view(view: bigquery.Row) -> BigqueryView:
return BigqueryView(
name=view.table_name,
created=view.created,
last_altered=datetime.fromtimestamp(
view.get("last_altered") / 1000, tz=timezone.utc
)
if view.get("last_altered") is not None
else None,
last_altered=(
datetime.fromtimestamp(view.get("last_altered") / 1000, tz=timezone.utc)
if view.get("last_altered") is not None
else None
),
comment=view.comment,
view_definition=view.view_definition,
materialized=view.table_type == BigqueryTableType.MATERIALIZED_VIEW,
size_in_bytes=view.get("size_bytes"),
rows_count=view.get("row_count"),
labels=parse_labels(view.labels) if hasattr(view, "labels") else None,
)

def get_policy_tags_for_column(
Expand Down Expand Up @@ -441,14 +448,16 @@ def get_columns_for_dataset(
with self.report.get_columns_for_dataset:
try:
cur = self.get_query_result(
BigqueryQuery.columns_for_dataset.format(
project_id=project_id, dataset_name=dataset_name
)
if not run_optimized_column_query
else BigqueryQuery.optimized_columns_for_dataset.format(
project_id=project_id,
dataset_name=dataset_name,
column_limit=column_limit,
(
BigqueryQuery.columns_for_dataset.format(
project_id=project_id, dataset_name=dataset_name
)
if not run_optimized_column_query
else BigqueryQuery.optimized_columns_for_dataset.format(
project_id=project_id,
dataset_name=dataset_name,
column_limit=column_limit,
)
),
)
except Exception as e:
Expand Down Expand Up @@ -578,11 +587,13 @@ def _make_bigquery_table_snapshot(snapshot: bigquery.Row) -> BigqueryTableSnapsh
return BigqueryTableSnapshot(
name=snapshot.table_name,
created=snapshot.created,
last_altered=datetime.fromtimestamp(
snapshot.get("last_altered") / 1000, tz=timezone.utc
)
if snapshot.get("last_altered") is not None
else None,
last_altered=(
datetime.fromtimestamp(
snapshot.get("last_altered") / 1000, tz=timezone.utc
)
if snapshot.get("last_altered") is not None
else None
),
comment=snapshot.comment,
ddl=snapshot.ddl,
snapshot_time=snapshot.snapshot_time,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,17 +117,21 @@ class BigqueryQuery:
t.table_type as table_type,
t.creation_time as created,
ts.last_modified_time as last_altered,
tos.OPTION_VALUE as comment,
tos_description.OPTION_VALUE as comment,
tos_labels.OPTION_VALUE as labels,
t.is_insertable_into,
t.ddl as view_definition,
ts.row_count,
ts.size_bytes
FROM
`{{project_id}}`.`{{dataset_name}}`.INFORMATION_SCHEMA.TABLES t
join `{{project_id}}`.`{{dataset_name}}`.__TABLES__ as ts on ts.table_id = t.TABLE_NAME
left join `{{project_id}}`.`{{dataset_name}}`.INFORMATION_SCHEMA.TABLE_OPTIONS as tos on t.table_schema = tos.table_schema
and t.TABLE_NAME = tos.TABLE_NAME
and tos.OPTION_NAME = "description"
left join `{{project_id}}`.`{{dataset_name}}`.INFORMATION_SCHEMA.TABLE_OPTIONS as tos_description on t.table_schema = tos_description.table_schema
and t.TABLE_NAME = tos_description.TABLE_NAME
and tos_description.OPTION_NAME = "description"
left join `{{project_id}}`.`{{dataset_name}}`.INFORMATION_SCHEMA.TABLE_OPTIONS as tos_labels on t.table_schema = tos_labels.table_schema
and t.TABLE_NAME = tos_labels.TABLE_NAME
and tos_labels.OPTION_NAME = "labels"
WHERE
table_type in ('{BigqueryTableType.VIEW}', '{BigqueryTableType.MATERIALIZED_VIEW}')
order by
Expand All @@ -142,14 +146,18 @@ class BigqueryQuery:
t.table_name as table_name,
t.table_type as table_type,
t.creation_time as created,
tos.OPTION_VALUE as comment,
tos_description.OPTION_VALUE as comment,
tos_labels.OPTION_VALUE as labels,
t.is_insertable_into,
t.ddl as view_definition
FROM
`{{project_id}}`.`{{dataset_name}}`.INFORMATION_SCHEMA.TABLES t
left join `{{project_id}}`.`{{dataset_name}}`.INFORMATION_SCHEMA.TABLE_OPTIONS as tos on t.table_schema = tos.table_schema
and t.TABLE_NAME = tos.TABLE_NAME
and tos.OPTION_NAME = "description"
left join `{{project_id}}`.`{{dataset_name}}`.INFORMATION_SCHEMA.TABLE_OPTIONS as tos_description on t.table_schema = tos_description.table_schema
and t.TABLE_NAME = tos_description.TABLE_NAME
and tos_description.OPTION_NAME = "description"
left join `{{project_id}}`.`{{dataset_name}}`.INFORMATION_SCHEMA.TABLE_OPTIONS as tos_labels on t.table_schema = tos_labels.table_schema
and t.TABLE_NAME = tos_labels.TABLE_NAME
and tos_labels.OPTION_NAME = "labels"
WHERE
table_type in ('{BigqueryTableType.VIEW}', '{BigqueryTableType.MATERIALIZED_VIEW}')
order by
Expand Down
2 changes: 2 additions & 0 deletions metadata-ingestion/tests/unit/test_bigquery_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,7 @@ def bigquery_view_1() -> BigqueryView:
comment="comment1",
view_definition="CREATE VIEW 1",
materialized=False,
labels=None,
)


Expand All @@ -833,6 +834,7 @@ def bigquery_view_2() -> BigqueryView:
comment="comment2",
view_definition="CREATE VIEW 2",
materialized=True,
labels=None,
)


Expand Down

0 comments on commit c58be15

Please sign in to comment.