From 04216e30bb2a1eab1993e48276d9f8e52d6b3121 Mon Sep 17 00:00:00 2001 From: Tamas Nemeth Date: Sat, 21 Oct 2023 17:21:54 +0200 Subject: [PATCH] feat(ingest/s3): S3 add partition to schema (#8900) Co-authored-by: Pedro Silva --- .../src/datahub/ingestion/source/s3/config.py | 5 ++- .../src/datahub/ingestion/source/s3/source.py | 33 +++++++++++++++++++ 2 files changed, 37 insertions(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/s3/config.py b/metadata-ingestion/src/datahub/ingestion/source/s3/config.py index 9b5296f0b9dd50..3ef6476078f6fb 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/s3/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/s3/config.py @@ -75,7 +75,10 @@ class DataLakeSourceConfig( default=100, description="Maximum number of rows to use when inferring schemas for TSV and CSV files.", ) - + add_partition_columns_to_schema: bool = Field( + default=False, + description="Whether to add partition fields to the schema.", + ) verify_ssl: Union[bool, str] = Field( default=True, description="Either a boolean, in which case it controls whether we verify the server's TLS certificate, or a string, in which case it must be a path to a CA bundle to use.", diff --git a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py index eb49fcbb268c0a..94c571eabad11a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py @@ -78,6 +78,7 @@ NullTypeClass, NumberTypeClass, RecordTypeClass, + SchemaField, SchemaFieldDataType, SchemaMetadata, StringTypeClass, @@ -90,6 +91,7 @@ OperationClass, OperationTypeClass, OtherSchemaClass, + SchemaFieldDataTypeClass, _Aspect, ) from datahub.telemetry import stats, telemetry @@ -458,8 +460,39 @@ def get_fields(self, table_data: TableData, path_spec: PathSpec) -> List: logger.debug(f"Extracted fields in schema: {fields}") fields = sorted(fields, key=lambda f: f.fieldPath) + if self.source_config.add_partition_columns_to_schema: + self.add_partition_columns_to_schema( + fields=fields, path_spec=path_spec, full_path=table_data.full_path + ) + return fields + def add_partition_columns_to_schema( + self, path_spec: PathSpec, full_path: str, fields: List[SchemaField] + ) -> None: + is_fieldpath_v2 = False + for field in fields: + if field.fieldPath.startswith("[version=2.0]"): + is_fieldpath_v2 = True + break + vars = path_spec.get_named_vars(full_path) + if vars is not None and "partition_key" in vars: + for partition_key in vars["partition_key"].values(): + fields.append( + SchemaField( + fieldPath=f"{partition_key}" + if not is_fieldpath_v2 + else f"[version=2.0].[type=string].{partition_key}", + nativeDataType="string", + type=SchemaFieldDataType(StringTypeClass()) + if not is_fieldpath_v2 + else SchemaFieldDataTypeClass(type=StringTypeClass()), + isPartitioningKey=True, + nullable=True, + recursive=False, + ) + ) + def get_table_profile( self, table_data: TableData, dataset_urn: str ) -> Iterable[MetadataWorkUnit]: