diff --git a/src/daft-micropartition/src/micropartition.rs b/src/daft-micropartition/src/micropartition.rs index de52d36f73..769a12d324 100644 --- a/src/daft-micropartition/src/micropartition.rs +++ b/src/daft-micropartition/src/micropartition.rs @@ -705,7 +705,7 @@ pub(crate) fn read_parquet_into_micropartition( num_parallel_tasks: usize, multithreaded_io: bool, schema_infer_options: &ParquetSchemaInferenceOptions, - field_id_mapping: &Option>, + field_id_mapping: &Option>>, ) -> DaftResult { if let Some(so) = start_offset && so > 0 { return Err(common_error::DaftError::ValueError("Micropartition Parquet Reader does not support non-zero start offsets".to_string())); @@ -842,7 +842,7 @@ pub(crate) fn read_parquet_into_micropartition( .collect::>(), FileFormatConfig::Parquet(ParquetSourceConfig { coerce_int96_timestamp_unit: schema_infer_options.coerce_int96_timestamp_unit, - field_id_mapping: field_id_mapping.clone(), // TODO: consider Arcing this, could be expensive to clone + field_id_mapping: field_id_mapping.clone(), }) .into(), daft_schema.clone(), diff --git a/src/daft-parquet/src/file.rs b/src/daft-parquet/src/file.rs index 3fef8a5bf9..09878ed9d2 100644 --- a/src/daft-parquet/src/file.rs +++ b/src/daft-parquet/src/file.rs @@ -39,7 +39,7 @@ pub(crate) struct ParquetReaderBuilder { row_groups: Option>, schema_inference_options: ParquetSchemaInferenceOptions, predicate: Option, - field_id_mapping: Option>, + field_id_mapping: Option>>, } use parquet2::read::decompress; @@ -274,7 +274,7 @@ impl ParquetReaderBuilder { self } - pub fn set_field_id_mapping(mut self, field_id_mapping: BTreeMap) -> Self { + pub fn set_field_id_mapping(mut self, field_id_mapping: Arc>) -> Self { self.field_id_mapping = Some(field_id_mapping); self } @@ -327,7 +327,7 @@ pub(crate) struct ParquetFileReader { metadata: Arc, arrow_schema: arrow2::datatypes::SchemaRef, row_ranges: Arc>, - field_id_mapping: Option>, + field_id_mapping: Option>>, } fn get_column_name_mapping( @@ -356,7 +356,7 @@ impl ParquetFileReader { metadata: parquet2::metadata::FileMetaData, arrow_schema: arrow2::datatypes::Schema, row_ranges: Vec, - field_id_mapping: Option>, + field_id_mapping: Option>>, ) -> super::Result { Ok(ParquetFileReader { uri, diff --git a/src/daft-parquet/src/read.rs b/src/daft-parquet/src/read.rs index b0496e8825..53f8862810 100644 --- a/src/daft-parquet/src/read.rs +++ b/src/daft-parquet/src/read.rs @@ -66,7 +66,7 @@ async fn read_parquet_single( io_client: Arc, io_stats: Option, schema_infer_options: ParquetSchemaInferenceOptions, - field_id_mapping: Option>, + field_id_mapping: Option>>, ) -> DaftResult { let original_columns = columns; let original_num_rows = num_rows; @@ -427,7 +427,7 @@ pub fn read_parquet_bulk( num_parallel_tasks: usize, runtime_handle: Arc, schema_infer_options: &ParquetSchemaInferenceOptions, - field_id_mapping: &Option>, + field_id_mapping: &Option>>, ) -> DaftResult> { let _rt_guard = runtime_handle.enter(); let owned_columns = columns.map(|s| s.iter().map(|v| String::from(*v)).collect::>()); diff --git a/src/daft-scan/src/file_format.rs b/src/daft-scan/src/file_format.rs index 2944c87b66..69702d446c 100644 --- a/src/daft-scan/src/file_format.rs +++ b/src/daft-scan/src/file_format.rs @@ -99,7 +99,7 @@ pub struct ParquetSourceConfig { /// data according to the provided field_ids. /// /// See: https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift#L456-L459 - pub field_id_mapping: Option>, + pub field_id_mapping: Option>>, } impl ParquetSourceConfig { @@ -136,8 +136,11 @@ impl ParquetSourceConfig { coerce_int96_timestamp_unit: coerce_int96_timestamp_unit .unwrap_or(TimeUnit::Nanoseconds.into()) .into(), - field_id_mapping: field_id_mapping - .map(|map| BTreeMap::from_iter(map.into_iter().map(|(k, v)| (k, v.field)))), + field_id_mapping: field_id_mapping.map(|map| { + Arc::new(BTreeMap::from_iter( + map.into_iter().map(|(k, v)| (k, v.field)), + )) + }), } }