Skip to content

Commit

Permalink
Add recursive renaming of schema but needs rebase on iceberg schema c…
Browse files Browse the repository at this point in the history
…hanges
  • Loading branch information
Jay Chia committed Mar 4, 2024
1 parent ffbf56b commit 0038a87
Showing 1 changed file with 68 additions and 14 deletions.
82 changes: 68 additions & 14 deletions src/daft-parquet/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
use arrow2::io::parquet::read::schema::infer_schema_with_options;
use common_error::DaftResult;
use daft_core::{
datatypes::Field, schema::Schema, utils::arrow::cast_array_for_daft_if_needed, Series,
datatypes::Field, schema::Schema, utils::arrow::cast_array_for_daft_if_needed, DataType, Series,
};
use daft_dsl::ExprRef;
use daft_io::{IOClient, IOStatsRef};
Expand Down Expand Up @@ -105,23 +105,77 @@ fn rename_schema_recursively(
daft_schema: Schema,
field_id_mapping: &BTreeMap<i32, Field>,
) -> DaftResult<Schema> {
// TODO: perform this recursively
fn rename_dtype_recursively(
dtype: &DataType,
field_id_mapping: &BTreeMap<i32, Field>,
) -> Option<DataType> {
match dtype {
// Ensure recursive renaming for nested types
DataType::List(child) => rename_dtype_recursively(child.as_ref(), field_id_mapping)
.map(|new_dtype| DataType::List(Box::new(new_dtype))),
DataType::FixedSizeList(child, size) => {
rename_dtype_recursively(child.as_ref(), field_id_mapping)
.map(|new_dtype| DataType::FixedSizeList(Box::new(new_dtype), *size))
}
DataType::Struct(original_children) => {
let new_fields = original_children
.iter()
.map(|field| rename_field_recursively(field, field_id_mapping))
.collect::<Vec<_>>();
if new_fields.iter().all(|f| f.is_none()) {
None
} else {
Some(DataType::Struct(
new_fields
.into_iter()
.zip(original_children.iter())
.map(|(maybe_new_field, old_field)| {
maybe_new_field.unwrap_or_else(|| old_field.clone())
})
.collect(),
))
}
}
// All other types are renamed only at the top-level
_ => None,
}
}

fn rename_field_recursively(
field: &Field,
field_id_mapping: &BTreeMap<i32, Field>,
) -> Option<Field> {
let new_name = if let Some(field_id) = field.metadata.get("field_id") {
let field_id = str::parse::<i32>(field_id).unwrap();
let mapped_field = field_id_mapping.get(&field_id);
mapped_field.map(|mapped_field| &mapped_field.name)
} else {
None
};
let new_dtype = rename_dtype_recursively(&field.dtype, field_id_mapping);

match (new_name, new_dtype) {
(None, None) => None,
(new_name, new_dtype) => Some(
Field::new(
new_name.unwrap_or(&field.name),
new_dtype.unwrap_or(field.dtype.clone()),
)
.with_metadata(field.metadata.clone()),
),
}
}

Schema::new(
daft_schema
.fields
.into_iter()
.map(|(_, field)| {
if let Some(field_id) = field.metadata.get("field_id") {
let field_id = str::parse::<i32>(field_id).unwrap();
let mapped_field = field_id_mapping.get(&field_id);
match mapped_field {
None => field,
Some(mapped_field) => field.rename(&mapped_field.name),
}
} else {
field
}
})
.map(
|(_, field)| match rename_field_recursively(&field, field_id_mapping) {
None => field,
Some(new_field) => new_field,
},
)
.collect(),
)
}
Expand Down

0 comments on commit 0038a87

Please sign in to comment.