Skip to content

Commit

Permalink
fix(ingestion/redshift): Fix for Redshift COPY-based lineage (datahub…
Browse files Browse the repository at this point in the history
  • Loading branch information
skrydal authored Oct 14, 2024
1 parent 38ac100 commit 3387110
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -334,19 +334,26 @@ def _process_view_lineage(self, lineage_row: LineageRow) -> None:
)

def _process_copy_command(self, lineage_row: LineageRow) -> None:
source = self._lineage_v1._get_sources(
logger.debug(f"Processing COPY command for lineage row: {lineage_row}")
sources = self._lineage_v1._get_sources(
lineage_type=LineageCollectorType.COPY,
db_name=self.database,
source_schema=None,
source_table=None,
ddl=None,
filename=lineage_row.filename,
)[0]
)
logger.debug(f"Recognized sources: {sources}")
source = sources[0]
if not source:
logger.debug("Ignoring command since couldn't recognize proper source")
return
s3_urn = source[0].urn

logger.debug(f"Recognized s3 dataset urn: {s3_urn}")
if not lineage_row.target_schema or not lineage_row.target_table:
logger.debug(
f"Didn't find target schema (found: {lineage_row.target_schema}) or target table (found: {lineage_row.target_table})"
)
return
target = self._make_filtered_target(lineage_row)
if not target:
Expand Down
89 changes: 28 additions & 61 deletions metadata-ingestion/src/datahub/ingestion/source/redshift/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,34 @@ def alter_table_rename_query(
AND SYS.query_text ILIKE '%alter table % rename to %'
"""

@staticmethod
def list_copy_commands_sql(
db_name: str, start_time: datetime, end_time: datetime
) -> str:
return """
select
distinct
"schema" as target_schema,
"table" as target_table,
c.file_name as filename
from
SYS_QUERY_DETAIL as si
join SYS_LOAD_DETAIL as c on
si.query_id = c.query_id
join SVV_TABLE_INFO sti on
sti.table_id = si.table_id
where
database = '{db_name}'
and si.start_time >= '{start_time}'
and si.start_time < '{end_time}'
order by target_schema, target_table, si.start_time asc
""".format(
# We need the original database name for filtering
db_name=db_name,
start_time=start_time.strftime(redshift_datetime_format),
end_time=end_time.strftime(redshift_datetime_format),
)

@staticmethod
def additional_table_metadata_query() -> str:
raise NotImplementedError
Expand Down Expand Up @@ -317,12 +345,6 @@ def list_insert_create_queries_sql(
) -> str:
raise NotImplementedError

@staticmethod
def list_copy_commands_sql(
db_name: str, start_time: datetime, end_time: datetime
) -> str:
raise NotImplementedError


class RedshiftProvisionedQuery(RedshiftCommonQuery):
@staticmethod
Expand Down Expand Up @@ -536,34 +558,6 @@ def list_insert_create_queries_sql(
end_time=end_time.strftime(redshift_datetime_format),
)

@staticmethod
def list_copy_commands_sql(
db_name: str, start_time: datetime, end_time: datetime
) -> str:
return """
select
distinct
"schema" as target_schema,
"table" as target_table,
filename
from
stl_insert as si
join stl_load_commits as c on
si.query = c.query
join SVV_TABLE_INFO sti on
sti.table_id = tbl
where
database = '{db_name}'
and si.starttime >= '{start_time}'
and si.starttime < '{end_time}'
order by target_schema, target_table, starttime asc
""".format(
# We need the original database name for filtering
db_name=db_name,
start_time=start_time.strftime(redshift_datetime_format),
end_time=end_time.strftime(redshift_datetime_format),
)

@staticmethod
def temp_table_ddl_query(start_time: datetime, end_time: datetime) -> str:
start_time_str: str = start_time.strftime(redshift_datetime_format)
Expand Down Expand Up @@ -941,33 +935,6 @@ def list_insert_create_queries_sql(

# when loading from s3 using prefix with a single file it produces 2 lines (for file and just directory) - also
# behaves like this when run in the old way
@staticmethod
def list_copy_commands_sql(
db_name: str, start_time: datetime, end_time: datetime
) -> str:
return """
select
distinct
"schema" as target_schema,
"table" as target_table,
c.file_name
from
SYS_QUERY_DETAIL as si
join SYS_LOAD_DETAIL as c on
si.query_id = c.query_id
join SVV_TABLE_INFO sti on
sti.table_id = si.table_id
where
database = '{db_name}'
and si.start_time >= '{start_time}'
and si.start_time < '{end_time}'
order by target_schema, target_table, si.start_time asc
""".format(
# We need the original database name for filtering
db_name=db_name,
start_time=start_time.strftime(redshift_datetime_format),
end_time=end_time.strftime(redshift_datetime_format),
)

# handles "create table IF ..." statements wrong probably - "create command" field contains only "create table if" in such cases
# also similar happens if for example table name contains special characters quoted with " i.e. "test-table1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,9 @@ def add_known_lineage_mapping(
upstream_urn: The upstream dataset URN.
downstream_urn: The downstream dataset URN.
"""

logger.debug(
f"Adding lineage to the map, downstream: {downstream_urn}, upstream: {upstream_urn}"
)
self.report.num_known_mapping_lineage += 1

# We generate a fake "query" object to hold the lineage.
Expand Down

0 comments on commit 3387110

Please sign in to comment.