Skip to content

Commit

Permalink
Arc the BTreeMap for cheaper clones across parallel tokio tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Chia committed Feb 21, 2024
1 parent 792a857 commit 56562e5
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 11 deletions.
4 changes: 2 additions & 2 deletions src/daft-micropartition/src/micropartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -705,7 +705,7 @@ pub(crate) fn read_parquet_into_micropartition(
num_parallel_tasks: usize,
multithreaded_io: bool,
schema_infer_options: &ParquetSchemaInferenceOptions,
field_id_mapping: &Option<BTreeMap<i32, Field>>,
field_id_mapping: &Option<Arc<BTreeMap<i32, Field>>>,
) -> DaftResult<MicroPartition> {
if let Some(so) = start_offset && so > 0 {
return Err(common_error::DaftError::ValueError("Micropartition Parquet Reader does not support non-zero start offsets".to_string()));
Expand Down Expand Up @@ -842,7 +842,7 @@ pub(crate) fn read_parquet_into_micropartition(
.collect::<Vec<_>>(),
FileFormatConfig::Parquet(ParquetSourceConfig {
coerce_int96_timestamp_unit: schema_infer_options.coerce_int96_timestamp_unit,
field_id_mapping: field_id_mapping.clone(), // TODO: consider Arcing this, could be expensive to clone
field_id_mapping: field_id_mapping.clone(),
})
.into(),
daft_schema.clone(),
Expand Down
8 changes: 4 additions & 4 deletions src/daft-parquet/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub(crate) struct ParquetReaderBuilder {
row_groups: Option<Vec<i64>>,
schema_inference_options: ParquetSchemaInferenceOptions,
predicate: Option<ExprRef>,
field_id_mapping: Option<BTreeMap<i32, Field>>,
field_id_mapping: Option<Arc<BTreeMap<i32, Field>>>,
}
use parquet2::read::decompress;

Expand Down Expand Up @@ -274,7 +274,7 @@ impl ParquetReaderBuilder {
self
}

pub fn set_field_id_mapping(mut self, field_id_mapping: BTreeMap<i32, Field>) -> Self {
pub fn set_field_id_mapping(mut self, field_id_mapping: Arc<BTreeMap<i32, Field>>) -> Self {
self.field_id_mapping = Some(field_id_mapping);
self
}
Expand Down Expand Up @@ -327,7 +327,7 @@ pub(crate) struct ParquetFileReader {
metadata: Arc<parquet2::metadata::FileMetaData>,
arrow_schema: arrow2::datatypes::SchemaRef,
row_ranges: Arc<Vec<RowGroupRange>>,
field_id_mapping: Option<BTreeMap<i32, Field>>,
field_id_mapping: Option<Arc<BTreeMap<i32, Field>>>,
}

fn get_column_name_mapping(
Expand Down Expand Up @@ -356,7 +356,7 @@ impl ParquetFileReader {
metadata: parquet2::metadata::FileMetaData,
arrow_schema: arrow2::datatypes::Schema,
row_ranges: Vec<RowGroupRange>,
field_id_mapping: Option<BTreeMap<i32, Field>>,
field_id_mapping: Option<Arc<BTreeMap<i32, Field>>>,
) -> super::Result<Self> {
Ok(ParquetFileReader {
uri,
Expand Down
4 changes: 2 additions & 2 deletions src/daft-parquet/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ async fn read_parquet_single(
io_client: Arc<IOClient>,
io_stats: Option<IOStatsRef>,
schema_infer_options: ParquetSchemaInferenceOptions,
field_id_mapping: Option<BTreeMap<i32, Field>>,
field_id_mapping: Option<Arc<BTreeMap<i32, Field>>>,
) -> DaftResult<Table> {
let original_columns = columns;
let original_num_rows = num_rows;
Expand Down Expand Up @@ -427,7 +427,7 @@ pub fn read_parquet_bulk(
num_parallel_tasks: usize,
runtime_handle: Arc<Runtime>,
schema_infer_options: &ParquetSchemaInferenceOptions,
field_id_mapping: &Option<BTreeMap<i32, Field>>,
field_id_mapping: &Option<Arc<BTreeMap<i32, Field>>>,
) -> DaftResult<Vec<Table>> {
let _rt_guard = runtime_handle.enter();
let owned_columns = columns.map(|s| s.iter().map(|v| String::from(*v)).collect::<Vec<_>>());
Expand Down
9 changes: 6 additions & 3 deletions src/daft-scan/src/file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ pub struct ParquetSourceConfig {
/// data according to the provided field_ids.
///
/// See: https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift#L456-L459
pub field_id_mapping: Option<BTreeMap<i32, Field>>,
pub field_id_mapping: Option<Arc<BTreeMap<i32, Field>>>,
}

impl ParquetSourceConfig {
Expand Down Expand Up @@ -136,8 +136,11 @@ impl ParquetSourceConfig {
coerce_int96_timestamp_unit: coerce_int96_timestamp_unit
.unwrap_or(TimeUnit::Nanoseconds.into())
.into(),
field_id_mapping: field_id_mapping
.map(|map| BTreeMap::from_iter(map.into_iter().map(|(k, v)| (k, v.field)))),
field_id_mapping: field_id_mapping.map(|map| {
Arc::new(BTreeMap::from_iter(
map.into_iter().map(|(k, v)| (k, v.field)),
))
}),
}
}

Expand Down

0 comments on commit 56562e5

Please sign in to comment.