From 3387110b411d91c7a08baaab76e29caa3bcb3a28 Mon Sep 17 00:00:00 2001 From: skrydal Date: Mon, 14 Oct 2024 11:35:34 +0200 Subject: [PATCH] fix(ingestion/redshift): Fix for Redshift COPY-based lineage (#11552) --- .../ingestion/source/redshift/lineage_v2.py | 13 ++- .../ingestion/source/redshift/query.py | 89 ++++++------------- .../sql_parsing/sql_parsing_aggregator.py | 4 +- 3 files changed, 41 insertions(+), 65 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage_v2.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage_v2.py index 4b7f710beed08f..4df64c80bad8a8 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage_v2.py @@ -334,19 +334,26 @@ def _process_view_lineage(self, lineage_row: LineageRow) -> None: ) def _process_copy_command(self, lineage_row: LineageRow) -> None: - source = self._lineage_v1._get_sources( + logger.debug(f"Processing COPY command for lineage row: {lineage_row}") + sources = self._lineage_v1._get_sources( lineage_type=LineageCollectorType.COPY, db_name=self.database, source_schema=None, source_table=None, ddl=None, filename=lineage_row.filename, - )[0] + ) + logger.debug(f"Recognized sources: {sources}") + source = sources[0] if not source: + logger.debug("Ignoring command since couldn't recognize proper source") return s3_urn = source[0].urn - + logger.debug(f"Recognized s3 dataset urn: {s3_urn}") if not lineage_row.target_schema or not lineage_row.target_table: + logger.debug( + f"Didn't find target schema (found: {lineage_row.target_schema}) or target table (found: {lineage_row.target_table})" + ) return target = self._make_filtered_target(lineage_row) if not target: diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/query.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/query.py index affbcd00b5107b..39370b93b561c5 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/query.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/query.py @@ -283,6 +283,34 @@ def alter_table_rename_query( AND SYS.query_text ILIKE '%alter table % rename to %' """ + @staticmethod + def list_copy_commands_sql( + db_name: str, start_time: datetime, end_time: datetime + ) -> str: + return """ + select + distinct + "schema" as target_schema, + "table" as target_table, + c.file_name as filename + from + SYS_QUERY_DETAIL as si + join SYS_LOAD_DETAIL as c on + si.query_id = c.query_id + join SVV_TABLE_INFO sti on + sti.table_id = si.table_id + where + database = '{db_name}' + and si.start_time >= '{start_time}' + and si.start_time < '{end_time}' + order by target_schema, target_table, si.start_time asc + """.format( + # We need the original database name for filtering + db_name=db_name, + start_time=start_time.strftime(redshift_datetime_format), + end_time=end_time.strftime(redshift_datetime_format), + ) + @staticmethod def additional_table_metadata_query() -> str: raise NotImplementedError @@ -317,12 +345,6 @@ def list_insert_create_queries_sql( ) -> str: raise NotImplementedError - @staticmethod - def list_copy_commands_sql( - db_name: str, start_time: datetime, end_time: datetime - ) -> str: - raise NotImplementedError - class RedshiftProvisionedQuery(RedshiftCommonQuery): @staticmethod @@ -536,34 +558,6 @@ def list_insert_create_queries_sql( end_time=end_time.strftime(redshift_datetime_format), ) - @staticmethod - def list_copy_commands_sql( - db_name: str, start_time: datetime, end_time: datetime - ) -> str: - return """ - select - distinct - "schema" as target_schema, - "table" as target_table, - filename - from - stl_insert as si - join stl_load_commits as c on - si.query = c.query - join SVV_TABLE_INFO sti on - sti.table_id = tbl - where - database = '{db_name}' - and si.starttime >= '{start_time}' - and si.starttime < '{end_time}' - order by target_schema, target_table, starttime asc - """.format( - # We need the original database name for filtering - db_name=db_name, - start_time=start_time.strftime(redshift_datetime_format), - end_time=end_time.strftime(redshift_datetime_format), - ) - @staticmethod def temp_table_ddl_query(start_time: datetime, end_time: datetime) -> str: start_time_str: str = start_time.strftime(redshift_datetime_format) @@ -941,33 +935,6 @@ def list_insert_create_queries_sql( # when loading from s3 using prefix with a single file it produces 2 lines (for file and just directory) - also # behaves like this when run in the old way - @staticmethod - def list_copy_commands_sql( - db_name: str, start_time: datetime, end_time: datetime - ) -> str: - return """ - select - distinct - "schema" as target_schema, - "table" as target_table, - c.file_name - from - SYS_QUERY_DETAIL as si - join SYS_LOAD_DETAIL as c on - si.query_id = c.query_id - join SVV_TABLE_INFO sti on - sti.table_id = si.table_id - where - database = '{db_name}' - and si.start_time >= '{start_time}' - and si.start_time < '{end_time}' - order by target_schema, target_table, si.start_time asc - """.format( - # We need the original database name for filtering - db_name=db_name, - start_time=start_time.strftime(redshift_datetime_format), - end_time=end_time.strftime(redshift_datetime_format), - ) # handles "create table IF ..." statements wrong probably - "create command" field contains only "create table if" in such cases # also similar happens if for example table name contains special characters quoted with " i.e. "test-table1" diff --git a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py index 52934f9f72a70e..5f2709fe426605 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py +++ b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py @@ -613,7 +613,9 @@ def add_known_lineage_mapping( upstream_urn: The upstream dataset URN. downstream_urn: The downstream dataset URN. """ - + logger.debug( + f"Adding lineage to the map, downstream: {downstream_urn}, upstream: {upstream_urn}" + ) self.report.num_known_mapping_lineage += 1 # We generate a fake "query" object to hold the lineage.