From 0038a87e97cd7fd5f34a076a0cf46ebfbc8a5b33 Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Mon, 26 Feb 2024 16:28:32 -0800 Subject: [PATCH] Add recursive renaming of schema but needs rebase on iceberg schema changes --- src/daft-parquet/src/file.rs | 82 ++++++++++++++++++++++++++++++------ 1 file changed, 68 insertions(+), 14 deletions(-) diff --git a/src/daft-parquet/src/file.rs b/src/daft-parquet/src/file.rs index 782d9e81cb..6cda2804c8 100644 --- a/src/daft-parquet/src/file.rs +++ b/src/daft-parquet/src/file.rs @@ -6,7 +6,7 @@ use std::{ use arrow2::io::parquet::read::schema::infer_schema_with_options; use common_error::DaftResult; use daft_core::{ - datatypes::Field, schema::Schema, utils::arrow::cast_array_for_daft_if_needed, Series, + datatypes::Field, schema::Schema, utils::arrow::cast_array_for_daft_if_needed, DataType, Series, }; use daft_dsl::ExprRef; use daft_io::{IOClient, IOStatsRef}; @@ -105,23 +105,77 @@ fn rename_schema_recursively( daft_schema: Schema, field_id_mapping: &BTreeMap, ) -> DaftResult { - // TODO: perform this recursively + fn rename_dtype_recursively( + dtype: &DataType, + field_id_mapping: &BTreeMap, + ) -> Option { + match dtype { + // Ensure recursive renaming for nested types + DataType::List(child) => rename_dtype_recursively(child.as_ref(), field_id_mapping) + .map(|new_dtype| DataType::List(Box::new(new_dtype))), + DataType::FixedSizeList(child, size) => { + rename_dtype_recursively(child.as_ref(), field_id_mapping) + .map(|new_dtype| DataType::FixedSizeList(Box::new(new_dtype), *size)) + } + DataType::Struct(original_children) => { + let new_fields = original_children + .iter() + .map(|field| rename_field_recursively(field, field_id_mapping)) + .collect::>(); + if new_fields.iter().all(|f| f.is_none()) { + None + } else { + Some(DataType::Struct( + new_fields + .into_iter() + .zip(original_children.iter()) + .map(|(maybe_new_field, old_field)| { + maybe_new_field.unwrap_or_else(|| old_field.clone()) + }) + .collect(), + )) + } + } + // All other types are renamed only at the top-level + _ => None, + } + } + + fn rename_field_recursively( + field: &Field, + field_id_mapping: &BTreeMap, + ) -> Option { + let new_name = if let Some(field_id) = field.metadata.get("field_id") { + let field_id = str::parse::(field_id).unwrap(); + let mapped_field = field_id_mapping.get(&field_id); + mapped_field.map(|mapped_field| &mapped_field.name) + } else { + None + }; + let new_dtype = rename_dtype_recursively(&field.dtype, field_id_mapping); + + match (new_name, new_dtype) { + (None, None) => None, + (new_name, new_dtype) => Some( + Field::new( + new_name.unwrap_or(&field.name), + new_dtype.unwrap_or(field.dtype.clone()), + ) + .with_metadata(field.metadata.clone()), + ), + } + } + Schema::new( daft_schema .fields .into_iter() - .map(|(_, field)| { - if let Some(field_id) = field.metadata.get("field_id") { - let field_id = str::parse::(field_id).unwrap(); - let mapped_field = field_id_mapping.get(&field_id); - match mapped_field { - None => field, - Some(mapped_field) => field.rename(&mapped_field.name), - } - } else { - field - } - }) + .map( + |(_, field)| match rename_field_recursively(&field, field_id_mapping) { + None => field, + Some(new_field) => new_field, + }, + ) .collect(), ) }