Skip to content

Commit

Permalink
Refactor MicroPartition::new_unloaded and MicroPartition.cast_to_schema
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Chia committed Feb 24, 2024
1 parent 6fef22a commit 82ad347
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 19 deletions.
4 changes: 1 addition & 3 deletions src/daft-micropartition/src/micropartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,11 +308,11 @@ impl MicroPartition {
/// Schema invariants:
/// 1. Each Loaded column statistic in `statistics` must be castable to the corresponding column in the MicroPartition's schema
pub fn new_unloaded(
schema: SchemaRef,
scan_task: Arc<ScanTask>,
metadata: TableMetadata,
statistics: TableStatistics,
) -> Self {
let schema = scan_task.materialized_schema();
MicroPartition {
schema: schema.clone(),
state: Mutex::new(TableState::Unloaded(scan_task)),
Expand Down Expand Up @@ -376,7 +376,6 @@ impl MicroPartition {
// CASE: ScanTask provides all required metadata.
// If the scan_task provides metadata (e.g. retrieved from a catalog) we can use it to create an unloaded MicroPartition
(Some(metadata), Some(statistics), _, _) => Ok(Self::new_unloaded(
schema,
scan_task.clone(),
metadata.clone(),
statistics.clone(),
Expand Down Expand Up @@ -922,7 +921,6 @@ pub(crate) fn read_parquet_into_micropartition(
);

Ok(MicroPartition::new_unloaded(
scan_task.materialized_schema(),
Arc::new(scan_task),
TableMetadata { length: total_rows },
stats,
Expand Down
41 changes: 25 additions & 16 deletions src/daft-micropartition/src/ops/cast_to_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,39 @@ use std::{ops::Deref, sync::Arc};

use common_error::DaftResult;
use daft_core::schema::SchemaRef;
use daft_scan::ScanTask;

use crate::micropartition::{MicroPartition, TableState};

use daft_stats::TableStatistics;

impl MicroPartition {
pub fn cast_to_schema(&self, schema: SchemaRef) -> DaftResult<Self> {
let pruned_statistics = self.statistics.clone().map(|stats| TableStatistics {
columns: stats
.columns
.into_iter()
.filter(|(key, _)| schema.names().contains(key))
.collect(),
});
let casted_statistics = self
.statistics
.clone()
.map(|stats| stats.cast_to_schema(schema.clone()))
.transpose()?;

let guard = self.state.lock().unwrap();
match guard.deref() {
// Replace schema if Unloaded, which should be applied when data is lazily loaded
TableState::Unloaded(scan_task) => Ok(MicroPartition::new_unloaded(
schema.clone(),
scan_task.clone(),
self.metadata.clone(),
pruned_statistics.expect("Unloaded MicroPartition should have statistics"),
)),
TableState::Unloaded(scan_task) => {
let maybe_new_scan_task = if scan_task.schema == schema {
scan_task.clone()
} else {
Arc::new(ScanTask::new(
scan_task.sources.clone(),
scan_task.file_format_config.clone(),
schema,
scan_task.storage_config.clone(),
scan_task.pushdowns.clone(),
))
};
Ok(MicroPartition::new_unloaded(
maybe_new_scan_task,
self.metadata.clone(),
casted_statistics.expect("Unloaded MicroPartition should have statistics"),
))
}
// If Tables are already loaded, we map `Table::cast_to_schema` on each Table
TableState::Loaded(tables) => Ok(MicroPartition::new_loaded(
schema.clone(),
Expand All @@ -35,7 +44,7 @@ impl MicroPartition {
.map(|tbl| tbl.cast_to_schema(schema.as_ref()))
.collect::<DaftResult<Vec<_>>>()?,
),
pruned_statistics,
casted_statistics,
)),
}
}
Expand Down

0 comments on commit 82ad347

Please sign in to comment.