Skip to content

Commit

Permalink
Move materialized scan tasks into Source nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
desmondcheongzx committed Nov 23, 2024
1 parent 8458c74 commit 67cd40c
Show file tree
Hide file tree
Showing 12 changed files with 87 additions and 160 deletions.
30 changes: 27 additions & 3 deletions src/common/scan-info/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,33 @@ pub use python::register_modules;
pub use scan_operator::{ScanOperator, ScanOperatorRef};
pub use scan_task::{BoxScanTaskLikeIter, ScanTaskLike, ScanTaskLikeRef};

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum ScanState {
Operator(ScanOperatorRef),
Tasks(Vec<ScanTaskLikeRef>),
}

impl ScanState {
pub fn multiline_display(&self) -> Vec<String> {
match self {
Self::Operator(scan_op) => scan_op.0.multiline_display(),
Self::Tasks(scan_tasks) => {
vec![format!("Num Scan Tasks = {}", scan_tasks.len())]
}
}
}

pub fn get_scan_op(&self) -> &ScanOperatorRef {
match self {
Self::Operator(scan_op) => scan_op,
Self::Tasks(_) => panic!("Tried to get scan op from materialized physical scan info"),
}
}
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct PhysicalScanInfo {
pub scan_op: ScanOperatorRef,
pub scan_state: ScanState,
pub source_schema: SchemaRef,
pub partitioning_keys: Vec<PartitionField>,
pub pushdowns: Pushdowns,
Expand All @@ -38,7 +62,7 @@ impl PhysicalScanInfo {
pushdowns: Pushdowns,
) -> Self {
Self {
scan_op,
scan_state: ScanState::Operator(scan_op),
source_schema,
partitioning_keys,
pushdowns,
Expand All @@ -48,7 +72,7 @@ impl PhysicalScanInfo {
#[must_use]
pub fn with_pushdowns(&self, pushdowns: Pushdowns) -> Self {
Self {
scan_op: self.scan_op.clone(),
scan_state: self.scan_state.clone(),
source_schema: self.source_schema.clone(),
partitioning_keys: self.partitioning_keys.clone(),
pushdowns,
Expand Down
29 changes: 8 additions & 21 deletions src/daft-local-plan/src/translate.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use common_error::{DaftError, DaftResult};
use common_scan_info::ScanState;
use daft_core::join::JoinStrategy;
use daft_dsl::ExprRef;
use daft_logical_plan::{
ops::MaterializedScanSource, JoinType, LogicalPlan, LogicalPlanRef, SourceInfo,
};
use daft_logical_plan::{JoinType, LogicalPlan, LogicalPlanRef, SourceInfo};

use super::plan::{LocalPhysicalPlan, LocalPhysicalPlanRef};

Expand All @@ -17,7 +16,12 @@ pub fn translate(plan: &LogicalPlanRef) -> DaftResult<LocalPhysicalPlanRef> {
)),
SourceInfo::Physical(info) => {
// We should be able to pass the ScanOperator into the physical plan directly but we need to figure out the serialization story
let scan_tasks = info.scan_op.0.to_scan_tasks(info.pushdowns.clone(), None)?;
let scan_tasks = match &info.scan_state {
ScanState::Operator(scan_op) => {
scan_op.0.to_scan_tasks(info.pushdowns.clone(), None)?
}
ScanState::Tasks(scan_tasks) => scan_tasks.clone(),
};
if scan_tasks.is_empty() {
Ok(LocalPhysicalPlan::empty_scan(source.output_schema.clone()))
} else {
Expand All @@ -34,23 +38,6 @@ pub fn translate(plan: &LogicalPlanRef) -> DaftResult<LocalPhysicalPlanRef> {
}
}
}
LogicalPlan::MaterializedScanSource(MaterializedScanSource {
scan_tasks,
pushdowns,
schema,
stats_state,
}) => {
if scan_tasks.is_empty() {
Ok(LocalPhysicalPlan::empty_scan(schema.clone()))
} else {
Ok(LocalPhysicalPlan::physical_scan(
scan_tasks.clone(),
pushdowns.clone(),
schema.clone(),
stats_state.clone(),
))
}
}
LogicalPlan::Filter(filter) => {
let input = translate(&filter.input)?;
Ok(LocalPhysicalPlan::filter(
Expand Down
12 changes: 0 additions & 12 deletions src/daft-logical-plan/src/logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use crate::stats::StatsState;
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub enum LogicalPlan {
Source(Source),
MaterializedScanSource(MaterializedScanSource),
Project(Project),
ActorPoolProject(ActorPoolProject),
Filter(Filter),
Expand Down Expand Up @@ -45,7 +44,6 @@ impl LogicalPlan {
pub fn schema(&self) -> SchemaRef {
match self {
Self::Source(Source { output_schema, .. }) => output_schema.clone(),
Self::MaterializedScanSource(MaterializedScanSource { schema, .. }) => schema.clone(),
Self::Project(Project {
projected_schema, ..
}) => projected_schema.clone(),
Expand Down Expand Up @@ -173,15 +171,13 @@ impl LogicalPlan {
Self::Intersect(_) => vec![IndexSet::new(), IndexSet::new()],
Self::Union(_) => vec![IndexSet::new(), IndexSet::new()],
Self::Source(_) => todo!(),
Self::MaterializedScanSource(_) => todo!(),
Self::Sink(_) => todo!(),
}
}

pub fn name(&self) -> &'static str {
match self {
Self::Source(..) => "Source",
Self::MaterializedScanSource(..) => "MaterializedScanSource",
Self::Project(..) => "Project",
Self::ActorPoolProject(..) => "ActorPoolProject",
Self::Filter(..) => "Filter",
Expand All @@ -206,7 +202,6 @@ impl LogicalPlan {
pub fn get_stats(&self) -> &StatsState {
match self {
Self::Source(Source { stats_state, .. })
| Self::MaterializedScanSource(MaterializedScanSource { stats_state, .. })
| Self::Project(Project { stats_state, .. })
| Self::ActorPoolProject(ActorPoolProject { stats_state, .. })
| Self::Filter(Filter { stats_state, .. })
Expand Down Expand Up @@ -239,9 +234,6 @@ impl LogicalPlan {
pub fn with_materialized_stats(self) -> Self {
match self {
Self::Source(plan) => Self::Source(plan.with_materialized_stats()),
Self::MaterializedScanSource(plan) => {
Self::MaterializedScanSource(plan.with_materialized_stats())
}
Self::Project(plan) => Self::Project(plan.with_materialized_stats()),
Self::ActorPoolProject(plan) => Self::ActorPoolProject(plan.with_materialized_stats()),
Self::Filter(plan) => Self::Filter(plan.with_materialized_stats()),
Expand Down Expand Up @@ -272,7 +264,6 @@ impl LogicalPlan {
pub fn multiline_display(&self) -> Vec<String> {
match self {
Self::Source(source) => source.multiline_display(),
Self::MaterializedScanSource(plan) => plan.multiline_display(),
Self::Project(projection) => projection.multiline_display(),
Self::ActorPoolProject(projection) => projection.multiline_display(),
Self::Filter(filter) => filter.multiline_display(),
Expand All @@ -299,7 +290,6 @@ impl LogicalPlan {
pub fn children(&self) -> Vec<&Self> {
match self {
Self::Source(..) => vec![],
Self::MaterializedScanSource(..) => vec![],
Self::Project(Project { input, .. }) => vec![input],
Self::ActorPoolProject(ActorPoolProject { input, .. }) => vec![input],
Self::Filter(Filter { input, .. }) => vec![input],
Expand Down Expand Up @@ -327,7 +317,6 @@ impl LogicalPlan {
match children {
[input] => match self {
Self::Source(_) => panic!("Source nodes don't have children, with_new_children() should never be called for Source ops"),
Self::MaterializedScanSource(_) => panic!("MaterializedScanSource nodes don't have children, with_new_children() should never be called for MaterializedScanSource ops"),
Self::Project(Project { projection, .. }) => Self::Project(Project::try_new(
input.clone(), projection.clone(),
).unwrap()),
Expand Down Expand Up @@ -460,7 +449,6 @@ macro_rules! impl_from_data_struct_for_logical_plan {
}

impl_from_data_struct_for_logical_plan!(Source);
impl_from_data_struct_for_logical_plan!(MaterializedScanSource);
impl_from_data_struct_for_logical_plan!(Project);
impl_from_data_struct_for_logical_plan!(Filter);
impl_from_data_struct_for_logical_plan!(Limit);
Expand Down
59 changes: 0 additions & 59 deletions src/daft-logical-plan/src/ops/materialized_scan_source.rs

This file was deleted.

2 changes: 0 additions & 2 deletions src/daft-logical-plan/src/ops/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ mod explode;
mod filter;
mod join;
mod limit;
mod materialized_scan_source;
mod monotonically_increasing_id;
mod pivot;
mod project;
Expand All @@ -26,7 +25,6 @@ pub use explode::Explode;
pub use filter::Filter;
pub use join::Join;
pub use limit::Limit;
pub use materialized_scan_source::MaterializedScanSource;
pub use monotonically_increasing_id::MonotonicallyIncreasingId;
pub use pivot::Pivot;
pub use project::Project;
Expand Down
48 changes: 27 additions & 21 deletions src/daft-logical-plan/src/ops/source.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use std::sync::Arc;

use common_daft_config::DaftExecutionConfig;
use common_scan_info::PhysicalScanInfo;
use common_scan_info::{PhysicalScanInfo, ScanState};
use daft_schema::schema::SchemaRef;

use crate::{
ops::MaterializedScanSource,
source_info::{InMemoryInfo, PlaceHolderInfo, SourceInfo},
stats::{ApproxStats, PlanStats, StatsState},
};
Expand All @@ -30,26 +29,33 @@ impl Source {
}
}

pub(crate) fn with_materialized_scan_source(
&self,
pub(crate) fn build_materialized_scan_source(
mut self,
execution_config: Option<&DaftExecutionConfig>,
) -> MaterializedScanSource {
match &*self.source_info {
SourceInfo::Physical(PhysicalScanInfo {
scan_op, pushdowns, ..
}) => {
let scan_tasks = scan_op
.0
.to_scan_tasks(pushdowns.clone(), execution_config)
.expect("Failed to get scan tasks from scan operator");
MaterializedScanSource::new(
scan_tasks,
pushdowns.clone(),
self.output_schema.clone(),
)
) -> Self {
if let Some(scan_info) = Arc::get_mut(&mut self.source_info) {
match scan_info {
SourceInfo::Physical(physical_scan_info) => {
match &mut physical_scan_info.scan_state {
ScanState::Operator(scan_op) => {
let scan_tasks = scan_op
.0
.to_scan_tasks(
physical_scan_info.pushdowns.clone(),
execution_config,
)
.expect("Failed to get scan tasks from scan operator");
physical_scan_info.scan_state = ScanState::Tasks(scan_tasks);
}
ScanState::Tasks(_) => {
panic!("Physical scan nodes are being materialized more than once");
}
}
}
_ => panic!("Only unmaterialized physical scan nodes can be materialized"),
}
_ => panic!("Only physical scan nodes can be materialized"),
}
self
}

pub(crate) fn with_materialized_stats(mut self) -> Self {
Expand Down Expand Up @@ -79,12 +85,12 @@ impl Source {
match self.source_info.as_ref() {
SourceInfo::Physical(PhysicalScanInfo {
source_schema,
scan_op,
scan_state: scan_op,
partitioning_keys,
pushdowns,
}) => {
use itertools::Itertools;
res.extend(scan_op.0.multiline_display());
res.extend(scan_op.multiline_display());

res.push(format!("File schema = {}", source_schema.short_string()));
res.push(format!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use super::OptimizerRule;
use crate::LogicalPlan;

// Add stats to all logical plan nodes in a bottom up fashion.
// All scan nodes MUST be materialized before stats are enriched.
impl OptimizerRule for EnrichWithStats {
fn try_optimize(&self, plan: Arc<LogicalPlan>) -> DaftResult<Transformed<Arc<LogicalPlan>>> {
plan.transform_up(|c| {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,23 @@ use crate::{LogicalPlan, SourceInfo};
// Add stats to all logical plan nodes in a bottom up fashion.
impl OptimizerRule for MaterializeScans {
fn try_optimize(&self, plan: Arc<LogicalPlan>) -> DaftResult<Transformed<Arc<LogicalPlan>>> {
plan.transform_up(|node| self.try_optimize_node(node))
plan.transform_up(|node| self.try_optimize_node(Arc::unwrap_or_clone(node)))
}
}

impl MaterializeScans {
#[allow(clippy::only_used_in_recursion)]
fn try_optimize_node(
&self,
plan: Arc<LogicalPlan>,
) -> DaftResult<Transformed<Arc<LogicalPlan>>> {
match plan.as_ref() {
fn try_optimize_node(&self, plan: LogicalPlan) -> DaftResult<Transformed<Arc<LogicalPlan>>> {
match plan {
LogicalPlan::Source(source) => match &*source.source_info {
SourceInfo::Physical(_) => Ok(Transformed::yes(
source
.with_materialized_scan_source(self.execution_config.as_deref())
.build_materialized_scan_source(self.execution_config.as_deref())
.into(),
)),
_ => Ok(Transformed::no(plan)),
_ => Ok(Transformed::no(Arc::new(LogicalPlan::Source(source)))),
},
_ => Ok(Transformed::no(plan)),
_ => Ok(Transformed::no(Arc::new(plan))),
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl PushDownFilter {
needing_filter_op,
} = rewrite_predicate_for_partitioning(
&new_predicate,
external_info.scan_op.0.partitioning_keys(),
external_info.scan_state.get_scan_op().0.partitioning_keys(),
)?;
assert!(
partition_only_filter.len()
Expand Down
Loading

0 comments on commit 67cd40c

Please sign in to comment.