Skip to content

Commit

Permalink
Cleanup materialize_scan_task and documentation of MicroPartition
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Chia committed Feb 24, 2024
1 parent d5ec521 commit 412ffbc
Showing 1 changed file with 25 additions and 13 deletions.
38 changes: 25 additions & 13 deletions src/daft-micropartition/src/micropartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,23 +60,39 @@ impl Display for TableState {
}
}
pub(crate) struct MicroPartition {
/// Schema of the MicroPartition
///
/// This is technically redundant with the schema in `state`:
/// 1. If [`TableState::Loaded`]: the schema should match every underlying [`Table`]
/// 2. If [`TableState::Unloaded`]: the schema should match the underlying [`ScanTask::materialized_schema`]
///
/// However this is still useful as an easy-to-access copy of the schema, as well as to handle the corner-case
/// of having 0 underlying [`Table`] objects (in an empty [`MicroPartition`])
pub(crate) schema: SchemaRef,

/// State of the MicroPartition. Can be Loaded or Unloaded.
pub(crate) state: Mutex<TableState>,

/// Metadata about the MicroPartition
pub(crate) metadata: TableMetadata,

/// Statistics about the MicroPartition
///
/// If present, this must have the same [`Schema`] as [`MicroPartition::schema`], and this invariant
/// is enforced in the `MicroPartition::new_*` constructors.
pub(crate) statistics: Option<TableStatistics>,
}

/// Helper to run all the IO and compute required to materialize a ScanTask into a Vec<Table>
/// Helper to run all the IO and compute required to materialize a [`ScanTask`] into a `Vec<Table>`
///
/// All [`Table`] objects returned will have the same [`Schema`] as [`ScanTask::materialized_schema`].
///
/// # Arguments
///
/// * `scan_task` - a batch of ScanTasks to materialize as Tables
/// * `cast_to_schema` - an Optional schema to cast all the resulting Tables to. If not provided, will use the schema
/// provided by the ScanTask
/// * `io_stats` - an optional IOStats object to record the IO operations performed
fn materialize_scan_task(
scan_task: Arc<ScanTask>,
cast_to_schema: Option<SchemaRef>,
io_stats: Option<IOStatsRef>,
) -> crate::Result<(Vec<Table>, SchemaRef)> {
let column_names = scan_task
Expand Down Expand Up @@ -290,9 +306,9 @@ fn materialize_scan_task(
}
};

// Schema to cast resultant tables into, ensuring that all Tables have the same schema.
// Note that we need to apply column pruning here if specified by the ScanTask
let cast_to_schema = cast_to_schema.unwrap_or_else(|| scan_task.materialized_schema());
// Ensure that all Tables have the schema as specified by the ScanTask
// Note that we need to apply column pruning here if specified
let cast_to_schema = scan_task.materialized_schema();

let casted_table_values = table_values
.iter()
Expand Down Expand Up @@ -431,7 +447,7 @@ impl MicroPartition {
// Perform an eager **data** read
_ => {
let statistics = scan_task.statistics.clone();
let (tables, schema) = materialize_scan_task(scan_task, None, Some(io_stats))?;
let (tables, schema) = materialize_scan_task(scan_task, Some(io_stats))?;
Ok(Self::new_loaded(schema, Arc::new(tables), statistics))
}
}
Expand Down Expand Up @@ -477,11 +493,7 @@ impl MicroPartition {
let mut guard = self.state.lock().unwrap();
match guard.deref() {
TableState::Unloaded(scan_task) => {
let (tables, _) = materialize_scan_task(
scan_task.clone(),
Some(self.schema.clone()),
Some(io_stats),
)?;
let (tables, _) = materialize_scan_task(scan_task.clone(), Some(io_stats))?;
let table_values = Arc::new(tables);

// Cache future accesses by setting the state to TableState::Loaded
Expand Down

0 comments on commit 412ffbc

Please sign in to comment.