From 412ffbcb86d160f408906f8b6ef9d6b05ff69c74 Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Sat, 24 Feb 2024 00:08:47 -0800 Subject: [PATCH] Cleanup materialize_scan_task and documentation of MicroPartition --- src/daft-micropartition/src/micropartition.rs | 38 ++++++++++++------- 1 file changed, 25 insertions(+), 13 deletions(-) diff --git a/src/daft-micropartition/src/micropartition.rs b/src/daft-micropartition/src/micropartition.rs index fcec91427a..93a0acd652 100644 --- a/src/daft-micropartition/src/micropartition.rs +++ b/src/daft-micropartition/src/micropartition.rs @@ -60,23 +60,39 @@ impl Display for TableState { } } pub(crate) struct MicroPartition { + /// Schema of the MicroPartition + /// + /// This is technically redundant with the schema in `state`: + /// 1. If [`TableState::Loaded`]: the schema should match every underlying [`Table`] + /// 2. If [`TableState::Unloaded`]: the schema should match the underlying [`ScanTask::materialized_schema`] + /// + /// However this is still useful as an easy-to-access copy of the schema, as well as to handle the corner-case + /// of having 0 underlying [`Table`] objects (in an empty [`MicroPartition`]) pub(crate) schema: SchemaRef, + + /// State of the MicroPartition. Can be Loaded or Unloaded. pub(crate) state: Mutex, + + /// Metadata about the MicroPartition pub(crate) metadata: TableMetadata, + + /// Statistics about the MicroPartition + /// + /// If present, this must have the same [`Schema`] as [`MicroPartition::schema`], and this invariant + /// is enforced in the `MicroPartition::new_*` constructors. pub(crate) statistics: Option, } -/// Helper to run all the IO and compute required to materialize a ScanTask into a Vec +/// Helper to run all the IO and compute required to materialize a [`ScanTask`] into a `Vec
` +/// +/// All [`Table`] objects returned will have the same [`Schema`] as [`ScanTask::materialized_schema`]. /// /// # Arguments /// /// * `scan_task` - a batch of ScanTasks to materialize as Tables -/// * `cast_to_schema` - an Optional schema to cast all the resulting Tables to. If not provided, will use the schema -/// provided by the ScanTask /// * `io_stats` - an optional IOStats object to record the IO operations performed fn materialize_scan_task( scan_task: Arc, - cast_to_schema: Option, io_stats: Option, ) -> crate::Result<(Vec
, SchemaRef)> { let column_names = scan_task @@ -290,9 +306,9 @@ fn materialize_scan_task( } }; - // Schema to cast resultant tables into, ensuring that all Tables have the same schema. - // Note that we need to apply column pruning here if specified by the ScanTask - let cast_to_schema = cast_to_schema.unwrap_or_else(|| scan_task.materialized_schema()); + // Ensure that all Tables have the schema as specified by the ScanTask + // Note that we need to apply column pruning here if specified + let cast_to_schema = scan_task.materialized_schema(); let casted_table_values = table_values .iter() @@ -431,7 +447,7 @@ impl MicroPartition { // Perform an eager **data** read _ => { let statistics = scan_task.statistics.clone(); - let (tables, schema) = materialize_scan_task(scan_task, None, Some(io_stats))?; + let (tables, schema) = materialize_scan_task(scan_task, Some(io_stats))?; Ok(Self::new_loaded(schema, Arc::new(tables), statistics)) } } @@ -477,11 +493,7 @@ impl MicroPartition { let mut guard = self.state.lock().unwrap(); match guard.deref() { TableState::Unloaded(scan_task) => { - let (tables, _) = materialize_scan_task( - scan_task.clone(), - Some(self.schema.clone()), - Some(io_stats), - )?; + let (tables, _) = materialize_scan_task(scan_task.clone(), Some(io_stats))?; let table_values = Arc::new(tables); // Cache future accesses by setting the state to TableState::Loaded