From 5af4ee903b29ec7077fab9371f3d7c3a5bd04e0c Mon Sep 17 00:00:00 2001 From: Jay Chia <17691182+jaychia@users.noreply.github.com> Date: Wed, 6 Mar 2024 14:11:01 -0800 Subject: [PATCH] [CHORE] Simplify cast to schema (#1982) This PR makes some changes to how we use `cast_to_schema` in our codebase, and also some subtle fixes which makes our code easier to refactor later on for field IDs. ## Reducing user error with schemas I added a bunch of documentation to our structs, detailing what the correct semantics for each `schema` field is on ScanTask and MicroPartition. Additinally: 1. `MicroPartition::new_unloaded` no longer takes in as input a schema. Instead, the unloaded MicroPartition's schema is simply its ScanTask's `materialized_schema`. 2. `materialize_scan_task` now does not take as input a `cast_to_schema` argument. Instead, it just uses the ScanTask's `materialized_schema` and `partition_spec()` for the fill map to correctly coerce all the materialized Tables. ## Refactoring the `read_parquet_into_micropartition` megafunction **Share code**: Move shared code into a `_read_parquet_into_loaded_micropartition` helper, which is called in 2 locations from `read_parquet_into_micropartition`. **Fix the Unloaded case**: Previously in the unloaded micropartition case, we were creating a `ScanTask` using the schema inferrred from the Parquet file. This is *wrong* behavior! I added a new `catalog_provided_schema` argument that is now correctly used when creating MicroPartitions in this function, in both the loaded and unloaded case. **Casting**: we now perform casting inside of `read_parquet_into_micropartition`: 1. When we eagerly create loaded micropartitions, we call `cast_to_schema_with_fill` on the materialized tables 2. When we create unloaded micropartitions, we correctly create the ScanTask with the right schema, partition_spec and column selection so that later on when we materialize the MicroPartition, we correctly call `cast_to_schema_with_fill` on each Table. Also called `cast_to_schema_with_fill` on the stats. --------- Co-authored-by: Jay Chia --- Cargo.lock | 1 + src/daft-micropartition/Cargo.toml | 1 + src/daft-micropartition/src/micropartition.rs | 258 +++++++++++------- .../src/ops/cast_to_schema.rs | 40 +-- src/daft-micropartition/src/python.rs | 2 + src/daft-scan/src/lib.rs | 4 + 6 files changed, 182 insertions(+), 124 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1b3d855dc3..cf57416e06 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1343,6 +1343,7 @@ dependencies = [ "pyo3-log", "serde", "snafu", + "tokio", ] [[package]] diff --git a/src/daft-micropartition/Cargo.toml b/src/daft-micropartition/Cargo.toml index 5c7e026dc2..a1a8df1a1e 100644 --- a/src/daft-micropartition/Cargo.toml +++ b/src/daft-micropartition/Cargo.toml @@ -18,6 +18,7 @@ pyo3 = {workspace = true, optional = true} pyo3-log = {workspace = true} serde = {workspace = true} snafu = {workspace = true} +tokio = {workspace = true} [features] default = ["python"] diff --git a/src/daft-micropartition/src/micropartition.rs b/src/daft-micropartition/src/micropartition.rs index 4f1aa147cc..654fe2695f 100644 --- a/src/daft-micropartition/src/micropartition.rs +++ b/src/daft-micropartition/src/micropartition.rs @@ -26,7 +26,7 @@ use snafu::ResultExt; use crate::PyIOSnafu; use crate::{DaftCSVSnafu, DaftCoreComputeSnafu}; -use daft_io::{IOConfig, IOStatsContext, IOStatsRef}; +use daft_io::{IOClient, IOConfig, IOStatsContext, IOStatsRef}; use daft_stats::TableStatistics; use daft_stats::{PartitionSpec, TableMetadata}; @@ -60,23 +60,39 @@ impl Display for TableState { } } pub(crate) struct MicroPartition { + /// Schema of the MicroPartition + /// + /// This is technically redundant with the schema in `state`: + /// 1. If [`TableState::Loaded`]: the schema should match every underlying [`Table`] + /// 2. If [`TableState::Unloaded`]: the schema should match the underlying [`ScanTask::materialized_schema`] + /// + /// However this is still useful as an easy-to-access copy of the schema, as well as to handle the corner-case + /// of having 0 underlying [`Table`] objects (in an empty [`MicroPartition`]) pub(crate) schema: SchemaRef, + + /// State of the MicroPartition. Can be Loaded or Unloaded. pub(crate) state: Mutex, + + /// Metadata about the MicroPartition pub(crate) metadata: TableMetadata, + + /// Statistics about the MicroPartition + /// + /// If present, this must have the same [`Schema`] as [`MicroPartition::schema`], and this invariant + /// is enforced in the `MicroPartition::new_*` constructors. pub(crate) statistics: Option, } -/// Helper to run all the IO and compute required to materialize a ScanTask into a Vec +/// Helper to run all the IO and compute required to materialize a [`ScanTask`] into a `Vec
` +/// +/// All [`Table`] objects returned will have the same [`Schema`] as [`ScanTask::materialized_schema`]. /// /// # Arguments /// /// * `scan_task` - a batch of ScanTasks to materialize as Tables -/// * `cast_to_schema` - an Optional schema to cast all the resulting Tables to. If not provided, will use the schema -/// provided by the ScanTask /// * `io_stats` - an optional IOStats object to record the IO operations performed fn materialize_scan_task( scan_task: Arc, - cast_to_schema: Option, io_stats: Option, ) -> crate::Result<(Vec
, SchemaRef)> { let column_names = scan_task @@ -288,9 +304,8 @@ fn materialize_scan_task( } }; - // Schema to cast resultant tables into, ensuring that all Tables have the same schema. - // Note that we need to apply column pruning here if specified by the ScanTask - let cast_to_schema = cast_to_schema.unwrap_or_else(|| scan_task.materialized_schema()); + // Ensure that all Tables have the schema as specified by [`ScanTask::materialized_schema`] + let cast_to_schema = scan_task.materialized_schema(); // If there is a partition spec and partition values aren't duplicated in the data, inline the partition values // into the table when casting the schema. @@ -310,11 +325,11 @@ impl MicroPartition { /// Schema invariants: /// 1. Each Loaded column statistic in `statistics` must be castable to the corresponding column in the MicroPartition's schema pub fn new_unloaded( - schema: SchemaRef, scan_task: Arc, metadata: TableMetadata, statistics: TableStatistics, ) -> Self { + let schema = scan_task.materialized_schema(); let fill_map = scan_task.partition_spec().map(|pspec| pspec.to_fill_map()); let statistics = statistics .cast_to_schema_with_fill(schema.clone(), fill_map.as_ref()) @@ -373,7 +388,6 @@ impl MicroPartition { // CASE: ScanTask provides all required metadata. // If the scan_task provides metadata (e.g. retrieved from a catalog) we can use it to create an unloaded MicroPartition (Some(metadata), Some(statistics), _, _) => Ok(Self::new_unloaded( - schema, scan_task.clone(), metadata.clone(), statistics.clone(), @@ -401,7 +415,7 @@ impl MicroPartition { .map(|cols| cols.iter().map(|s| s.as_str()).collect::>()); let row_groups = parquet_sources_to_row_groups(scan_task.sources.as_slice()); - let mp = read_parquet_into_micropartition( + read_parquet_into_micropartition( uris.as_slice(), columns.as_deref(), None, @@ -419,19 +433,16 @@ impl MicroPartition { &ParquetSchemaInferenceOptions { coerce_int96_timestamp_unit: *coerce_int96_timestamp_unit, }, + Some(schema.clone()), ) - .context(DaftCoreComputeSnafu)?; - - let fill_map = scan_task.partition_spec().map(|pspec| pspec.to_fill_map()); - mp.cast_to_schema_with_fill(schema, fill_map.as_ref()) - .context(DaftCoreComputeSnafu) + .context(DaftCoreComputeSnafu) } // CASE: Last resort fallback option // Perform an eager **data** read _ => { let statistics = scan_task.statistics.clone(); - let (tables, schema) = materialize_scan_task(scan_task, None, Some(io_stats))?; + let (tables, schema) = materialize_scan_task(scan_task, Some(io_stats))?; Ok(Self::new_loaded(schema, Arc::new(tables), statistics)) } } @@ -475,11 +486,7 @@ impl MicroPartition { let mut guard = self.state.lock().unwrap(); match guard.deref() { TableState::Unloaded(scan_task) => { - let (tables, _) = materialize_scan_task( - scan_task.clone(), - Some(self.schema.clone()), - Some(io_stats), - )?; + let (tables, _) = materialize_scan_task(scan_task.clone(), Some(io_stats))?; let table_values = Arc::new(tables); // Cache future accesses by setting the state to TableState::Loaded @@ -547,7 +554,10 @@ impl MicroPartition { } } -fn prune_fields_from_schema(schema: Schema, columns: Option<&[&str]>) -> DaftResult { +fn prune_fields_from_schema( + schema: Arc, + columns: Option<&[&str]>, +) -> DaftResult> { if let Some(columns) = columns { let avail_names = schema .fields @@ -567,11 +577,13 @@ fn prune_fields_from_schema(schema: Schema, columns: Option<&[&str]>) -> DaftRes } } let filtered_columns = schema + .as_ref() .fields - .into_values() + .values() .filter(|field| names_to_keep.contains(field.name.as_str())) + .cloned() .collect::>(); - Schema::new(filtered_columns) + Ok(Arc::new(Schema::new(filtered_columns)?)) } else { Ok(schema) } @@ -693,6 +705,65 @@ pub(crate) fn read_json_into_micropartition( } } +#[allow(clippy::too_many_arguments)] +fn _read_parquet_into_loaded_micropartition( + io_client: Arc, + runtime_handle: Arc, + uris: &[&str], + columns: Option<&[&str]>, + start_offset: Option, + num_rows: Option, + row_groups: Option>>>, + predicate: Option, + partition_spec: Option<&PartitionSpec>, + io_stats: Option, + num_parallel_tasks: usize, + schema_infer_options: &ParquetSchemaInferenceOptions, + catalog_provided_schema: Option, +) -> DaftResult { + let all_tables = read_parquet_bulk( + uris, + columns, + start_offset, + num_rows, + row_groups, + predicate, + io_client, + io_stats, + num_parallel_tasks, + runtime_handle, + schema_infer_options, + )?; + + // Prefer using the `catalog_provided_schema` but fall back onto inferred schema from Parquet files + let full_daft_schema = match catalog_provided_schema { + Some(catalog_provided_schema) => catalog_provided_schema, + None => { + let unioned_schema = all_tables + .iter() + .map(|t| t.schema.clone()) + .try_reduce(|l, r| DaftResult::Ok(l.union(&r)?.into()))?; + unioned_schema.expect("we need at least 1 schema") + } + }; + + let pruned_daft_schema = prune_fields_from_schema(full_daft_schema, columns)?; + + let fill_map = partition_spec.map(|pspec| pspec.to_fill_map()); + let all_tables = all_tables + .into_iter() + .map(|t| t.cast_to_schema_with_fill(&pruned_daft_schema, fill_map.as_ref())) + .collect::>>()?; + + // TODO: we can pass in stats here to optimize downstream workloads such as join. Make sure to correctly + // cast those statistics to the appropriate schema + fillmap as well. + Ok(MicroPartition::new_loaded( + pruned_daft_schema, + all_tables.into(), + None, + )) +} + #[allow(clippy::too_many_arguments)] pub(crate) fn read_parquet_into_micropartition( uris: &[&str], @@ -707,6 +778,7 @@ pub(crate) fn read_parquet_into_micropartition( num_parallel_tasks: usize, multithreaded_io: bool, schema_infer_options: &ParquetSchemaInferenceOptions, + catalog_provided_schema: Option, ) -> DaftResult { if let Some(so) = start_offset && so > 0 { return Err(common_error::DaftError::ValueError("Micropartition Parquet Reader does not support non-zero start offsets".to_string())); @@ -715,52 +787,32 @@ pub(crate) fn read_parquet_into_micropartition( // Run the required I/O to retrieve all the Parquet FileMetaData let runtime_handle = daft_io::get_runtime(multithreaded_io)?; let io_client = daft_io::get_io_client(multithreaded_io, io_config.clone())?; - if let Some(predicate) = predicate { - // We have a predicate, so we will perform eager read only reading what row groups we need. - let all_tables = read_parquet_bulk( + + // If we have a predicate, perform an eager read only reading what row groups we need. + if predicate.is_some() { + return _read_parquet_into_loaded_micropartition( + io_client, + runtime_handle, uris, columns, - None, + start_offset, num_rows, row_groups, - Some(predicate.clone()), - io_client, + predicate, + partition_spec, io_stats, num_parallel_tasks, - runtime_handle, schema_infer_options, - )?; - - let unioned_schema = all_tables - .iter() - .map(|t| t.schema.clone()) - .try_reduce(|l, r| DaftResult::Ok(l.union(&r)?.into()))?; - let full_daft_schema = unioned_schema.expect("we need at least 1 schema"); - // Hack to avoid to owned schema - let full_daft_schema = Schema { - fields: full_daft_schema.fields.clone(), - }; - let pruned_daft_schema = prune_fields_from_schema(full_daft_schema, columns)?; - - let all_tables = all_tables - .into_iter() - .map(|t| t.cast_to_schema(&pruned_daft_schema)) - .collect::>>()?; - // TODO: we can pass in stats here to optimize downstream workloads such as join - return Ok(MicroPartition::new_loaded( - Arc::new(pruned_daft_schema), - all_tables.into(), - None, - )); + catalog_provided_schema, + ); } + // Attempt to read TableStatistics from the Parquet file let meta_io_client = io_client.clone(); let meta_io_stats = io_stats.clone(); let metadata = runtime_handle.block_on(async move { read_parquet_metadata_bulk(uris, meta_io_client, meta_io_stats).await })?; - - // Deserialize and collect relevant TableStatistics let schemas = metadata .iter() .map(|m| { @@ -789,40 +841,45 @@ pub(crate) fn read_parquet_into_micropartition( None }; - // Union and prune the schema using the specified `columns` - let unioned_schema = schemas.into_iter().try_reduce(|l, r| l.union(&r))?; - let full_daft_schema = unioned_schema.expect("we need at least 1 schema"); - let pruned_daft_schema = prune_fields_from_schema(full_daft_schema, columns)?; + // If statistics are provided by the Parquet file, we create an unloaded MicroPartition + // by constructing an appropriate ScanTask + if let Some(stats) = stats { + // Prefer using the `catalog_provided_schema` but fall back onto inferred schema from Parquet files + let scan_task_daft_schema = match catalog_provided_schema { + Some(catalog_provided_schema) => catalog_provided_schema, + None => { + let unioned_schema = schemas.into_iter().try_reduce(|l, r| l.union(&r))?; + Arc::new(unioned_schema.expect("we need at least 1 schema")) + } + }; - // Get total number of rows, accounting for selected `row_groups` and the indicated `num_rows` - let total_rows_no_limit = match &row_groups { - None => metadata.iter().map(|fm| fm.num_rows).sum(), - Some(row_groups) => metadata - .iter() - .zip(row_groups.iter()) - .map(|(fm, rg)| match rg { - Some(rg) => rg - .iter() - .map(|rg_idx| fm.row_groups.get(*rg_idx as usize).unwrap().num_rows()) - .sum::(), - None => fm.num_rows, - }) - .sum(), - }; - let total_rows = num_rows - .map(|num_rows| num_rows.min(total_rows_no_limit)) - .unwrap_or(total_rows_no_limit); + // Get total number of rows, accounting for selected `row_groups` and the indicated `num_rows` + let total_rows_no_limit = match &row_groups { + None => metadata.iter().map(|fm| fm.num_rows).sum(), + Some(row_groups) => metadata + .iter() + .zip(row_groups.iter()) + .map(|(fm, rg)| match rg { + Some(rg) => rg + .iter() + .map(|rg_idx| fm.row_groups.get(*rg_idx as usize).unwrap().num_rows()) + .sum::(), + None => fm.num_rows, + }) + .sum(), + }; + let total_rows = num_rows + .map(|num_rows| num_rows.min(total_rows_no_limit)) + .unwrap_or(total_rows_no_limit); - if let Some(stats) = stats { let owned_urls = uris.iter().map(|s| s.to_string()).collect::>(); - - let daft_schema = Arc::new(pruned_daft_schema); let size_bytes = metadata .iter() .map(|m| -> u64 { std::iter::Sum::sum(m.row_groups.iter().map(|m| m.total_byte_size() as u64)) }) .sum(); + let scan_task = ScanTask::new( owned_urls .into_iter() @@ -843,7 +900,7 @@ pub(crate) fn read_parquet_into_micropartition( coerce_int96_timestamp_unit: schema_infer_options.coerce_int96_timestamp_unit, }) .into(), - daft_schema.clone(), + scan_task_daft_schema, StorageConfig::Native( NativeStorageConfig::new_internal( multithreaded_io, @@ -861,43 +918,32 @@ pub(crate) fn read_parquet_into_micropartition( ), ); - let exprs = daft_schema - .fields - .keys() - .map(|n| daft_dsl::col(n.as_str())) - .collect::>(); - // use schema to update stats - let stats = stats.eval_expression_list(exprs.as_slice(), daft_schema.as_ref())?; + let fill_map = scan_task.partition_spec().map(|pspec| pspec.to_fill_map()); + let casted_stats = + stats.cast_to_schema_with_fill(scan_task.materialized_schema(), fill_map.as_ref())?; Ok(MicroPartition::new_unloaded( - scan_task.materialized_schema(), Arc::new(scan_task), TableMetadata { length: total_rows }, - stats, + casted_stats, )) } else { - let all_tables = read_parquet_bulk( + // If no TableStatistics are available, we perform an eager read + _read_parquet_into_loaded_micropartition( + io_client, + runtime_handle, uris, columns, start_offset, num_rows, row_groups, - None, - io_client, + predicate, + partition_spec, io_stats, num_parallel_tasks, - runtime_handle, schema_infer_options, - )?; - let all_tables = all_tables - .into_iter() - .map(|t| t.cast_to_schema(&pruned_daft_schema)) - .collect::>>()?; - Ok(MicroPartition::new_loaded( - Arc::new(pruned_daft_schema), - all_tables.into(), - None, - )) + catalog_provided_schema, + ) } } diff --git a/src/daft-micropartition/src/ops/cast_to_schema.rs b/src/daft-micropartition/src/ops/cast_to_schema.rs index 0f383ed330..a09cc885db 100644 --- a/src/daft-micropartition/src/ops/cast_to_schema.rs +++ b/src/daft-micropartition/src/ops/cast_to_schema.rs @@ -1,44 +1,48 @@ -use std::{collections::HashMap, ops::Deref, sync::Arc}; +use std::{ops::Deref, sync::Arc}; use common_error::DaftResult; use daft_core::schema::SchemaRef; -use daft_dsl::Expr; +use daft_scan::ScanTask; use crate::micropartition::{MicroPartition, TableState}; impl MicroPartition { pub fn cast_to_schema(&self, schema: SchemaRef) -> DaftResult { - self.cast_to_schema_with_fill(schema, None) - } - - pub fn cast_to_schema_with_fill( - &self, - schema: SchemaRef, - fill_map: Option<&HashMap<&str, Expr>>, - ) -> DaftResult { let schema_owned = schema.clone(); let pruned_statistics = self .statistics .as_ref() - .map(|stats| stats.cast_to_schema_with_fill(schema_owned, fill_map)) + .map(|stats| stats.cast_to_schema(schema_owned)) .transpose()?; let guard = self.state.lock().unwrap(); match guard.deref() { // Replace schema if Unloaded, which should be applied when data is lazily loaded - TableState::Unloaded(scan_task) => Ok(MicroPartition::new_unloaded( - schema.clone(), - scan_task.clone(), - self.metadata.clone(), - pruned_statistics.expect("Unloaded MicroPartition should have statistics"), - )), + TableState::Unloaded(scan_task) => { + let maybe_new_scan_task = if scan_task.schema == schema { + scan_task.clone() + } else { + Arc::new(ScanTask::new( + scan_task.sources.clone(), + scan_task.file_format_config.clone(), + schema, + scan_task.storage_config.clone(), + scan_task.pushdowns.clone(), + )) + }; + Ok(MicroPartition::new_unloaded( + maybe_new_scan_task, + self.metadata.clone(), + pruned_statistics.expect("Unloaded MicroPartition should have statistics"), + )) + } // If Tables are already loaded, we map `Table::cast_to_schema` on each Table TableState::Loaded(tables) => Ok(MicroPartition::new_loaded( schema.clone(), Arc::new( tables .iter() - .map(|tbl| tbl.cast_to_schema_with_fill(schema.as_ref(), fill_map)) + .map(|tbl| tbl.cast_to_schema(schema.as_ref())) .collect::>>()?, ), pruned_statistics, diff --git a/src/daft-micropartition/src/python.rs b/src/daft-micropartition/src/python.rs index 3cdd3216c9..2a7c487f27 100644 --- a/src/daft-micropartition/src/python.rs +++ b/src/daft-micropartition/src/python.rs @@ -544,6 +544,7 @@ impl PyMicroPartition { 1, multithreaded_io.unwrap_or(true), &schema_infer_options, + None, ) })?; Ok(mp.into()) @@ -585,6 +586,7 @@ impl PyMicroPartition { num_parallel_tasks.unwrap_or(128) as usize, multithreaded_io.unwrap_or(true), &schema_infer_options, + None, ) })?; Ok(mp.into()) diff --git a/src/daft-scan/src/lib.rs b/src/daft-scan/src/lib.rs index 10856738a2..1f7023a2b8 100644 --- a/src/daft-scan/src/lib.rs +++ b/src/daft-scan/src/lib.rs @@ -256,6 +256,10 @@ pub struct ScanTask { pub sources: Vec, /// Schema to use when reading the DataFileSources. + /// + /// Schema to use when reading the DataFileSources. This should always be passed in by the + /// ScanOperator implementation and should not have had any "pruning" applied. + /// /// Note that this is different than the schema of the data after pushdowns have been applied, /// which can be obtained with [`ScanTask::materialized_schema`] instead. pub schema: SchemaRef,