From 245284ec6c6b754b22943ba42d7139ddd5772377 Mon Sep 17 00:00:00 2001 From: jayasimhankv <145704974+jayasimhankv@users.noreply.github.com> Date: Wed, 11 Oct 2023 17:40:20 -0500 Subject: [PATCH 1/6] fix(): Display generic not found page for corp groups that do not exist (#8880) Co-authored-by: Jay Kadambi --- .../java/com/linkedin/datahub/graphql/GmsGraphQLEngine.java | 3 ++- datahub-graphql-core/src/main/resources/entity.graphql | 5 +++++ datahub-web-react/src/app/entity/group/GroupProfile.tsx | 4 ++++ datahub-web-react/src/graphql/group.graphql | 1 + 4 files changed, 12 insertions(+), 1 deletion(-) diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/GmsGraphQLEngine.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/GmsGraphQLEngine.java index ebb5c7d62c7d3a..b99f712034fe03 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/GmsGraphQLEngine.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/GmsGraphQLEngine.java @@ -1292,7 +1292,8 @@ private void configureCorpUserResolvers(final RuntimeWiring.Builder builder) { */ private void configureCorpGroupResolvers(final RuntimeWiring.Builder builder) { builder.type("CorpGroup", typeWiring -> typeWiring - .dataFetcher("relationships", new EntityRelationshipsResultResolver(graphClient))); + .dataFetcher("relationships", new EntityRelationshipsResultResolver(graphClient)) + .dataFetcher("exists", new EntityExistsResolver(entityService))); builder.type("CorpGroupInfo", typeWiring -> typeWiring .dataFetcher("admins", new LoadableTypeBatchResolver<>(corpUserType, diff --git a/datahub-graphql-core/src/main/resources/entity.graphql b/datahub-graphql-core/src/main/resources/entity.graphql index 0b15d7b875a9ca..b37a8f34fa0563 100644 --- a/datahub-graphql-core/src/main/resources/entity.graphql +++ b/datahub-graphql-core/src/main/resources/entity.graphql @@ -3788,6 +3788,11 @@ type CorpGroup implements Entity { Additional read only info about the group """ info: CorpGroupInfo @deprecated + + """ + Whether or not this entity exists on DataHub + """ + exists: Boolean } """ diff --git a/datahub-web-react/src/app/entity/group/GroupProfile.tsx b/datahub-web-react/src/app/entity/group/GroupProfile.tsx index d5e284af931df3..53d2062277dec0 100644 --- a/datahub-web-react/src/app/entity/group/GroupProfile.tsx +++ b/datahub-web-react/src/app/entity/group/GroupProfile.tsx @@ -11,6 +11,7 @@ import { RoutedTabs } from '../../shared/RoutedTabs'; import GroupInfoSidebar from './GroupInfoSideBar'; import { GroupAssets } from './GroupAssets'; import { ErrorSection } from '../../shared/error/ErrorSection'; +import NonExistentEntityPage from '../shared/entity/NonExistentEntityPage'; const messageStyle = { marginTop: '10%' }; @@ -110,6 +111,9 @@ export default function GroupProfile() { urn, }; + if (data?.corpGroup?.exists === false) { + return ; + } return ( <> {error && } diff --git a/datahub-web-react/src/graphql/group.graphql b/datahub-web-react/src/graphql/group.graphql index 9aa6e2b005f16c..1007721e51a4ec 100644 --- a/datahub-web-react/src/graphql/group.graphql +++ b/datahub-web-react/src/graphql/group.graphql @@ -3,6 +3,7 @@ query getGroup($urn: String!, $membersCount: Int!) { urn type name + exists origin { type externalType From 245c5c00087116d236acf7a9bbddbdb4dee15949 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20G=C3=B3mez=20Villamor?= Date: Thu, 12 Oct 2023 02:06:19 +0200 Subject: [PATCH 2/6] fix(ingest/looker): stop emitting tag owner (#8942) --- docs/how/updating-datahub.md | 2 + .../ingestion/source/looker/looker_common.py | 13 +----- .../looker/golden_looker_mces.json | 42 ------------------- .../looker/golden_test_allow_ingest.json | 42 ------------------- ...olden_test_external_project_view_mces.json | 42 ------------------- .../looker/golden_test_file_path_ingest.json | 42 ------------------- .../golden_test_independent_look_ingest.json | 42 ------------------- .../looker/golden_test_ingest.json | 42 ------------------- .../looker/golden_test_ingest_joins.json | 42 ------------------- .../golden_test_ingest_unaliased_joins.json | 42 ------------------- .../looker_mces_golden_deleted_stateful.json | 42 ------------------- .../looker/looker_mces_usage_history.json | 42 ------------------- .../lookml/lookml_mces_api_bigquery.json | 42 ------------------- .../lookml/lookml_mces_api_hive2.json | 42 ------------------- .../lookml/lookml_mces_badsql_parser.json | 42 ------------------- .../lookml/lookml_mces_offline.json | 42 ------------------- .../lookml_mces_offline_deny_pattern.json | 42 ------------------- ...lookml_mces_offline_platform_instance.json | 42 ------------------- .../lookml_mces_with_external_urls.json | 42 ------------------- .../lookml/lookml_reachable_views.json | 42 ------------------- 20 files changed, 3 insertions(+), 768 deletions(-) diff --git a/docs/how/updating-datahub.md b/docs/how/updating-datahub.md index 5d0ad5eaf8f7e1..9cd4ad5c6f02d0 100644 --- a/docs/how/updating-datahub.md +++ b/docs/how/updating-datahub.md @@ -7,6 +7,8 @@ This file documents any backwards-incompatible changes in DataHub and assists pe ### Breaking Changes - #8810 - Removed support for SQLAlchemy 1.3.x. Only SQLAlchemy 1.4.x is supported now. +- #8942 - Removed `urn:li:corpuser:datahub` owner for the `Measure`, `Dimension` and `Temporal` tags emitted + by Looker and LookML source connectors. - #8853 - The Airflow plugin no longer supports Airflow 2.0.x or Python 3.7. See the docs for more details. - #8853 - Introduced the Airflow plugin v2. If you're using Airflow 2.3+, the v2 plugin will be enabled by default, and so you'll need to switch your requirements to include `pip install 'acryl-datahub-airflow-plugin[plugin-v2]'`. To continue using the v1 plugin, set the `DATAHUB_AIRFLOW_PLUGIN_USE_V1_PLUGIN` environment variable to `true`. - #8943 The Unity Catalog ingestion source has a new option `include_metastore`, which will cause all urns to be changed when disabled. diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_common.py b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_common.py index 89b1e45695c578..30c38720dd96c4 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_common.py @@ -81,9 +81,6 @@ EnumTypeClass, FineGrainedLineageClass, GlobalTagsClass, - OwnerClass, - OwnershipClass, - OwnershipTypeClass, SchemaMetadataClass, StatusClass, SubTypesClass, @@ -453,17 +450,9 @@ def _get_schema( @staticmethod def _get_tag_mce_for_urn(tag_urn: str) -> MetadataChangeEvent: assert tag_urn in LookerUtil.tag_definitions - ownership = OwnershipClass( - owners=[ - OwnerClass( - owner="urn:li:corpuser:datahub", - type=OwnershipTypeClass.DATAOWNER, - ) - ] - ) return MetadataChangeEvent( proposedSnapshot=TagSnapshotClass( - urn=tag_urn, aspects=[ownership, LookerUtil.tag_definitions[tag_urn]] + urn=tag_urn, aspects=[LookerUtil.tag_definitions[tag_urn]] ) ) diff --git a/metadata-ingestion/tests/integration/looker/golden_looker_mces.json b/metadata-ingestion/tests/integration/looker/golden_looker_mces.json index dee85b40bb7a81..1da42b94e320cf 100644 --- a/metadata-ingestion/tests/integration/looker/golden_looker_mces.json +++ b/metadata-ingestion/tests/integration/looker/golden_looker_mces.json @@ -533,20 +533,6 @@ "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { "urn": "urn:li:tag:Dimension", "aspects": [ - { - "com.linkedin.pegasus2avro.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:datahub", - "type": "DATAOWNER" - } - ], - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - } - } - }, { "com.linkedin.pegasus2avro.tag.TagProperties": { "name": "Dimension", @@ -566,20 +552,6 @@ "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { "urn": "urn:li:tag:Temporal", "aspects": [ - { - "com.linkedin.pegasus2avro.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:datahub", - "type": "DATAOWNER" - } - ], - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - } - } - }, { "com.linkedin.pegasus2avro.tag.TagProperties": { "name": "Temporal", @@ -599,20 +571,6 @@ "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { "urn": "urn:li:tag:Measure", "aspects": [ - { - "com.linkedin.pegasus2avro.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:datahub", - "type": "DATAOWNER" - } - ], - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - } - } - }, { "com.linkedin.pegasus2avro.tag.TagProperties": { "name": "Measure", diff --git a/metadata-ingestion/tests/integration/looker/golden_test_allow_ingest.json b/metadata-ingestion/tests/integration/looker/golden_test_allow_ingest.json index 72db36e63daf77..685a606a57c339 100644 --- a/metadata-ingestion/tests/integration/looker/golden_test_allow_ingest.json +++ b/metadata-ingestion/tests/integration/looker/golden_test_allow_ingest.json @@ -327,20 +327,6 @@ "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { "urn": "urn:li:tag:Dimension", "aspects": [ - { - "com.linkedin.pegasus2avro.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:datahub", - "type": "DATAOWNER" - } - ], - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - } - } - }, { "com.linkedin.pegasus2avro.tag.TagProperties": { "name": "Dimension", @@ -360,20 +346,6 @@ "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { "urn": "urn:li:tag:Temporal", "aspects": [ - { - "com.linkedin.pegasus2avro.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:datahub", - "type": "DATAOWNER" - } - ], - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - } - } - }, { "com.linkedin.pegasus2avro.tag.TagProperties": { "name": "Temporal", @@ -393,20 +365,6 @@ "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { "urn": "urn:li:tag:Measure", "aspects": [ - { - "com.linkedin.pegasus2avro.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:datahub", - "type": "DATAOWNER" - } - ], - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - } - } - }, { "com.linkedin.pegasus2avro.tag.TagProperties": { "name": "Measure", diff --git a/metadata-ingestion/tests/integration/looker/golden_test_external_project_view_mces.json b/metadata-ingestion/tests/integration/looker/golden_test_external_project_view_mces.json index e5508bdb06b9e0..069788cb088ac2 100644 --- a/metadata-ingestion/tests/integration/looker/golden_test_external_project_view_mces.json +++ b/metadata-ingestion/tests/integration/looker/golden_test_external_project_view_mces.json @@ -327,20 +327,6 @@ "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { "urn": "urn:li:tag:Dimension", "aspects": [ - { - "com.linkedin.pegasus2avro.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:datahub", - "type": "DATAOWNER" - } - ], - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - } - } - }, { "com.linkedin.pegasus2avro.tag.TagProperties": { "name": "Dimension", @@ -360,20 +346,6 @@ "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { "urn": "urn:li:tag:Temporal", "aspects": [ - { - "com.linkedin.pegasus2avro.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:datahub", - "type": "DATAOWNER" - } - ], - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - } - } - }, { "com.linkedin.pegasus2avro.tag.TagProperties": { "name": "Temporal", @@ -393,20 +365,6 @@ "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { "urn": "urn:li:tag:Measure", "aspects": [ - { - "com.linkedin.pegasus2avro.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:datahub", - "type": "DATAOWNER" - } - ], - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - } - } - }, { "com.linkedin.pegasus2avro.tag.TagProperties": { "name": "Measure", diff --git a/metadata-ingestion/tests/integration/looker/golden_test_file_path_ingest.json b/metadata-ingestion/tests/integration/looker/golden_test_file_path_ingest.json index b0f66e7b245c96..f1c932ebd5a707 100644 --- a/metadata-ingestion/tests/integration/looker/golden_test_file_path_ingest.json +++ b/metadata-ingestion/tests/integration/looker/golden_test_file_path_ingest.json @@ -335,20 +335,6 @@ "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { "urn": "urn:li:tag:Dimension", "aspects": [ - { - "com.linkedin.pegasus2avro.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:datahub", - "type": "DATAOWNER" - } - ], - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - } - } - }, { "com.linkedin.pegasus2avro.tag.TagProperties": { "name": "Dimension", @@ -369,20 +355,6 @@ "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { "urn": "urn:li:tag:Temporal", "aspects": [ - { - "com.linkedin.pegasus2avro.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:datahub", - "type": "DATAOWNER" - } - ], - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - } - } - }, { "com.linkedin.pegasus2avro.tag.TagProperties": { "name": "Temporal", @@ -403,20 +375,6 @@ "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { "urn": "urn:li:tag:Measure", "aspects": [ - { - "com.linkedin.pegasus2avro.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:datahub", - "type": "DATAOWNER" - } - ], - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - } - } - }, { "com.linkedin.pegasus2avro.tag.TagProperties": { "name": "Measure", diff --git a/metadata-ingestion/tests/integration/looker/golden_test_independent_look_ingest.json b/metadata-ingestion/tests/integration/looker/golden_test_independent_look_ingest.json index 91e13debfa0283..9521c9af4bbdcc 100644 --- a/metadata-ingestion/tests/integration/looker/golden_test_independent_look_ingest.json +++ b/metadata-ingestion/tests/integration/looker/golden_test_independent_look_ingest.json @@ -550,20 +550,6 @@ "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { "urn": "urn:li:tag:Dimension", "aspects": [ - { - "com.linkedin.pegasus2avro.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:datahub", - "type": "DATAOWNER" - } - ], - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - } - } - }, { "com.linkedin.pegasus2avro.tag.TagProperties": { "name": "Dimension", @@ -583,20 +569,6 @@ "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { "urn": "urn:li:tag:Temporal", "aspects": [ - { - "com.linkedin.pegasus2avro.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:datahub", - "type": "DATAOWNER" - } - ], - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - } - } - }, { "com.linkedin.pegasus2avro.tag.TagProperties": { "name": "Temporal", @@ -616,20 +588,6 @@ "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { "urn": "urn:li:tag:Measure", "aspects": [ - { - "com.linkedin.pegasus2avro.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:datahub", - "type": "DATAOWNER" - } - ], - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - } - } - }, { "com.linkedin.pegasus2avro.tag.TagProperties": { "name": "Measure", diff --git a/metadata-ingestion/tests/integration/looker/golden_test_ingest.json b/metadata-ingestion/tests/integration/looker/golden_test_ingest.json index e93079119e4f49..dbacd52fe83de5 100644 --- a/metadata-ingestion/tests/integration/looker/golden_test_ingest.json +++ b/metadata-ingestion/tests/integration/looker/golden_test_ingest.json @@ -327,20 +327,6 @@ "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { "urn": "urn:li:tag:Dimension", "aspects": [ - { - "com.linkedin.pegasus2avro.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:datahub", - "type": "DATAOWNER" - } - ], - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - } - } - }, { "com.linkedin.pegasus2avro.tag.TagProperties": { "name": "Dimension", @@ -360,20 +346,6 @@ "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { "urn": "urn:li:tag:Temporal", "aspects": [ - { - "com.linkedin.pegasus2avro.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:datahub", - "type": "DATAOWNER" - } - ], - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - } - } - }, { "com.linkedin.pegasus2avro.tag.TagProperties": { "name": "Temporal", @@ -393,20 +365,6 @@ "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { "urn": "urn:li:tag:Measure", "aspects": [ - { - "com.linkedin.pegasus2avro.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:datahub", - "type": "DATAOWNER" - } - ], - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - } - } - }, { "com.linkedin.pegasus2avro.tag.TagProperties": { "name": "Measure", diff --git a/metadata-ingestion/tests/integration/looker/golden_test_ingest_joins.json b/metadata-ingestion/tests/integration/looker/golden_test_ingest_joins.json index a9c8efa7cdb980..aaa874d9ff3483 100644 --- a/metadata-ingestion/tests/integration/looker/golden_test_ingest_joins.json +++ b/metadata-ingestion/tests/integration/looker/golden_test_ingest_joins.json @@ -351,20 +351,6 @@ "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { "urn": "urn:li:tag:Dimension", "aspects": [ - { - "com.linkedin.pegasus2avro.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:datahub", - "type": "DATAOWNER" - } - ], - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - } - } - }, { "com.linkedin.pegasus2avro.tag.TagProperties": { "name": "Dimension", @@ -384,20 +370,6 @@ "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { "urn": "urn:li:tag:Temporal", "aspects": [ - { - "com.linkedin.pegasus2avro.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:datahub", - "type": "DATAOWNER" - } - ], - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - } - } - }, { "com.linkedin.pegasus2avro.tag.TagProperties": { "name": "Temporal", @@ -417,20 +389,6 @@ "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { "urn": "urn:li:tag:Measure", "aspects": [ - { - "com.linkedin.pegasus2avro.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:datahub", - "type": "DATAOWNER" - } - ], - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - } - } - }, { "com.linkedin.pegasus2avro.tag.TagProperties": { "name": "Measure", diff --git a/metadata-ingestion/tests/integration/looker/golden_test_ingest_unaliased_joins.json b/metadata-ingestion/tests/integration/looker/golden_test_ingest_unaliased_joins.json index edd15624a14cd4..be8db0722aea33 100644 --- a/metadata-ingestion/tests/integration/looker/golden_test_ingest_unaliased_joins.json +++ b/metadata-ingestion/tests/integration/looker/golden_test_ingest_unaliased_joins.json @@ -343,20 +343,6 @@ "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { "urn": "urn:li:tag:Dimension", "aspects": [ - { - "com.linkedin.pegasus2avro.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:datahub", - "type": "DATAOWNER" - } - ], - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - } - } - }, { "com.linkedin.pegasus2avro.tag.TagProperties": { "name": "Dimension", @@ -376,20 +362,6 @@ "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { "urn": "urn:li:tag:Temporal", "aspects": [ - { - "com.linkedin.pegasus2avro.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:datahub", - "type": "DATAOWNER" - } - ], - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - } - } - }, { "com.linkedin.pegasus2avro.tag.TagProperties": { "name": "Temporal", @@ -409,20 +381,6 @@ "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { "urn": "urn:li:tag:Measure", "aspects": [ - { - "com.linkedin.pegasus2avro.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:datahub", - "type": "DATAOWNER" - } - ], - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - } - } - }, { "com.linkedin.pegasus2avro.tag.TagProperties": { "name": "Measure", diff --git a/metadata-ingestion/tests/integration/looker/looker_mces_golden_deleted_stateful.json b/metadata-ingestion/tests/integration/looker/looker_mces_golden_deleted_stateful.json index aebc89b609a08b..05b74f163ad45f 100644 --- a/metadata-ingestion/tests/integration/looker/looker_mces_golden_deleted_stateful.json +++ b/metadata-ingestion/tests/integration/looker/looker_mces_golden_deleted_stateful.json @@ -327,20 +327,6 @@ "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { "urn": "urn:li:tag:Dimension", "aspects": [ - { - "com.linkedin.pegasus2avro.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:datahub", - "type": "DATAOWNER" - } - ], - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - } - } - }, { "com.linkedin.pegasus2avro.tag.TagProperties": { "name": "Dimension", @@ -360,20 +346,6 @@ "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { "urn": "urn:li:tag:Temporal", "aspects": [ - { - "com.linkedin.pegasus2avro.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:datahub", - "type": "DATAOWNER" - } - ], - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - } - } - }, { "com.linkedin.pegasus2avro.tag.TagProperties": { "name": "Temporal", @@ -393,20 +365,6 @@ "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { "urn": "urn:li:tag:Measure", "aspects": [ - { - "com.linkedin.pegasus2avro.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:datahub", - "type": "DATAOWNER" - } - ], - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - } - } - }, { "com.linkedin.pegasus2avro.tag.TagProperties": { "name": "Measure", diff --git a/metadata-ingestion/tests/integration/looker/looker_mces_usage_history.json b/metadata-ingestion/tests/integration/looker/looker_mces_usage_history.json index 34bded3cf691e5..0778aa0050b007 100644 --- a/metadata-ingestion/tests/integration/looker/looker_mces_usage_history.json +++ b/metadata-ingestion/tests/integration/looker/looker_mces_usage_history.json @@ -279,20 +279,6 @@ "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { "urn": "urn:li:tag:Dimension", "aspects": [ - { - "com.linkedin.pegasus2avro.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:datahub", - "type": "DATAOWNER" - } - ], - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - } - } - }, { "com.linkedin.pegasus2avro.tag.TagProperties": { "name": "Dimension", @@ -312,20 +298,6 @@ "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { "urn": "urn:li:tag:Temporal", "aspects": [ - { - "com.linkedin.pegasus2avro.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:datahub", - "type": "DATAOWNER" - } - ], - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - } - } - }, { "com.linkedin.pegasus2avro.tag.TagProperties": { "name": "Temporal", @@ -345,20 +317,6 @@ "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { "urn": "urn:li:tag:Measure", "aspects": [ - { - "com.linkedin.pegasus2avro.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:datahub", - "type": "DATAOWNER" - } - ], - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - } - } - }, { "com.linkedin.pegasus2avro.tag.TagProperties": { "name": "Measure", diff --git a/metadata-ingestion/tests/integration/lookml/lookml_mces_api_bigquery.json b/metadata-ingestion/tests/integration/lookml/lookml_mces_api_bigquery.json index 238f4c2580cdf2..5a0bd4e12fd3a2 100644 --- a/metadata-ingestion/tests/integration/lookml/lookml_mces_api_bigquery.json +++ b/metadata-ingestion/tests/integration/lookml/lookml_mces_api_bigquery.json @@ -2121,20 +2121,6 @@ "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { "urn": "urn:li:tag:Dimension", "aspects": [ - { - "com.linkedin.pegasus2avro.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:datahub", - "type": "DATAOWNER" - } - ], - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - } - } - }, { "com.linkedin.pegasus2avro.tag.TagProperties": { "name": "Dimension", @@ -2154,20 +2140,6 @@ "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { "urn": "urn:li:tag:Temporal", "aspects": [ - { - "com.linkedin.pegasus2avro.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:datahub", - "type": "DATAOWNER" - } - ], - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - } - } - }, { "com.linkedin.pegasus2avro.tag.TagProperties": { "name": "Temporal", @@ -2187,20 +2159,6 @@ "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { "urn": "urn:li:tag:Measure", "aspects": [ - { - "com.linkedin.pegasus2avro.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:datahub", - "type": "DATAOWNER" - } - ], - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - } - } - }, { "com.linkedin.pegasus2avro.tag.TagProperties": { "name": "Measure", diff --git a/metadata-ingestion/tests/integration/lookml/lookml_mces_api_hive2.json b/metadata-ingestion/tests/integration/lookml/lookml_mces_api_hive2.json index 45d5d839e9d21c..1b0ee3216383cd 100644 --- a/metadata-ingestion/tests/integration/lookml/lookml_mces_api_hive2.json +++ b/metadata-ingestion/tests/integration/lookml/lookml_mces_api_hive2.json @@ -2121,20 +2121,6 @@ "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { "urn": "urn:li:tag:Dimension", "aspects": [ - { - "com.linkedin.pegasus2avro.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:datahub", - "type": "DATAOWNER" - } - ], - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - } - } - }, { "com.linkedin.pegasus2avro.tag.TagProperties": { "name": "Dimension", @@ -2154,20 +2140,6 @@ "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { "urn": "urn:li:tag:Temporal", "aspects": [ - { - "com.linkedin.pegasus2avro.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:datahub", - "type": "DATAOWNER" - } - ], - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - } - } - }, { "com.linkedin.pegasus2avro.tag.TagProperties": { "name": "Temporal", @@ -2187,20 +2159,6 @@ "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { "urn": "urn:li:tag:Measure", "aspects": [ - { - "com.linkedin.pegasus2avro.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:datahub", - "type": "DATAOWNER" - } - ], - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - } - } - }, { "com.linkedin.pegasus2avro.tag.TagProperties": { "name": "Measure", diff --git a/metadata-ingestion/tests/integration/lookml/lookml_mces_badsql_parser.json b/metadata-ingestion/tests/integration/lookml/lookml_mces_badsql_parser.json index 187cedaefb6b21..b960ba581e6b57 100644 --- a/metadata-ingestion/tests/integration/lookml/lookml_mces_badsql_parser.json +++ b/metadata-ingestion/tests/integration/lookml/lookml_mces_badsql_parser.json @@ -2004,20 +2004,6 @@ "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { "urn": "urn:li:tag:Dimension", "aspects": [ - { - "com.linkedin.pegasus2avro.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:datahub", - "type": "DATAOWNER" - } - ], - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - } - } - }, { "com.linkedin.pegasus2avro.tag.TagProperties": { "name": "Dimension", @@ -2037,20 +2023,6 @@ "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { "urn": "urn:li:tag:Temporal", "aspects": [ - { - "com.linkedin.pegasus2avro.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:datahub", - "type": "DATAOWNER" - } - ], - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - } - } - }, { "com.linkedin.pegasus2avro.tag.TagProperties": { "name": "Temporal", @@ -2070,20 +2042,6 @@ "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { "urn": "urn:li:tag:Measure", "aspects": [ - { - "com.linkedin.pegasus2avro.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:datahub", - "type": "DATAOWNER" - } - ], - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - } - } - }, { "com.linkedin.pegasus2avro.tag.TagProperties": { "name": "Measure", diff --git a/metadata-ingestion/tests/integration/lookml/lookml_mces_offline.json b/metadata-ingestion/tests/integration/lookml/lookml_mces_offline.json index c2c879e38f37bb..e29292a44c949d 100644 --- a/metadata-ingestion/tests/integration/lookml/lookml_mces_offline.json +++ b/metadata-ingestion/tests/integration/lookml/lookml_mces_offline.json @@ -2121,20 +2121,6 @@ "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { "urn": "urn:li:tag:Dimension", "aspects": [ - { - "com.linkedin.pegasus2avro.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:datahub", - "type": "DATAOWNER" - } - ], - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - } - } - }, { "com.linkedin.pegasus2avro.tag.TagProperties": { "name": "Dimension", @@ -2154,20 +2140,6 @@ "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { "urn": "urn:li:tag:Temporal", "aspects": [ - { - "com.linkedin.pegasus2avro.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:datahub", - "type": "DATAOWNER" - } - ], - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - } - } - }, { "com.linkedin.pegasus2avro.tag.TagProperties": { "name": "Temporal", @@ -2187,20 +2159,6 @@ "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { "urn": "urn:li:tag:Measure", "aspects": [ - { - "com.linkedin.pegasus2avro.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:datahub", - "type": "DATAOWNER" - } - ], - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - } - } - }, { "com.linkedin.pegasus2avro.tag.TagProperties": { "name": "Measure", diff --git a/metadata-ingestion/tests/integration/lookml/lookml_mces_offline_deny_pattern.json b/metadata-ingestion/tests/integration/lookml/lookml_mces_offline_deny_pattern.json index c1ac54b0fb588d..04ecaecbd4afb2 100644 --- a/metadata-ingestion/tests/integration/lookml/lookml_mces_offline_deny_pattern.json +++ b/metadata-ingestion/tests/integration/lookml/lookml_mces_offline_deny_pattern.json @@ -584,20 +584,6 @@ "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { "urn": "urn:li:tag:Dimension", "aspects": [ - { - "com.linkedin.pegasus2avro.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:datahub", - "type": "DATAOWNER" - } - ], - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - } - } - }, { "com.linkedin.pegasus2avro.tag.TagProperties": { "name": "Dimension", @@ -617,20 +603,6 @@ "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { "urn": "urn:li:tag:Temporal", "aspects": [ - { - "com.linkedin.pegasus2avro.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:datahub", - "type": "DATAOWNER" - } - ], - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - } - } - }, { "com.linkedin.pegasus2avro.tag.TagProperties": { "name": "Temporal", @@ -650,20 +622,6 @@ "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { "urn": "urn:li:tag:Measure", "aspects": [ - { - "com.linkedin.pegasus2avro.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:datahub", - "type": "DATAOWNER" - } - ], - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - } - } - }, { "com.linkedin.pegasus2avro.tag.TagProperties": { "name": "Measure", diff --git a/metadata-ingestion/tests/integration/lookml/lookml_mces_offline_platform_instance.json b/metadata-ingestion/tests/integration/lookml/lookml_mces_offline_platform_instance.json index f602ca37b31607..080931ae637bc8 100644 --- a/metadata-ingestion/tests/integration/lookml/lookml_mces_offline_platform_instance.json +++ b/metadata-ingestion/tests/integration/lookml/lookml_mces_offline_platform_instance.json @@ -2121,20 +2121,6 @@ "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { "urn": "urn:li:tag:Dimension", "aspects": [ - { - "com.linkedin.pegasus2avro.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:datahub", - "type": "DATAOWNER" - } - ], - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - } - } - }, { "com.linkedin.pegasus2avro.tag.TagProperties": { "name": "Dimension", @@ -2154,20 +2140,6 @@ "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { "urn": "urn:li:tag:Temporal", "aspects": [ - { - "com.linkedin.pegasus2avro.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:datahub", - "type": "DATAOWNER" - } - ], - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - } - } - }, { "com.linkedin.pegasus2avro.tag.TagProperties": { "name": "Temporal", @@ -2187,20 +2159,6 @@ "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { "urn": "urn:li:tag:Measure", "aspects": [ - { - "com.linkedin.pegasus2avro.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:datahub", - "type": "DATAOWNER" - } - ], - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - } - } - }, { "com.linkedin.pegasus2avro.tag.TagProperties": { "name": "Measure", diff --git a/metadata-ingestion/tests/integration/lookml/lookml_mces_with_external_urls.json b/metadata-ingestion/tests/integration/lookml/lookml_mces_with_external_urls.json index 104bd365669e34..5826c4316b539d 100644 --- a/metadata-ingestion/tests/integration/lookml/lookml_mces_with_external_urls.json +++ b/metadata-ingestion/tests/integration/lookml/lookml_mces_with_external_urls.json @@ -2134,20 +2134,6 @@ "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { "urn": "urn:li:tag:Dimension", "aspects": [ - { - "com.linkedin.pegasus2avro.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:datahub", - "type": "DATAOWNER" - } - ], - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - } - } - }, { "com.linkedin.pegasus2avro.tag.TagProperties": { "name": "Dimension", @@ -2167,20 +2153,6 @@ "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { "urn": "urn:li:tag:Temporal", "aspects": [ - { - "com.linkedin.pegasus2avro.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:datahub", - "type": "DATAOWNER" - } - ], - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - } - } - }, { "com.linkedin.pegasus2avro.tag.TagProperties": { "name": "Temporal", @@ -2200,20 +2172,6 @@ "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { "urn": "urn:li:tag:Measure", "aspects": [ - { - "com.linkedin.pegasus2avro.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:datahub", - "type": "DATAOWNER" - } - ], - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - } - } - }, { "com.linkedin.pegasus2avro.tag.TagProperties": { "name": "Measure", diff --git a/metadata-ingestion/tests/integration/lookml/lookml_reachable_views.json b/metadata-ingestion/tests/integration/lookml/lookml_reachable_views.json index 37a6c94c6952e3..53d1ec0229de16 100644 --- a/metadata-ingestion/tests/integration/lookml/lookml_reachable_views.json +++ b/metadata-ingestion/tests/integration/lookml/lookml_reachable_views.json @@ -681,20 +681,6 @@ "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { "urn": "urn:li:tag:Dimension", "aspects": [ - { - "com.linkedin.pegasus2avro.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:datahub", - "type": "DATAOWNER" - } - ], - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - } - } - }, { "com.linkedin.pegasus2avro.tag.TagProperties": { "name": "Dimension", @@ -714,20 +700,6 @@ "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { "urn": "urn:li:tag:Temporal", "aspects": [ - { - "com.linkedin.pegasus2avro.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:datahub", - "type": "DATAOWNER" - } - ], - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - } - } - }, { "com.linkedin.pegasus2avro.tag.TagProperties": { "name": "Temporal", @@ -747,20 +719,6 @@ "com.linkedin.pegasus2avro.metadata.snapshot.TagSnapshot": { "urn": "urn:li:tag:Measure", "aspects": [ - { - "com.linkedin.pegasus2avro.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:datahub", - "type": "DATAOWNER" - } - ], - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - } - } - }, { "com.linkedin.pegasus2avro.tag.TagProperties": { "name": "Measure", From 84bba4dc446ee97f8991689fd17bfa6d14232601 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Thu, 12 Oct 2023 01:31:17 -0400 Subject: [PATCH 3/6] feat(ingest): add output schema inference for sql parser (#8989) --- .../src/datahub/utilities/sqlglot_lineage.py | 119 ++++++++++++++++-- .../integration/powerbi/test_m_parser.py | 93 ++++---------- .../test_bigquery_create_view_with_cte.json | 32 ++++- ..._bigquery_from_sharded_table_wildcard.json | 16 ++- .../test_bigquery_nested_subqueries.json | 16 ++- ..._bigquery_sharded_table_normalization.json | 16 ++- .../test_bigquery_star_with_replace.json | 24 +++- .../test_bigquery_view_from_union.json | 16 ++- .../goldens/test_create_view_as_select.json | 16 ++- .../test_expand_select_star_basic.json | 80 ++++++++++-- .../goldens/test_insert_as_select.json | 36 +++++- ...est_select_ambiguous_column_no_schema.json | 12 +- .../goldens/test_select_count.json | 8 +- .../test_select_from_struct_subfields.json | 16 ++- .../goldens/test_select_from_union.json | 16 ++- .../sql_parsing/goldens/test_select_max.json | 4 +- .../goldens/test_select_with_ctes.json | 8 +- .../test_select_with_full_col_name.json | 12 +- .../test_snowflake_case_statement.json | 16 ++- .../goldens/test_snowflake_column_cast.json | 63 ++++++++++ .../test_snowflake_column_normalization.json | 32 ++++- ...t_snowflake_ctas_column_normalization.json | 32 ++++- .../test_snowflake_default_normalization.json | 48 ++++++- .../unit/sql_parsing/test_sqlglot_lineage.py | 21 ++++ 24 files changed, 604 insertions(+), 148 deletions(-) create mode 100644 metadata-ingestion/tests/unit/sql_parsing/goldens/test_snowflake_column_cast.json diff --git a/metadata-ingestion/src/datahub/utilities/sqlglot_lineage.py b/metadata-ingestion/src/datahub/utilities/sqlglot_lineage.py index 81c43884fdf7d1..349eb40a5e865a 100644 --- a/metadata-ingestion/src/datahub/utilities/sqlglot_lineage.py +++ b/metadata-ingestion/src/datahub/utilities/sqlglot_lineage.py @@ -5,12 +5,13 @@ import logging import pathlib from collections import defaultdict -from typing import Dict, List, Optional, Set, Tuple, Union +from typing import Any, Dict, List, Optional, Set, Tuple, Union import pydantic.dataclasses import sqlglot import sqlglot.errors import sqlglot.lineage +import sqlglot.optimizer.annotate_types import sqlglot.optimizer.qualify import sqlglot.optimizer.qualify_columns from pydantic import BaseModel @@ -23,7 +24,17 @@ from datahub.ingestion.api.closeable import Closeable from datahub.ingestion.graph.client import DataHubGraph from datahub.ingestion.source.bigquery_v2.bigquery_audit import BigqueryTableIdentifier -from datahub.metadata.schema_classes import OperationTypeClass, SchemaMetadataClass +from datahub.metadata.schema_classes import ( + ArrayTypeClass, + BooleanTypeClass, + DateTypeClass, + NumberTypeClass, + OperationTypeClass, + SchemaFieldDataTypeClass, + SchemaMetadataClass, + StringTypeClass, + TimeTypeClass, +) from datahub.utilities.file_backed_collections import ConnectionWrapper, FileBackedDict from datahub.utilities.urns.dataset_urn import DatasetUrn @@ -90,8 +101,18 @@ def get_query_type_of_sql(expression: sqlglot.exp.Expression) -> QueryType: return QueryType.UNKNOWN +class _ParserBaseModel( + BaseModel, + arbitrary_types_allowed=True, + json_encoders={ + SchemaFieldDataTypeClass: lambda v: v.to_obj(), + }, +): + pass + + @functools.total_ordering -class _FrozenModel(BaseModel, frozen=True): +class _FrozenModel(_ParserBaseModel, frozen=True): def __lt__(self, other: "_FrozenModel") -> bool: for field in self.__fields__: self_v = getattr(self, field) @@ -146,29 +167,42 @@ class _ColumnRef(_FrozenModel): column: str -class ColumnRef(BaseModel): +class ColumnRef(_ParserBaseModel): table: Urn column: str -class _DownstreamColumnRef(BaseModel): +class _DownstreamColumnRef(_ParserBaseModel): table: Optional[_TableName] column: str + column_type: Optional[sqlglot.exp.DataType] -class DownstreamColumnRef(BaseModel): +class DownstreamColumnRef(_ParserBaseModel): table: Optional[Urn] column: str + column_type: Optional[SchemaFieldDataTypeClass] + native_column_type: Optional[str] + + @pydantic.validator("column_type", pre=True) + def _load_column_type( + cls, v: Optional[Union[dict, SchemaFieldDataTypeClass]] + ) -> Optional[SchemaFieldDataTypeClass]: + if v is None: + return None + if isinstance(v, SchemaFieldDataTypeClass): + return v + return SchemaFieldDataTypeClass.from_obj(v) -class _ColumnLineageInfo(BaseModel): +class _ColumnLineageInfo(_ParserBaseModel): downstream: _DownstreamColumnRef upstreams: List[_ColumnRef] logic: Optional[str] -class ColumnLineageInfo(BaseModel): +class ColumnLineageInfo(_ParserBaseModel): downstream: DownstreamColumnRef upstreams: List[ColumnRef] @@ -176,7 +210,7 @@ class ColumnLineageInfo(BaseModel): logic: Optional[str] = pydantic.Field(default=None, exclude=True) -class SqlParsingDebugInfo(BaseModel, arbitrary_types_allowed=True): +class SqlParsingDebugInfo(_ParserBaseModel): confidence: float = 0.0 tables_discovered: int = 0 @@ -190,7 +224,7 @@ def error(self) -> Optional[Exception]: return self.table_error or self.column_error -class SqlParsingResult(BaseModel): +class SqlParsingResult(_ParserBaseModel): query_type: QueryType = QueryType.UNKNOWN in_tables: List[Urn] @@ -541,6 +575,15 @@ def _schema_aware_fuzzy_column_resolve( ) from e logger.debug("Qualified sql %s", statement.sql(pretty=True, dialect=dialect)) + # Try to figure out the types of the output columns. + try: + statement = sqlglot.optimizer.annotate_types.annotate_types( + statement, schema=sqlglot_db_schema + ) + except sqlglot.errors.OptimizeError as e: + # This is not a fatal error, so we can continue. + logger.debug("sqlglot failed to annotate types: %s", e) + column_lineage = [] try: @@ -553,7 +596,6 @@ def _schema_aware_fuzzy_column_resolve( logger.debug("output columns: %s", [col[0] for col in output_columns]) output_col: str for output_col, original_col_expression in output_columns: - # print(f"output column: {output_col}") if output_col == "*": # If schema information is available, the * will be expanded to the actual columns. # Otherwise, we can't process it. @@ -613,12 +655,19 @@ def _schema_aware_fuzzy_column_resolve( output_col = _schema_aware_fuzzy_column_resolve(output_table, output_col) + # Guess the output column type. + output_col_type = None + if original_col_expression.type: + output_col_type = original_col_expression.type + if not direct_col_upstreams: logger.debug(f' "{output_col}" has no upstreams') column_lineage.append( _ColumnLineageInfo( downstream=_DownstreamColumnRef( - table=output_table, column=output_col + table=output_table, + column=output_col, + column_type=output_col_type, ), upstreams=sorted(direct_col_upstreams), # logic=column_logic.sql(pretty=True, dialect=dialect), @@ -673,6 +722,42 @@ def _try_extract_select( return statement +def _translate_sqlglot_type( + sqlglot_type: sqlglot.exp.DataType.Type, +) -> Optional[SchemaFieldDataTypeClass]: + TypeClass: Any + if sqlglot_type in sqlglot.exp.DataType.TEXT_TYPES: + TypeClass = StringTypeClass + elif sqlglot_type in sqlglot.exp.DataType.NUMERIC_TYPES or sqlglot_type in { + sqlglot.exp.DataType.Type.DECIMAL, + }: + TypeClass = NumberTypeClass + elif sqlglot_type in { + sqlglot.exp.DataType.Type.BOOLEAN, + sqlglot.exp.DataType.Type.BIT, + }: + TypeClass = BooleanTypeClass + elif sqlglot_type in { + sqlglot.exp.DataType.Type.DATE, + }: + TypeClass = DateTypeClass + elif sqlglot_type in sqlglot.exp.DataType.TEMPORAL_TYPES: + TypeClass = TimeTypeClass + elif sqlglot_type in { + sqlglot.exp.DataType.Type.ARRAY, + }: + TypeClass = ArrayTypeClass + elif sqlglot_type in { + sqlglot.exp.DataType.Type.UNKNOWN, + }: + return None + else: + logger.debug("Unknown sqlglot type: %s", sqlglot_type) + return None + + return SchemaFieldDataTypeClass(type=TypeClass()) + + def _translate_internal_column_lineage( table_name_urn_mapping: Dict[_TableName, str], raw_column_lineage: _ColumnLineageInfo, @@ -684,6 +769,16 @@ def _translate_internal_column_lineage( downstream=DownstreamColumnRef( table=downstream_urn, column=raw_column_lineage.downstream.column, + column_type=_translate_sqlglot_type( + raw_column_lineage.downstream.column_type.this + ) + if raw_column_lineage.downstream.column_type + else None, + native_column_type=raw_column_lineage.downstream.column_type.sql() + if raw_column_lineage.downstream.column_type + and raw_column_lineage.downstream.column_type.this + != sqlglot.exp.DataType.Type.UNKNOWN + else None, ), upstreams=[ ColumnRef( diff --git a/metadata-ingestion/tests/integration/powerbi/test_m_parser.py b/metadata-ingestion/tests/integration/powerbi/test_m_parser.py index e3cc6c8101650f..b6cb578217a2c2 100644 --- a/metadata-ingestion/tests/integration/powerbi/test_m_parser.py +++ b/metadata-ingestion/tests/integration/powerbi/test_m_parser.py @@ -17,7 +17,6 @@ ) from datahub.ingestion.source.powerbi.m_query import parser, resolver, tree_function from datahub.ingestion.source.powerbi.m_query.resolver import DataPlatformTable, Lineage -from datahub.utilities.sqlglot_lineage import ColumnLineageInfo, DownstreamColumnRef pytestmark = pytest.mark.integration_batch_2 @@ -742,75 +741,25 @@ def test_sqlglot_parser(): == "urn:li:dataset:(urn:li:dataPlatform:snowflake,sales_deployment.operations_analytics.transformed_prod.v_sme_unit_targets,PROD)" ) - assert lineage[0].column_lineage == [ - ColumnLineageInfo( - downstream=DownstreamColumnRef(table=None, column="client_director"), - upstreams=[], - logic=None, - ), - ColumnLineageInfo( - downstream=DownstreamColumnRef(table=None, column="tier"), - upstreams=[], - logic=None, - ), - ColumnLineageInfo( - downstream=DownstreamColumnRef(table=None, column='upper("manager")'), - upstreams=[], - logic=None, - ), - ColumnLineageInfo( - downstream=DownstreamColumnRef(table=None, column="team_type"), - upstreams=[], - logic=None, - ), - ColumnLineageInfo( - downstream=DownstreamColumnRef(table=None, column="date_target"), - upstreams=[], - logic=None, - ), - ColumnLineageInfo( - downstream=DownstreamColumnRef(table=None, column="monthid"), - upstreams=[], - logic=None, - ), - ColumnLineageInfo( - downstream=DownstreamColumnRef(table=None, column="target_team"), - upstreams=[], - logic=None, - ), - ColumnLineageInfo( - downstream=DownstreamColumnRef(table=None, column="seller_email"), - upstreams=[], - logic=None, - ), - ColumnLineageInfo( - downstream=DownstreamColumnRef(table=None, column="agent_key"), - upstreams=[], - logic=None, - ), - ColumnLineageInfo( - downstream=DownstreamColumnRef(table=None, column="sme_quota"), - upstreams=[], - logic=None, - ), - ColumnLineageInfo( - downstream=DownstreamColumnRef(table=None, column="revenue_quota"), - upstreams=[], - logic=None, - ), - ColumnLineageInfo( - downstream=DownstreamColumnRef(table=None, column="service_quota"), - upstreams=[], - logic=None, - ), - ColumnLineageInfo( - downstream=DownstreamColumnRef(table=None, column="bl_target"), - upstreams=[], - logic=None, - ), - ColumnLineageInfo( - downstream=DownstreamColumnRef(table=None, column="software_quota"), - upstreams=[], - logic=None, - ), + # TODO: None of these columns have upstreams? + # That doesn't seem right - we probably need to add fake schemas for the two tables above. + cols = [ + "client_director", + "tier", + 'upper("manager")', + "team_type", + "date_target", + "monthid", + "target_team", + "seller_email", + "agent_key", + "sme_quota", + "revenue_quota", + "service_quota", + "bl_target", + "software_quota", ] + for i, column in enumerate(cols): + assert lineage[0].column_lineage[i].downstream.table is None + assert lineage[0].column_lineage[i].downstream.column == column + assert lineage[0].column_lineage[i].upstreams == [] diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_create_view_with_cte.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_create_view_with_cte.json index e50d944ce72e36..f0175b4dc88927 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_create_view_with_cte.json +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_create_view_with_cte.json @@ -12,7 +12,13 @@ { "downstream": { "table": "urn:li:dataset:(urn:li:dataPlatform:bigquery,my-proj-2.dataset.my_view,PROD)", - "column": "col5" + "column": "col5", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "native_column_type": "TEXT" }, "upstreams": [ { @@ -24,7 +30,13 @@ { "downstream": { "table": "urn:li:dataset:(urn:li:dataPlatform:bigquery,my-proj-2.dataset.my_view,PROD)", - "column": "col1" + "column": "col1", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "native_column_type": "TEXT" }, "upstreams": [ { @@ -36,7 +48,13 @@ { "downstream": { "table": "urn:li:dataset:(urn:li:dataPlatform:bigquery,my-proj-2.dataset.my_view,PROD)", - "column": "col2" + "column": "col2", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "native_column_type": "TEXT" }, "upstreams": [ { @@ -48,7 +66,13 @@ { "downstream": { "table": "urn:li:dataset:(urn:li:dataPlatform:bigquery,my-proj-2.dataset.my_view,PROD)", - "column": "col3" + "column": "col3", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "native_column_type": "TEXT" }, "upstreams": [ { diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_from_sharded_table_wildcard.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_from_sharded_table_wildcard.json index 78591286feb505..b7df5444987f23 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_from_sharded_table_wildcard.json +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_from_sharded_table_wildcard.json @@ -8,7 +8,13 @@ { "downstream": { "table": null, - "column": "col1" + "column": "col1", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "native_column_type": "TEXT" }, "upstreams": [ { @@ -20,7 +26,13 @@ { "downstream": { "table": null, - "column": "col2" + "column": "col2", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "native_column_type": "TEXT" }, "upstreams": [ { diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_nested_subqueries.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_nested_subqueries.json index 0e93d31fbb6a66..67e306bebf5459 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_nested_subqueries.json +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_nested_subqueries.json @@ -8,7 +8,13 @@ { "downstream": { "table": null, - "column": "col1" + "column": "col1", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "native_column_type": "TEXT" }, "upstreams": [ { @@ -20,7 +26,13 @@ { "downstream": { "table": null, - "column": "col2" + "column": "col2", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "native_column_type": "TEXT" }, "upstreams": [ { diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_sharded_table_normalization.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_sharded_table_normalization.json index 78591286feb505..b7df5444987f23 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_sharded_table_normalization.json +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_sharded_table_normalization.json @@ -8,7 +8,13 @@ { "downstream": { "table": null, - "column": "col1" + "column": "col1", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "native_column_type": "TEXT" }, "upstreams": [ { @@ -20,7 +26,13 @@ { "downstream": { "table": null, - "column": "col2" + "column": "col2", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "native_column_type": "TEXT" }, "upstreams": [ { diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_star_with_replace.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_star_with_replace.json index 17a801a63e3ffc..b393b2445d6c4a 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_star_with_replace.json +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_star_with_replace.json @@ -10,7 +10,13 @@ { "downstream": { "table": "urn:li:dataset:(urn:li:dataPlatform:bigquery,my-project.my-dataset.test_table,PROD)", - "column": "col1" + "column": "col1", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "native_column_type": "TEXT" }, "upstreams": [ { @@ -22,7 +28,13 @@ { "downstream": { "table": "urn:li:dataset:(urn:li:dataPlatform:bigquery,my-project.my-dataset.test_table,PROD)", - "column": "col2" + "column": "col2", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "native_column_type": "TEXT" }, "upstreams": [ { @@ -34,7 +46,13 @@ { "downstream": { "table": "urn:li:dataset:(urn:li:dataPlatform:bigquery,my-project.my-dataset.test_table,PROD)", - "column": "something" + "column": "something", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "native_column_type": "TEXT" }, "upstreams": [ { diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_view_from_union.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_view_from_union.json index fd8a586ac74ac0..53fb94300e8042 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_view_from_union.json +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_view_from_union.json @@ -11,7 +11,13 @@ { "downstream": { "table": "urn:li:dataset:(urn:li:dataPlatform:bigquery,my_view,PROD)", - "column": "col1" + "column": "col1", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "native_column_type": "TEXT" }, "upstreams": [ { @@ -27,7 +33,13 @@ { "downstream": { "table": "urn:li:dataset:(urn:li:dataPlatform:bigquery,my_view,PROD)", - "column": "col2" + "column": "col2", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "native_column_type": "TEXT" }, "upstreams": [ { diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_create_view_as_select.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_create_view_as_select.json index 1ca56840531e46..ff452467aa5bdd 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_create_view_as_select.json +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_create_view_as_select.json @@ -10,7 +10,9 @@ { "downstream": { "table": "urn:li:dataset:(urn:li:dataPlatform:oracle,vsal,PROD)", - "column": "Department" + "column": "Department", + "column_type": null, + "native_column_type": null }, "upstreams": [ { @@ -22,14 +24,22 @@ { "downstream": { "table": "urn:li:dataset:(urn:li:dataPlatform:oracle,vsal,PROD)", - "column": "Employees" + "column": "Employees", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "native_column_type": "BIGINT" }, "upstreams": [] }, { "downstream": { "table": "urn:li:dataset:(urn:li:dataPlatform:oracle,vsal,PROD)", - "column": "Salary" + "column": "Salary", + "column_type": null, + "native_column_type": null }, "upstreams": [ { diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_expand_select_star_basic.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_expand_select_star_basic.json index e241bdd08e243c..eecb2265eaec55 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_expand_select_star_basic.json +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_expand_select_star_basic.json @@ -8,7 +8,13 @@ { "downstream": { "table": null, - "column": "total_agg" + "column": "total_agg", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "native_column_type": "DOUBLE" }, "upstreams": [ { @@ -20,7 +26,13 @@ { "downstream": { "table": null, - "column": "orderkey" + "column": "orderkey", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "native_column_type": "DECIMAL" }, "upstreams": [ { @@ -32,7 +44,13 @@ { "downstream": { "table": null, - "column": "custkey" + "column": "custkey", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "native_column_type": "DECIMAL" }, "upstreams": [ { @@ -44,7 +62,13 @@ { "downstream": { "table": null, - "column": "orderstatus" + "column": "orderstatus", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "native_column_type": "TEXT" }, "upstreams": [ { @@ -56,7 +80,13 @@ { "downstream": { "table": null, - "column": "totalprice" + "column": "totalprice", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "native_column_type": "FLOAT" }, "upstreams": [ { @@ -68,7 +98,13 @@ { "downstream": { "table": null, - "column": "orderdate" + "column": "orderdate", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.DateType": {} + } + }, + "native_column_type": "DATE" }, "upstreams": [ { @@ -80,7 +116,13 @@ { "downstream": { "table": null, - "column": "orderpriority" + "column": "orderpriority", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "native_column_type": "TEXT" }, "upstreams": [ { @@ -92,7 +134,13 @@ { "downstream": { "table": null, - "column": "clerk" + "column": "clerk", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "native_column_type": "TEXT" }, "upstreams": [ { @@ -104,7 +152,13 @@ { "downstream": { "table": null, - "column": "shippriority" + "column": "shippriority", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "native_column_type": "DECIMAL" }, "upstreams": [ { @@ -116,7 +170,13 @@ { "downstream": { "table": null, - "column": "comment" + "column": "comment", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "native_column_type": "TEXT" }, "upstreams": [ { diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_insert_as_select.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_insert_as_select.json index d7264fd2db6b28..326db47e7ab333 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_insert_as_select.json +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_insert_as_select.json @@ -18,21 +18,27 @@ { "downstream": { "table": "urn:li:dataset:(urn:li:dataPlatform:hive,query72,PROD)", - "column": "i_item_desc" + "column": "i_item_desc", + "column_type": null, + "native_column_type": null }, "upstreams": [] }, { "downstream": { "table": "urn:li:dataset:(urn:li:dataPlatform:hive,query72,PROD)", - "column": "w_warehouse_name" + "column": "w_warehouse_name", + "column_type": null, + "native_column_type": null }, "upstreams": [] }, { "downstream": { "table": "urn:li:dataset:(urn:li:dataPlatform:hive,query72,PROD)", - "column": "d_week_seq" + "column": "d_week_seq", + "column_type": null, + "native_column_type": null }, "upstreams": [ { @@ -44,7 +50,13 @@ { "downstream": { "table": "urn:li:dataset:(urn:li:dataPlatform:hive,query72,PROD)", - "column": "no_promo" + "column": "no_promo", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "native_column_type": "BIGINT" }, "upstreams": [ { @@ -56,7 +68,13 @@ { "downstream": { "table": "urn:li:dataset:(urn:li:dataPlatform:hive,query72,PROD)", - "column": "promo" + "column": "promo", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "native_column_type": "BIGINT" }, "upstreams": [ { @@ -68,7 +86,13 @@ { "downstream": { "table": "urn:li:dataset:(urn:li:dataPlatform:hive,query72,PROD)", - "column": "total_cnt" + "column": "total_cnt", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "native_column_type": "BIGINT" }, "upstreams": [] } diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_select_ambiguous_column_no_schema.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_select_ambiguous_column_no_schema.json index 10f5ee20b0c1f1..b5fd5eebeb1b19 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_select_ambiguous_column_no_schema.json +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_select_ambiguous_column_no_schema.json @@ -9,21 +9,27 @@ { "downstream": { "table": null, - "column": "a" + "column": "a", + "column_type": null, + "native_column_type": null }, "upstreams": [] }, { "downstream": { "table": null, - "column": "b" + "column": "b", + "column_type": null, + "native_column_type": null }, "upstreams": [] }, { "downstream": { "table": null, - "column": "c" + "column": "c", + "column_type": null, + "native_column_type": null }, "upstreams": [] } diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_select_count.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_select_count.json index 9f6eeae46c2940..a67c944822138f 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_select_count.json +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_select_count.json @@ -8,7 +8,13 @@ { "downstream": { "table": null, - "column": "COUNT(`fact_complaint_snapshot`.`etl_data_dt_id`)" + "column": "COUNT(`fact_complaint_snapshot`.`etl_data_dt_id`)", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "native_column_type": "BIGINT" }, "upstreams": [ { diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_select_from_struct_subfields.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_select_from_struct_subfields.json index 109de961804227..5ad847e252497a 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_select_from_struct_subfields.json +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_select_from_struct_subfields.json @@ -8,7 +8,13 @@ { "downstream": { "table": null, - "column": "post_id" + "column": "post_id", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "native_column_type": "DECIMAL" }, "upstreams": [ { @@ -20,7 +26,9 @@ { "downstream": { "table": null, - "column": "id" + "column": "id", + "column_type": null, + "native_column_type": null }, "upstreams": [ { @@ -32,7 +40,9 @@ { "downstream": { "table": null, - "column": "min_metric" + "column": "min_metric", + "column_type": null, + "native_column_type": null }, "upstreams": [ { diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_select_from_union.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_select_from_union.json index 2340b2e95b0d0b..902aa010c8afc4 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_select_from_union.json +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_select_from_union.json @@ -9,14 +9,26 @@ { "downstream": { "table": null, - "column": "label" + "column": "label", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "native_column_type": "VARCHAR" }, "upstreams": [] }, { "downstream": { "table": null, - "column": "total_agg" + "column": "total_agg", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "native_column_type": "DOUBLE" }, "upstreams": [ { diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_select_max.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_select_max.json index 326c07d332c268..6ea88f45847ce7 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_select_max.json +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_select_max.json @@ -8,7 +8,9 @@ { "downstream": { "table": null, - "column": "max_col" + "column": "max_col", + "column_type": null, + "native_column_type": null }, "upstreams": [ { diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_select_with_ctes.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_select_with_ctes.json index 3e02314d6e8c39..67e9fd2d21a0e4 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_select_with_ctes.json +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_select_with_ctes.json @@ -9,7 +9,9 @@ { "downstream": { "table": null, - "column": "COL1" + "column": "COL1", + "column_type": null, + "native_column_type": null }, "upstreams": [ { @@ -21,7 +23,9 @@ { "downstream": { "table": null, - "column": "COL3" + "column": "COL3", + "column_type": null, + "native_column_type": null }, "upstreams": [ { diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_select_with_full_col_name.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_select_with_full_col_name.json index c12ad23b2f03b0..6ee3d2e61c39b7 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_select_with_full_col_name.json +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_select_with_full_col_name.json @@ -8,7 +8,13 @@ { "downstream": { "table": null, - "column": "post_id" + "column": "post_id", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "native_column_type": "DECIMAL" }, "upstreams": [ { @@ -20,7 +26,9 @@ { "downstream": { "table": null, - "column": "id" + "column": "id", + "column_type": null, + "native_column_type": null }, "upstreams": [ { diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_snowflake_case_statement.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_snowflake_case_statement.json index 64cd80e9a2d697..a876824127ec11 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_snowflake_case_statement.json +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_snowflake_case_statement.json @@ -8,7 +8,13 @@ { "downstream": { "table": null, - "column": "total_price_category" + "column": "total_price_category", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "native_column_type": "VARCHAR" }, "upstreams": [ { @@ -20,7 +26,13 @@ { "downstream": { "table": null, - "column": "total_price_success" + "column": "total_price_success", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "native_column_type": "FLOAT" }, "upstreams": [ { diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_snowflake_column_cast.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_snowflake_column_cast.json new file mode 100644 index 00000000000000..7545e2b3269dc0 --- /dev/null +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_snowflake_column_cast.json @@ -0,0 +1,63 @@ +{ + "query_type": "SELECT", + "in_tables": [ + "urn:li:dataset:(urn:li:dataPlatform:snowflake,snowflake_sample_data.tpch_sf1.orders,PROD)" + ], + "out_tables": [], + "column_lineage": [ + { + "downstream": { + "table": null, + "column": "orderkey", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "native_column_type": "DECIMAL(20, 0)" + }, + "upstreams": [ + { + "table": "urn:li:dataset:(urn:li:dataPlatform:snowflake,snowflake_sample_data.tpch_sf1.orders,PROD)", + "column": "o_orderkey" + } + ] + }, + { + "downstream": { + "table": null, + "column": "total_cast_int", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "native_column_type": "INT" + }, + "upstreams": [ + { + "table": "urn:li:dataset:(urn:li:dataPlatform:snowflake,snowflake_sample_data.tpch_sf1.orders,PROD)", + "column": "o_totalprice" + } + ] + }, + { + "downstream": { + "table": null, + "column": "total_cast_float", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "native_column_type": "DECIMAL(16, 4)" + }, + "upstreams": [ + { + "table": "urn:li:dataset:(urn:li:dataPlatform:snowflake,snowflake_sample_data.tpch_sf1.orders,PROD)", + "column": "o_totalprice" + } + ] + } + ] +} \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_snowflake_column_normalization.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_snowflake_column_normalization.json index 7b22a46757e392..84e6b053000f18 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_snowflake_column_normalization.json +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_snowflake_column_normalization.json @@ -8,7 +8,13 @@ { "downstream": { "table": null, - "column": "total_agg" + "column": "total_agg", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "native_column_type": "DOUBLE" }, "upstreams": [ { @@ -20,7 +26,13 @@ { "downstream": { "table": null, - "column": "total_avg" + "column": "total_avg", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "native_column_type": "DOUBLE" }, "upstreams": [ { @@ -32,7 +44,13 @@ { "downstream": { "table": null, - "column": "total_min" + "column": "total_min", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "native_column_type": "FLOAT" }, "upstreams": [ { @@ -44,7 +62,13 @@ { "downstream": { "table": null, - "column": "total_max" + "column": "total_max", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "native_column_type": "FLOAT" }, "upstreams": [ { diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_snowflake_ctas_column_normalization.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_snowflake_ctas_column_normalization.json index c912d99a3a8a32..39c94cf83c561b 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_snowflake_ctas_column_normalization.json +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_snowflake_ctas_column_normalization.json @@ -10,7 +10,13 @@ { "downstream": { "table": "urn:li:dataset:(urn:li:dataPlatform:snowflake,snowflake_sample_data.tpch_sf1.orders_normalized,PROD)", - "column": "Total_Agg" + "column": "Total_Agg", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "native_column_type": "DOUBLE" }, "upstreams": [ { @@ -22,7 +28,13 @@ { "downstream": { "table": "urn:li:dataset:(urn:li:dataPlatform:snowflake,snowflake_sample_data.tpch_sf1.orders_normalized,PROD)", - "column": "total_avg" + "column": "total_avg", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "native_column_type": "DOUBLE" }, "upstreams": [ { @@ -34,7 +46,13 @@ { "downstream": { "table": "urn:li:dataset:(urn:li:dataPlatform:snowflake,snowflake_sample_data.tpch_sf1.orders_normalized,PROD)", - "column": "TOTAL_MIN" + "column": "TOTAL_MIN", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "native_column_type": "FLOAT" }, "upstreams": [ { @@ -46,7 +64,13 @@ { "downstream": { "table": "urn:li:dataset:(urn:li:dataPlatform:snowflake,snowflake_sample_data.tpch_sf1.orders_normalized,PROD)", - "column": "total_max" + "column": "total_max", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "native_column_type": "FLOAT" }, "upstreams": [ { diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_snowflake_default_normalization.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_snowflake_default_normalization.json index 2af308ec606234..dbf5b1b9a44535 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_snowflake_default_normalization.json +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_snowflake_default_normalization.json @@ -11,7 +11,13 @@ { "downstream": { "table": "urn:li:dataset:(urn:li:dataPlatform:snowflake,long_tail_companions.analytics.active_customer_ltv,PROD)", - "column": "user_fk" + "column": "user_fk", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "native_column_type": "DECIMAL(38, 0)" }, "upstreams": [ { @@ -23,7 +29,13 @@ { "downstream": { "table": "urn:li:dataset:(urn:li:dataPlatform:snowflake,long_tail_companions.analytics.active_customer_ltv,PROD)", - "column": "email" + "column": "email", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "native_column_type": "VARCHAR(16777216)" }, "upstreams": [ { @@ -35,7 +47,13 @@ { "downstream": { "table": "urn:li:dataset:(urn:li:dataPlatform:snowflake,long_tail_companions.analytics.active_customer_ltv,PROD)", - "column": "last_purchase_date" + "column": "last_purchase_date", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.DateType": {} + } + }, + "native_column_type": "DATE" }, "upstreams": [ { @@ -47,7 +65,13 @@ { "downstream": { "table": "urn:li:dataset:(urn:li:dataPlatform:snowflake,long_tail_companions.analytics.active_customer_ltv,PROD)", - "column": "lifetime_purchase_amount" + "column": "lifetime_purchase_amount", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "native_column_type": "DECIMAL" }, "upstreams": [ { @@ -59,7 +83,13 @@ { "downstream": { "table": "urn:li:dataset:(urn:li:dataPlatform:snowflake,long_tail_companions.analytics.active_customer_ltv,PROD)", - "column": "lifetime_purchase_count" + "column": "lifetime_purchase_count", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "native_column_type": "BIGINT" }, "upstreams": [ { @@ -71,7 +101,13 @@ { "downstream": { "table": "urn:li:dataset:(urn:li:dataPlatform:snowflake,long_tail_companions.analytics.active_customer_ltv,PROD)", - "column": "average_purchase_amount" + "column": "average_purchase_amount", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "native_column_type": "DECIMAL" }, "upstreams": [ { diff --git a/metadata-ingestion/tests/unit/sql_parsing/test_sqlglot_lineage.py b/metadata-ingestion/tests/unit/sql_parsing/test_sqlglot_lineage.py index 2a965a9bb1e61e..bb6e5f15817547 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/test_sqlglot_lineage.py +++ b/metadata-ingestion/tests/unit/sql_parsing/test_sqlglot_lineage.py @@ -608,4 +608,25 @@ def test_snowflake_default_normalization(): ) +def test_snowflake_column_cast(): + assert_sql_result( + """ +SELECT + o.o_orderkey::NUMBER(20,0) as orderkey, + CAST(o.o_totalprice AS INT) as total_cast_int, + CAST(o.o_totalprice AS NUMBER(16,4)) as total_cast_float +FROM snowflake_sample_data.tpch_sf1.orders o +LIMIT 10 +""", + dialect="snowflake", + schemas={ + "urn:li:dataset:(urn:li:dataPlatform:snowflake,snowflake_sample_data.tpch_sf1.orders,PROD)": { + "orderkey": "NUMBER(38,0)", + "totalprice": "NUMBER(12,2)", + }, + }, + expected_file=RESOURCE_DIR / "test_snowflake_column_cast.json", + ) + + # TODO: Add a test for setting platform_instance or env From dd418de76d96fb41c9064261cdba37bc2af85309 Mon Sep 17 00:00:00 2001 From: Tamas Nemeth Date: Thu, 12 Oct 2023 13:10:59 +0200 Subject: [PATCH 4/6] fix(ingest/bigquery): Fix shard regexp to match without underscore as well (#8934) --- .../ingestion/source/bigquery_v2/bigquery.py | 1 + .../source/bigquery_v2/bigquery_audit.py | 27 ++++++++++++++----- .../ingestion/source/bigquery_v2/queries.py | 8 +++--- .../ingestion/source_config/bigquery.py | 8 +++++- .../tests/unit/test_bigquery_source.py | 10 ++++--- .../unit/test_bigqueryv2_usage_source.py | 4 +-- 6 files changed, 41 insertions(+), 17 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py index b4a04d96b532b4..e577c2bac8bbde 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py @@ -1057,6 +1057,7 @@ def gen_schema_fields(self, columns: List[BigqueryColumn]) -> List[SchemaField]: ): field.description = col.comment schema_fields[idx] = field + break else: tags = [] if col.is_partition_column: diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_audit.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_audit.py index b0ac77201b415b..88060a9cdc91df 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_audit.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_audit.py @@ -20,7 +20,13 @@ logger: logging.Logger = logging.getLogger(__name__) -_BIGQUERY_DEFAULT_SHARDED_TABLE_REGEX = "((.+)[_$])?(\\d{8})$" +# Regexp for sharded tables. +# A sharded table is a table that has a suffix of the form _yyyymmdd or yyyymmdd, where yyyymmdd is a date. +# The regexp checks for valid dates in the suffix (e.g. 20200101, 20200229, 20201231) and if the date is not valid +# then it is not a sharded table. +_BIGQUERY_DEFAULT_SHARDED_TABLE_REGEX = ( + "((.+\\D)[_$]?)?(\\d\\d\\d\\d(?:0[1-9]|1[0-2])(?:0[1-9]|[12][0-9]|3[01]))$" +) @dataclass(frozen=True, order=True) @@ -40,7 +46,7 @@ class BigqueryTableIdentifier: _BQ_SHARDED_TABLE_SUFFIX: str = "_yyyymmdd" @staticmethod - def get_table_and_shard(table_name: str) -> Tuple[str, Optional[str]]: + def get_table_and_shard(table_name: str) -> Tuple[Optional[str], Optional[str]]: """ Args: table_name: @@ -53,16 +59,25 @@ def get_table_and_shard(table_name: str) -> Tuple[str, Optional[str]]: In case of non-sharded tables, returns (, None) In case of sharded tables, returns (, shard) """ + new_table_name = table_name match = re.match( BigqueryTableIdentifier._BIGQUERY_DEFAULT_SHARDED_TABLE_REGEX, table_name, re.IGNORECASE, ) if match: - table_name = match.group(2) - shard = match.group(3) - return table_name, shard - return table_name, None + shard: str = match[3] + if shard: + if table_name.endswith(shard): + new_table_name = table_name[: -len(shard)] + + new_table_name = ( + new_table_name.rstrip("_") if new_table_name else new_table_name + ) + if new_table_name.endswith("."): + new_table_name = table_name + return (new_table_name, shard) if new_table_name else (None, shard) + return new_table_name, None @classmethod def from_string_name(cls, table: str) -> "BigqueryTableIdentifier": diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries.py index a87cb8c1cbfa54..67fcc33cdf2182 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries.py @@ -51,8 +51,8 @@ class BigqueryQuery: p.max_partition_id, p.active_billable_bytes, p.long_term_billable_bytes, - REGEXP_EXTRACT(t.table_name, r".*_(\\d+)$") as table_suffix, - REGEXP_REPLACE(t.table_name, r"_(\\d+)$", "") as table_base + REGEXP_EXTRACT(t.table_name, r"(?:(?:.+\\D)[_$]?)(\\d\\d\\d\\d(?:0[1-9]|1[012])(?:0[1-9]|[12][0-9]|3[01]))$") as table_suffix, + REGEXP_REPLACE(t.table_name, r"(?:[_$]?)(\\d\\d\\d\\d(?:0[1-9]|1[012])(?:0[1-9]|[12][0-9]|3[01]))$", "") as table_base FROM `{{project_id}}`.`{{dataset_name}}`.INFORMATION_SCHEMA.TABLES t @@ -92,8 +92,8 @@ class BigqueryQuery: tos.OPTION_VALUE as comment, t.is_insertable_into, t.ddl, - REGEXP_EXTRACT(t.table_name, r".*_(\\d+)$") as table_suffix, - REGEXP_REPLACE(t.table_name, r"_(\\d+)$", "") as table_base + REGEXP_EXTRACT(t.table_name, r"(?:(?:.+\\D)[_$]?)(\\d\\d\\d\\d(?:0[1-9]|1[012])(?:0[1-9]|[12][0-9]|3[01]))$") as table_suffix, + REGEXP_REPLACE(t.table_name, r"(?:[_$]?)(\\d\\d\\d\\d(?:0[1-9]|1[012])(?:0[1-9]|[12][0-9]|3[01]))$", "") as table_base FROM `{{project_id}}`.`{{dataset_name}}`.INFORMATION_SCHEMA.TABLES t diff --git a/metadata-ingestion/src/datahub/ingestion/source_config/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source_config/bigquery.py index 8ca1296d819c1c..0a73bb5203e72b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source_config/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source_config/bigquery.py @@ -4,7 +4,13 @@ from datahub.configuration.common import ConfigModel, ConfigurationError -_BIGQUERY_DEFAULT_SHARDED_TABLE_REGEX: str = "((.+)[_$])?(\\d{8})$" +# Regexp for sharded tables. +# A sharded table is a table that has a suffix of the form _yyyymmdd or yyyymmdd, where yyyymmdd is a date. +# The regexp checks for valid dates in the suffix (e.g. 20200101, 20200229, 20201231) and if the date is not valid +# then it is not a sharded table. +_BIGQUERY_DEFAULT_SHARDED_TABLE_REGEX: str = ( + "((.+\\D)[_$]?)?(\\d\\d\\d\\d(?:0[1-9]|1[0-2])(?:0[1-9]|[12][0-9]|3[01]))$" +) class BigQueryBaseConfig(ConfigModel): diff --git a/metadata-ingestion/tests/unit/test_bigquery_source.py b/metadata-ingestion/tests/unit/test_bigquery_source.py index e9e91361f49f47..5a11a933c85954 100644 --- a/metadata-ingestion/tests/unit/test_bigquery_source.py +++ b/metadata-ingestion/tests/unit/test_bigquery_source.py @@ -765,11 +765,14 @@ def test_gen_view_dataset_workunits( ("project.dataset.table_20231215", "project.dataset.table", "20231215"), ("project.dataset.table_2023", "project.dataset.table_2023", None), # incorrectly handled special case where dataset itself is a sharded table if full name is specified - ("project.dataset.20231215", "project.dataset.20231215", None), + ("project.dataset.20231215", "project.dataset.20231215", "20231215"), + ("project1.dataset2.20231215", "project1.dataset2.20231215", "20231215"), # Cases with Just the table name as input ("table", "table", None), - ("table20231215", "table20231215", None), + ("table20231215", "table", "20231215"), ("table_20231215", "table", "20231215"), + ("table2_20231215", "table2", "20231215"), + ("table220231215", "table220231215", None), ("table_1624046611000_name", "table_1624046611000_name", None), ("table_1624046611000", "table_1624046611000", None), # Special case where dataset itself is a sharded table @@ -801,7 +804,6 @@ def test_get_table_and_shard_default( ("project.dataset.2023", "project.dataset.2023", None), # Cases with Just the table name as input ("table", "table", None), - ("table20231215", "table20231215", None), ("table_20231215", "table", "20231215"), ("table_2023", "table", "2023"), ("table_1624046611000_name", "table_1624046611000_name", None), @@ -842,7 +844,7 @@ def test_get_table_and_shard_custom_shard_pattern( "project.dataset.table_1624046611000_name", ), ("project.dataset.table_1624046611000", "project.dataset.table_1624046611000"), - ("project.dataset.table20231215", "project.dataset.table20231215"), + ("project.dataset.table20231215", "project.dataset.table"), ("project.dataset.table_*", "project.dataset.table"), ("project.dataset.table_2023*", "project.dataset.table"), ("project.dataset.table_202301*", "project.dataset.table"), diff --git a/metadata-ingestion/tests/unit/test_bigqueryv2_usage_source.py b/metadata-ingestion/tests/unit/test_bigqueryv2_usage_source.py index 4cf42da4395f94..44fd840f28d594 100644 --- a/metadata-ingestion/tests/unit/test_bigqueryv2_usage_source.py +++ b/metadata-ingestion/tests/unit/test_bigqueryv2_usage_source.py @@ -144,10 +144,10 @@ def test_bigquery_table_sanitasitation(): assert new_table_ref.dataset == "dataset-4567" table_ref = BigQueryTableRef( - BigqueryTableIdentifier("project-1234", "dataset-4567", "foo_20222110") + BigqueryTableIdentifier("project-1234", "dataset-4567", "foo_20221210") ) new_table_identifier = table_ref.table_identifier - assert new_table_identifier.table == "foo_20222110" + assert new_table_identifier.table == "foo_20221210" assert new_table_identifier.is_sharded_table() assert new_table_identifier.get_table_display_name() == "foo" assert new_table_identifier.project_id == "project-1234" From c381806110ae995dd2164305394ee4e1d131e033 Mon Sep 17 00:00:00 2001 From: Tamas Nemeth Date: Thu, 12 Oct 2023 13:56:30 +0200 Subject: [PATCH 5/6] feat(ingestion): Adding config option to auto lowercase dataset urns (#8928) --- .../datahub/configuration/source_common.py | 7 ++ .../src/datahub/ingestion/api/source.py | 24 +++++++ .../datahub/ingestion/api/source_helpers.py | 20 +++++- .../ingestion/source/bigquery_v2/bigquery.py | 3 - .../source/bigquery_v2/bigquery_config.py | 5 -- .../src/datahub/ingestion/source/kafka.py | 11 ++- .../ingestion/source/sql/sql_config.py | 11 ++- .../datahub/ingestion/source/unity/config.py | 6 +- .../src/datahub/utilities/urns/urn_iter.py | 33 +++++++-- .../api/source_helpers/test_source_helpers.py | 70 +++++++++++++++++++ 10 files changed, 170 insertions(+), 20 deletions(-) diff --git a/metadata-ingestion/src/datahub/configuration/source_common.py b/metadata-ingestion/src/datahub/configuration/source_common.py index a9f891ddb7b1e1..80b6ceb576c1cc 100644 --- a/metadata-ingestion/src/datahub/configuration/source_common.py +++ b/metadata-ingestion/src/datahub/configuration/source_common.py @@ -54,6 +54,13 @@ class DatasetSourceConfigMixin(PlatformInstanceConfigMixin, EnvConfigMixin): """ +class LowerCaseDatasetUrnConfigMixin(ConfigModel): + convert_urns_to_lowercase: bool = Field( + default=False, + description="Whether to convert dataset urns to lowercase.", + ) + + class DatasetLineageProviderConfigBase(EnvConfigMixin): """ Any non-Dataset source that produces lineage to Datasets should inherit this class. diff --git a/metadata-ingestion/src/datahub/ingestion/api/source.py b/metadata-ingestion/src/datahub/ingestion/api/source.py index 0bcc220cad49bf..b86844b1c4c831 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source.py @@ -29,6 +29,7 @@ from datahub.ingestion.api.report import Report from datahub.ingestion.api.source_helpers import ( auto_browse_path_v2, + auto_lowercase_urns, auto_materialize_referenced_tags, auto_status_aspect, auto_workunit_reporter, @@ -192,7 +193,30 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: self.ctx.pipeline_config.flags.generate_browse_path_v2_dry_run ) + auto_lowercase_dataset_urns: Optional[MetadataWorkUnitProcessor] = None + if ( + self.ctx.pipeline_config + and self.ctx.pipeline_config.source + and self.ctx.pipeline_config.source.config + and ( + ( + hasattr( + self.ctx.pipeline_config.source.config, + "convert_urns_to_lowercase", + ) + and self.ctx.pipeline_config.source.config.convert_urns_to_lowercase + ) + or ( + hasattr(self.ctx.pipeline_config.source.config, "get") + and self.ctx.pipeline_config.source.config.get( + "convert_urns_to_lowercase" + ) + ) + ) + ): + auto_lowercase_dataset_urns = auto_lowercase_urns return [ + auto_lowercase_dataset_urns, auto_status_aspect, auto_materialize_referenced_tags, browse_path_processor, diff --git a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py index 7fc15cf829678b..2ce9e07bc57bc8 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py @@ -35,7 +35,7 @@ from datahub.utilities.urns.dataset_urn import DatasetUrn from datahub.utilities.urns.tag_urn import TagUrn from datahub.utilities.urns.urn import guess_entity_type -from datahub.utilities.urns.urn_iter import list_urns +from datahub.utilities.urns.urn_iter import list_urns, lowercase_dataset_urns if TYPE_CHECKING: from datahub.ingestion.api.source import SourceReport @@ -70,7 +70,6 @@ def auto_status_aspect( for wu in stream: urn = wu.get_urn() all_urns.add(urn) - if not wu.is_primary_source: # If this is a non-primary source, we pretend like we've seen the status # aspect so that we don't try to emit a removal for it. @@ -173,6 +172,23 @@ def auto_materialize_referenced_tags( ).as_workunit() +def auto_lowercase_urns( + stream: Iterable[MetadataWorkUnit], +) -> Iterable[MetadataWorkUnit]: + """Lowercase all dataset urns""" + + for wu in stream: + try: + old_urn = wu.get_urn() + lowercase_dataset_urns(wu.metadata) + wu.id = wu.id.replace(old_urn, wu.get_urn()) + + yield wu + except Exception as e: + logger.warning(f"Failed to lowercase urns for {wu}: {e}", exc_info=True) + yield wu + + def auto_browse_path_v2( stream: Iterable[MetadataWorkUnit], *, diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py index e577c2bac8bbde..552612f877b9aa 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py @@ -16,7 +16,6 @@ make_dataplatform_instance_urn, make_dataset_urn, make_tag_urn, - set_dataset_urn_to_lower, ) from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.mcp_builder import BigQueryDatasetKey, ContainerKey, ProjectIdKey @@ -218,8 +217,6 @@ def __init__(self, ctx: PipelineContext, config: BigQueryV2Config): if self.config.enable_legacy_sharded_table_support: BigqueryTableIdentifier._BQ_SHARDED_TABLE_SUFFIX = "" - set_dataset_urn_to_lower(self.config.convert_urns_to_lowercase) - self.bigquery_data_dictionary = BigQuerySchemaApi( self.report.schema_api_perf, self.config.get_bigquery_client() ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py index 483355a85ac05c..944814b6936a45 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py @@ -206,11 +206,6 @@ def validate_column_lineage(cls, v: bool, values: Dict[str, Any]) -> bool: description="This flag enables the data lineage extraction from Data Lineage API exposed by Google Data Catalog. NOTE: This extractor can't build views lineage. It's recommended to enable the view's DDL parsing. Read the docs to have more information about: https://cloud.google.com/data-catalog/docs/concepts/about-data-lineage", ) - convert_urns_to_lowercase: bool = Field( - default=False, - description="Convert urns to lowercase.", - ) - enable_legacy_sharded_table_support: bool = Field( default=True, description="Use the legacy sharded table urn suffix added.", diff --git a/metadata-ingestion/src/datahub/ingestion/source/kafka.py b/metadata-ingestion/src/datahub/ingestion/source/kafka.py index 566304e1999b79..d5039360da5677 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/kafka.py +++ b/metadata-ingestion/src/datahub/ingestion/source/kafka.py @@ -18,7 +18,10 @@ from datahub.configuration.common import AllowDenyPattern from datahub.configuration.kafka import KafkaConsumerConnectionConfig -from datahub.configuration.source_common import DatasetSourceConfigMixin +from datahub.configuration.source_common import ( + DatasetSourceConfigMixin, + LowerCaseDatasetUrnConfigMixin, +) from datahub.emitter import mce_builder from datahub.emitter.mce_builder import ( make_data_platform_urn, @@ -76,7 +79,11 @@ class KafkaTopicConfigKeys(str, Enum): UNCLEAN_LEADER_ELECTION_CONFIG = "unclean.leader.election.enable" -class KafkaSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin): +class KafkaSourceConfig( + StatefulIngestionConfigBase, + DatasetSourceConfigMixin, + LowerCaseDatasetUrnConfigMixin, +): connection: KafkaConsumerConnectionConfig = KafkaConsumerConnectionConfig() topic_patterns: AllowDenyPattern = AllowDenyPattern(allow=[".*"], deny=["^_.*"]) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_config.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_config.py index 677d32c8bac08f..08cc74aec39775 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_config.py @@ -7,7 +7,10 @@ from pydantic import Field from datahub.configuration.common import AllowDenyPattern, ConfigModel -from datahub.configuration.source_common import DatasetSourceConfigMixin +from datahub.configuration.source_common import ( + DatasetSourceConfigMixin, + LowerCaseDatasetUrnConfigMixin, +) from datahub.configuration.validate_field_deprecation import pydantic_field_deprecated from datahub.ingestion.source.ge_profiling_config import GEProfilingConfig from datahub.ingestion.source.state.stale_entity_removal_handler import ( @@ -21,7 +24,11 @@ logger: logging.Logger = logging.getLogger(__name__) -class SQLCommonConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin): +class SQLCommonConfig( + StatefulIngestionConfigBase, + DatasetSourceConfigMixin, + LowerCaseDatasetUrnConfigMixin, +): options: dict = pydantic.Field( default_factory=dict, description="Any options specified here will be passed to [SQLAlchemy.create_engine](https://docs.sqlalchemy.org/en/14/core/engines.html#sqlalchemy.create_engine) as kwargs.", diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/config.py b/metadata-ingestion/src/datahub/ingestion/source/unity/config.py index 51390873712d39..a57ee398488556 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/config.py @@ -7,7 +7,10 @@ from pydantic import Field from datahub.configuration.common import AllowDenyPattern, ConfigModel -from datahub.configuration.source_common import DatasetSourceConfigMixin +from datahub.configuration.source_common import ( + DatasetSourceConfigMixin, + LowerCaseDatasetUrnConfigMixin, +) from datahub.configuration.validate_field_removal import pydantic_removed_field from datahub.configuration.validate_field_rename import pydantic_renamed_field from datahub.ingestion.source.state.stale_entity_removal_handler import ( @@ -91,6 +94,7 @@ class UnityCatalogSourceConfig( BaseUsageConfig, DatasetSourceConfigMixin, StatefulProfilingConfigMixin, + LowerCaseDatasetUrnConfigMixin, ): token: str = pydantic.Field(description="Databricks personal access token") workspace_url: str = pydantic.Field( diff --git a/metadata-ingestion/src/datahub/utilities/urns/urn_iter.py b/metadata-ingestion/src/datahub/utilities/urns/urn_iter.py index 261f95331af612..e13d439161064c 100644 --- a/metadata-ingestion/src/datahub/utilities/urns/urn_iter.py +++ b/metadata-ingestion/src/datahub/utilities/urns/urn_iter.py @@ -3,7 +3,11 @@ from avro.schema import Field, RecordSchema from datahub.emitter.mcp import MetadataChangeProposalWrapper -from datahub.metadata.schema_classes import DictWrapper +from datahub.metadata.schema_classes import ( + DictWrapper, + MetadataChangeEventClass, + MetadataChangeProposalClass, +) from datahub.utilities.urns.dataset_urn import DatasetUrn from datahub.utilities.urns.urn import Urn, guess_entity_type @@ -32,7 +36,7 @@ def list_urns_with_path( if isinstance(model, MetadataChangeProposalWrapper): if model.entityUrn: - urns.append((model.entityUrn, ["urn"])) + urns.append((model.entityUrn, ["entityUrn"])) if model.entityKeyAspect: urns.extend( _add_prefix_to_paths( @@ -83,7 +87,15 @@ def list_urns(model: Union[DictWrapper, MetadataChangeProposalWrapper]) -> List[ return [urn for urn, _ in list_urns_with_path(model)] -def transform_urns(model: DictWrapper, func: Callable[[str], str]) -> None: +def transform_urns( + model: Union[ + DictWrapper, + MetadataChangeEventClass, + MetadataChangeProposalClass, + MetadataChangeProposalWrapper, + ], + func: Callable[[str], str], +) -> None: """ Rewrites all URNs in the given object according to the given function. """ @@ -95,7 +107,9 @@ def transform_urns(model: DictWrapper, func: Callable[[str], str]) -> None: def _modify_at_path( - model: Union[DictWrapper, list], path: _Path, new_value: str + model: Union[DictWrapper, MetadataChangeProposalWrapper, list], + path: _Path, + new_value: str, ) -> None: assert len(path) > 0 @@ -103,6 +117,8 @@ def _modify_at_path( if isinstance(path[0], int): assert isinstance(model, list) model[path[0]] = new_value + elif isinstance(model, MetadataChangeProposalWrapper): + setattr(model, path[0], new_value) else: assert isinstance(model, DictWrapper) model._inner_dict[path[0]] = new_value @@ -120,7 +136,14 @@ def _lowercase_dataset_urn(dataset_urn: str) -> str: return str(cur_urn) -def lowercase_dataset_urns(model: DictWrapper) -> None: +def lowercase_dataset_urns( + model: Union[ + DictWrapper, + MetadataChangeEventClass, + MetadataChangeProposalClass, + MetadataChangeProposalWrapper, + ] +) -> None: def modify_urn(urn: str) -> str: if guess_entity_type(urn) == "dataset": return _lowercase_dataset_urn(urn) diff --git a/metadata-ingestion/tests/unit/api/source_helpers/test_source_helpers.py b/metadata-ingestion/tests/unit/api/source_helpers/test_source_helpers.py index b6ec6ebce240c8..b667af8bb41e98 100644 --- a/metadata-ingestion/tests/unit/api/source_helpers/test_source_helpers.py +++ b/metadata-ingestion/tests/unit/api/source_helpers/test_source_helpers.py @@ -16,6 +16,7 @@ from datahub.ingestion.api.source_helpers import ( auto_browse_path_v2, auto_empty_dataset_usage_statistics, + auto_lowercase_urns, auto_status_aspect, auto_workunit, ) @@ -275,6 +276,75 @@ def test_auto_browse_path_v2_legacy_browse_path(telemetry_ping_mock): assert paths["platform,dataset-2,PROD)"] == _make_browse_path_entries(["something"]) +def test_auto_lowercase_aspects(): + mcws = auto_workunit( + [ + MetadataChangeProposalWrapper( + entityUrn=make_dataset_urn( + "bigquery", "myProject.mySchema.myTable", "PROD" + ), + aspect=models.DatasetKeyClass( + "urn:li:dataPlatform:bigquery", "myProject.mySchema.myTable", "PROD" + ), + ), + MetadataChangeProposalWrapper( + entityUrn="urn:li:container:008e111aa1d250dd52e0fd5d4b307b1a", + aspect=models.ContainerPropertiesClass( + name="test", + ), + ), + models.MetadataChangeEventClass( + proposedSnapshot=models.DatasetSnapshotClass( + urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,bigquery-Public-Data.Covid19_Aha.staffing,PROD)", + aspects=[ + models.DatasetPropertiesClass( + customProperties={ + "key": "value", + }, + ), + ], + ), + ), + ] + ) + + expected = [ + *list( + auto_workunit( + [ + MetadataChangeProposalWrapper( + entityUrn="urn:li:dataset:(urn:li:dataPlatform:bigquery,myproject.myschema.mytable,PROD)", + aspect=models.DatasetKeyClass( + "urn:li:dataPlatform:bigquery", + "myProject.mySchema.myTable", + "PROD", + ), + ), + MetadataChangeProposalWrapper( + entityUrn="urn:li:container:008e111aa1d250dd52e0fd5d4b307b1a", + aspect=models.ContainerPropertiesClass( + name="test", + ), + ), + models.MetadataChangeEventClass( + proposedSnapshot=models.DatasetSnapshotClass( + urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,bigquery-public-data.covid19_aha.staffing,PROD)", + aspects=[ + models.DatasetPropertiesClass( + customProperties={ + "key": "value", + }, + ), + ], + ), + ), + ] + ) + ), + ] + assert list(auto_lowercase_urns(mcws)) == expected + + @patch("datahub.ingestion.api.source_helpers.telemetry.telemetry_instance.ping") def test_auto_browse_path_v2_container_over_legacy_browse_path(telemetry_ping_mock): structure = {"a": {"b": ["c"]}} From 8813ae2fb15a1f80d5f0ef433fce1f84e1a240b5 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Thu, 12 Oct 2023 07:58:10 -0400 Subject: [PATCH 6/6] feat(ingest/s3): support .gzip and fix decompression bug (#8990) --- .../ingestion/source/data_lake_common/path_spec.py | 9 ++++++++- .../src/datahub/ingestion/source/s3/source.py | 8 +++++++- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/path_spec.py b/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/path_spec.py index d1c949f48e2cd5..a35fb94614f722 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/path_spec.py +++ b/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/path_spec.py @@ -18,7 +18,14 @@ logger: logging.Logger = logging.getLogger(__name__) SUPPORTED_FILE_TYPES: List[str] = ["csv", "tsv", "json", "parquet", "avro"] -SUPPORTED_COMPRESSIONS: List[str] = ["gz", "bz2"] + +# These come from the smart_open library. +SUPPORTED_COMPRESSIONS: List[str] = [ + "gz", + "bz2", + # We have a monkeypatch on smart_open that aliases .gzip to .gz. + "gzip", +] class PathSpec(ConfigModel): diff --git a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py index ac4433b7eb1f0c..eb49fcbb268c0a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py @@ -10,6 +10,7 @@ from pathlib import PurePath from typing import Any, Dict, Iterable, List, Optional, Tuple +import smart_open.compression as so_compression from more_itertools import peekable from pyspark.conf import SparkConf from pyspark.sql import SparkSession @@ -120,6 +121,9 @@ } PAGE_SIZE = 1000 +# Hack to support the .gzip extension with smart_open. +so_compression.register_compressor(".gzip", so_compression._COMPRESSOR_REGISTRY[".gz"]) + def get_column_type( report: SourceReport, dataset_name: str, column_type: str @@ -407,7 +411,9 @@ def get_fields(self, table_data: TableData, path_spec: PathSpec) -> List: table_data.full_path, "rb", transport_params={"client": s3_client} ) else: - file = open(table_data.full_path, "rb") + # We still use smart_open here to take advantage of the compression + # capabilities of smart_open. + file = smart_open(table_data.full_path, "rb") fields = []