From 9c72bd9ed7f1c2b4228fc656c5f8b6f31bf0d431 Mon Sep 17 00:00:00 2001 From: Kos Korchak <97058061+kkorchak@users.noreply.github.com> Date: Mon, 30 Oct 2023 14:12:07 -0400 Subject: [PATCH 1/9] fix(test): Fix for flaky download_lineage_results cypress test (#9132) --- .../cypress/cypress/e2e/lineage/download_lineage_results.js | 3 +++ .../tests/cypress/cypress/e2e/mutations/dataset_ownership.js | 4 ++++ 2 files changed, 7 insertions(+) diff --git a/smoke-test/tests/cypress/cypress/e2e/lineage/download_lineage_results.js b/smoke-test/tests/cypress/cypress/e2e/lineage/download_lineage_results.js index 315aa7b22b9da..dc6efc9f7df66 100644 --- a/smoke-test/tests/cypress/cypress/e2e/lineage/download_lineage_results.js +++ b/smoke-test/tests/cypress/cypress/e2e/lineage/download_lineage_results.js @@ -27,6 +27,9 @@ const downloadCsvFile = (filename) => { }; describe("download lineage results to .csv file", () => { + beforeEach(() => { + cy.on('uncaught:exception', (err, runnable) => { return false; }); + }); it("download and verify lineage results for 1st, 2nd and 3+ degree of dependencies", () => { cy.loginWithCredentials(); diff --git a/smoke-test/tests/cypress/cypress/e2e/mutations/dataset_ownership.js b/smoke-test/tests/cypress/cypress/e2e/mutations/dataset_ownership.js index 99ad9a68d35e1..465d7998b9f9a 100644 --- a/smoke-test/tests/cypress/cypress/e2e/mutations/dataset_ownership.js +++ b/smoke-test/tests/cypress/cypress/e2e/mutations/dataset_ownership.js @@ -29,6 +29,10 @@ const addOwner = (owner, type, elementId) => { } describe("add, remove ownership for dataset", () => { + beforeEach(() => { + cy.on('uncaught:exception', (err, runnable) => { return false; }); + }); + it("create test user and test group, add user to a group", () => { cy.loginWithCredentials(); cy.createUser(username, password, email); From 300cea373d6a94f05cf3bd95ab69bc503a28538e Mon Sep 17 00:00:00 2001 From: Pedro Silva Date: Mon, 30 Oct 2023 20:50:42 +0000 Subject: [PATCH 2/9] docs: Update updating-datahub.md (#9131) --- docs/how/updating-datahub.md | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/docs/how/updating-datahub.md b/docs/how/updating-datahub.md index 4d1535f28fa0a..28f11e4b6d707 100644 --- a/docs/how/updating-datahub.md +++ b/docs/how/updating-datahub.md @@ -4,10 +4,20 @@ This file documents any backwards-incompatible changes in DataHub and assists pe ## Next -- #9010 - In Redshift source's config `incremental_lineage` is set default to off. +### Breaking Changes + +### Potential Downtime + +### Deprecations + +### Other Notable Changes + +## 0.12.0 ### Breaking Changes +- #9044 - GraphQL APIs for adding ownership now expect either an `ownershipTypeUrn` referencing a customer ownership type or a (deprecated) `type`. Where before adding an ownership without a concrete type was allowed, this is no longer the case. For simplicity you can use the `type` parameter which will get translated to a custom ownership type internally if one exists for the type being added. +- #9010 - In Redshift source's config `incremental_lineage` is set default to off. - #8810 - Removed support for SQLAlchemy 1.3.x. Only SQLAlchemy 1.4.x is supported now. - #8942 - Removed `urn:li:corpuser:datahub` owner for the `Measure`, `Dimension` and `Temporal` tags emitted by Looker and LookML source connectors. From 58bcedcd6a091263c6dc3e1181c260233a80575d Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Mon, 30 Oct 2023 14:18:48 -0700 Subject: [PATCH 3/9] fix(ingest/clickhouse): pin version to solve column reflection regression (#9143) --- metadata-ingestion/setup.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 7f7826abe2095..b1c5510efd923 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -173,7 +173,9 @@ clickhouse_common = { # Clickhouse 0.2.0 adds support for SQLAlchemy 1.4.x - "clickhouse-sqlalchemy>=0.2.0", + # Disallow 0.2.5 because of https://github.com/xzkostyan/clickhouse-sqlalchemy/issues/272. + # Note that there's also a known issue around nested map types: https://github.com/xzkostyan/clickhouse-sqlalchemy/issues/269. + "clickhouse-sqlalchemy>=0.2.0,<0.2.5", } redshift_common = { From 51d6d1f4531dad133e06db75267fbea77e424d00 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Mon, 30 Oct 2023 14:19:52 -0700 Subject: [PATCH 4/9] feat(ingest/looker): cleanup error handling (#9135) --- .../src/datahub/ingestion/api/workunit.py | 6 +++++- .../ingestion/source/looker/looker_lib_wrapper.py | 8 ++++++-- .../ingestion/source/looker/looker_source.py | 14 ++------------ .../ingestion/source/looker/lookml_source.py | 5 +---- 4 files changed, 14 insertions(+), 19 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/api/workunit.py b/metadata-ingestion/src/datahub/ingestion/api/workunit.py index 8eea3514a22af..b1c003ee27e12 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/workunit.py +++ b/metadata-ingestion/src/datahub/ingestion/api/workunit.py @@ -22,7 +22,11 @@ class MetadataWorkUnit(WorkUnit): metadata: Union[ MetadataChangeEvent, MetadataChangeProposal, MetadataChangeProposalWrapper ] - # A workunit creator can determine if this workunit is allowed to fail + + # A workunit creator can determine if this workunit is allowed to fail. + # TODO: This flag was initially added during the rollout of the subType aspect + # to improve backwards compatibility, but is not really needed anymore and so + # should be removed. treat_errors_as_warnings: bool = False # When this is set to false, this MWU will be ignored by automatic helpers diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_lib_wrapper.py b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_lib_wrapper.py index cf132b7ef27f7..b00f74b71e792 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_lib_wrapper.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_lib_wrapper.py @@ -123,8 +123,12 @@ def get_user(self, id_: str, user_fields: str) -> Optional[User]: transport_options=self.transport_options, ) except SDKError as e: - logger.warning(f"Could not find user with id {id_}") - logger.warning(f"Failure was {e}") + if "Looker Not Found (404)" in str(e): + # User not found + logger.info(f"Could not find user with id {id_}: 404 error") + else: + logger.warning(f"Could not find user with id {id_}") + logger.warning(f"Failure was {e}") # User not found return None diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_source.py b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_source.py index a3df977582ca4..09683d790c14c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_source.py @@ -926,14 +926,7 @@ def process_metrics_dimensions_and_fields_for_dashboard( mcps = chart_mcps mcps.append(dashboard_mcp) - workunits = [ - MetadataWorkUnit( - id=f"looker-{mcp.aspectName}-{mcp.entityUrn}", - mcp=mcp, - treat_errors_as_warnings=True, - ) - for mcp in mcps - ] + workunits = [mcp.as_workunit() for mcp in mcps] return workunits @@ -1320,10 +1313,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: id=f"looker-{event.proposedSnapshot.urn}", mce=event ) elif isinstance(event, MetadataChangeProposalWrapper): - # We want to treat subtype aspects as optional, so allowing failures in this aspect to be treated as warnings rather than failures - yield event.as_workunit( - treat_errors_as_warnings=event.aspectName in ["subTypes"] - ) + yield event.as_workunit() else: raise Exception(f"Unexpected type of event {event}") self.reporter.report_stage_end("explore_metadata") diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_source.py b/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_source.py index e69c3b6e601bd..e6b78cc7a7745 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_source.py @@ -2171,10 +2171,7 @@ def get_internal_workunits(self) -> Iterable[MetadataWorkUnit]: # noqa: C901 for mcp in self._build_dataset_mcps( maybe_looker_view ): - # We want to treat mcp aspects as optional, so allowing failures in this aspect to be treated as warnings rather than failures - yield mcp.as_workunit( - treat_errors_as_warnings=True - ) + yield mcp.as_workunit() else: ( prev_model_name, From 0bd2d9a36cdf18575ac4e54126db5be33ec59d8a Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Mon, 30 Oct 2023 14:22:05 -0700 Subject: [PATCH 5/9] feat(ingest): add `entity_supports_aspect` helper (#9120) --- .../src/datahub/emitter/mcp_builder.py | 13 ++++++++++++- metadata-ingestion/tests/unit/test_mcp_builder.py | 9 +++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/emitter/mcp_builder.py b/metadata-ingestion/src/datahub/emitter/mcp_builder.py index 65e0c0d6ba60d..d50feba8b119c 100644 --- a/metadata-ingestion/src/datahub/emitter/mcp_builder.py +++ b/metadata-ingestion/src/datahub/emitter/mcp_builder.py @@ -1,9 +1,10 @@ -from typing import Dict, Iterable, List, Optional, TypeVar +from typing import Dict, Iterable, List, Optional, Type, TypeVar from pydantic.fields import Field from pydantic.main import BaseModel from datahub.emitter.mce_builder import ( + Aspect, datahub_guid, make_container_urn, make_data_platform_urn, @@ -18,6 +19,7 @@ ) from datahub.metadata.com.linkedin.pegasus2avro.container import ContainerProperties from datahub.metadata.schema_classes import ( + KEY_ASPECTS, ContainerClass, DomainsClass, EmbedClass, @@ -306,3 +308,12 @@ def create_embed_mcp(urn: str, embed_url: str) -> MetadataChangeProposalWrapper: entityUrn=urn, aspect=EmbedClass(renderUrl=embed_url), ) + + +def entity_supports_aspect(entity_type: str, aspect_type: Type[Aspect]) -> bool: + entity_key_aspect = KEY_ASPECTS[entity_type] + aspect_name = aspect_type.get_aspect_name() + + supported_aspects = entity_key_aspect.ASPECT_INFO["entityAspects"] + + return aspect_name in supported_aspects diff --git a/metadata-ingestion/tests/unit/test_mcp_builder.py b/metadata-ingestion/tests/unit/test_mcp_builder.py index 561b782ef9e46..e304edb24789c 100644 --- a/metadata-ingestion/tests/unit/test_mcp_builder.py +++ b/metadata-ingestion/tests/unit/test_mcp_builder.py @@ -1,4 +1,5 @@ import datahub.emitter.mcp_builder as builder +from datahub.metadata.schema_classes import StatusClass, TelemetryClientIdClass def test_guid_generator(): @@ -83,3 +84,11 @@ def test_guid_generators(): guid = key.guid() assert guid == guid_datahub + + +def test_entity_supports_aspect(): + assert builder.entity_supports_aspect("dataset", StatusClass) + assert not builder.entity_supports_aspect("telemetry", StatusClass) + + assert not builder.entity_supports_aspect("dataset", TelemetryClientIdClass) + assert builder.entity_supports_aspect("telemetry", TelemetryClientIdClass) From ce0f36b8bc74e3f0bab447408096347617804d92 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Mon, 30 Oct 2023 14:23:19 -0700 Subject: [PATCH 6/9] feat(sqlparser): support more update syntaxes + fix bug with subqueries (#9105) --- .../src/datahub/utilities/sqlglot_lineage.py | 57 ++++++++++- .../test_postgres_select_subquery.json | 64 ++++++++++++ .../test_snowflake_update_from_table.json | 1 + .../test_snowflake_update_hardcoded.json | 4 +- .../goldens/test_snowflake_update_self.json | 29 ++++++ .../unit/sql_parsing/test_sqlglot_lineage.py | 98 +++++++++++++++++++ 6 files changed, 247 insertions(+), 6 deletions(-) create mode 100644 metadata-ingestion/tests/unit/sql_parsing/goldens/test_postgres_select_subquery.json create mode 100644 metadata-ingestion/tests/unit/sql_parsing/goldens/test_snowflake_update_self.json diff --git a/metadata-ingestion/src/datahub/utilities/sqlglot_lineage.py b/metadata-ingestion/src/datahub/utilities/sqlglot_lineage.py index 1d74b20569814..388388f9f4b38 100644 --- a/metadata-ingestion/src/datahub/utilities/sqlglot_lineage.py +++ b/metadata-ingestion/src/datahub/utilities/sqlglot_lineage.py @@ -12,8 +12,8 @@ import sqlglot.errors import sqlglot.lineage import sqlglot.optimizer.annotate_types +import sqlglot.optimizer.optimizer import sqlglot.optimizer.qualify -import sqlglot.optimizer.qualify_columns from pydantic import BaseModel from typing_extensions import TypedDict @@ -48,6 +48,19 @@ SQL_PARSE_RESULT_CACHE_SIZE = 1000 +RULES_BEFORE_TYPE_ANNOTATION: tuple = tuple( + filter( + # Skip pushdown_predicates because it sometimes throws exceptions, and we + # don't actually need it for anything. + lambda func: func.__name__ not in {"pushdown_predicates"}, + itertools.takewhile( + lambda func: func != sqlglot.optimizer.annotate_types.annotate_types, + sqlglot.optimizer.optimizer.RULES, + ), + ) +) + + class GraphQLSchemaField(TypedDict): fieldPath: str nativeDataType: str @@ -289,6 +302,10 @@ def _table_level_lineage( ) # TODO: If a CTAS has "LIMIT 0", it's not really lineage, just copying the schema. + # Update statements implicitly read from the table being updated, so add those back in. + if isinstance(statement, sqlglot.exp.Update): + tables = tables | modified + return tables, modified @@ -568,17 +585,20 @@ def _schema_aware_fuzzy_column_resolve( # - the select instead of the full outer statement # - schema info # - column qualification enabled + # - running the full pre-type annotation optimizer # logger.debug("Schema: %s", sqlglot_db_schema.mapping) - statement = sqlglot.optimizer.qualify.qualify( + statement = sqlglot.optimizer.optimizer.optimize( statement, dialect=dialect, schema=sqlglot_db_schema, + qualify_columns=True, validate_qualify_columns=False, identify=True, # sqlglot calls the db -> schema -> table hierarchy "catalog", "db", "table". catalog=default_db, db=default_schema, + rules=RULES_BEFORE_TYPE_ANNOTATION, ) except (sqlglot.errors.OptimizeError, ValueError) as e: raise SqlUnderstandingError( @@ -748,6 +768,7 @@ def _extract_select_from_create( _UPDATE_ARGS_NOT_SUPPORTED_BY_SELECT: Set[str] = set( sqlglot.exp.Update.arg_types.keys() ) - set(sqlglot.exp.Select.arg_types.keys()) +_UPDATE_FROM_TABLE_ARGS_TO_MOVE = {"joins", "laterals", "pivot"} def _extract_select_from_update( @@ -774,17 +795,43 @@ def _extract_select_from_update( # they'll get caught later. new_expressions.append(expr) - return sqlglot.exp.Select( + # Special translation for the `from` clause. + extra_args = {} + original_from = statement.args.get("from") + if original_from and isinstance(original_from.this, sqlglot.exp.Table): + # Move joins, laterals, and pivots from the Update->From->Table->field + # to the top-level Select->field. + + for k in _UPDATE_FROM_TABLE_ARGS_TO_MOVE: + if k in original_from.this.args: + # Mutate the from table clause in-place. + extra_args[k] = original_from.this.args.pop(k) + + select_statement = sqlglot.exp.Select( **{ **{ k: v for k, v in statement.args.items() if k not in _UPDATE_ARGS_NOT_SUPPORTED_BY_SELECT }, + **extra_args, "expressions": new_expressions, } ) + # Update statements always implicitly have the updated table in context. + # TODO: Retain table name alias. + if select_statement.args.get("from"): + # select_statement = sqlglot.parse_one(select_statement.sql(dialect=dialect)) + + select_statement = select_statement.join( + statement.this, append=True, join_kind="cross" + ) + else: + select_statement = select_statement.from_(statement.this) + + return select_statement + def _is_create_table_ddl(statement: sqlglot.exp.Expression) -> bool: return isinstance(statement, sqlglot.exp.Create) and isinstance( @@ -955,7 +1002,7 @@ def _sqlglot_lineage_inner( # Fetch schema info for the relevant tables. table_name_urn_mapping: Dict[_TableName, str] = {} table_name_schema_mapping: Dict[_TableName, SchemaInfo] = {} - for table in itertools.chain(tables, modified): + for table in tables | modified: # For select statements, qualification will be a no-op. For other statements, this # is where the qualification actually happens. qualified_table = table.qualified( @@ -971,7 +1018,7 @@ def _sqlglot_lineage_inner( # Also include the original, non-qualified table name in the urn mapping. table_name_urn_mapping[table] = urn - total_tables_discovered = len(tables) + len(modified) + total_tables_discovered = len(tables | modified) total_schemas_resolved = len(table_name_schema_mapping) debug_info = SqlParsingDebugInfo( confidence=0.9 if total_tables_discovered == total_schemas_resolved diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_postgres_select_subquery.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_postgres_select_subquery.json new file mode 100644 index 0000000000000..0c40ce120c934 --- /dev/null +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_postgres_select_subquery.json @@ -0,0 +1,64 @@ +{ + "query_type": "SELECT", + "in_tables": [ + "urn:li:dataset:(urn:li:dataPlatform:postgres,my_db.my_schema.table1,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:postgres,my_db.my_schema.table2,PROD)" + ], + "out_tables": [], + "column_lineage": [ + { + "downstream": { + "table": null, + "column": "a", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "native_column_type": "INT" + }, + "upstreams": [ + { + "table": "urn:li:dataset:(urn:li:dataPlatform:postgres,my_db.my_schema.table1,PROD)", + "column": "a" + } + ] + }, + { + "downstream": { + "table": null, + "column": "b", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "native_column_type": "INT" + }, + "upstreams": [ + { + "table": "urn:li:dataset:(urn:li:dataPlatform:postgres,my_db.my_schema.table1,PROD)", + "column": "b" + } + ] + }, + { + "downstream": { + "table": null, + "column": "c", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.ArrayType": {} + } + }, + "native_column_type": "INT[]" + }, + "upstreams": [ + { + "table": "urn:li:dataset:(urn:li:dataPlatform:postgres,my_db.my_schema.table2,PROD)", + "column": "c" + } + ] + } + ] +} \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_snowflake_update_from_table.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_snowflake_update_from_table.json index e2baa34e7fe28..d51001f969799 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_snowflake_update_from_table.json +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_snowflake_update_from_table.json @@ -1,6 +1,7 @@ { "query_type": "UPDATE", "in_tables": [ + "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_db.my_schema.my_table,PROD)", "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_db.my_schema.table1,PROD)", "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_db.my_schema.table2,PROD)" ], diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_snowflake_update_hardcoded.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_snowflake_update_hardcoded.json index b41ed61b37cdb..f421b28530c64 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_snowflake_update_hardcoded.json +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_snowflake_update_hardcoded.json @@ -1,6 +1,8 @@ { "query_type": "UPDATE", - "in_tables": [], + "in_tables": [ + "urn:li:dataset:(urn:li:dataPlatform:snowflake,snowflake_sample_data.tpch_sf1.orders,PROD)" + ], "out_tables": [ "urn:li:dataset:(urn:li:dataPlatform:snowflake,snowflake_sample_data.tpch_sf1.orders,PROD)" ], diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_snowflake_update_self.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_snowflake_update_self.json new file mode 100644 index 0000000000000..c8cc32164a3eb --- /dev/null +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_snowflake_update_self.json @@ -0,0 +1,29 @@ +{ + "query_type": "UPDATE", + "in_tables": [ + "urn:li:dataset:(urn:li:dataPlatform:snowflake,snowflake_sample_data.tpch_sf1.orders,PROD)" + ], + "out_tables": [ + "urn:li:dataset:(urn:li:dataPlatform:snowflake,snowflake_sample_data.tpch_sf1.orders,PROD)" + ], + "column_lineage": [ + { + "downstream": { + "table": "urn:li:dataset:(urn:li:dataPlatform:snowflake,snowflake_sample_data.tpch_sf1.orders,PROD)", + "column": "orderkey", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "native_column_type": "DECIMAL" + }, + "upstreams": [ + { + "table": "urn:li:dataset:(urn:li:dataPlatform:snowflake,snowflake_sample_data.tpch_sf1.orders,PROD)", + "column": "orderkey" + } + ] + } + ] +} \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/sql_parsing/test_sqlglot_lineage.py b/metadata-ingestion/tests/unit/sql_parsing/test_sqlglot_lineage.py index dfc5b486abd35..5559ebe1756a6 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/test_sqlglot_lineage.py +++ b/metadata-ingestion/tests/unit/sql_parsing/test_sqlglot_lineage.py @@ -768,3 +768,101 @@ def test_snowflake_update_from_table(): }, expected_file=RESOURCE_DIR / "test_snowflake_update_from_table.json", ) + + +def test_snowflake_update_self(): + assert_sql_result( + """ +UPDATE snowflake_sample_data.tpch_sf1.orders +SET orderkey = orderkey + 1 +""", + dialect="snowflake", + schemas={ + "urn:li:dataset:(urn:li:dataPlatform:snowflake,snowflake_sample_data.tpch_sf1.orders,PROD)": { + "orderkey": "NUMBER(38,0)", + "totalprice": "NUMBER(12,2)", + }, + }, + expected_file=RESOURCE_DIR / "test_snowflake_update_self.json", + ) + + +def test_postgres_select_subquery(): + assert_sql_result( + """ +SELECT + a, + b, + (SELECT c FROM table2 WHERE table2.id = table1.id) as c +FROM table1 +""", + dialect="postgres", + default_db="my_db", + default_schema="my_schema", + schemas={ + "urn:li:dataset:(urn:li:dataPlatform:postgres,my_db.my_schema.table1,PROD)": { + "id": "INTEGER", + "a": "INTEGER", + "b": "INTEGER", + }, + "urn:li:dataset:(urn:li:dataPlatform:postgres,my_db.my_schema.table2,PROD)": { + "id": "INTEGER", + "c": "INTEGER", + }, + }, + expected_file=RESOURCE_DIR / "test_postgres_select_subquery.json", + ) + + +@pytest.mark.skip(reason="We can't parse column-list syntax with sub-selects yet") +def test_postgres_update_subselect(): + assert_sql_result( + """ +UPDATE accounts SET sales_person_name = + (SELECT name FROM employees + WHERE employees.id = accounts.sales_person_id) +""", + dialect="postgres", + default_db="my_db", + default_schema="my_schema", + schemas={ + "urn:li:dataset:(urn:li:dataPlatform:postgres,my_db.my_schema.accounts,PROD)": { + "id": "INTEGER", + "sales_person_id": "INTEGER", + "sales_person_name": "VARCHAR(16777216)", + }, + "urn:li:dataset:(urn:li:dataPlatform:postgres,my_db.my_schema.employees,PROD)": { + "id": "INTEGER", + "name": "VARCHAR(16777216)", + }, + }, + expected_file=RESOURCE_DIR / "test_postgres_update_subselect.json", + ) + + +@pytest.mark.skip(reason="We can't parse column-list syntax with sub-selects yet") +def test_postgres_complex_update(): + # Example query from the postgres docs: + # https://www.postgresql.org/docs/current/sql-update.html + assert_sql_result( + """ +UPDATE accounts SET (contact_first_name, contact_last_name) = + (SELECT first_name, last_name FROM employees + WHERE employees.id = accounts.sales_person); +""", + dialect="postgres", + schemas={ + "urn:li:dataset:(urn:li:dataPlatform:postgres,my_db.my_schema.accounts,PROD)": { + "id": "INTEGER", + "contact_first_name": "VARCHAR(16777216)", + "contact_last_name": "VARCHAR(16777216)", + "sales_person": "INTEGER", + }, + "urn:li:dataset:(urn:li:dataPlatform:postgres,my_db.my_schema.employees,PROD)": { + "id": "INTEGER", + "first_name": "VARCHAR(16777216)", + "last_name": "VARCHAR(16777216)", + }, + }, + expected_file=RESOURCE_DIR / "test_postgres_complex_update.json", + ) From 94d438d44f2d18def4a422cd60150d2c9a78be49 Mon Sep 17 00:00:00 2001 From: sachinsaju <33017477+sachinsaju@users.noreply.github.com> Date: Tue, 31 Oct 2023 08:54:56 +0530 Subject: [PATCH 7/9] docs: correct broken doc links (#9137) Co-authored-by: Hyejin Yoon <0327jane@gmail.com> --- docs/deploy/aws.md | 2 +- docs/what-is-datahub/datahub-concepts.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/deploy/aws.md b/docs/deploy/aws.md index e0f57b4a0b0cb..6598b93c25e9a 100644 --- a/docs/deploy/aws.md +++ b/docs/deploy/aws.md @@ -15,7 +15,7 @@ This guide requires the following tools: - [kubectl](https://kubernetes.io/docs/tasks/tools/) to manage kubernetes resources - [helm](https://helm.sh/docs/intro/install/) to deploy the resources based on helm charts. Note, we only support Helm 3. -- [eksctl](https://eksctl.io/introduction/#installation) to create and manage clusters on EKS +- [eksctl](https://eksctl.io/installation/) to create and manage clusters on EKS - [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-install.html) to manage AWS resources To use the above tools, you need to set up AWS credentials by following diff --git a/docs/what-is-datahub/datahub-concepts.md b/docs/what-is-datahub/datahub-concepts.md index 6328d97fa6a50..03b86fab0ede4 100644 --- a/docs/what-is-datahub/datahub-concepts.md +++ b/docs/what-is-datahub/datahub-concepts.md @@ -99,7 +99,7 @@ List of Data Platforms - Tableau - Vertica -Reference : [data_platforms.json](https://github.com/acryldata/datahub-fork/blob/acryl-main/metadata-service/war/src/main/resources/boot/data_platforms.json) +Reference : [data_platforms.json](https://github.com/datahub-project/datahub/blob/master/metadata-service/war/src/main/resources/boot/data_platforms.json) From ea1273281e3a65ab4d94d002ee19f91907a3eb84 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Mon, 30 Oct 2023 20:57:59 -0700 Subject: [PATCH 8/9] feat(ingest): sql parser perf + asyncio fixes (#9119) --- metadata-ingestion/setup.py | 2 +- .../src/datahub/cli/docker_cli.py | 5 ++ .../src/datahub/upgrade/upgrade.py | 12 ++--- .../src/datahub/utilities/sqlglot_lineage.py | 5 +- .../goldens/test_select_from_union.json | 2 +- .../test_teradata_strange_operators.json | 46 +++++++++++++++++++ .../unit/sql_parsing/test_sqlglot_lineage.py | 14 ++++++ 7 files changed, 73 insertions(+), 13 deletions(-) create mode 100644 metadata-ingestion/tests/unit/sql_parsing/goldens/test_teradata_strange_operators.json diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index b1c5510efd923..151842bd84d0a 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -108,7 +108,7 @@ sqlglot_lib = { # Using an Acryl fork of sqlglot. # https://github.com/tobymao/sqlglot/compare/main...hsheth2:sqlglot:hsheth?expand=1 - "acryl-sqlglot==18.5.2.dev45", + "acryl-sqlglot==18.17.1.dev16", } sql_common = ( diff --git a/metadata-ingestion/src/datahub/cli/docker_cli.py b/metadata-ingestion/src/datahub/cli/docker_cli.py index 4afccfe711e34..77e3285d359ef 100644 --- a/metadata-ingestion/src/datahub/cli/docker_cli.py +++ b/metadata-ingestion/src/datahub/cli/docker_cli.py @@ -5,6 +5,7 @@ import os import pathlib import platform +import signal import subprocess import sys import tempfile @@ -770,6 +771,10 @@ def quickstart( # noqa: C901 logger.debug("docker compose up still running, sending SIGKILL") up_process.kill() up_process.wait() + else: + # If the docker process got a keyboard interrupt, raise one here. + if up_process.returncode in {128 + signal.SIGINT, -signal.SIGINT}: + raise KeyboardInterrupt # Check docker health every few seconds. status = check_docker_quickstart() diff --git a/metadata-ingestion/src/datahub/upgrade/upgrade.py b/metadata-ingestion/src/datahub/upgrade/upgrade.py index 30f19b8b84f35..acc7954ad25a6 100644 --- a/metadata-ingestion/src/datahub/upgrade/upgrade.py +++ b/metadata-ingestion/src/datahub/upgrade/upgrade.py @@ -1,6 +1,5 @@ import asyncio import contextlib -import functools import logging import sys from datetime import datetime, timedelta, timezone @@ -374,17 +373,14 @@ def check_upgrade(func: Callable[..., T]) -> Callable[..., T]: @wraps(func) def async_wrapper(*args: Any, **kwargs: Any) -> Any: async def run_inner_func(): - loop = asyncio.get_event_loop() - return await loop.run_in_executor( - None, functools.partial(func, *args, **kwargs) - ) + return func(*args, **kwargs) async def run_func_check_upgrade(): version_stats_future = asyncio.ensure_future(retrieve_version_stats()) - the_one_future = asyncio.ensure_future(run_inner_func()) - ret = await the_one_future + main_func_future = asyncio.ensure_future(run_inner_func()) + ret = await main_func_future - # the one future has returned + # the main future has returned # we check the other futures quickly try: version_stats = await asyncio.wait_for(version_stats_future, 0.5) diff --git a/metadata-ingestion/src/datahub/utilities/sqlglot_lineage.py b/metadata-ingestion/src/datahub/utilities/sqlglot_lineage.py index 388388f9f4b38..6413275ac63a6 100644 --- a/metadata-ingestion/src/datahub/utilities/sqlglot_lineage.py +++ b/metadata-ingestion/src/datahub/utilities/sqlglot_lineage.py @@ -106,6 +106,7 @@ def get_query_type_of_sql(expression: sqlglot.exp.Expression) -> QueryType: sqlglot.exp.Update: QueryType.UPDATE, sqlglot.exp.Delete: QueryType.DELETE, sqlglot.exp.Merge: QueryType.MERGE, + sqlglot.exp.Subqueryable: QueryType.SELECT, # unions, etc. are also selects } for cls, query_type in mapping.items(): @@ -820,10 +821,8 @@ def _extract_select_from_update( ) # Update statements always implicitly have the updated table in context. - # TODO: Retain table name alias. + # TODO: Retain table name alias, if one was present. if select_statement.args.get("from"): - # select_statement = sqlglot.parse_one(select_statement.sql(dialect=dialect)) - select_statement = select_statement.join( statement.this, append=True, join_kind="cross" ) diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_select_from_union.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_select_from_union.json index 902aa010c8afc..5d1d421f49a2a 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_select_from_union.json +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_select_from_union.json @@ -1,5 +1,5 @@ { - "query_type": "UNKNOWN", + "query_type": "SELECT", "in_tables": [ "urn:li:dataset:(urn:li:dataPlatform:snowflake,snowflake_sample_data.tpch_sf10.orders,PROD)", "urn:li:dataset:(urn:li:dataPlatform:snowflake,snowflake_sample_data.tpch_sf100.orders,PROD)" diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_teradata_strange_operators.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_teradata_strange_operators.json new file mode 100644 index 0000000000000..4b21a2512ccd1 --- /dev/null +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_teradata_strange_operators.json @@ -0,0 +1,46 @@ +{ + "query_type": "SELECT", + "in_tables": [ + "urn:li:dataset:(urn:li:dataPlatform:teradata,dbc.table1,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:teradata,dbc.table2,PROD)" + ], + "out_tables": [], + "column_lineage": [ + { + "downstream": { + "table": null, + "column": "col1", + "column_type": null, + "native_column_type": null + }, + "upstreams": [ + { + "table": "urn:li:dataset:(urn:li:dataPlatform:teradata,dbc.table1,PROD)", + "column": "col1" + }, + { + "table": "urn:li:dataset:(urn:li:dataPlatform:teradata,dbc.table2,PROD)", + "column": "col1" + } + ] + }, + { + "downstream": { + "table": null, + "column": "col2", + "column_type": null, + "native_column_type": null + }, + "upstreams": [ + { + "table": "urn:li:dataset:(urn:li:dataPlatform:teradata,dbc.table1,PROD)", + "column": "col2" + }, + { + "table": "urn:li:dataset:(urn:li:dataPlatform:teradata,dbc.table2,PROD)", + "column": "col2" + } + ] + } + ] +} \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/sql_parsing/test_sqlglot_lineage.py b/metadata-ingestion/tests/unit/sql_parsing/test_sqlglot_lineage.py index 5559ebe1756a6..3b9fa0d55f18d 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/test_sqlglot_lineage.py +++ b/metadata-ingestion/tests/unit/sql_parsing/test_sqlglot_lineage.py @@ -675,6 +675,20 @@ def test_teradata_default_normalization(): ) +def test_teradata_strange_operators(): + assert_sql_result( + """ +select col1, col2 from dbc.table1 +where col1 eq 'value1' +minus +select col1, col2 from dbc.table2 +""", + dialect="teradata", + default_schema="dbc", + expected_file=RESOURCE_DIR / "test_teradata_strange_operators.json", + ) + + def test_snowflake_update_hardcoded(): assert_sql_result( """ From b565a657d2235b82e65dfbe0bfcc11c97c3d9b79 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Mon, 30 Oct 2023 23:35:12 -0700 Subject: [PATCH 9/9] feat(quickstart): fix broker InconsistentClusterIdException issues (#9148) --- docker/docker-compose-with-cassandra.yml | 6 +++++- docker/docker-compose-without-neo4j.yml | 6 +++++- docker/docker-compose.yml | 6 +++++- docker/quickstart/docker-compose-m1.quickstart.yml | 4 +++- .../docker-compose-without-neo4j-m1.quickstart.yml | 4 +++- .../quickstart/docker-compose-without-neo4j.quickstart.yml | 4 +++- docker/quickstart/docker-compose.quickstart.yml | 4 +++- 7 files changed, 27 insertions(+), 7 deletions(-) diff --git a/docker/docker-compose-with-cassandra.yml b/docker/docker-compose-with-cassandra.yml index 9543e67da07f2..39f4341600572 100644 --- a/docker/docker-compose-with-cassandra.yml +++ b/docker/docker-compose-with-cassandra.yml @@ -200,7 +200,10 @@ services: retries: 5 timeout: 5s volumes: - - zkdata:/var/lib/zookeeper + # See https://stackoverflow.com/a/61008432 for why we need two volumes. + # See also: https://docs.confluent.io/platform/current/installation/docker/operations/external-volumes.html#data-volumes-for-kafka-and-zk + - zkdata:/var/lib/zookeeper/data + - zklogs:/var/lib/zookeeper/log networks: default: name: datahub_network @@ -210,3 +213,4 @@ volumes: neo4jdata: broker: zkdata: + zklogs: diff --git a/docker/docker-compose-without-neo4j.yml b/docker/docker-compose-without-neo4j.yml index 022362782f742..235e89e340551 100644 --- a/docker/docker-compose-without-neo4j.yml +++ b/docker/docker-compose-without-neo4j.yml @@ -174,7 +174,10 @@ services: retries: 3 timeout: 5s volumes: - - zkdata:/var/lib/zookeeper + # See https://stackoverflow.com/a/61008432 for why we need two volumes. + # See also: https://docs.confluent.io/platform/current/installation/docker/operations/external-volumes.html#data-volumes-for-kafka-and-zk + - zkdata:/var/lib/zookeeper/data + - zklogs:/var/lib/zookeeper/log networks: default: name: datahub_network @@ -182,3 +185,4 @@ volumes: esdata: broker: zkdata: + zklogs: diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index a486689e050a2..46da8c6fdbd2a 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -195,7 +195,10 @@ services: retries: 3 timeout: 5s volumes: - - zkdata:/var/lib/zookeeper + # See https://stackoverflow.com/a/61008432 for why we need two volumes. + # See also: https://docs.confluent.io/platform/current/installation/docker/operations/external-volumes.html#data-volumes-for-kafka-and-zk + - zkdata:/var/lib/zookeeper/data + - zklogs:/var/lib/zookeeper/log networks: default: name: datahub_network @@ -204,3 +207,4 @@ volumes: neo4jdata: broker: zkdata: + zklogs: diff --git a/docker/quickstart/docker-compose-m1.quickstart.yml b/docker/quickstart/docker-compose-m1.quickstart.yml index c5de687d335b9..3b6d02c83d0f0 100644 --- a/docker/quickstart/docker-compose-m1.quickstart.yml +++ b/docker/quickstart/docker-compose-m1.quickstart.yml @@ -300,7 +300,8 @@ services: ports: - ${DATAHUB_MAPPED_ZK_PORT:-2181}:2181 volumes: - - zkdata:/var/lib/zookeeper + - zkdata:/var/lib/zookeeper/data + - zklogs:/var/lib/zookeeper/log version: '3.9' volumes: broker: null @@ -308,3 +309,4 @@ volumes: mysqldata: null neo4jdata: null zkdata: null + zklogs: null diff --git a/docker/quickstart/docker-compose-without-neo4j-m1.quickstart.yml b/docker/quickstart/docker-compose-without-neo4j-m1.quickstart.yml index b6935f24c5ce2..e45bafc3da480 100644 --- a/docker/quickstart/docker-compose-without-neo4j-m1.quickstart.yml +++ b/docker/quickstart/docker-compose-without-neo4j-m1.quickstart.yml @@ -274,10 +274,12 @@ services: ports: - ${DATAHUB_MAPPED_ZK_PORT:-2181}:2181 volumes: - - zkdata:/var/lib/zookeeper + - zkdata:/var/lib/zookeeper/data + - zklogs:/var/lib/zookeeper/log version: '3.9' volumes: broker: null esdata: null mysqldata: null zkdata: null + zklogs: null diff --git a/docker/quickstart/docker-compose-without-neo4j.quickstart.yml b/docker/quickstart/docker-compose-without-neo4j.quickstart.yml index 4ff8bbd70da85..020ef5e9a97b9 100644 --- a/docker/quickstart/docker-compose-without-neo4j.quickstart.yml +++ b/docker/quickstart/docker-compose-without-neo4j.quickstart.yml @@ -274,10 +274,12 @@ services: ports: - ${DATAHUB_MAPPED_ZK_PORT:-2181}:2181 volumes: - - zkdata:/var/lib/zookeeper + - zkdata:/var/lib/zookeeper/data + - zklogs:/var/lib/zookeeper/log version: '3.9' volumes: broker: null esdata: null mysqldata: null zkdata: null + zklogs: null diff --git a/docker/quickstart/docker-compose.quickstart.yml b/docker/quickstart/docker-compose.quickstart.yml index f2950ebab2c9d..8adc2b9063b84 100644 --- a/docker/quickstart/docker-compose.quickstart.yml +++ b/docker/quickstart/docker-compose.quickstart.yml @@ -300,7 +300,8 @@ services: ports: - ${DATAHUB_MAPPED_ZK_PORT:-2181}:2181 volumes: - - zkdata:/var/lib/zookeeper + - zkdata:/var/lib/zookeeper/data + - zklogs:/var/lib/zookeeper/log version: '3.9' volumes: broker: null @@ -308,3 +309,4 @@ volumes: mysqldata: null neo4jdata: null zkdata: null + zklogs: null