From e8e22c2b54add8826540c7d3062e9575beafa4af Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Wed, 21 Feb 2024 18:34:56 -0800 Subject: [PATCH] Update to use field_ids from field metadata in arrow2 fork --- Cargo.lock | 2 +- Cargo.toml | 5 +-- src/daft-parquet/src/file.rs | 64 +++++++++++++++++++++--------------- src/daft-table/src/lib.rs | 2 +- 4 files changed, 42 insertions(+), 31 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6521e7095e..76a93d64ad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -105,7 +105,7 @@ dependencies = [ [[package]] name = "arrow2" version = "0.17.4" -source = "git+https://github.com/Eventual-Inc/arrow2?rev=d5685eebf1d65c3f3d854370ad39f93dcd91971a#d5685eebf1d65c3f3d854370ad39f93dcd91971a" +source = "git+https://github.com/Eventual-Inc/arrow2?rev=79654e8475#79654e8475e5e31b3e60a7d29e2a8fc7edac58ed" dependencies = [ "ahash", "arrow-format", diff --git a/Cargo.toml b/Cargo.toml index eaf82e3000..261a9b80c3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -112,10 +112,11 @@ tokio-util = "0.7.8" url = "2.4.0" [workspace.dependencies.arrow2] -# branch = "daft-fork" +# TODO: Update this to daft-fork +# branch = "jay/fd-add-rename-test" git = "https://github.com/Eventual-Inc/arrow2" package = "arrow2" -rev = "d5685eebf1d65c3f3d854370ad39f93dcd91971a" +rev = "79654e8475" [workspace.dependencies.bincode] version = "1.3.3" diff --git a/src/daft-parquet/src/file.rs b/src/daft-parquet/src/file.rs index 09878ed9d2..2e21eefb34 100644 --- a/src/daft-parquet/src/file.rs +++ b/src/daft-parquet/src/file.rs @@ -325,7 +325,7 @@ pub(crate) struct RowGroupRange { pub(crate) struct ParquetFileReader { uri: String, metadata: Arc, - arrow_schema: arrow2::datatypes::SchemaRef, + arrow_schema_from_pq: arrow2::datatypes::SchemaRef, row_ranges: Arc>, field_id_mapping: Option>>, } @@ -354,25 +354,25 @@ impl ParquetFileReader { fn new( uri: String, metadata: parquet2::metadata::FileMetaData, - arrow_schema: arrow2::datatypes::Schema, + arrow_schema_from_pq: arrow2::datatypes::Schema, row_ranges: Vec, field_id_mapping: Option>>, ) -> super::Result { Ok(ParquetFileReader { uri, metadata: Arc::new(metadata), - arrow_schema: arrow_schema.into(), + arrow_schema_from_pq: arrow_schema_from_pq.into(), row_ranges: Arc::new(row_ranges), field_id_mapping, }) } pub fn arrow_schema(&self) -> &Arc { - &self.arrow_schema + &self.arrow_schema_from_pq } fn naive_read_plan(&self) -> super::Result { - let arrow_fields = &self.arrow_schema.fields; + let arrow_fields = &self.arrow_schema_from_pq.fields; let mut read_planner = ReadPlanner::new(&self.uri); @@ -430,10 +430,24 @@ impl ParquetFileReader { ) -> DaftResult { let metadata = self.metadata; let all_handles = self - .arrow_schema + .arrow_schema_from_pq .fields .iter() - .map(|field| { + .map(|pq_arrow_field| { + // Retrieve the appropriate field name from the field_id mapping + // TODO: We should do this recursively as well + let target_field_name = if let (Some(field_id_mapping), Some(field_id)) = ( + self.field_id_mapping.as_ref(), + pq_arrow_field.metadata.get("field_id"), + ) { + field_id_mapping + .get(&str::parse::(field_id).unwrap()) + .map(|f| f.name.clone()) + .unwrap_or(pq_arrow_field.name.clone()) + } else { + pq_arrow_field.name.clone() + }; + let owned_row_ranges = self.row_ranges.clone(); let field_handles = owned_row_ranges @@ -441,7 +455,7 @@ impl ParquetFileReader { .map(|row_range| { let row_range = *row_range; let rt_handle = tokio::runtime::Handle::current(); - let field = field.clone(); + let pq_arrow_field = pq_arrow_field.clone(); let owned_uri = self.uri.clone(); let rg = metadata .row_groups @@ -449,11 +463,12 @@ impl ParquetFileReader { .expect("Row Group index should be in bounds"); let num_rows = rg.num_rows().min(row_range.start + row_range.num_rows); let columns = rg.columns(); - let field_name = &field.name; let filtered_cols_idx = columns .iter() .enumerate() - .filter(|(_, x)| &x.descriptor().path_in_schema[0] == field_name) + .filter(|(_, x)| { + x.descriptor().path_in_schema[0] == pq_arrow_field.name + }) .map(|(i, _)| i) .collect::>(); @@ -471,6 +486,7 @@ impl ParquetFileReader { }) .collect::>(); let metadata = metadata.clone(); + let cloned_target_field_name = target_field_name.clone(); let handle = tokio::task::spawn(async move { let mut decompressed_iters = Vec::with_capacity(filtered_cols_idx.len()); @@ -516,7 +532,7 @@ impl ParquetFileReader { let arr_iter = column_iter_to_arrays( decompressed_iters, ptypes.iter().collect(), - field.clone(), + pq_arrow_field.clone(), Some(2048), num_rows, ); @@ -544,7 +560,7 @@ impl ParquetFileReader { .into_iter() .map(|a| { Series::try_from(( - field.name.as_str(), + cloned_target_field_name.as_str(), cast_array_for_daft_if_needed(a), )) }) @@ -559,7 +575,7 @@ impl ParquetFileReader { }) .collect::>>()?; let owned_uri = self.uri.clone(); - let owned_field = field.clone(); + let owned_field = pq_arrow_field.clone(); let concated_handle = tokio::task::spawn(async move { let series_to_concat = try_join_all(field_handles).await.context(JoinSnafu { @@ -595,30 +611,24 @@ impl ParquetFileReader { })? .into_iter() .collect::>>()?; - let daft_schema = daft_core::schema::Schema::try_from(self.arrow_schema.as_ref())?; + let daft_schema = daft_core::schema::Schema::try_from(self.arrow_schema_from_pq.as_ref())?; + // Apply `field_id_mapping` to the parsed daft_schema and data (if provided) let column_name_mapping = self .field_id_mapping .map(|field_id_mapping| get_column_name_mapping(&field_id_mapping, metadata.schema())); - let (daft_schema, all_series) = if let Some(column_name_mapping) = column_name_mapping { + let daft_schema = if let Some(column_name_mapping) = column_name_mapping { let new_daft_fields = daft_schema.fields.into_iter().map(|(name, field)| { match column_name_mapping.get(&name) { None => field, Some(mapped_name) => field.rename(mapped_name), } }); - let new_daft_schema = daft_core::schema::Schema::new(new_daft_fields.collect()) - .expect("Daft schema should be constructed from column name mapping"); - let new_all_series = all_series - .into_iter() - .map(|s| match column_name_mapping.get(s.name()) { - None => s, - Some(mapped_name) => s.rename(mapped_name), - }) - .collect(); - (new_daft_schema, new_all_series) + + daft_core::schema::Schema::new(new_daft_fields.collect()) + .expect("Daft schema should be constructed from column name mapping") } else { - (daft_schema, all_series) + daft_schema }; Table::new(daft_schema, all_series) @@ -630,7 +640,7 @@ impl ParquetFileReader { ) -> DaftResult>>> { let metadata = self.metadata; let all_handles = self - .arrow_schema + .arrow_schema_from_pq .fields .iter() .map(|field| { diff --git a/src/daft-table/src/lib.rs b/src/daft-table/src/lib.rs index 0dcd16544c..2a406b1f96 100644 --- a/src/daft-table/src/lib.rs +++ b/src/daft-table/src/lib.rs @@ -43,7 +43,7 @@ impl Table { let mut num_rows = 1; for (field, series) in schema.fields.values().zip(columns.iter()) { - if field != series.field() { + if field.name != series.field().name || field.dtype != series.field().dtype { return Err(DaftError::SchemaMismatch(format!("While building a Table, we found that the Schema Field and the Series Field did not match. schema field: {field} vs series field: {}", series.field()))); } if (series.len() != 1) && (series.len() != num_rows) {