From b612545220d9329696eaa26d6b42439cdf01fb95 Mon Sep 17 00:00:00 2001 From: siddiquebagwan-gslab Date: Wed, 25 Oct 2023 15:26:06 +0530 Subject: [PATCH 1/9] feat(ingestion/redshift): support auto_incremental_lineage (#9010) --- docs/how/updating-datahub.md | 2 ++ metadata-ingestion/setup.py | 10 +++------- .../datahub/ingestion/source/redshift/config.py | 8 +++++++- .../datahub/ingestion/source/redshift/redshift.py | 15 +++++++++++++-- .../tests/unit/test_redshift_config.py | 6 ++++++ 5 files changed, 31 insertions(+), 10 deletions(-) create mode 100644 metadata-ingestion/tests/unit/test_redshift_config.py diff --git a/docs/how/updating-datahub.md b/docs/how/updating-datahub.md index 57193ea69f2bec..8813afee65be91 100644 --- a/docs/how/updating-datahub.md +++ b/docs/how/updating-datahub.md @@ -4,6 +4,8 @@ This file documents any backwards-incompatible changes in DataHub and assists pe ## Next +- #9010 - In Redshift source's config `incremental_lineage` is set default to off. + ### Breaking Changes - #8810 - Removed support for SQLAlchemy 1.3.x. Only SQLAlchemy 1.4.x is supported now. diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 417588a4336555..72b0e776a0da59 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -355,13 +355,9 @@ | {"psycopg2-binary", "pymysql>=1.0.2"}, "pulsar": {"requests"}, "redash": {"redash-toolbelt", "sql-metadata"} | sqllineage_lib, - "redshift": sql_common - | redshift_common - | usage_common - | sqlglot_lib - | {"redshift-connector"}, - "redshift-legacy": sql_common | redshift_common, - "redshift-usage-legacy": sql_common | usage_common | redshift_common, + "redshift": sql_common | redshift_common | usage_common | {"redshift-connector"} | sqlglot_lib, + "redshift-legacy": sql_common | redshift_common | sqlglot_lib, + "redshift-usage-legacy": sql_common | redshift_common | sqlglot_lib | usage_common, "s3": {*s3_base, *data_lake_profiling}, "gcs": {*s3_base, *data_lake_profiling}, "sagemaker": aws_common, diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/config.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/config.py index 2789b800940db2..79b044841e0541 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/config.py @@ -133,7 +133,13 @@ class RedshiftConfig( ) extract_column_level_lineage: bool = Field( - default=True, description="Whether to extract column level lineage." + default=True, + description="Whether to extract column level lineage. This config works with rest-sink only.", + ) + + incremental_lineage: bool = Field( + default=False, + description="When enabled, emits lineage as incremental to existing lineage already in DataHub. When disabled, re-states lineage on each run. This config works with rest-sink only.", ) @root_validator(pre=True) diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py index a1b6333a3775d4..26237a6ce12e0d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py @@ -1,5 +1,6 @@ import logging from collections import defaultdict +from functools import partial from typing import Dict, Iterable, List, Optional, Type, Union import humanfriendly @@ -25,6 +26,7 @@ platform_name, support_status, ) +from datahub.ingestion.api.incremental_lineage_helper import auto_incremental_lineage from datahub.ingestion.api.source import ( CapabilityReport, MetadataWorkUnitProcessor, @@ -369,6 +371,11 @@ def gen_database_container(self, database: str) -> Iterable[MetadataWorkUnit]: def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: return [ *super().get_workunit_processors(), + partial( + auto_incremental_lineage, + self.ctx.graph, + self.config.incremental_lineage, + ), StaleEntityRemovalHandler.create( self, self.config, self.ctx ).workunit_processor, @@ -942,7 +949,9 @@ def generate_lineage(self, database: str) -> Iterable[MetadataWorkUnit]: ) if lineage_info: yield from gen_lineage( - dataset_urn, lineage_info, self.config.incremental_lineage + dataset_urn, + lineage_info, + incremental_lineage=False, # incremental lineage generation is taken care by auto_incremental_lineage ) for schema in self.db_views[database]: @@ -956,7 +965,9 @@ def generate_lineage(self, database: str) -> Iterable[MetadataWorkUnit]: ) if lineage_info: yield from gen_lineage( - dataset_urn, lineage_info, self.config.incremental_lineage + dataset_urn, + lineage_info, + incremental_lineage=False, # incremental lineage generation is taken care by auto_incremental_lineage ) def add_config_to_report(self): diff --git a/metadata-ingestion/tests/unit/test_redshift_config.py b/metadata-ingestion/tests/unit/test_redshift_config.py new file mode 100644 index 00000000000000..8a165e7f5f3fe3 --- /dev/null +++ b/metadata-ingestion/tests/unit/test_redshift_config.py @@ -0,0 +1,6 @@ +from datahub.ingestion.source.redshift.config import RedshiftConfig + + +def test_incremental_lineage_default_to_false(): + config = RedshiftConfig(host_port="localhost:5439", database="test") + assert config.incremental_lineage is False From 9cccd22c04bf357b574f4d9d7dae3aee633bf7d3 Mon Sep 17 00:00:00 2001 From: Pedro Silva Date: Wed, 25 Oct 2023 11:01:49 +0100 Subject: [PATCH 2/9] feat(auth): Add backwards compatible field resolver (#9096) --- .../com/datahub/authorization/EntityFieldType.java | 13 +++++++++++++ .../authorization/DefaultEntitySpecResolver.java | 13 +++++++------ .../DataPlatformInstanceFieldResolverProvider.java | 10 +++++----- .../DomainFieldResolverProvider.java | 5 +++-- .../EntityFieldResolverProvider.java | 6 ++++-- .../EntityTypeFieldResolverProvider.java | 7 +++++-- .../EntityUrnFieldResolverProvider.java | 7 +++++-- .../GroupMembershipFieldResolverProvider.java | 5 +++-- .../OwnerFieldResolverProvider.java | 5 +++-- ...taPlatformInstanceFieldResolverProviderTest.java | 2 +- .../GroupMembershipFieldResolverProviderTest.java | 2 +- 11 files changed, 50 insertions(+), 25 deletions(-) diff --git a/metadata-auth/auth-api/src/main/java/com/datahub/authorization/EntityFieldType.java b/metadata-auth/auth-api/src/main/java/com/datahub/authorization/EntityFieldType.java index 46763f29a70405..1258d958f20923 100644 --- a/metadata-auth/auth-api/src/main/java/com/datahub/authorization/EntityFieldType.java +++ b/metadata-auth/auth-api/src/main/java/com/datahub/authorization/EntityFieldType.java @@ -4,6 +4,19 @@ * List of entity field types to fetch for a given entity */ public enum EntityFieldType { + + /** + * Type of the entity (e.g. dataset, chart) + * @deprecated + */ + @Deprecated + RESOURCE_URN, + /** + * Urn of the entity + * @deprecated + */ + @Deprecated + RESOURCE_TYPE, /** * Type of the entity (e.g. dataset, chart) */ diff --git a/metadata-service/auth-impl/src/main/java/com/datahub/authorization/DefaultEntitySpecResolver.java b/metadata-service/auth-impl/src/main/java/com/datahub/authorization/DefaultEntitySpecResolver.java index 4ad14ed59c9c07..65b0329a9c4f25 100644 --- a/metadata-service/auth-impl/src/main/java/com/datahub/authorization/DefaultEntitySpecResolver.java +++ b/metadata-service/auth-impl/src/main/java/com/datahub/authorization/DefaultEntitySpecResolver.java @@ -1,15 +1,16 @@ package com.datahub.authorization; -import com.datahub.authorization.fieldresolverprovider.DataPlatformInstanceFieldResolverProvider; -import com.datahub.authorization.fieldresolverprovider.EntityTypeFieldResolverProvider; -import com.datahub.authorization.fieldresolverprovider.OwnerFieldResolverProvider; import com.datahub.authentication.Authentication; +import com.datahub.authorization.fieldresolverprovider.DataPlatformInstanceFieldResolverProvider; import com.datahub.authorization.fieldresolverprovider.DomainFieldResolverProvider; -import com.datahub.authorization.fieldresolverprovider.EntityUrnFieldResolverProvider; import com.datahub.authorization.fieldresolverprovider.EntityFieldResolverProvider; +import com.datahub.authorization.fieldresolverprovider.EntityTypeFieldResolverProvider; +import com.datahub.authorization.fieldresolverprovider.EntityUrnFieldResolverProvider; import com.datahub.authorization.fieldresolverprovider.GroupMembershipFieldResolverProvider; +import com.datahub.authorization.fieldresolverprovider.OwnerFieldResolverProvider; import com.google.common.collect.ImmutableList; import com.linkedin.entity.client.EntityClient; +import com.linkedin.util.Pair; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -34,7 +35,7 @@ public ResolvedEntitySpec resolve(EntitySpec entitySpec) { private Map getFieldResolvers(EntitySpec entitySpec) { return _entityFieldResolverProviders.stream() - .collect(Collectors.toMap(EntityFieldResolverProvider::getFieldType, - hydrator -> hydrator.getFieldResolver(entitySpec))); + .flatMap(resolver -> resolver.getFieldTypes().stream().map(fieldType -> Pair.of(fieldType, resolver))) + .collect(Collectors.toMap(Pair::getKey, pair -> pair.getValue().getFieldResolver(entitySpec))); } } diff --git a/metadata-service/auth-impl/src/main/java/com/datahub/authorization/fieldresolverprovider/DataPlatformInstanceFieldResolverProvider.java b/metadata-service/auth-impl/src/main/java/com/datahub/authorization/fieldresolverprovider/DataPlatformInstanceFieldResolverProvider.java index 27cb8fcee8138a..cbb237654e9693 100644 --- a/metadata-service/auth-impl/src/main/java/com/datahub/authorization/fieldresolverprovider/DataPlatformInstanceFieldResolverProvider.java +++ b/metadata-service/auth-impl/src/main/java/com/datahub/authorization/fieldresolverprovider/DataPlatformInstanceFieldResolverProvider.java @@ -1,8 +1,5 @@ package com.datahub.authorization.fieldresolverprovider; -import static com.linkedin.metadata.Constants.DATA_PLATFORM_INSTANCE_ASPECT_NAME; -import static com.linkedin.metadata.Constants.DATA_PLATFORM_INSTANCE_ENTITY_NAME; - import com.datahub.authentication.Authentication; import com.datahub.authorization.EntityFieldType; import com.datahub.authorization.EntitySpec; @@ -14,10 +11,13 @@ import com.linkedin.entity.EnvelopedAspect; import com.linkedin.entity.client.EntityClient; import java.util.Collections; +import java.util.List; import java.util.Objects; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import static com.linkedin.metadata.Constants.*; + /** * Provides field resolver for domain given resourceSpec */ @@ -29,8 +29,8 @@ public class DataPlatformInstanceFieldResolverProvider implements EntityFieldRes private final Authentication _systemAuthentication; @Override - public EntityFieldType getFieldType() { - return EntityFieldType.DATA_PLATFORM_INSTANCE; + public List getFieldTypes() { + return Collections.singletonList(EntityFieldType.DATA_PLATFORM_INSTANCE); } @Override diff --git a/metadata-service/auth-impl/src/main/java/com/datahub/authorization/fieldresolverprovider/DomainFieldResolverProvider.java b/metadata-service/auth-impl/src/main/java/com/datahub/authorization/fieldresolverprovider/DomainFieldResolverProvider.java index 25c2165f02b940..15d821b75c0bdd 100644 --- a/metadata-service/auth-impl/src/main/java/com/datahub/authorization/fieldresolverprovider/DomainFieldResolverProvider.java +++ b/metadata-service/auth-impl/src/main/java/com/datahub/authorization/fieldresolverprovider/DomainFieldResolverProvider.java @@ -14,6 +14,7 @@ import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -37,8 +38,8 @@ public class DomainFieldResolverProvider implements EntityFieldResolverProvider private final Authentication _systemAuthentication; @Override - public EntityFieldType getFieldType() { - return EntityFieldType.DOMAIN; + public List getFieldTypes() { + return Collections.singletonList(EntityFieldType.DOMAIN); } @Override diff --git a/metadata-service/auth-impl/src/main/java/com/datahub/authorization/fieldresolverprovider/EntityFieldResolverProvider.java b/metadata-service/auth-impl/src/main/java/com/datahub/authorization/fieldresolverprovider/EntityFieldResolverProvider.java index a76db0ecb51024..227d403a9cd1d1 100644 --- a/metadata-service/auth-impl/src/main/java/com/datahub/authorization/fieldresolverprovider/EntityFieldResolverProvider.java +++ b/metadata-service/auth-impl/src/main/java/com/datahub/authorization/fieldresolverprovider/EntityFieldResolverProvider.java @@ -3,6 +3,7 @@ import com.datahub.authorization.FieldResolver; import com.datahub.authorization.EntityFieldType; import com.datahub.authorization.EntitySpec; +import java.util.List; /** @@ -11,9 +12,10 @@ public interface EntityFieldResolverProvider { /** - * Field that this hydrator is hydrating + * List of fields that this hydrator is hydrating. + * @return */ - EntityFieldType getFieldType(); + List getFieldTypes(); /** * Return resolver for fetching the field values given the entity diff --git a/metadata-service/auth-impl/src/main/java/com/datahub/authorization/fieldresolverprovider/EntityTypeFieldResolverProvider.java b/metadata-service/auth-impl/src/main/java/com/datahub/authorization/fieldresolverprovider/EntityTypeFieldResolverProvider.java index 187f6969049477..addac84c68b185 100644 --- a/metadata-service/auth-impl/src/main/java/com/datahub/authorization/fieldresolverprovider/EntityTypeFieldResolverProvider.java +++ b/metadata-service/auth-impl/src/main/java/com/datahub/authorization/fieldresolverprovider/EntityTypeFieldResolverProvider.java @@ -3,16 +3,19 @@ import com.datahub.authorization.FieldResolver; import com.datahub.authorization.EntityFieldType; import com.datahub.authorization.EntitySpec; +import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList; import java.util.Collections; +import java.util.List; /** * Provides field resolver for entity type given entitySpec */ public class EntityTypeFieldResolverProvider implements EntityFieldResolverProvider { + @Override - public EntityFieldType getFieldType() { - return EntityFieldType.TYPE; + public List getFieldTypes() { + return ImmutableList.of(EntityFieldType.TYPE, EntityFieldType.RESOURCE_TYPE); } @Override diff --git a/metadata-service/auth-impl/src/main/java/com/datahub/authorization/fieldresolverprovider/EntityUrnFieldResolverProvider.java b/metadata-service/auth-impl/src/main/java/com/datahub/authorization/fieldresolverprovider/EntityUrnFieldResolverProvider.java index 2f5c4a7c6c9615..32960de687839a 100644 --- a/metadata-service/auth-impl/src/main/java/com/datahub/authorization/fieldresolverprovider/EntityUrnFieldResolverProvider.java +++ b/metadata-service/auth-impl/src/main/java/com/datahub/authorization/fieldresolverprovider/EntityUrnFieldResolverProvider.java @@ -3,16 +3,19 @@ import com.datahub.authorization.FieldResolver; import com.datahub.authorization.EntityFieldType; import com.datahub.authorization.EntitySpec; +import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList; import java.util.Collections; +import java.util.List; /** * Provides field resolver for entity urn given entitySpec */ public class EntityUrnFieldResolverProvider implements EntityFieldResolverProvider { + @Override - public EntityFieldType getFieldType() { - return EntityFieldType.URN; + public List getFieldTypes() { + return ImmutableList.of(EntityFieldType.URN, EntityFieldType.RESOURCE_URN); } @Override diff --git a/metadata-service/auth-impl/src/main/java/com/datahub/authorization/fieldresolverprovider/GroupMembershipFieldResolverProvider.java b/metadata-service/auth-impl/src/main/java/com/datahub/authorization/fieldresolverprovider/GroupMembershipFieldResolverProvider.java index 8db029632d7e25..b1202d9f4bbd34 100644 --- a/metadata-service/auth-impl/src/main/java/com/datahub/authorization/fieldresolverprovider/GroupMembershipFieldResolverProvider.java +++ b/metadata-service/auth-impl/src/main/java/com/datahub/authorization/fieldresolverprovider/GroupMembershipFieldResolverProvider.java @@ -13,6 +13,7 @@ import com.linkedin.identity.NativeGroupMembership; import com.linkedin.metadata.Constants; import com.linkedin.identity.GroupMembership; +import java.util.Collections; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -35,8 +36,8 @@ public class GroupMembershipFieldResolverProvider implements EntityFieldResolver private final Authentication _systemAuthentication; @Override - public EntityFieldType getFieldType() { - return EntityFieldType.GROUP_MEMBERSHIP; + public List getFieldTypes() { + return Collections.singletonList(EntityFieldType.GROUP_MEMBERSHIP); } @Override diff --git a/metadata-service/auth-impl/src/main/java/com/datahub/authorization/fieldresolverprovider/OwnerFieldResolverProvider.java b/metadata-service/auth-impl/src/main/java/com/datahub/authorization/fieldresolverprovider/OwnerFieldResolverProvider.java index bdd652d1d38717..3c27f9e6ce8d79 100644 --- a/metadata-service/auth-impl/src/main/java/com/datahub/authorization/fieldresolverprovider/OwnerFieldResolverProvider.java +++ b/metadata-service/auth-impl/src/main/java/com/datahub/authorization/fieldresolverprovider/OwnerFieldResolverProvider.java @@ -12,6 +12,7 @@ import com.linkedin.entity.client.EntityClient; import com.linkedin.metadata.Constants; import java.util.Collections; +import java.util.List; import java.util.stream.Collectors; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -28,8 +29,8 @@ public class OwnerFieldResolverProvider implements EntityFieldResolverProvider { private final Authentication _systemAuthentication; @Override - public EntityFieldType getFieldType() { - return EntityFieldType.OWNER; + public List getFieldTypes() { + return Collections.singletonList(EntityFieldType.OWNER); } @Override diff --git a/metadata-service/auth-impl/src/test/java/com/datahub/authorization/fieldresolverprovider/DataPlatformInstanceFieldResolverProviderTest.java b/metadata-service/auth-impl/src/test/java/com/datahub/authorization/fieldresolverprovider/DataPlatformInstanceFieldResolverProviderTest.java index b2343bbb015094..5c7d87f1c05a96 100644 --- a/metadata-service/auth-impl/src/test/java/com/datahub/authorization/fieldresolverprovider/DataPlatformInstanceFieldResolverProviderTest.java +++ b/metadata-service/auth-impl/src/test/java/com/datahub/authorization/fieldresolverprovider/DataPlatformInstanceFieldResolverProviderTest.java @@ -56,7 +56,7 @@ public void setup() { @Test public void shouldReturnDataPlatformInstanceType() { - assertEquals(EntityFieldType.DATA_PLATFORM_INSTANCE, dataPlatformInstanceFieldResolverProvider.getFieldType()); + assertEquals(EntityFieldType.DATA_PLATFORM_INSTANCE, dataPlatformInstanceFieldResolverProvider.getFieldTypes().get(0)); } @Test diff --git a/metadata-service/auth-impl/src/test/java/com/datahub/authorization/fieldresolverprovider/GroupMembershipFieldResolverProviderTest.java b/metadata-service/auth-impl/src/test/java/com/datahub/authorization/fieldresolverprovider/GroupMembershipFieldResolverProviderTest.java index 54675045b4413a..af547f14cd3fcd 100644 --- a/metadata-service/auth-impl/src/test/java/com/datahub/authorization/fieldresolverprovider/GroupMembershipFieldResolverProviderTest.java +++ b/metadata-service/auth-impl/src/test/java/com/datahub/authorization/fieldresolverprovider/GroupMembershipFieldResolverProviderTest.java @@ -53,7 +53,7 @@ public void setup() { @Test public void shouldReturnGroupsMembershipType() { - assertEquals(EntityFieldType.GROUP_MEMBERSHIP, groupMembershipFieldResolverProvider.getFieldType()); + assertEquals(EntityFieldType.GROUP_MEMBERSHIP, groupMembershipFieldResolverProvider.getFieldTypes().get(0)); } @Test From dd5d997390d489c777aac25dbbd3f47c4bab8340 Mon Sep 17 00:00:00 2001 From: Andrew Sikowitz Date: Wed, 25 Oct 2023 10:54:55 -0400 Subject: [PATCH 3/9] build(gradle): Support IntelliJ 2023.2.3 (#9034) --- metadata-models/build.gradle | 1 + 1 file changed, 1 insertion(+) diff --git a/metadata-models/build.gradle b/metadata-models/build.gradle index 53e7765152aefe..bd8052283e168f 100644 --- a/metadata-models/build.gradle +++ b/metadata-models/build.gradle @@ -23,6 +23,7 @@ dependencies { } } api project(':li-utils') + api project(path: ':li-utils', configuration: "dataTemplate") dataModel project(':li-utils') compileOnly externalDependency.lombok From 8a80e858a7b6bf67105e082475ada57a27c37c67 Mon Sep 17 00:00:00 2001 From: Andrew Sikowitz Date: Wed, 25 Oct 2023 13:06:12 -0400 Subject: [PATCH 4/9] build(ingest): Bump avro pin: security vulnerability (#9042) --- metadata-ingestion/scripts/avro_codegen.py | 3 +- metadata-ingestion/scripts/modeldocgen.py | 4 +- metadata-ingestion/setup.py | 8 +- .../ingestion/extractor/schema_util.py | 109 ++++++++++++------ .../src/datahub/ingestion/source/kafka.py | 19 ++- .../src/datahub/utilities/mapping.py | 4 +- .../unit/data_lake/test_schema_inference.py | 6 +- 7 files changed, 99 insertions(+), 54 deletions(-) diff --git a/metadata-ingestion/scripts/avro_codegen.py b/metadata-ingestion/scripts/avro_codegen.py index a9b9b4b20f5ac8..021ebd4a31eb3a 100644 --- a/metadata-ingestion/scripts/avro_codegen.py +++ b/metadata-ingestion/scripts/avro_codegen.py @@ -152,7 +152,8 @@ def add_name(self, name_attr, space_attr, new_schema): return encoded -autogen_header = """# flake8: noqa +autogen_header = """# mypy: ignore-errors +# flake8: noqa # This file is autogenerated by /metadata-ingestion/scripts/avro_codegen.py # Do not modify manually! diff --git a/metadata-ingestion/scripts/modeldocgen.py b/metadata-ingestion/scripts/modeldocgen.py index ffa80515dbafd3..81b26145e620c9 100644 --- a/metadata-ingestion/scripts/modeldocgen.py +++ b/metadata-ingestion/scripts/modeldocgen.py @@ -351,8 +351,8 @@ def strip_types(field_path: str) -> str: field_objects = [] for f in entity_fields: field = avro.schema.Field( - type=f["type"], - name=f["name"], + f["type"], + f["name"], has_default=False, ) field_objects.append(field) diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 72b0e776a0da59..0b8661b0df5f5a 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -32,7 +32,7 @@ "expandvars>=0.6.5", "avro-gen3==0.7.11", # "avro-gen3 @ git+https://github.com/acryldata/avro_gen@master#egg=avro-gen3", - "avro>=1.10.2,<1.11", + "avro>=1.11.3,<1.12", "python-dateutil>=2.8.0", "tabulate", "progressbar2", @@ -355,7 +355,11 @@ | {"psycopg2-binary", "pymysql>=1.0.2"}, "pulsar": {"requests"}, "redash": {"redash-toolbelt", "sql-metadata"} | sqllineage_lib, - "redshift": sql_common | redshift_common | usage_common | {"redshift-connector"} | sqlglot_lib, + "redshift": sql_common + | redshift_common + | usage_common + | {"redshift-connector"} + | sqlglot_lib, "redshift-legacy": sql_common | redshift_common | sqlglot_lib, "redshift-usage-legacy": sql_common | redshift_common | sqlglot_lib | usage_common, "s3": {*s3_base, *data_lake_profiling}, diff --git a/metadata-ingestion/src/datahub/ingestion/extractor/schema_util.py b/metadata-ingestion/src/datahub/ingestion/extractor/schema_util.py index 4acf99a50e50ed..df0b732833fbe1 100644 --- a/metadata-ingestion/src/datahub/ingestion/extractor/schema_util.py +++ b/metadata-ingestion/src/datahub/ingestion/extractor/schema_util.py @@ -1,6 +1,18 @@ import json import logging -from typing import Any, Callable, Dict, Generator, List, Optional, Type, Union +from typing import ( + Any, + Callable, + Dict, + Iterable, + List, + Mapping, + Optional, + Type, + Union, + cast, + overload, +) import avro.schema @@ -54,6 +66,8 @@ avro.schema.PrimitiveSchema, ] +SchemaOrField = Union[avro.schema.Schema, avro.schema.Field] + FieldStack = List[avro.schema.Field] # The latest avro code contains this type definition in a compatibility module, @@ -124,16 +138,22 @@ def __init__( self._meta_mapping_processor = meta_mapping_processor self._schema_tags_field = schema_tags_field self._tag_prefix = tag_prefix + # Map of avro schema type to the conversion handler - self._avro_type_to_mce_converter_map: Dict[ - avro.schema.Schema, - Callable[[ExtendedAvroNestedSchemas], Generator[SchemaField, None, None]], + # TODO: Clean up this type... perhaps refactor + self._avro_type_to_mce_converter_map: Mapping[ + Union[ + Type[avro.schema.Schema], + Type[avro.schema.Field], + Type[avro.schema.LogicalSchema], + ], + Callable[[SchemaOrField], Iterable[SchemaField]], ] = { avro.schema.RecordSchema: self._gen_from_non_field_nested_schemas, avro.schema.UnionSchema: self._gen_from_non_field_nested_schemas, avro.schema.ArraySchema: self._gen_from_non_field_nested_schemas, avro.schema.MapSchema: self._gen_from_non_field_nested_schemas, - avro.schema.Field: self._gen_nested_schema_from_field, + avro.schema.Field: self._gen_nested_schema_from_field, # type: ignore avro.schema.PrimitiveSchema: self._gen_non_nested_to_mce_fields, avro.schema.FixedSchema: self._gen_non_nested_to_mce_fields, avro.schema.EnumSchema: self._gen_non_nested_to_mce_fields, @@ -142,20 +162,22 @@ def __init__( @staticmethod def _get_type_name( - avro_schema: avro.schema.Schema, logical_if_present: bool = False + avro_schema: SchemaOrField, logical_if_present: bool = False ) -> str: logical_type_name: Optional[str] = None if logical_if_present: - logical_type_name = getattr( - avro_schema, "logical_type", None - ) or avro_schema.props.get("logicalType") + logical_type_name = cast( + Optional[str], + getattr(avro_schema, "logical_type", None) + or avro_schema.props.get("logicalType"), + ) return logical_type_name or str( getattr(avro_schema.type, "type", avro_schema.type) ) @staticmethod def _get_column_type( - avro_schema: avro.schema.Schema, logical_type: Optional[str] + avro_schema: SchemaOrField, logical_type: Optional[str] ) -> SchemaFieldDataType: type_name: str = AvroToMceSchemaConverter._get_type_name(avro_schema) TypeClass: Optional[Type] = AvroToMceSchemaConverter.field_type_mapping.get( @@ -186,7 +208,7 @@ def _get_column_type( ) return dt - def _is_nullable(self, schema: avro.schema.Schema) -> bool: + def _is_nullable(self, schema: SchemaOrField) -> bool: if isinstance(schema, avro.schema.Field): return self._is_nullable(schema.type) if isinstance(schema, avro.schema.UnionSchema): @@ -208,7 +230,7 @@ def _strip_namespace(name_or_fullname: str) -> str: return name_or_fullname.rsplit(".", maxsplit=1)[-1] @staticmethod - def _get_simple_native_type(schema: ExtendedAvroNestedSchemas) -> str: + def _get_simple_native_type(schema: SchemaOrField) -> str: if isinstance(schema, (avro.schema.RecordSchema, avro.schema.Field)): # For Records, fields, always return the name. return AvroToMceSchemaConverter._strip_namespace(schema.name) @@ -226,7 +248,7 @@ def _get_simple_native_type(schema: ExtendedAvroNestedSchemas) -> str: return schema.type @staticmethod - def _get_type_annotation(schema: ExtendedAvroNestedSchemas) -> str: + def _get_type_annotation(schema: SchemaOrField) -> str: simple_native_type = AvroToMceSchemaConverter._get_simple_native_type(schema) if simple_native_type.startswith("__struct_"): simple_native_type = "struct" @@ -237,10 +259,24 @@ def _get_type_annotation(schema: ExtendedAvroNestedSchemas) -> str: else: return f"[type={simple_native_type}]" + @staticmethod + @overload + def _get_underlying_type_if_option_as_union( + schema: SchemaOrField, default: SchemaOrField + ) -> SchemaOrField: + ... + + @staticmethod + @overload + def _get_underlying_type_if_option_as_union( + schema: SchemaOrField, default: Optional[SchemaOrField] = None + ) -> Optional[SchemaOrField]: + ... + @staticmethod def _get_underlying_type_if_option_as_union( - schema: AvroNestedSchemas, default: Optional[AvroNestedSchemas] = None - ) -> AvroNestedSchemas: + schema: SchemaOrField, default: Optional[SchemaOrField] = None + ) -> Optional[SchemaOrField]: if isinstance(schema, avro.schema.UnionSchema) and len(schema.schemas) == 2: (first, second) = schema.schemas if first.type == AVRO_TYPE_NULL: @@ -258,8 +294,8 @@ class SchemaFieldEmissionContextManager: def __init__( self, - schema: avro.schema.Schema, - actual_schema: avro.schema.Schema, + schema: SchemaOrField, + actual_schema: SchemaOrField, converter: "AvroToMceSchemaConverter", description: Optional[str] = None, default_value: Optional[str] = None, @@ -275,7 +311,7 @@ def __enter__(self): self._converter._prefix_name_stack.append(type_annotation) return self - def emit(self) -> Generator[SchemaField, None, None]: + def emit(self) -> Iterable[SchemaField]: if ( not isinstance( self._actual_schema, @@ -307,7 +343,7 @@ def emit(self) -> Generator[SchemaField, None, None]: description = self._description if not description and actual_schema.props.get("doc"): - description = actual_schema.props.get("doc") + description = cast(Optional[str], actual_schema.props.get("doc")) if self._default_value is not None: description = f"{description if description else ''}\nField default value: {self._default_value}" @@ -320,12 +356,12 @@ def emit(self) -> Generator[SchemaField, None, None]: native_data_type = native_data_type[ slice(len(type_prefix), len(native_data_type) - 1) ] - native_data_type = actual_schema.props.get( - "native_data_type", native_data_type + native_data_type = cast( + str, actual_schema.props.get("native_data_type", native_data_type) ) field_path = self._converter._get_cur_field_path() - merged_props = {} + merged_props: Dict[str, Any] = {} merged_props.update(self._schema.other_props) merged_props.update(schema.other_props) @@ -363,12 +399,13 @@ def emit(self) -> Generator[SchemaField, None, None]: meta_terms_aspect = meta_aspects.get(Constants.ADD_TERM_OPERATION) - logical_type_name: Optional[str] = ( + logical_type_name: Optional[str] = cast( + Optional[str], # logicalType nested inside type getattr(actual_schema, "logical_type", None) or actual_schema.props.get("logicalType") # bare logicalType - or self._actual_schema.props.get("logicalType") + or self._actual_schema.props.get("logicalType"), ) field = SchemaField( @@ -392,14 +429,12 @@ def emit(self) -> Generator[SchemaField, None, None]: def __exit__(self, exc_type, exc_val, exc_tb): self._converter._prefix_name_stack.pop() - def _get_sub_schemas( - self, schema: ExtendedAvroNestedSchemas - ) -> Generator[avro.schema.Schema, None, None]: + def _get_sub_schemas(self, schema: SchemaOrField) -> Iterable[SchemaOrField]: """Responsible for generation for appropriate sub-schemas for every nested AVRO type.""" def gen_items_from_list_tuple_or_scalar( val: Any, - ) -> Generator[avro.schema.Schema, None, None]: + ) -> Iterable[avro.schema.Schema]: if isinstance(val, (list, tuple)): for i in val: yield i @@ -433,7 +468,7 @@ def gen_items_from_list_tuple_or_scalar( def _gen_nested_schema_from_field( self, field: avro.schema.Field, - ) -> Generator[SchemaField, None, None]: + ) -> Iterable[SchemaField]: """Handles generation of MCE SchemaFields for an AVRO Field type.""" # NOTE: Here we only manage the field stack and trigger MCE Field generation from this field's type. # The actual emitting of a field happens when @@ -447,7 +482,7 @@ def _gen_nested_schema_from_field( def _gen_from_last_field( self, schema_to_recurse: Optional[AvroNestedSchemas] = None - ) -> Generator[SchemaField, None, None]: + ) -> Iterable[SchemaField]: """Emits the field most-recent field, optionally triggering sub-schema generation under the field.""" last_field_schema = self._fields_stack[-1] # Generate the custom-description for the field. @@ -467,8 +502,8 @@ def _gen_from_last_field( yield from self._to_mce_fields(sub_schema) def _gen_from_non_field_nested_schemas( - self, schema: AvroNestedSchemas - ) -> Generator[SchemaField, None, None]: + self, schema: SchemaOrField + ) -> Iterable[SchemaField]: """Handles generation of MCE SchemaFields for all standard AVRO nested types.""" # Handle recursive record definitions recurse: bool = True @@ -511,8 +546,8 @@ def _gen_from_non_field_nested_schemas( yield from self._to_mce_fields(sub_schema) def _gen_non_nested_to_mce_fields( - self, schema: AvroNonNestedSchemas - ) -> Generator[SchemaField, None, None]: + self, schema: SchemaOrField + ) -> Iterable[SchemaField]: """Handles generation of MCE SchemaFields for non-nested AVRO types.""" with AvroToMceSchemaConverter.SchemaFieldEmissionContextManager( schema, @@ -521,9 +556,7 @@ def _gen_non_nested_to_mce_fields( ) as non_nested_emitter: yield from non_nested_emitter.emit() - def _to_mce_fields( - self, avro_schema: avro.schema.Schema - ) -> Generator[SchemaField, None, None]: + def _to_mce_fields(self, avro_schema: SchemaOrField) -> Iterable[SchemaField]: # Invoke the relevant conversion handler for the schema element type. schema_type = ( type(avro_schema) @@ -541,7 +574,7 @@ def to_mce_fields( meta_mapping_processor: Optional[OperationProcessor] = None, schema_tags_field: Optional[str] = None, tag_prefix: Optional[str] = None, - ) -> Generator[SchemaField, None, None]: + ) -> Iterable[SchemaField]: """ Converts a key or value type AVRO schema string to appropriate MCE SchemaFields. :param avro_schema_string: String representation of the AVRO schema. diff --git a/metadata-ingestion/src/datahub/ingestion/source/kafka.py b/metadata-ingestion/src/datahub/ingestion/source/kafka.py index d5039360da5677..23770ff3cf8122 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/kafka.py +++ b/metadata-ingestion/src/datahub/ingestion/source/kafka.py @@ -3,7 +3,7 @@ import logging from dataclasses import dataclass, field from enum import Enum -from typing import Any, Dict, Iterable, List, Optional, Type +from typing import Any, Dict, Iterable, List, Optional, Type, cast import avro.schema import confluent_kafka @@ -316,13 +316,20 @@ def _extract_record( avro_schema = avro.schema.parse( schema_metadata.platformSchema.documentSchema ) - description = avro_schema.doc + description = getattr(avro_schema, "doc", None) # set the tags all_tags: List[str] = [] - for tag in avro_schema.other_props.get( - self.source_config.schema_tags_field, [] - ): - all_tags.append(self.source_config.tag_prefix + tag) + try: + schema_tags = cast( + Iterable[str], + avro_schema.other_props.get( + self.source_config.schema_tags_field, [] + ), + ) + for tag in schema_tags: + all_tags.append(self.source_config.tag_prefix + tag) + except TypeError: + pass if self.source_config.enable_meta_mapping: meta_aspects = self.meta_processor.process(avro_schema.other_props) diff --git a/metadata-ingestion/src/datahub/utilities/mapping.py b/metadata-ingestion/src/datahub/utilities/mapping.py index eb2d975ee607f2..f91c01d901ac1e 100644 --- a/metadata-ingestion/src/datahub/utilities/mapping.py +++ b/metadata-ingestion/src/datahub/utilities/mapping.py @@ -4,7 +4,7 @@ import re import time from functools import reduce -from typing import Any, Dict, List, Match, Optional, Union, cast +from typing import Any, Dict, List, Mapping, Match, Optional, Union, cast from datahub.emitter import mce_builder from datahub.emitter.mce_builder import OwnerType @@ -111,7 +111,7 @@ def __init__( self.owner_source_type = owner_source_type self.match_nested_props = match_nested_props - def process(self, raw_props: Dict[str, Any]) -> Dict[str, Any]: + def process(self, raw_props: Mapping[str, Any]) -> Dict[str, Any]: # Defining the following local variables - # operations_map - the final resulting map when operations are processed. # Against each operation the values to be applied are stored. diff --git a/metadata-ingestion/tests/unit/data_lake/test_schema_inference.py b/metadata-ingestion/tests/unit/data_lake/test_schema_inference.py index cbd5be9e7d832b..4a69deb572fbd7 100644 --- a/metadata-ingestion/tests/unit/data_lake/test_schema_inference.py +++ b/metadata-ingestion/tests/unit/data_lake/test_schema_inference.py @@ -1,14 +1,14 @@ import tempfile from typing import List, Type -import avro.schema import pandas as pd import ujson from avro import schema as avro_schema from avro.datafile import DataFileWriter from avro.io import DatumWriter -from datahub.ingestion.source.schema_inference import avro, csv_tsv, json, parquet +from datahub.ingestion.source.schema_inference import csv_tsv, json, parquet +from datahub.ingestion.source.schema_inference.avro import AvroInferrer from datahub.metadata.com.linkedin.pegasus2avro.schema import ( BooleanTypeClass, NumberTypeClass, @@ -123,7 +123,7 @@ def test_infer_schema_avro(): file.seek(0) - fields = avro.AvroInferrer().infer_schema(file) + fields = AvroInferrer().infer_schema(file) fields.sort(key=lambda x: x.fieldPath) assert_field_paths_match(fields, expected_field_paths_avro) From b9508e6dd50c5d0eaf8eddb21c5bdf55bec1646a Mon Sep 17 00:00:00 2001 From: siddiquebagwan-gslab Date: Wed, 25 Oct 2023 23:48:15 +0530 Subject: [PATCH 5/9] fix(ingestion/redshift): fix schema field data type mappings --- .../src/datahub/ingestion/source/redshift/redshift.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py index 26237a6ce12e0d..c7d01021773b12 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py @@ -218,6 +218,9 @@ class RedshiftSource(StatefulIngestionSourceBase, TestableSource): ] = { "BYTES": BytesType, "BOOL": BooleanType, + "BOOLEAN": BooleanType, + "DOUBLE": NumberType, + "DOUBLE PRECISION": NumberType, "DECIMAL": NumberType, "NUMERIC": NumberType, "BIGNUMERIC": NumberType, @@ -244,6 +247,13 @@ class RedshiftSource(StatefulIngestionSourceBase, TestableSource): "CHARACTER": StringType, "CHAR": StringType, "TIMESTAMP WITHOUT TIME ZONE": TimeType, + "REAL": NumberType, + "VARCHAR": StringType, + "TIMESTAMPTZ": TimeType, + "GEOMETRY": NullType, + "HLLSKETCH": NullType, + "TIMETZ": TimeType, + "VARBYTE": StringType, } def get_platform_instance_id(self) -> str: From 1c77bca0c68878ca5cb86f741ca77ce0aa497272 Mon Sep 17 00:00:00 2001 From: Younghoon YUN Date: Thu, 26 Oct 2023 05:01:47 +0900 Subject: [PATCH 6/9] fix(datahub-protobuf): add check if nested field is reserved (#9058) --- .../src/main/java/datahub/protobuf/model/ProtobufField.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/metadata-integration/java/datahub-protobuf/src/main/java/datahub/protobuf/model/ProtobufField.java b/metadata-integration/java/datahub-protobuf/src/main/java/datahub/protobuf/model/ProtobufField.java index 42884241d9f7cd..d890c373f12994 100644 --- a/metadata-integration/java/datahub-protobuf/src/main/java/datahub/protobuf/model/ProtobufField.java +++ b/metadata-integration/java/datahub-protobuf/src/main/java/datahub/protobuf/model/ProtobufField.java @@ -259,7 +259,9 @@ private FieldDescriptorProto getNestedTypeFields(List pathList, Descrip messageType = messageType.getNestedType(value); } - if (pathList.get(pathSize - 2) == DescriptorProto.FIELD_FIELD_NUMBER) { + if (pathList.get(pathSize - 2) == DescriptorProto.FIELD_FIELD_NUMBER + && pathList.get(pathSize - 1) != DescriptorProto.RESERVED_RANGE_FIELD_NUMBER + && pathList.get(pathSize - 1) != DescriptorProto.RESERVED_NAME_FIELD_NUMBER) { return messageType.getField(pathList.get(pathSize - 1)); } else { return null; From 32f5dcb1544e5a47efbb48d39b215d3bdc33535b Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Wed, 25 Oct 2023 13:16:49 -0700 Subject: [PATCH 7/9] fix(ingest): better handling around sink errors (#9003) --- .../src/datahub/ingestion/run/pipeline.py | 10 +++++- .../datahub/ingestion/sink/datahub_kafka.py | 33 ++++++++----------- 2 files changed, 22 insertions(+), 21 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py index 07b55e0e25a89e..f2735c24ca19dc 100644 --- a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py +++ b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py @@ -390,7 +390,15 @@ def run(self) -> None: record_envelopes = self.extractor.get_records(wu) for record_envelope in self.transform(record_envelopes): if not self.dry_run: - self.sink.write_record_async(record_envelope, callback) + try: + self.sink.write_record_async( + record_envelope, callback + ) + except Exception as e: + # In case the sink's error handling is bad, we still want to report the error. + self.sink.report.report_failure( + f"Failed to write record: {e}" + ) except RuntimeError: raise diff --git a/metadata-ingestion/src/datahub/ingestion/sink/datahub_kafka.py b/metadata-ingestion/src/datahub/ingestion/sink/datahub_kafka.py index 39054c256a7fd5..38ddadaafc862c 100644 --- a/metadata-ingestion/src/datahub/ingestion/sink/datahub_kafka.py +++ b/metadata-ingestion/src/datahub/ingestion/sink/datahub_kafka.py @@ -9,7 +9,6 @@ MetadataChangeEvent, MetadataChangeProposal, ) -from datahub.metadata.schema_classes import MetadataChangeProposalClass class KafkaSinkConfig(KafkaEmitterConfig): @@ -58,27 +57,21 @@ def write_record_async( ], write_callback: WriteCallback, ) -> None: - record = record_envelope.record - if isinstance(record, MetadataChangeEvent): - self.emitter.emit_mce_async( + callback = _KafkaCallback( + self.report, record_envelope, write_callback + ).kafka_callback + try: + record = record_envelope.record + self.emitter.emit( record, - callback=_KafkaCallback( - self.report, record_envelope, write_callback - ).kafka_callback, - ) - elif isinstance( - record, (MetadataChangeProposalWrapper, MetadataChangeProposalClass) - ): - self.emitter.emit_mcp_async( - record, - callback=_KafkaCallback( - self.report, record_envelope, write_callback - ).kafka_callback, - ) - else: - raise ValueError( - f"The datahub-kafka sink only supports MetadataChangeEvent/MetadataChangeProposal[Wrapper] classes, not {type(record)}" + callback=callback, ) + except Exception as err: + # In case we throw an exception while trying to emit the record, + # catch it and report the failure. This might happen if the schema + # registry is down or otherwise misconfigured, in which case we'd + # fail when serializing the record. + callback(err, f"Failed to write record: {err}") def close(self) -> None: self.emitter.flush() From 6c932e8afeb4ac71c9b6b31e9fde3876c9e947cf Mon Sep 17 00:00:00 2001 From: Andrew Sikowitz Date: Wed, 25 Oct 2023 16:17:09 -0400 Subject: [PATCH 8/9] feat(ingest/bigquery): Attempt to support raw dataset pattern (#9109) --- docs/how/updating-datahub.md | 8 +-- .../source/bigquery_v2/bigquery_config.py | 18 ++++++- .../tests/unit/test_bigquery_source.py | 53 +++++++++++++++++++ 3 files changed, 74 insertions(+), 5 deletions(-) diff --git a/docs/how/updating-datahub.md b/docs/how/updating-datahub.md index 8813afee65be91..4d1535f28fa0a9 100644 --- a/docs/how/updating-datahub.md +++ b/docs/how/updating-datahub.md @@ -53,10 +53,10 @@ into for example, using `datahub put` command. Policies can be also removed and re-created via UI. - #9077 - The BigQuery ingestion source by default sets `match_fully_qualified_names: true`. This means that any `dataset_pattern` or `schema_pattern` specified will be matched on the fully -qualified dataset name, i.e. `.`. If this is not the case, please -update your pattern (e.g. prepend your old dataset pattern with `.*\.` which matches the project part), -or set `match_fully_qualified_names: false` in your recipe. However, note that -setting this to `false` is deprecated and this flag will be removed entirely in a future release. +qualified dataset name, i.e. `.`. We attempt to support the old +pattern format by prepending `.*\\.` to dataset patterns lacking a period, so in most cases this +should not cause any issues. However, if you have a complex dataset pattern, we recommend you +manually convert it to the fully qualified format to avoid any potential issues. ### Potential Downtime diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py index a6a740385cf5c3..62031927697507 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py @@ -299,7 +299,7 @@ def backward_compatibility_configs_set(cls, values: Dict) -> Dict: "use project_id_pattern whenever possible. project_id will be deprecated, please use project_id_pattern only if possible." ) - dataset_pattern = values.get("dataset_pattern") + dataset_pattern: Optional[AllowDenyPattern] = values.get("dataset_pattern") schema_pattern = values.get("schema_pattern") if ( dataset_pattern == AllowDenyPattern.allow_all() @@ -329,6 +329,22 @@ def backward_compatibility_configs_set(cls, values: Dict) -> Dict: "Please update `dataset_pattern` to match against fully qualified schema name `.` and set config `match_fully_qualified_names : True`." "The config option `match_fully_qualified_names` is deprecated and will be removed in a future release." ) + elif match_fully_qualified_names and dataset_pattern is not None: + adjusted = False + for lst in [dataset_pattern.allow, dataset_pattern.deny]: + for i, pattern in enumerate(lst): + if "." not in pattern: + if pattern.startswith("^"): + lst[i] = r"^.*\." + pattern[1:] + else: + lst[i] = r".*\." + pattern + adjusted = True + if adjusted: + logger.warning( + "`dataset_pattern` was adjusted to match against fully qualified schema names," + " of the form `.`." + ) + return values def get_table_pattern(self, pattern: List[str]) -> str: diff --git a/metadata-ingestion/tests/unit/test_bigquery_source.py b/metadata-ingestion/tests/unit/test_bigquery_source.py index 5a11a933c85954..4cfa5c48d23771 100644 --- a/metadata-ingestion/tests/unit/test_bigquery_source.py +++ b/metadata-ingestion/tests/unit/test_bigquery_source.py @@ -53,6 +53,59 @@ def test_bigquery_uri_on_behalf(): assert config.get_sql_alchemy_url() == "bigquery://test-project-on-behalf" +def test_bigquery_dataset_pattern(): + config = BigQueryV2Config.parse_obj( + { + "dataset_pattern": { + "allow": [ + "test-dataset", + "test-project.test-dataset", + ".*test-dataset", + ], + "deny": [ + "^test-dataset-2$", + "project\\.second_dataset", + ], + }, + } + ) + assert config.dataset_pattern.allow == [ + r".*\.test-dataset", + r"test-project.test-dataset", + r".*test-dataset", + ] + assert config.dataset_pattern.deny == [ + r"^.*\.test-dataset-2$", + r"project\.second_dataset", + ] + + config = BigQueryV2Config.parse_obj( + { + "dataset_pattern": { + "allow": [ + "test-dataset", + "test-project.test-dataset", + ".*test-dataset", + ], + "deny": [ + "^test-dataset-2$", + "project\\.second_dataset", + ], + }, + "match_fully_qualified_names": False, + } + ) + assert config.dataset_pattern.allow == [ + r"test-dataset", + r"test-project.test-dataset", + r".*test-dataset", + ] + assert config.dataset_pattern.deny == [ + r"^test-dataset-2$", + r"project\.second_dataset", + ] + + def test_bigquery_uri_with_credential(): expected_credential_json = { "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", From 2ebf33eb13d14c17bc6cb0eaee3a97dba33ea338 Mon Sep 17 00:00:00 2001 From: Zachary McNellis Date: Wed, 25 Oct 2023 16:25:41 -0400 Subject: [PATCH 9/9] docs(observability): Column Assertion user guide (#9106) Co-authored-by: John Joyce --- docs-website/sidebars.js | 1 + .../observe/column-assertions.md | 358 ++++++++++++++++++ 2 files changed, 359 insertions(+) create mode 100644 docs/managed-datahub/observe/column-assertions.md diff --git a/docs-website/sidebars.js b/docs-website/sidebars.js index b2b3df4dfb33c4..31d69aec46d8b9 100644 --- a/docs-website/sidebars.js +++ b/docs-website/sidebars.js @@ -446,6 +446,7 @@ module.exports = { "docs/managed-datahub/observe/freshness-assertions", "docs/managed-datahub/observe/volume-assertions", "docs/managed-datahub/observe/custom-sql-assertions", + "docs/managed-datahub/observe/column-assertions", ], }, { diff --git a/docs/managed-datahub/observe/column-assertions.md b/docs/managed-datahub/observe/column-assertions.md new file mode 100644 index 00000000000000..99a764f7716766 --- /dev/null +++ b/docs/managed-datahub/observe/column-assertions.md @@ -0,0 +1,358 @@ +--- +description: This page provides an overview of working with DataHub Column Assertions +--- +import FeatureAvailability from '@site/src/components/FeatureAvailability'; + + +# Column Assertions + + + +> ⚠️ The **Column Assertions** feature is currently in private beta, part of the **Acryl Observe** module, and may only +> be available to a limited set of design partners. +> +> If you are interested in trying it and providing feedback, please reach out to your Acryl Customer Success +> representative. + +## Introduction + +Can you remember a time when an important warehouse table column changed dramatically, with little or no notice? Perhaps the number of null values suddenly spiked, or a new value was added to a fixed set of possible values. If the answer is yes, how did you initially find out? We'll take a guess - someone looking at an internal reporting dashboard or worse, a user using your your product, sounded an alarm when a number looked a bit out of the ordinary. + +There are many reasons why important columns in your Snowflake, Redshift, or BigQuery tables may change - application code bugs, new feature rollouts, etc. Oftentimes, these changes break important assumptions made about the data used in building key downstream data products like reporting dashboards or data-driven product features. + +What if you could reduce the time to detect these incidents, so that the people responsible for the data were made aware of data issues before anyone else? With Acryl DataHub Column Assertions, you can. + +With Acryl DataHub, you can define **Column Value** assertions to ensure each value in a column matches specific constraints, and **Column Metric** assertions to ensure that computed metrics from columns align with your expectations. As soon as things go wrong, your team will be the first to know, before the data issue becomes a larger data incident. + +In this guide, we'll cover the basics of Column Assertions - what they are, how to configure them, and more - so that you and your team can start building trust in your most important data assets. + +Let's dive in! + +## Support + +Column Assertions are currently supported for: + +1. Snowflake +2. Redshift +3. BigQuery + +Note that an Ingestion Source _must_ be configured with the data platform of your choice in +Acryl DataHub's **Ingestion** tab. + +> Note that Column Assertions are not yet supported if you are connecting to your warehouse +> using the DataHub CLI or a Remote Ingestion Executor. + +## What is a Column Assertion? + +A **Column Assertion** is a highly configurable Data Quality rule used to monitor specific columns of a Data Warehouse table for unexpected changes. + +Column Assertions are defined to validate a specific column, and can be used to + +1. Validate that the values of the column match some constraints (regex, allowed values, max, min, etc) across rows OR +2. Validate that specific column aggregation metrics match some expectations across rows. + +Column Assertions can be particularly useful for documenting and enforcing column-level "contracts", i.e. formal specifications about the expected contents of a particular column that can be used for coordinating among producers and consumers of the data. + +### Anatomy of Column Assertion + +Column Assertions can be divided into two main types: **Column Value** and **Column Metric** Assertions. + +A **Column Value Assertion** is used to monitor the value of a specific column in a table, and ensure that every row +adheres to a specific condition. In comparison, a **Column Metric Assertion** is used to compute a metric for that column, +and ensure that the value of that metric adheres to a specific condition. + +At the most basic level, both types consist of a few important parts: + +1. An **Evaluation Schedule** +2. A **Column Selection** +3. A **Evaluation Criteria** +4. A **Row Evaluation Type** + +In this section, we'll give an overview of each. + +#### 1. Evaluation Schedule + +The **Evaluation Schedule**: This defines how often to evaluate the Column Assertion against the given warehouse table. +This should usually be configured to match the expected change frequency of the table, although it can also be less +frequently depending on your requirements. You can also specify specific days of the week, hours in the day, or even +minutes in an hour. + +#### 2. Column Selection + +The **Column Selection**: This defines the column that should be monitored by the Column Assertion. You can choose from +any of the columns from the table listed in the dropdown. Note that columns of struct / object type are not currently supported. + +#### 3. Evaluation Criteria + +The **Evaluation Criteria**: This defines the condition that must be satisfied in order for the Column +Assertion to pass. + +For **Column Value Assertions**, you will be able to choose from a set of operators that can be applied to the column +value. The options presented will vary based on the data type of the selected column. For example, if you've selected a numeric column, you +can verify that the column value is greater than a particular value. For string types, you can check that the column value +matches a particular regex pattern. Additionally, you are able to control the behavior of the check in the presence of NULL values. If the +**Allow Nulls** option is _disabled_, then any null values encountered will be reported as a failure when evaluating the +assertion. If **Allow Nulls** is enabled, then nulls will be ignored; the condition will be evaluated for rows where the column value is non-null. + +For **Column Metric Assertions**, you will be able to choose from a list of common column metrics - MAX, MIN, MEAN, NULL COUNT, etc - and then compare these metric values to an expected value. The list of metrics will vary based on the type of the selected column. For example +if you've selected a numeric column, you can choose to compute the MEAN value of the column, and then assert that it is greater than a +specific number. For string types, you can choose to compute the MAX LENGTH of the string across all column values, and then assert that it +is less than a specific number. + +#### 4. Row Selection Set + +The **Row Selection Set**: This defines which rows in the table the Column Assertion will be evaluated across. You can choose +from the following options: + +- **All Table Rows**: Evaluate the Column Assertion across all rows in the table. This is the default option. Note that +this may not be desirable for large tables. + +- **Only Rows That Have Changed**: Evaluate the Column Assertion only against rows that have changed since the last +evaluation of the assertion. If you choose this option, you will need to specify a **High Watermark Column** to help determine which rows +have changed. A **High Watermark Column** is a column that contains a constantly incrementing value - a date, a time, or +another always-increasing number - that can be used to find the "new rows" that were added since previous evaluation. When selected, a query will be issued to the table to find only the rows that have changed since the previous assertion evaluation. + +## Creating a Column Assertion + +### Prerequisites + +1. **Permissions**: To create or delete Column Assertions for a specific entity on DataHub, you'll need to be granted the + `Edit Assertions` and `Edit Monitors` privileges for the entity. This is granted to Entity owners by default. + +2. **Data Platform Connection**: In order to create a Column Assertion, you'll need to have an **Ingestion Source** + configured to your Data Platform: Snowflake, BigQuery, or Redshift under the **Ingestion** tab. + +Once these are in place, you're ready to create your Column Assertions! + +### Steps + +1. Navigate to the Table that you want to monitor +2. Click the **Validations** tab + +

+ +

+ +3. Click **+ Create Assertion** + +

+ +

+ +4. Choose **Column** + +5. Configure the evaluation **schedule**. This is the frequency at which the assertion will be evaluated to produce a + pass or fail result, and the times when the column values will be checked. + +6. Configure the **column assertion type**. You can choose from **Column Value** or **Column Metric**. + **Column Value** assertions are used to monitor the value of a specific column in a table, and ensure that every row + adheres to a specific condition. **Column Metric** assertions are used to compute a metric for that column, and then compare the value of that metric to your expectations. + +

+ +

+ +7. Configure the **column selection**. This defines the column that should be monitored by the Column Assertion. + You can choose from any of the columns from the table listed in the dropdown. + +

+ +

+ +8. Configure the **evaluation criteria**. This step varies based on the type of assertion you chose in the previous step. + + - **Column Value Assertions**: You will be able to choose from a set of operators that can be applied to the column + value. The options presented will vary based on the data type of the selected column. For example with numeric types, you + can check that the column value is greater than a specific value. For string types, you can check that the column value + matches a particular regex pattern. You will also be able to control the behavior of null values in the column. If the + **Allow Nulls** option is _disabled_, any null values encountered will be reported as a failure when evaluating the + assertion. + + - **Column Metric Assertions**: You will be able to choose from a list of common metrics and then specify the operator + and value to compare against. The list of metrics will vary based on the data type of the selected column. For example + with numeric types, you can choose to compute the average value of the column, and then assert that it is greater than a + specific number. For string types, you can choose to compute the max length of all column values, and then assert that it + is less than a specific number. + +9. Configure the **row evaluation type**. This defines which rows in the table the Column Assertion should evaluate. You can choose + from the following options: + + - **All Table Rows**: Evaluate the Column Assertion against all rows in the table. This is the default option. Note that + this may not be desirable for large tables. + + - **Only Rows That Have Changed**: Evaluate the Column Assertion only against rows that have changed since the last + evaluation. If you choose this option, you will need to specify a **High Watermark Column** to help determine which rows + have changed. A **High Watermark Column** is a column that contains a constantly-incrementing value - a date, a time, or + another always-increasing number. When selected, a query will be issued to the table find only the rows which have changed since the last assertion run. + +

+ +

+ +10. (Optional) Click **Advanced** to further customize the Column Assertion. The options listed here will vary based on the + type of assertion you chose in the previous step. + + - **Invalid Values Threshold**: For **Column Value** assertions, you can configure the number of invalid values + (i.e. rows) that are allowed to fail before the assertion is marked as failing. This is useful if you want to allow a limited number + of invalid values in the column. By default this is 0, meaning the assertion will fail if any rows have an invalid column value. + + - **Source**: For **Column Metric** assertions, you can choose the mechanism that will be used to obtain the column + metric. **Query** will issue a query to the dataset to compute the metric. **DataHub Dataset Profile** will use the + DataHub Dataset Profile metadata to compute the metric. Note that this option requires that dataset profiling + statistics are up-to-date as of the assertion run time. + + - **Additional Filters**: You can choose to add additional filters to the query that will be used to evaluate the + assertion. This is useful if you want to limit the assertion to a subset of rows in the table. Note this option will not + be available if you choose **DataHub Dataset Profile** as the **source**. + +11. Click **Next** +12. Configure actions that should be taken when the Column Assertion passes or fails + +

+ +

+ +- **Raise incident**: Automatically raise a new DataHub `Column` Incident for the Table whenever the Column Assertion is failing. This + may indicate that the Table is unfit for consumption. Configure Slack Notifications under **Settings** to be notified when + an incident is created due to an Assertion failure. +- **Resolve incident**: Automatically resolved any incidents that were raised due to failures in this Column Assertion. Note that + any other incidents will not be impacted. + +10. Click **Save**. + +And that's it! DataHub will now begin to monitor your Column Assertion for the table. + +To view the time of the next Column Assertion evaluation, simply click **Column** and then click on your +new Assertion: + +

+ +

+ +Once your assertion has run, you will begin to see Success or Failure status for the Table + +

+ +

+ +## Stopping a Column Assertion + +In order to temporarily stop the evaluation of a Column Assertion: + +1. Navigate to the **Validations** tab of the table with the assertion +2. Click **Column** to open the Column Assertions list +3. Click the three-dot menu on the right side of the assertion you want to disable +4. Click **Stop** + +

+ +

+ +To resume the Column Assertion, simply click **Turn On**. + +

+ +

+ +## Creating Column Assertions via API + +Under the hood, Acryl DataHub implements Column Assertion Monitoring using two "entity" concepts: + +- **Assertion**: The specific expectation for the column metric. e.g. "The value of an integer column is greater than 10 for all rows in the table." This is the "what". + +- **Monitor**: The process responsible for evaluating the Assertion on a given evaluation schedule and using specific + mechanisms. This is the "how". + +Note that to create or delete Assertions and Monitors for a specific entity on DataHub, you'll need the +`Edit Assertions` and `Edit Monitors` privileges for it. + +#### GraphQL + +In order to create a Column Assertion that is being monitored on a specific **Evaluation Schedule**, you'll need to use 2 +GraphQL mutation queries to create a Column Assertion entity and create an Assertion Monitor entity responsible for evaluating it. + +Start by creating the Column Assertion entity using the `createFieldAssertion` query and hang on to the 'urn' field of the Assertion entity +you get back. Then continue by creating a Monitor entity using the `createAssertionMonitor`. + +##### Examples + +To create a Column Assertion Entity that checks that the value of an integer column is greater than 10: + +```json +mutation createFieldAssertion { + createFieldAssertion( + input: { + entityUrn: "", + type: FIELD_VALUES, + fieldValuesAssertion: { + field: { + path: "", + type: "NUMBER", + nativeType: "NUMBER(38,0)" + }, + operator: GREATER_THAN, + parameters: { + value: { + type: NUMBER, + value: "10" + } + }, + failThreshold: { + type: COUNT, + value: 0 + }, + excludeNulls: true + } + } + ) { + urn +} +} +``` + +To create an Assertion Monitor Entity that evaluates the column assertion every 8 hours using all rows in the table: + +```json +mutation createAssertionMonitor { + createAssertionMonitor( + input: { + entityUrn: "", + assertionUrn: "", + schedule: { + cron: "0 */8 * * *", + timezone: "America/Los_Angeles" + }, + parameters: { + type: DATASET_FIELD, + datasetFieldParameters: { + sourceType: ALL_ROWS_QUERY + } + } + } + ) { + urn + } +} +``` + +This entity defines _when_ to run the check (Using CRON format - every 8th hour) and _how_ to run the check (using a query against all rows of the table). + +After creating the monitor, the new assertion will start to be evaluated every 8 hours in your selected timezone. + +You can delete assertions along with their monitors using GraphQL mutations: `deleteAssertion` and `deleteMonitor`. + +### Tips + +:::info +**Authorization** + +Remember to always provide a DataHub Personal Access Token when calling the GraphQL API. To do so, just add the 'Authorization' header as follows: + +``` +Authorization: Bearer +``` + +**Exploring GraphQL API** + +Also, remember that you can play with an interactive version of the Acryl GraphQL API at `https://your-account-id.acryl.io/api/graphiql` +:::