From 7e69247a7f0138af4c1dfc89daa709326bf80566 Mon Sep 17 00:00:00 2001 From: Tamas Nemeth Date: Fri, 26 Apr 2024 17:35:03 +0200 Subject: [PATCH] fix(ingest/profiling): Filter tables early based on profile pattern filter (#10378) --- .../source/sql/sql_generic_profiler.py | 16 ++++- .../datahub/ingestion/source/sql/sql_utils.py | 25 +++++++ .../tests/unit/test_sql_utils.py | 66 ++++++++++++++++++- 3 files changed, 105 insertions(+), 2 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py index 365539df7a83be..968989e2548d13 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py @@ -17,6 +17,7 @@ from datahub.ingestion.source.sql.sql_common import SQLSourceReport from datahub.ingestion.source.sql.sql_config import SQLCommonConfig from datahub.ingestion.source.sql.sql_generic import BaseTable, BaseView +from datahub.ingestion.source.sql.sql_utils import check_table_with_profile_pattern from datahub.ingestion.source.state.profiling_state_handler import ProfilingHandler from datahub.metadata.com.linkedin.pegasus2avro.dataset import DatasetProfile from datahub.metadata.com.linkedin.pegasus2avro.timeseries import PartitionType @@ -36,6 +37,10 @@ class DetailedProfilerReportMixin: default_factory=int_top_k_dict ) + profiling_skipped_table_profile_pattern: TopKDict[str, int] = field( + default_factory=int_top_k_dict + ) + profiling_skipped_other: TopKDict[str, int] = field(default_factory=int_top_k_dict) num_tables_not_eligible_profiling: Dict[str, int] = field( @@ -272,8 +277,17 @@ def is_dataset_eligible_for_profiling( threshold_time = datetime.now(timezone.utc) - timedelta( self.config.profiling.profile_if_updated_since_days ) - schema_name = dataset_name.rsplit(".", 1)[0] + + if not check_table_with_profile_pattern( + self.config.profile_pattern, dataset_name + ): + self.report.profiling_skipped_table_profile_pattern[schema_name] += 1 + logger.debug( + f"Table {dataset_name} is not allowed for profiling due to profile pattern" + ) + return False + if (threshold_time is not None) and ( last_altered is not None and last_altered < threshold_time ): diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_utils.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_utils.py index 70e8ab33f18591..16655d17482872 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_utils.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_utils.py @@ -1,3 +1,4 @@ +import re from typing import Dict, Iterable, List, Optional from datahub.configuration.common import AllowDenyPattern @@ -235,3 +236,27 @@ def schema_requires_v2(canonical_schema: List[SchemaField]) -> bool: if ARRAY_TOKEN in field_name or UNION_TOKEN in field_name: return True return False + + +CHECK_TABLE_TABLE_PART_SEPARATOR_PATTERN = re.compile("\\\\?\\.") + + +def check_table_with_profile_pattern( + profile_pattern: AllowDenyPattern, table_name: str +) -> bool: + parts = len(table_name.split(".")) + allow_list: List[str] = [] + + for pattern in profile_pattern.allow: + replaced_pattern = pattern.replace(".*", "").replace(".+", "") + splits = re.split(CHECK_TABLE_TABLE_PART_SEPARATOR_PATTERN, replaced_pattern) + if parts + 1 == len(splits): + table_pattern = pattern[: pattern.find(splits[-2]) + len(splits[-2])] + allow_list.append(table_pattern + "$") + else: + allow_list.append(pattern) + + table_allow_deny_pattern = AllowDenyPattern( + allow=allow_list, deny=profile_pattern.deny + ) + return table_allow_deny_pattern.allowed(table_name) diff --git a/metadata-ingestion/tests/unit/test_sql_utils.py b/metadata-ingestion/tests/unit/test_sql_utils.py index 23a08ffb6180b6..1b7dc6bcf23f1e 100644 --- a/metadata-ingestion/tests/unit/test_sql_utils.py +++ b/metadata-ingestion/tests/unit/test_sql_utils.py @@ -1,4 +1,10 @@ -from datahub.ingestion.source.sql.sql_utils import gen_schema_key +import pytest + +from datahub.configuration.common import AllowDenyPattern +from datahub.ingestion.source.sql.sql_utils import ( + check_table_with_profile_pattern, + gen_schema_key, +) def test_guid_generators(): @@ -13,3 +19,61 @@ def test_guid_generators(): guid = schema_key.guid() assert guid == expected_guid + + +test_profile_pattern_matching_on_table_allow_list_test_data = [ + ("db.table.column", "db.table", True), + ("db.table.column2", "db.table", True), + ("db.table..*", "db.table", True), + ("db.*", "db.table", True), + ("db.*", "db.table", True), + ("db.*", "db.schema.table", True), + ("db.schema.*", "db.schema.table", True), + ("db\\.schema\\..*", "db.schema.table", True), + ("db\\.schema\\.table\\.column_prefix.*", "db.schema.table", True), + ("db\\.schema\\.table\\.column", "db.schema.table", True), + ("db\\.schema\\.table2\\.column", "db.schema.table", False), + ("db2\\.schema.*", "db.schema.table", False), + ("db2\\.schema.*", "db.schema.table", False), + ("db\\.schema\\.table\\..*", "db.table2", False), +] + + +@pytest.mark.parametrize( + "allow_pattern, table_name, result", + test_profile_pattern_matching_on_table_allow_list_test_data, +) +def test_profile_pattern_matching_on_table_allow_list( + allow_pattern: str, table_name: str, result: bool +) -> None: + pattern = AllowDenyPattern(allow=[allow_pattern]) + assert check_table_with_profile_pattern(pattern, table_name) == result + + +test_profile_pattern_matching_on_table_deny_list_test_data = [ + ("db.table.column", "db.table", True), + ("db.table.column2", "db.table", True), + ("db.table..*", "db.table", True), + ("db.*", "db.table", False), + ("db.*", "db.table", False), + ("db.*", "db.schema.table", False), + ("db.schema.*", "db.schema.table", False), + ("db\\.schema\\..*", "db.schema.table", False), + ("db\\.schema\\.table\\.column_prefix.*", "db.schema.table", True), + ("db\\.schema\\.table\\.column", "db.schema.table", True), + ("db\\.schema\\.table2\\.column", "db.schema.table", True), + ("db2\\.schema.*", "db.schema.table", True), + ("db2\\.schema.*", "db.schema.table", True), + ("db\\.schema\\.table\\..*", "db.table2", True), +] + + +@pytest.mark.parametrize( + "deny_pattern, table_name, result", + test_profile_pattern_matching_on_table_deny_list_test_data, +) +def test_profile_pattern_matching_on_table_deny_list( + deny_pattern: str, table_name: str, result: bool +) -> None: + pattern = AllowDenyPattern(deny=[deny_pattern]) + assert check_table_with_profile_pattern(pattern, table_name) == result