Skip to content

Commit

Permalink
Refactor read_parquet_into_micropartition
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Chia committed Feb 24, 2024
1 parent 3c78161 commit 1343aa5
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 113 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/daft-micropartition/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pyo3 = {workspace = true, optional = true}
pyo3-log = {workspace = true}
serde = {workspace = true}
snafu = {workspace = true}
tokio = {workspace = true}

[features]
default = ["python"]
Expand Down
221 changes: 108 additions & 113 deletions src/daft-micropartition/src/micropartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use snafu::ResultExt;
use crate::PyIOSnafu;
use crate::{DaftCSVSnafu, DaftCoreComputeSnafu};

use daft_io::{IOConfig, IOStatsContext, IOStatsRef};
use daft_io::{IOClient, IOConfig, IOStatsContext, IOStatsRef};
use daft_stats::TableMetadata;
use daft_stats::TableStatistics;

Expand Down Expand Up @@ -422,6 +422,7 @@ impl MicroPartition {
&ParquetSchemaInferenceOptions {
coerce_int96_timestamp_unit: *coerce_int96_timestamp_unit,
},
Some(schema.clone()),
field_id_mapping,
)
.context(DaftCoreComputeSnafu)?;
Expand Down Expand Up @@ -697,29 +698,66 @@ pub(crate) fn read_json_into_micropartition(
}
}

// TODO: Deduplicate this with the other `rename_schema_recursively` function in file.rs
fn rename_schema_recursively(
daft_schema: Schema,
field_id_mapping: &BTreeMap<i32, Field>,
) -> DaftResult<Schema> {
Schema::new(
daft_schema
.fields
.into_iter()
.map(|(_, field)| {
if let Some(field_id) = field.metadata.get("field_id") {
let field_id = str::parse::<i32>(field_id).unwrap();
let mapped_field = field_id_mapping.get(&field_id);
match mapped_field {
None => field,
Some(mapped_field) => field.rename(&mapped_field.name),
}
} else {
field
}
})
.collect(),
)
#[allow(clippy::too_many_arguments)]
fn _read_parquet_into_loaded_micropartition(
io_client: Arc<IOClient>,
runtime_handle: Arc<tokio::runtime::Runtime>,
uris: &[&str],
columns: Option<&[&str]>,
start_offset: Option<usize>,
num_rows: Option<usize>,
row_groups: Option<Vec<Option<Vec<i64>>>>,
predicate: Option<ExprRef>,
io_stats: Option<IOStatsRef>,
num_parallel_tasks: usize,
schema_infer_options: &ParquetSchemaInferenceOptions,
catalog_provided_schema: Option<SchemaRef>,
field_id_mapping: &Option<Arc<BTreeMap<i32, Field>>>,
) -> DaftResult<MicroPartition> {
let all_tables = read_parquet_bulk(
uris,
columns,
start_offset,
num_rows,
row_groups,
predicate,
io_client,
io_stats,
num_parallel_tasks,
runtime_handle,
schema_infer_options,
field_id_mapping,
)?;

// Prefer using the `catalog_provided_schema` but fall back onto inferred schema from Parquet files
let full_daft_schema = match catalog_provided_schema {
Some(catalog_provided_schema) => catalog_provided_schema,
None => {
let unioned_schema = all_tables
.iter()
.map(|t| t.schema.clone())
.try_reduce(|l, r| DaftResult::Ok(l.union(&r)?.into()))?;
unioned_schema.expect("we need at least 1 schema")
}
};

// Hack to avoid to owned schema
let full_daft_schema = Schema {
fields: full_daft_schema.fields.clone(),
};
let pruned_daft_schema = prune_fields_from_schema(full_daft_schema, columns)?;

let all_tables = all_tables
.into_iter()
.map(|t| t.cast_to_schema(&pruned_daft_schema))
.collect::<DaftResult<Vec<_>>>()?;

// TODO: we can pass in stats here to optimize downstream workloads such as join
Ok(MicroPartition::new_loaded(
Arc::new(pruned_daft_schema),
all_tables.into(),
None,
))
}

#[allow(clippy::too_many_arguments)]
Expand All @@ -735,6 +773,7 @@ pub(crate) fn read_parquet_into_micropartition(
num_parallel_tasks: usize,
multithreaded_io: bool,
schema_infer_options: &ParquetSchemaInferenceOptions,
catalog_provided_schema: Option<SchemaRef>,
field_id_mapping: &Option<Arc<BTreeMap<i32, Field>>>,
) -> DaftResult<MicroPartition> {
if let Some(so) = start_offset && so > 0 {
Expand All @@ -745,44 +784,23 @@ pub(crate) fn read_parquet_into_micropartition(
let runtime_handle = daft_io::get_runtime(multithreaded_io)?;
let io_client = daft_io::get_io_client(multithreaded_io, io_config.clone())?;

if let Some(predicate) = predicate {
if predicate.is_some() {
// We have a predicate, so we will perform eager read only reading what row groups we need.
let all_tables = read_parquet_bulk(
return _read_parquet_into_loaded_micropartition(
io_client,
runtime_handle,
uris,
columns,
None,
start_offset,
num_rows,
row_groups,
Some(predicate.clone()),
io_client,
predicate,
io_stats,
num_parallel_tasks,
runtime_handle,
schema_infer_options,
catalog_provided_schema,
field_id_mapping,
)?;

let unioned_schema = all_tables
.iter()
.map(|t| t.schema.clone())
.try_reduce(|l, r| DaftResult::Ok(l.union(&r)?.into()))?;
let full_daft_schema = unioned_schema.expect("we need at least 1 schema");
// Hack to avoid to owned schema
let full_daft_schema = Schema {
fields: full_daft_schema.fields.clone(),
};
let pruned_daft_schema = prune_fields_from_schema(full_daft_schema, columns)?;

let all_tables = all_tables
.into_iter()
.map(|t| t.cast_to_schema(&pruned_daft_schema))
.collect::<DaftResult<Vec<_>>>()?;
// TODO: we can pass in stats here to optimize downstream workloads such as join
return Ok(MicroPartition::new_loaded(
Arc::new(pruned_daft_schema),
all_tables.into(),
None,
));
);
}

let meta_io_client = io_client.clone();
Expand Down Expand Up @@ -824,46 +842,40 @@ pub(crate) fn read_parquet_into_micropartition(
None
};

// Union and prune the schema using the specified `columns`
let resolved_schemas = if let Some(field_id_mapping) = field_id_mapping {
pq_file_schemas
.into_iter()
.map(|pq_file_schema| {
rename_schema_recursively(pq_file_schema, field_id_mapping.as_ref())
})
.collect::<DaftResult<Vec<_>>>()?
} else {
pq_file_schemas
};
let unioned_schema = resolved_schemas
.into_iter()
.try_reduce(|l, r| l.union(&r))?;
let full_daft_schema = unioned_schema.expect("we need at least 1 schema");
let pruned_daft_schema = prune_fields_from_schema(full_daft_schema, columns)?;
if let Some(stats) = stats {
// Statistics are provided by the Parquet file, so we create an unloaded MicroPartition
// by constructing an appropriate ScanTask

// Prefer using the `catalog_provided_schema` but fall back onto inferred schema from Parquet files
let scan_task_daft_schema = match catalog_provided_schema {
Some(catalog_provided_schema) => catalog_provided_schema,
None => {
let unioned_schema = pq_file_schemas.into_iter().try_reduce(|l, r| l.union(&r))?;
Arc::new(unioned_schema.expect("we need at least 1 schema"))
}
};

// Get total number of rows, accounting for selected `row_groups` and the indicated `num_rows`
let total_rows_no_limit = match &row_groups {
None => metadata.iter().map(|fm| fm.num_rows).sum(),
Some(row_groups) => metadata
.iter()
.zip(row_groups.iter())
.map(|(fm, rg)| match rg {
Some(rg) => rg
.iter()
.map(|rg_idx| fm.row_groups.get(*rg_idx as usize).unwrap().num_rows())
.sum::<usize>(),
None => fm.num_rows,
})
.sum(),
};
let total_rows = num_rows
.map(|num_rows| num_rows.min(total_rows_no_limit))
.unwrap_or(total_rows_no_limit);
// Get total number of rows, accounting for selected `row_groups` and the indicated `num_rows`
let total_rows_no_limit = match &row_groups {
None => metadata.iter().map(|fm| fm.num_rows).sum(),
Some(row_groups) => metadata
.iter()
.zip(row_groups.iter())
.map(|(fm, rg)| match rg {
Some(rg) => rg
.iter()
.map(|rg_idx| fm.row_groups.get(*rg_idx as usize).unwrap().num_rows())
.sum::<usize>(),
None => fm.num_rows,
})
.sum(),
};
let total_rows = num_rows
.map(|num_rows| num_rows.min(total_rows_no_limit))
.unwrap_or(total_rows_no_limit);

if let Some(stats) = stats {
let owned_urls = uris.iter().map(|s| s.to_string()).collect::<Vec<_>>();

let daft_schema = Arc::new(pruned_daft_schema);
let size_bytes = metadata
.iter()
.map(|m| -> u64 {
Expand Down Expand Up @@ -891,7 +903,7 @@ pub(crate) fn read_parquet_into_micropartition(
field_id_mapping: field_id_mapping.clone(),
})
.into(),
daft_schema.clone(),
scan_task_daft_schema,
StorageConfig::Native(
NativeStorageConfig::new_internal(
multithreaded_io,
Expand All @@ -909,45 +921,28 @@ pub(crate) fn read_parquet_into_micropartition(
),
);

let exprs = daft_schema
.fields
.keys()
.map(|n| daft_dsl::col(n.as_str()))
.collect::<Vec<_>>();

// use schema to update stats
let stats = stats.eval_expression_list(exprs.as_slice(), daft_schema.as_ref())?;

Ok(MicroPartition::new_unloaded(
scan_task.materialized_schema(),
Arc::new(scan_task),
TableMetadata { length: total_rows },
stats,
))
} else {
let all_tables = read_parquet_bulk(
_read_parquet_into_loaded_micropartition(
io_client,
runtime_handle,
uris,
columns,
start_offset,
num_rows,
row_groups,
None,
io_client,
predicate,
io_stats,
num_parallel_tasks,
runtime_handle,
schema_infer_options,
catalog_provided_schema,
field_id_mapping,
)?;
let all_tables = all_tables
.into_iter()
.map(|t| t.cast_to_schema(&pruned_daft_schema))
.collect::<DaftResult<Vec<_>>>()?;
Ok(MicroPartition::new_loaded(
Arc::new(pruned_daft_schema),
all_tables.into(),
None,
))
)
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/daft-micropartition/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,7 @@ impl PyMicroPartition {
1,
multithreaded_io.unwrap_or(true),
&schema_infer_options,
None,
&None,
)
})?;
Expand Down Expand Up @@ -584,6 +585,7 @@ impl PyMicroPartition {
num_parallel_tasks.unwrap_or(128) as usize,
multithreaded_io.unwrap_or(true),
&schema_infer_options,
None,
&None,
)
})?;
Expand Down

0 comments on commit 1343aa5

Please sign in to comment.