From 57d245131fd90cac789ea9a63d4a17be53dbff24 Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Wed, 21 Feb 2024 19:18:29 -0800 Subject: [PATCH] Add todos --- src/daft-micropartition/src/python.rs | 4 ++-- src/daft-parquet/src/file.rs | 6 ++++-- src/daft-parquet/src/read.rs | 2 +- src/daft-scan/src/file_format.rs | 4 ++-- 4 files changed, 9 insertions(+), 7 deletions(-) diff --git a/src/daft-micropartition/src/python.rs b/src/daft-micropartition/src/python.rs index 704987ded0..4c16e2b70f 100644 --- a/src/daft-micropartition/src/python.rs +++ b/src/daft-micropartition/src/python.rs @@ -543,7 +543,7 @@ impl PyMicroPartition { 1, multithreaded_io.unwrap_or(true), &schema_infer_options, - &None, // TODO: pass in field_id mapping + &None, ) })?; Ok(mp.into()) @@ -584,7 +584,7 @@ impl PyMicroPartition { num_parallel_tasks.unwrap_or(128) as usize, multithreaded_io.unwrap_or(true), &schema_infer_options, - &None, // TODO: pass in field_id mapping + &None, ) })?; Ok(mp.into()) diff --git a/src/daft-parquet/src/file.rs b/src/daft-parquet/src/file.rs index efb67f784f..b6b96d9227 100644 --- a/src/daft-parquet/src/file.rs +++ b/src/daft-parquet/src/file.rs @@ -250,6 +250,7 @@ impl ParquetReaderBuilder { } pub fn prune_columns>(mut self, columns: &[S]) -> super::Result { + // TODO: perform pruning of columns on names AFTER applying the field_id_mapping let avail_names = self .parquet_schema() .fields() @@ -439,8 +440,7 @@ impl ParquetFileReader { .fields .iter() .map(|pq_arrow_field| { - // Retrieve the appropriate field name from the field_id mapping - // TODO: We should do this recursively as well + // Retrieve the intended target field name (might differ from the Parquet field name, depending on the field_id_mapping) let target_field_name = if let (Some(field_id_mapping), Some(field_id)) = ( self.field_id_mapping.as_ref(), pq_arrow_field.metadata.get("field_id"), @@ -564,6 +564,8 @@ impl ParquetFileReader { all_arrays .into_iter() .map(|a| { + // TODO: Need to perform recursive renaming of Series here. Hopefully arrow array + // has the correct metadata and that was correctly transferred to the Series... Series::try_from(( cloned_target_field_name.as_str(), cast_array_for_daft_if_needed(a), diff --git a/src/daft-parquet/src/read.rs b/src/daft-parquet/src/read.rs index 53f8862810..f932d02349 100644 --- a/src/daft-parquet/src/read.rs +++ b/src/daft-parquet/src/read.rs @@ -379,7 +379,7 @@ pub fn read_parquet( io_client, io_stats, schema_infer_options, - None, // TODO: Add field_id_mapping + None, ) .await }) diff --git a/src/daft-scan/src/file_format.rs b/src/daft-scan/src/file_format.rs index 69702d446c..098962d3df 100644 --- a/src/daft-scan/src/file_format.rs +++ b/src/daft-scan/src/file_format.rs @@ -111,10 +111,10 @@ impl ParquetSourceConfig { )); if let Some(mapping) = &self.field_id_mapping { res.push(format!( - "Field ID to column names = [{}]", + "Field ID to Fields = {{{}}}", mapping .iter() - .map(|(f, c)| format!("{f}: {c}")) + .map(|(fid, f)| format!("{fid}: {f}")) .collect::>() .join(",") ));